[PATCH 8/9] tests: test MLO rejection for incorrectly configured MLD APs
Benjamin Berg
benjamin at sipsolutions.net
Wed Jun 18 05:35:30 PDT 2025
From: Benjamin Berg <benjamin.berg at intel.com>
Add a test that injects various problematic ML elements and verify that
MLO is disabled in that case.
Signed-off-by: Benjamin Berg <benjamin.berg at intel.com>
---
tests/hwsim/test_eht.py | 117 +++++++++++++++++++++++++++++++++++
tests/hwsim/wpasupplicant.py | 44 +++++++++++++
2 files changed, 161 insertions(+)
diff --git a/tests/hwsim/test_eht.py b/tests/hwsim/test_eht.py
index f254d00b3f..42904a9401 100644
--- a/tests/hwsim/test_eht.py
+++ b/tests/hwsim/test_eht.py
@@ -2411,6 +2411,123 @@ def test_eht_mlo_color_change(dev, apdev):
hapd0.dump_monitor()
hapd1.dump_monitor()
+def test_eht_mld_invalid_ml_elem(dev, apdev):
+ """EHT AP MLD with various errors in its beacon"""
+
+ cases = {
+ '2nd link misses ML elem': {
+ 'hapd1_ml_elem': '',
+ 'hapd1_parse_error': True,
+ },
+ '2nd link has wrong MLD capa with 3 maximum links' : {
+ 'hapd1_ml_elem': 'ff106bb0010d__MLD_ADDR__010181000200',
+ },
+ '2nd link has incorrect MLD addr': {
+ 'hapd1_ml_elem': 'ff106bb0010d112233445566010181000100',
+ },
+ '2nd link missing EMLSR': {
+ 'hapd1_ml_elem': 'ff106bb0010d__MLD_ADDR__010101000100',
+ },
+ '2nd link missing MLD capa': {
+ 'hapd1_ml_elem': 'ff0e6bb0000b__MLD_ADDR__01018100',
+ 'hapd1_parse_error': True
+ },
+ '2nd link added/different extended MLD capa': {
+ 'hapd1_ml_elem': 'ff126bb0050f__MLD_ADDR__0101810001000100',
+ 'hapd1_parse_error': True
+ },
+ '1st link missing MLD capa': {
+ 'hapd0_ml_elem': 'ff0e6bb0000b__MLD_ADDR__00018100',
+ 'hapd0_parse_error': True
+ },
+ }
+
+ with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
+ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
+
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas.interface_add(wpas_iface)
+
+ passphrase = 'qwertyuiop'
+ ssid = "mld_ap_two_links"
+
+ params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE", mfp="2", pwe="1")
+ hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
+
+ params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE", mfp="2", pwe="1")
+ params['channel'] = '6'
+ hapd1 = eht_mld_enable_ap(hapd_iface, 1, params)
+
+ # hapd0 as the primary link
+ wpas.set("mld_connect_bssid_pref", hapd0.own_addr())
+ wpas.set("sae_pwe", "1")
+ wpas.connect(ssid, scan_freq="2412 2437", psk=passphrase, key_mgmt="SAE", ieee80211w="2")
+
+ eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=3, active_links=3)
+ eht_verify_wifi_version(wpas)
+
+ # Grab the BSS information
+ hapd0_bss = wpas.get_bss(hapd0.own_addr())
+ hapd1_bss = wpas.get_bss(hapd1.own_addr())
+
+ assert wpas.request('DISCONNECT').strip() == 'OK'
+
+ # Iterate the testcases
+ for name, case in cases.items():
+ logger.info(f"Testing case: {name}")
+ wpas.scan()
+ time.sleep(5)
+
+ new_hapd1_bss = hapd1_bss.copy()
+ if 'beacon_ie' in new_hapd1_bss:
+ del new_hapd1_bss['beacon_ie']
+ if 'hapd1_ml_elem' in case:
+ ml_addr = hapd0.own_mld_addr().replace(":", "")
+ orig_ml_elem = f'ff106bb0010d{ml_addr}010181000100'
+ new_ml_elem = case['hapd1_ml_elem'].replace('__MLD_ADDR__', ml_addr)
+ new_hapd1_bss['ie'] = \
+ hapd1_bss['ie'].replace(orig_ml_elem, new_ml_elem)
+
+ new_hapd1_bss['est_throughput'] = 1
+
+ # Always override hapd1 to lower est_throughput
+ override = {
+ new_hapd1_bss['bssid']: new_hapd1_bss
+ }
+
+ if 'hapd0_ml_elem' in case:
+ new_hapd0_bss = hapd0_bss.copy()
+ if 'beacon_ie' in new_hapd0_bss:
+ del new_hapd0_bss['beacon_ie']
+
+ ml_addr = hapd0.own_mld_addr().replace(":", "")
+ orig_ml_elem = f'ff106bb0010d{ml_addr}000181000100'
+ new_ml_elem = case['hapd0_ml_elem'].replace('__MLD_ADDR__', ml_addr)
+ new_hapd0_bss['ie'] = \
+ hapd0_bss['ie'].replace(orig_ml_elem, new_ml_elem)
+
+ override[new_hapd0_bss['bssid']] = new_hapd0_bss
+
+ wpas.override_scan(override)
+
+ # Ensure we could parse the ML information
+ if not case.get('hapd0_parse_error', False):
+ assert 'ap_mld_addr' in wpas.get_bss(hapd0_bss['bssid'])
+ else:
+ assert 'ap_mld_addr' not in wpas.get_bss(hapd0_bss['bssid'])
+ # Nothing else to test, not connecting in MLO mode
+ continue
+ if not case.get('hapd1_parse_error', False):
+ assert 'ap_mld_addr' in wpas.get_bss(hapd1_bss['bssid'])
+
+ # This will connect to hapd0, but the second link is ignored
+ wpas.connect(ssid, psk=passphrase, key_mgmt="SAE", ieee80211w="2")
+
+ eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=1, active_links=1)
+ eht_verify_wifi_version(wpas)
+
+ assert wpas.request('DISCONNECT').strip() == 'OK'
+
def test_eht_mld_control_socket_connectivity(dev, apdev):
"""AP MLD control socket connectivity"""
with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
diff --git a/tests/hwsim/wpasupplicant.py b/tests/hwsim/wpasupplicant.py
index 4ea27a771b..6798748ca8 100644
--- a/tests/hwsim/wpasupplicant.py
+++ b/tests/hwsim/wpasupplicant.py
@@ -1242,6 +1242,50 @@ class WpaSupplicant:
if len(res.splitlines()) > 1:
logger.info("flush_scan_cache: Could not clear all BSS entries. These remain:\n" + res)
+ def override_scan(self, inject):
+ """
+ Simulate a scan with all existing BSSs overriding/adding any BSS listed
+ in the inject dictionary.
+ """
+ attrs = {
+ 'bssid': 'bssid',
+ 'freq': 'freq',
+ 'beacon_int': 'beacon_int',
+ 'caps': 'capabilities',
+ 'qual': 'qual',
+ 'noise': 'noise',
+ 'level': 'level',
+ 'est_throughput': 'est_throughput',
+ 'snr': 'snr',
+ 'ie': 'ie',
+ 'beacon_ie': 'beacon_ie',
+ }
+
+ all_aps = {}
+
+ # Grab all existing BSSs, so that they are part of the "last" scan
+ for bssid in re.findall('bssid=(?P<bssid>.*?)\n', self.request("BSS RANGE=ALL MASK=0x2")):
+ scan_bss = self.get_bss(bssid)
+ all_aps[bssid] = scan_bss
+
+ # Override the one we want to inject
+ all_aps.update(inject)
+
+ assert self.request('DRIVER_EVENT SCAN_RES START').strip() == 'OK'
+
+ # And, insert them all
+ for bss in all_aps.values():
+ bss_info = ' '.join(f'{k}={bss[v]}' for k, v in attrs.items() if v in bss)
+
+ if 'tsf' in bss:
+ bss_info += (f' tsf={int(bss["tsf"]):x}')
+
+ if bss['bssid'] in inject:
+ logger.info(f'Injecting BSS: {bss_info}')
+ assert self.request(f'DRIVER_EVENT SCAN_RES BSS {bss_info}').strip() == 'OK'
+
+ assert self.request('DRIVER_EVENT SCAN_RES END').strip() == 'OK'
+
def disconnect_and_stop_scan(self):
self.request("DISCONNECT")
res = self.request("ABORT_SCAN")
--
2.49.0
More information about the Hostap
mailing list