[PATCH v2 44/44] tests: Add basic MLD hwsim tests

Andrei Otcheretianski andrei.otcheretianski at intel.com
Mon May 22 12:34:12 PDT 2023


Signed-off-by: Andrei Otcheretianski <andrei.otcheretianski at intel.com>
---
 tests/hwsim/example-hostapd.config |   1 +
 tests/hwsim/hostapd.py             |  77 +++++++-
 tests/hwsim/hwsim.py               |  11 +-
 tests/hwsim/test_eht.py            | 293 +++++++++++++++++++++++++++++
 4 files changed, 375 insertions(+), 7 deletions(-)

diff --git a/tests/hwsim/example-hostapd.config b/tests/hwsim/example-hostapd.config
index 5b7130fdcd..e6f91fe388 100644
--- a/tests/hwsim/example-hostapd.config
+++ b/tests/hwsim/example-hostapd.config
@@ -117,3 +117,4 @@ CONFIG_DPP2=y
 CONFIG_WEP=y
 CONFIG_PASN=y
 CONFIG_AIRTIME_POLICY=y
+CONFIG_IEEE80211BE=y
diff --git a/tests/hwsim/hostapd.py b/tests/hwsim/hostapd.py
index 77b210b6e6..e47aa146db 100644
--- a/tests/hwsim/hostapd.py
+++ b/tests/hwsim/hostapd.py
@@ -101,6 +101,11 @@ class HostapdGlobal:
             if not ignore_error:
                 raise Exception("Could not add hostapd BSS")
 
+    def add_link(self, ifname, confname):
+        res = self.request("ADD " + ifname + " config=" + confname)
+        if "OK" not in res:
+            raise Exception("Could not add hostapd link")
+
     def remove(self, ifname):
         self.request("REMOVE " + ifname, timeout=30)
 
@@ -141,13 +146,13 @@ class HostapdGlobal:
         self.host.send_file(src, dst)
 
 class Hostapd:
-    def __init__(self, ifname, bssidx=0, hostname=None, port=8877):
+    def __init__(self, ifname, bssidx=0, hostname=None, ctrl=hapd_ctrl, port=8877):
         self.hostname = hostname
         self.host = remotehost.Host(hostname, ifname)
         self.ifname = ifname
         if hostname is None:
-            self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
-            self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
+            self.ctrl = wpaspy.Ctrl(os.path.join(ctrl, ifname))
+            self.mon = wpaspy.Ctrl(os.path.join(ctrl, ifname))
             self.dbg = ifname
         else:
             self.ctrl = wpaspy.Ctrl(hostname, port)
@@ -156,6 +161,7 @@ class Hostapd:
         self.mon.attach()
         self.bssid = None
         self.bssidx = bssidx
+        self.mld_addr = None
 
     def cmd_execute(self, cmd_array, shell=False):
         if self.hostname is None:
@@ -184,8 +190,15 @@ class Hostapd:
             self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
         return self.bssid
 
+    def own_mld_addr(self):
+        if self.mld_addr is None:
+            self.mld_addr = self.get_status_field('mld_addr[%d]' % self.bssidx)
+        return self.mld_addr
+
     def get_addr(self, group=False):
-        return self.own_addr()
+        if self.own_mld_addr() is None:
+            return self.own_addr()
+        return self.own_mld_addr()
 
     def request(self, cmd):
         logger.debug(self.dbg + ": CTRL: " + cmd)
@@ -682,6 +695,33 @@ def add_iface(apdev, confname):
         raise Exception("Could not ping hostapd")
     return hapd
 
+def add_mld_link(apdev, params):
+    if isinstance(apdev, dict):
+        ifname = apdev['ifname']
+        try:
+            hostname = apdev['hostname']
+            port = apdev['port']
+            logger.info("Adding link on: " + hostname + "/" + port + " ifname=" + ifname)
+        except:
+            logger.info("Adding link on: ifname=" + ifname)
+            hostname = None
+            port = 8878
+    else:
+        ifname = apdev
+        logger.info("Adding link on: ifname=" + ifname)
+        hostname = None
+        port = 8878
+
+    hapd_global = HostapdGlobal(apdev)
+    confname, ctrl_iface = cfg_mld_link_file(ifname, params)
+    hapd_global.send_file(confname, confname)
+    hapd_global.add_link(ifname, confname)
+    port = hapd_global.get_ctrl_iface_port(ifname)
+    hapd = Hostapd(ifname, hostname=hostname, ctrl=ctrl_iface, port=port)
+    if not hapd.ping():
+        raise Exception("Could not ping hostapd")
+    return hapd
+
 def remove_bss(apdev, ifname=None):
     if ifname == None:
         ifname = apdev['ifname']
@@ -904,3 +944,32 @@ def cfg_file(apdev, conf, ifname=None):
         return fname
 
     return conf
+
+idx = 0
+def cfg_mld_link_file(ifname, params):
+    global idx
+    ctrl_iface="/var/run/hostapd"
+    conf = "link-%d.conf" % idx
+
+    fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
+    f = os.fdopen(fd, 'w')
+
+    if idx != 0:
+        ctrl_iface="/var/run/hostapd_%d" % idx
+
+    f.write("ctrl_interface=%s\n" % ctrl_iface)
+    f.write("driver=nl80211\n")
+    f.write("ieee80211n=1\n")
+    f.write("ieee80211ac=1\n")
+    f.write("ieee80211ax=1\n")
+    f.write("ieee80211be=1\n")
+    f.write("interface=%s\n" % ifname)
+    f.write("mld_ap=1\n")
+    f.write("mld_id=0\n")
+
+    for k, v in list(params.items()):
+        f.write("{}={}\n".format(k,v))
+
+    idx = idx + 1
+
+    return fname, ctrl_iface
\ No newline at end of file
diff --git a/tests/hwsim/hwsim.py b/tests/hwsim/hwsim.py
index bc8aabdd49..5b1f858c95 100644
--- a/tests/hwsim/hwsim.py
+++ b/tests/hwsim/hwsim.py
@@ -17,6 +17,7 @@ HWSIM_ATTR_CHANNELS = 9
 HWSIM_ATTR_RADIO_ID = 10
 HWSIM_ATTR_SUPPORT_P2P_DEVICE = 14
 HWSIM_ATTR_USE_CHANCTX = 15
+HWSIM_ATTR_MLO_SUPPORT = 25
 
 # the controller class
 class HWSimController(object):
@@ -25,7 +26,7 @@ class HWSimController(object):
         self._fid = netlink.genl_controller.get_family_id(b'MAC80211_HWSIM')
 
     def create_radio(self, n_channels=None, use_chanctx=False,
-                     use_p2p_device=False):
+                     use_p2p_device=False, use_mlo=False):
         attrs = []
         if n_channels:
             attrs.append(netlink.U32Attr(HWSIM_ATTR_CHANNELS, n_channels))
@@ -33,6 +34,8 @@ class HWSimController(object):
             attrs.append(netlink.FlagAttr(HWSIM_ATTR_USE_CHANCTX))
         if use_p2p_device:
             attrs.append(netlink.FlagAttr(HWSIM_ATTR_SUPPORT_P2P_DEVICE))
+        if use_mlo:
+            attrs.append(netlink.FlagAttr(HWSIM_ATTR_MLO_SUPPORT))
 
         msg = netlink.GenlMessage(self._fid, HWSIM_CMD_CREATE_RADIO,
                                   flags=netlink.NLM_F_REQUEST |
@@ -50,17 +53,19 @@ class HWSimController(object):
 
 class HWSimRadio(object):
     def __init__(self, n_channels=None, use_chanctx=False,
-                 use_p2p_device=False):
+                 use_p2p_device=False, use_mlo=False):
         self._controller = HWSimController()
         self._n_channels = n_channels
         self._use_chanctx = use_chanctx
         self._use_p2p_dev = use_p2p_device
+        self._use_mlo = use_mlo
 
     def __enter__(self):
         self._radio_id = self._controller.create_radio(
               n_channels=self._n_channels,
               use_chanctx=self._use_chanctx,
-              use_p2p_device=self._use_p2p_dev)
+              use_p2p_device=self._use_p2p_dev,
+              use_mlo=self._use_mlo)
         if self._radio_id < 0:
             raise Exception("Failed to create radio (err:%d)" % self._radio_id)
         try:
diff --git a/tests/hwsim/test_eht.py b/tests/hwsim/test_eht.py
index ebc846c0d4..17b52ba049 100644
--- a/tests/hwsim/test_eht.py
+++ b/tests/hwsim/test_eht.py
@@ -6,6 +6,57 @@
 
 import hostapd
 from utils import *
+from hwsim import HWSimRadio
+import hwsim_utils
+from wpasupplicant import WpaSupplicant
+import re
+
+def _eht_verify_wifi_version(dev):
+    status = dev.get_status()
+    logger.info("station status: " + str(status))
+
+    if 'wifi_generation' not in status:
+        raise Exception("Missing wifi_generation information")
+    if status['wifi_generation'] != "7":
+        raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
+
+def _eht_verify_status(wpas, hapd, freq, bw, is_ht=False, is_vht=False, mld=False):
+    status = hapd.get_status()
+
+    logger.info("hostapd STATUS: " + str(status))
+    if is_ht and status["ieee80211n"] != "1":
+        raise Exception("Unexpected STATUS ieee80211n value")
+    if is_vht and status["ieee80211ac"] != "1":
+        raise Exception("Unexpected STATUS ieee80211ac value")
+    if status["ieee80211ax"] != "1":
+        raise Exception("Unexpected STATUS ieee80211ax value")
+    if status["ieee80211be"] != "1":
+        raise Exception("Unexpected STATUS ieee80211be value")
+
+    sta = hapd.get_sta(wpas.own_addr())
+    logger.info("hostapd STA: " + str(sta))
+    if is_ht and "[HT]" not in sta['flags']:
+        raise Exception("Missing STA flag: HT")
+    if is_vht and "[VHT]" not in sta['flags']:
+        raise Exception("Missing STA flag: VHT")
+    if "[HE]" not in sta['flags']:
+        raise Exception("Missing STA flag: HE")
+    if "[EHT]" not in sta['flags']:
+        raise Exception("Missing STA flag: EHT")
+
+    sig = wpas.request("SIGNAL_POLL").splitlines()
+
+    # TODO: with MLD connection, signal poll logic is still not implemented.
+    # While mac80211 maintains the station using the MLD address, the information
+    # is maintained in the link stations, but it is not sent to user space yet.
+    if not mld:
+        if "FREQUENCY=%s" % freq not in sig:
+            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
+        if "WIDTH=%s MHz" % bw not in sig:
+            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
+
+def _test_traffic(wpas, hapd):
+    hwsim_utils.test_connectivity(wpas, hapd)
 
 def test_eht_open(dev, apdev):
     """EHT AP with open mode configuration"""
@@ -118,3 +169,245 @@ def test_eht_sae_mlo(dev, apdev):
     finally:
         dev[0].set("sae_groups", "")
         dev[0].set("sae_pwe", "0")
+
+def _eht_mld_enable_ap(iface, params):
+    hapd = hostapd.add_mld_link(iface, params)
+    hapd.enable()
+
+    ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=1)
+    if ev is None:
+        raise Exception("AP startup timed out")
+    if "AP-ENABLED" not in ev:
+        raise Exception("AP startup failed")
+
+    return hapd
+
+def eht_mld_ap_wpa2_params(ssid, passphrase=None, key_mgmt="WPA-PSK-SHA256", mfp="2", pwe=None,
+                           beacon_prot="1"):
+    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase, wpa_key_mgmt=key_mgmt, ieee80211w=mfp)
+
+    params['ieee80211n'] = '1'
+    params['ieee80211ax'] = '1'
+    params['ieee80211be'] = '1'
+    params['channel'] = '1'
+    params['hw_mode'] = 'g'
+    params['group_mgmt_cipher'] = "AES-128-CMAC"
+    params['beacon_prot'] = beacon_prot
+
+    if pwe is not None:
+        params['sae_pwe'] = pwe
+
+    return params
+
+def test_eht_mld_discovery(dev, apdev):
+    """EHT MLD AP discovery"""
+
+    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
+        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
+
+        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+        wpas.interface_add(wpas_iface)
+
+        ssid = "mld_ap"
+        link0_params = {"ssid": ssid,
+                        "hw_mode": "g",
+                        "channel": "1"}
+        link1_params = {"ssid": ssid,
+                        "hw_mode": "g",
+                        "channel": "2"}
+
+        hapd0 = _eht_mld_enable_ap(hapd_iface, link0_params)
+        hapd1 = _eht_mld_enable_ap(hapd_iface, link1_params)
+
+        res = wpas.request("SCAN freq=2412,2417")
+        if "FAIL" in res:
+            raise Exception("Failed to start scan")
+
+        ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"])
+        if ev is None:
+            raise Exception("Scan did not start")
+
+        ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
+        if ev is None:
+            raise Exception("Scan did not complete")
+
+        logger.info("Scan done")
+
+        rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=", re.MULTILINE)
+        ml_pattern = re.compile(".*multi-link:.*, MLD ID=0x0", re.MULTILINE)
+
+        bss = wpas.request("BSS " + hapd0.own_addr())
+        logger.info("BSS 0: " + str(bss))
+
+        if rnr_pattern.search(bss) is None:
+            raise Exception("RNR element not found for first link")
+
+        if ml_pattern.search(bss) is None:
+            raise Exception("ML element not found for first link")
+
+        bss = wpas.request("BSS " + hapd1.own_addr())
+        logger.info("BSS 1: " + str(bss))
+
+        if rnr_pattern.search(bss) is None:
+            raise Exception("RNR element not found for second link")
+
+        if ml_pattern.search(bss) is None:
+            raise Exception("ML element not found for second link")
+
+def _eht_mld_owe_two_links(dev, apdev):
+    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
+        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \
+        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
+
+        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+        wpas.interface_add(wpas_iface)
+
+        ssid = "mld_ap_owe_two_link"
+        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
+
+        hapd0 = _eht_mld_enable_ap(hapd0_iface, params)
+
+        params['channel'] = '6'
+
+        hapd1 = _eht_mld_enable_ap(hapd0_iface, params)
+        # check legacy client connection
+        dev[0].connect(ssid, scan_freq="2437", key_mgmt="OWE", ieee80211w="2")
+        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE", ieee80211w="2")
+
+        _eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True)
+        _eht_verify_wifi_version(wpas)
+        _test_traffic(wpas, hapd0)
+        _test_traffic(wpas, hapd1)
+
+def test_eht_mld_owe_two_links(dev, apdev):
+    """EHT MLD AP with MLD client OWE connection using two links"""
+    _eht_mld_owe_two_links(dev, apdev)
+
+def test_eht_mld_sae_single_link(dev, apdev):
+    """EHT MLD AP with MLD client SAE H2E connection using single link"""
+
+    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
+            HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
+        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+        wpas.interface_add(wpas_iface)
+
+        passphrase = 'qwertyuiop'
+        ssid = "mld_ap_sae_single_link"
+        params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE", mfp="2", pwe='2')
+
+        hapd0 = _eht_mld_enable_ap(hapd_iface, params)
+
+        wpas.set("sae_pwe", "1")
+        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412", key_mgmt="SAE", ieee80211w="2")
+
+        _eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True)
+        _eht_verify_wifi_version(wpas)
+        _test_traffic(wpas, hapd0)
+
+def _test_eht_mld_sae_two_links(dev, apdev, beacon_prot="1"):
+    """EHT MLD AP with MLD client SAE H2E connection using two links"""
+
+    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
+        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
+
+        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+        wpas.interface_add(wpas_iface)
+
+        passphrase = 'qwertyuiop'
+        ssid = "mld_ap_sae_two_link"
+        params = eht_mld_ap_wpa2_params(ssid, passphrase,
+                                        key_mgmt="SAE", mfp="2", pwe='1', beacon_prot=beacon_prot)
+
+        hapd0 = _eht_mld_enable_ap(hapd_iface, params)
+
+        params['channel'] = '6'
+
+        hapd1 = _eht_mld_enable_ap(hapd_iface, params)
+
+        wpas.set("sae_pwe", "1")
+        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
+                     key_mgmt="SAE", ieee80211w="2", beacon_prot="1")
+
+        _eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True)
+        _eht_verify_wifi_version(wpas)
+        _test_traffic(wpas, hapd0)
+        _test_traffic(wpas, hapd1)
+
+def test_eht_mld_sae_two_links(dev, apdev):
+    """EHT MLD AP with MLD client SAE H2E connection using two links"""
+    _test_eht_mld_sae_two_links(dev, apdev)
+
+def test_eht_mld_sae_two_links_no_beacon_prot(dev, apdev):
+    """EHT MLD AP with MLD client SAE H2E connection using two links and no beacon protection"""
+    _test_eht_mld_sae_two_links(dev, apdev, beacon_prot="0")
+
+def test_eht_mld_sae_ext_one_link(dev, apdev):
+    """EHT MLD AP with MLD client SAE-EXT H2E connection using single link"""
+
+    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
+        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
+
+        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+        wpas.interface_add(wpas_iface)
+
+        passphrase = 'qwertyuiop'
+        ssid = "mld_ap_sae_ext_single_link"
+        params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY")
+
+        hapd0 = _eht_mld_enable_ap(hapd_iface, params)
+
+        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412", key_mgmt="SAE-EXT-KEY",
+                     ieee80211w="2")
+
+        _eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True)
+        _eht_verify_wifi_version(wpas)
+        _test_traffic(wpas, hapd0)
+
+def test_eht_mld_sae_ext_two_links(dev, apdev):
+    """EHT MLD AP with MLD client SAE-EXT H2E connection using two links"""
+
+    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
+        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
+
+        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+        wpas.interface_add(wpas_iface)
+
+        passphrase = 'qwertyuiop'
+        ssid = "mld_ap_sae_two_link"
+        params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY")
+
+        hapd0 = _eht_mld_enable_ap(hapd_iface, params)
+
+        params['channel'] = '6'
+
+        hapd1 = _eht_mld_enable_ap(hapd_iface, params)
+
+        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
+                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
+
+        _eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True)
+        _eht_verify_wifi_version(wpas)
+        _test_traffic(wpas, hapd0)
+        _test_traffic(wpas, hapd1)
+
+def test_eht_mld_sae_legacy_client(dev, apdev):
+    """EHT MLD AP with legacy client SAE H2E connection"""
+
+    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface):
+        passphrase = 'qwertyuiop'
+        ssid = "mld_ap_sae_two_link"
+        params = eht_mld_ap_wpa2_params(ssid, passphrase,
+                                        key_mgmt="SAE", mfp="2", pwe='1')
+
+        hapd0 = _eht_mld_enable_ap(hapd_iface, params)
+
+        params['channel'] = '6'
+
+        hapd1 = _eht_mld_enable_ap(hapd_iface, params)
+
+        dev[0].set("sae_pwe", "1")
+        dev[0].connect(ssid, sae_password=passphrase, scan_freq="2412",
+                       key_mgmt="SAE", ieee80211w="2", beacon_prot="1")
+
+        _eht_verify_status(dev[0], hapd0, 2412, 20, is_ht=True)
+        _test_traffic(dev[0], hapd0)
-- 
2.38.1




More information about the Hostap mailing list