[PATCH 8/9] tests: test MLO rejection for incorrectly configured MLD APs

Benjamin Berg benjamin at sipsolutions.net
Wed Jun 18 05:35:30 PDT 2025


From: Benjamin Berg <benjamin.berg at intel.com>

Add a test that injects various problematic ML elements and verify that
MLO is disabled in that case.

Signed-off-by: Benjamin Berg <benjamin.berg at intel.com>
---
 tests/hwsim/test_eht.py      | 117 +++++++++++++++++++++++++++++++++++
 tests/hwsim/wpasupplicant.py |  44 +++++++++++++
 2 files changed, 161 insertions(+)

diff --git a/tests/hwsim/test_eht.py b/tests/hwsim/test_eht.py
index f254d00b3f..42904a9401 100644
--- a/tests/hwsim/test_eht.py
+++ b/tests/hwsim/test_eht.py
@@ -2411,6 +2411,123 @@ def test_eht_mlo_color_change(dev, apdev):
         hapd0.dump_monitor()
         hapd1.dump_monitor()
 
+def test_eht_mld_invalid_ml_elem(dev, apdev):
+    """EHT AP MLD with various errors in its beacon"""
+
+    cases = {
+        '2nd link misses ML elem': {
+            'hapd1_ml_elem': '',
+            'hapd1_parse_error': True,
+        },
+        '2nd link has wrong MLD capa with 3 maximum links' : {
+            'hapd1_ml_elem': 'ff106bb0010d__MLD_ADDR__010181000200',
+        },
+        '2nd link has incorrect MLD addr': {
+            'hapd1_ml_elem': 'ff106bb0010d112233445566010181000100',
+        },
+        '2nd link missing EMLSR': {
+            'hapd1_ml_elem': 'ff106bb0010d__MLD_ADDR__010101000100',
+        },
+        '2nd link missing MLD capa': {
+            'hapd1_ml_elem': 'ff0e6bb0000b__MLD_ADDR__01018100',
+            'hapd1_parse_error': True
+        },
+        '2nd link added/different extended MLD capa': {
+            'hapd1_ml_elem': 'ff126bb0050f__MLD_ADDR__0101810001000100',
+            'hapd1_parse_error': True
+        },
+        '1st link missing MLD capa': {
+            'hapd0_ml_elem': 'ff0e6bb0000b__MLD_ADDR__00018100',
+            'hapd0_parse_error': True
+        },
+    }
+
+    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
+        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
+
+        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+        wpas.interface_add(wpas_iface)
+
+        passphrase = 'qwertyuiop'
+        ssid = "mld_ap_two_links"
+
+        params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE", mfp="2", pwe="1")
+        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
+
+        params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE", mfp="2", pwe="1")
+        params['channel'] = '6'
+        hapd1 = eht_mld_enable_ap(hapd_iface, 1, params)
+
+        # hapd0 as the primary link
+        wpas.set("mld_connect_bssid_pref", hapd0.own_addr())
+        wpas.set("sae_pwe", "1")
+        wpas.connect(ssid, scan_freq="2412 2437", psk=passphrase, key_mgmt="SAE", ieee80211w="2")
+
+        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=3, active_links=3)
+        eht_verify_wifi_version(wpas)
+
+        # Grab the BSS information
+        hapd0_bss = wpas.get_bss(hapd0.own_addr())
+        hapd1_bss = wpas.get_bss(hapd1.own_addr())
+
+        assert wpas.request('DISCONNECT').strip() == 'OK'
+
+        # Iterate the testcases
+        for name, case in cases.items():
+            logger.info(f"Testing case: {name}")
+            wpas.scan()
+            time.sleep(5)
+
+            new_hapd1_bss = hapd1_bss.copy()
+            if 'beacon_ie' in new_hapd1_bss:
+                del new_hapd1_bss['beacon_ie']
+            if 'hapd1_ml_elem' in case:
+                ml_addr = hapd0.own_mld_addr().replace(":", "")
+                orig_ml_elem = f'ff106bb0010d{ml_addr}010181000100'
+                new_ml_elem = case['hapd1_ml_elem'].replace('__MLD_ADDR__', ml_addr)
+                new_hapd1_bss['ie'] = \
+                    hapd1_bss['ie'].replace(orig_ml_elem, new_ml_elem)
+
+            new_hapd1_bss['est_throughput'] = 1
+
+            # Always override hapd1 to lower est_throughput
+            override = {
+                new_hapd1_bss['bssid']: new_hapd1_bss
+            }
+
+            if 'hapd0_ml_elem' in case:
+                new_hapd0_bss = hapd0_bss.copy()
+                if 'beacon_ie' in new_hapd0_bss:
+                    del new_hapd0_bss['beacon_ie']
+
+                ml_addr = hapd0.own_mld_addr().replace(":", "")
+                orig_ml_elem = f'ff106bb0010d{ml_addr}000181000100'
+                new_ml_elem = case['hapd0_ml_elem'].replace('__MLD_ADDR__', ml_addr)
+                new_hapd0_bss['ie'] = \
+                    hapd0_bss['ie'].replace(orig_ml_elem, new_ml_elem)
+
+                override[new_hapd0_bss['bssid']] = new_hapd0_bss
+
+            wpas.override_scan(override)
+
+            # Ensure we could parse the ML information
+            if not case.get('hapd0_parse_error', False):
+                assert 'ap_mld_addr' in wpas.get_bss(hapd0_bss['bssid'])
+            else:
+                assert 'ap_mld_addr' not in wpas.get_bss(hapd0_bss['bssid'])
+                # Nothing else to test, not connecting in MLO mode
+                continue
+            if not case.get('hapd1_parse_error', False):
+                assert 'ap_mld_addr' in wpas.get_bss(hapd1_bss['bssid'])
+
+            # This will connect to hapd0, but the second link is ignored
+            wpas.connect(ssid, psk=passphrase, key_mgmt="SAE", ieee80211w="2")
+
+            eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=1, active_links=1)
+            eht_verify_wifi_version(wpas)
+
+            assert wpas.request('DISCONNECT').strip() == 'OK'
+
 def test_eht_mld_control_socket_connectivity(dev, apdev):
     """AP MLD control socket connectivity"""
     with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
diff --git a/tests/hwsim/wpasupplicant.py b/tests/hwsim/wpasupplicant.py
index 4ea27a771b..6798748ca8 100644
--- a/tests/hwsim/wpasupplicant.py
+++ b/tests/hwsim/wpasupplicant.py
@@ -1242,6 +1242,50 @@ class WpaSupplicant:
             if len(res.splitlines()) > 1:
                 logger.info("flush_scan_cache: Could not clear all BSS entries. These remain:\n" + res)
 
+    def override_scan(self, inject):
+        """
+        Simulate a scan with all existing BSSs overriding/adding any BSS listed
+        in the inject dictionary.
+        """
+        attrs = {
+            'bssid': 'bssid',
+            'freq': 'freq',
+            'beacon_int': 'beacon_int',
+            'caps': 'capabilities',
+            'qual': 'qual',
+            'noise': 'noise',
+            'level': 'level',
+            'est_throughput': 'est_throughput',
+            'snr': 'snr',
+            'ie': 'ie',
+            'beacon_ie': 'beacon_ie',
+        }
+
+        all_aps = {}
+
+        # Grab all existing BSSs, so that they are part of the "last" scan
+        for bssid in re.findall('bssid=(?P<bssid>.*?)\n', self.request("BSS RANGE=ALL MASK=0x2")):
+            scan_bss = self.get_bss(bssid)
+            all_aps[bssid] = scan_bss
+
+        # Override the one we want to inject
+        all_aps.update(inject)
+
+        assert self.request('DRIVER_EVENT SCAN_RES START').strip() == 'OK'
+
+        # And, insert them all
+        for bss in all_aps.values():
+            bss_info = ' '.join(f'{k}={bss[v]}' for k, v in attrs.items() if v in bss)
+
+            if 'tsf' in bss:
+                bss_info += (f' tsf={int(bss["tsf"]):x}')
+
+            if bss['bssid'] in inject:
+                logger.info(f'Injecting BSS: {bss_info}')
+            assert self.request(f'DRIVER_EVENT SCAN_RES BSS {bss_info}').strip() == 'OK'
+
+        assert self.request('DRIVER_EVENT SCAN_RES END').strip() == 'OK'
+
     def disconnect_and_stop_scan(self):
         self.request("DISCONNECT")
         res = self.request("ABORT_SCAN")
-- 
2.49.0




More information about the Hostap mailing list