[RFC v2 99/99] tests: Add NAN data path tests
Andrei Otcheretianski
andrei.otcheretianski at intel.com
Tue Dec 23 03:52:43 PST 2025
Add several NDP establishment tests for open and encrypted connections,
including counter NDL proposal.
In addition add local schedule configuration test.
Signed-off-by: Andrei Otcheretianski <andrei.otcheretianski at intel.com>
---
tests/hwsim/test_nan.py | 308 +++++++++++++++++++++++++++++++++++++++-
1 file changed, 307 insertions(+), 1 deletion(-)
diff --git a/tests/hwsim/test_nan.py b/tests/hwsim/test_nan.py
index b1633737be..2eb36c57d0 100644
--- a/tests/hwsim/test_nan.py
+++ b/tests/hwsim/test_nan.py
@@ -9,6 +9,8 @@ import logging
logger = logging.getLogger()
from utils import *
import string
+from test_p2p_channel import set_country
+from hwsim import HWSimRadio
def check_nan_capab(dev):
capa = dev.request("GET_CAPABILITY nan")
@@ -18,10 +20,11 @@ def check_nan_capab(dev):
raise HwsimSkip(f"NAN not supported: {capa}")
class NanDevice:
- def __init__(self, dev, ifname):
+ def __init__(self, dev, ifname, ndi_name=None):
self.dev = dev
self.ifname = ifname
self.wpas = None
+ self.ndi_name = ndi_name
def __enter__(self):
self.start()
@@ -48,12 +51,19 @@ class NanDevice:
logger.info(f"NAN device started on {self.ifname}")
+ # Add NDI
+ if self.ndi_name is not None:
+ self.dev.interface_add(self.ndi_name, if_type="nan_data", create=True)
+
def stop(self):
logger.info(f"NAN device stopping on {self.ifname}")
if "OK" not in self.wpas.request("NAN_STOP"):
raise Exception(f"Failed to stop NAN functionality on {self.ifname}")
+ if self.ndi_name is not None:
+ self.dev.interface_remove(self.ndi_name)
+
self.dev.global_request(f"INTERFACE_REMOVE {self.ifname}")
self.wpas.remove_ifname()
@@ -82,6 +92,68 @@ class NanDevice:
return self.wpas.request(cmd)
+ def schedule_config(self, *chans, map_id=1):
+ cmd = f"NAN_SCHED_CONFIG_MAP map_id={map_id} "
+ cmd += " ".join([f"{freq}:{bitmap}" for freq, bitmap in chans])
+ return self.wpas.request(cmd)
+
+ def remove_shedule(self, map_id=1):
+ cmd = f"NAN_SCHED_CONFIG_MAP map_id={map_id}"
+ return self.wpas.request(cmd)
+
+ def ndp_request(self, ndi, handle, peer_nmi, peer_id, ssi=None,
+ qos_slots=0, qos_latency=0xffff, csid=None, password=None,
+ pmk=None):
+ cmd = f"NAN_NDP_REQUEST handle={handle} ndi={ndi} peer_nmi={peer_nmi} peer_id={peer_id}"
+
+ params = [
+ ("ssi", ssi),
+ ("csid", csid),
+ ("password", password),
+ ("pmk", pmk),
+ ]
+
+ cmd += "".join(f" {name}={value}" for name, value in params if value is not None)
+
+ if qos_slots > 0 or qos_latency != 0xffff:
+ cmd += f" qos={qos_slots}:{qos_latency}"
+
+ return self.wpas.request(cmd)
+
+ def ndp_response(self, action, peer_nmi, ndi=None, peer_ndi=None,
+ ndp_id=None, init_ndi=None, reason_code=None, ssi=None,
+ qos_slots=0, qos_latency=0xffff, handle=None, csid=None,
+ password=None, pmk=None):
+ if action not in ["accept", "reject"]:
+ raise Exception(f"Invalid action: {action}. Must be 'accept' or 'reject'")
+
+ cmd = f"NAN_NDP_RESPONSE {action} peer_nmi={peer_nmi}"
+
+ params = [
+ ("reason_code", reason_code),
+ ("ndi", ndi),
+ ("peer_ndi", peer_ndi),
+ ("ndp_id", ndp_id),
+ ("init_ndi", init_ndi),
+ ("handle", handle),
+ ("ssi", ssi),
+ ("csid", csid),
+ ("password", password),
+ ("pmk", pmk),
+ ]
+
+ cmd += "".join(f" {name}={value}" for name, value in params if value is not None)
+
+ if qos_slots > 0 or qos_latency != 0xffff:
+ cmd += f" qos={qos_slots}:{qos_latency}"
+
+ return self.wpas.request(cmd)
+
+
+ def ndp_terminate(self, peer_nmi, init_ndi, ndp_id):
+ cmd = f"NAN_NDP_TERMINATE peer_nmi={peer_nmi} init_ndi={init_ndi} ndp_id={ndp_id}"
+ return self.wpas.request(cmd)
+
def subscribe(self, service_name, ssi=None, active=1,
sync=1, match_filter_rx=None, match_filter_tx=None,
srf_include=0, srf_mac_list=None, srf_bf_len=0,
@@ -174,6 +246,40 @@ def nan_sync_verify_event(ev, addr, pid, sid, ssi):
if data['address'] != addr:
raise Exception("Unexpected peer_addr: " + ev)
+def nan_ndp_verify_event(ev, peer_nmi, publish_inst_id=None, init_ndi=None,
+ ssi=None, csid=None):
+ """Verify NAN-NDP-REQUEST event format and content"""
+ data = split_nan_event(ev)
+
+ if 'peer_nmi' not in data:
+ raise Exception(f"Missing peer_nmi in NDP event: {ev}")
+
+ if 'init_ndi' not in data:
+ raise Exception(f"Missing init_ndi in NDP event: {ev}")
+
+ if 'ndp_id' not in data:
+ raise Exception(f"Missing ndp_id in NDP event: {ev}")
+
+ if publish_inst_id is not None and 'publish_inst_id' not in data:
+ raise Exception(f"Missing publish_inst_id in NDP event: {ev}")
+
+ if data['peer_nmi'] != peer_nmi:
+ raise Exception(f"Unexpected peer_nmi: got {data['peer_nmi']}, expected {peer_nmi} in event: {ev}")
+
+ if init_ndi is not None and data['init_ndi'] != init_ndi:
+ raise Exception(f"Unexpected init_ndi: got {data['init_ndi']}, expected {init_ndi} in event: {ev}")
+
+ if (publish_inst_id is not None and data['publish_inst_id'] != publish_inst_id):
+ raise Exception(f"Unexpected publish_inst_id: got {data['publish_inst_id']}, expected {publish_inst_id} in event: {ev}")
+
+ if ssi is not None and 'ssi' in data:
+ if data['ssi'] != ssi:
+ raise Exception(f"Unexpected ssi: got {data['ssi']}, expected {ssi} in event: {ev}")
+
+ if csid is not None and 'csid' in data:
+ if data['csid'] != str(csid):
+ raise Exception(f"Unexpected csid: got {data['csid']}, expected {str(csid)} in event: {ev}")
+
def nan_sync_discovery(pub, sub, service_name, pssi, sssi,
unsolicited=1, solicited=1, active=1,
expect_discovery=True,
@@ -774,3 +880,203 @@ def test_nan_config(dev, apdev, params):
# and finally update the configuration
logger.info("Updating NAN configuration")
nan.update_config()
+
+def test_nan_sched(dev, apdev, params):
+ """NAN configure schedule"""
+ set_country("US")
+ try:
+ with HWSimRadio(n_channels=3, use_nan=True) as (wpas_radio1, wpas_iface1):
+ wpas1 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas1.interface_add(wpas_iface1)
+ with NanDevice(wpas1, "nan5") as pub:
+ if "OK" not in pub.schedule_config((2437, "03000000"), (5180, "0000ff00"), (5825, "000000ff")):
+ raise Exception("Failed to configure schedule")
+ # Remove
+ if "OK" not in pub.remove_shedule():
+ raise Exception("Failed to remove schedule")
+ # Overlapping maps
+ if "FAIL" not in pub.schedule_config((2437, "03000000"), (5180, "0000ff00"), (5825, "050000ff")):
+ raise Exception("A schedule with overlapping time bitmaps was unexpectedly accepted")
+ # Same channel
+ if "FAIL" not in pub.schedule_config((2437, "03000000"), (2437, "0000ff00")):
+ raise Exception("A schedule with duplicate channel entries was unexpectedly accepted")
+ # Bad length
+ if "FAIL" not in pub.schedule_config((2437, "0300")):
+ raise Exception("Too short schedule bitmap accepted")
+ finally:
+ set_country("00")
+
+def _run_nan_dp(dev, counter=False, csid=None, wrong_pwd=False, use_pmk=False):
+ if use_pmk:
+ pmk = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
+ pwd = None
+ else:
+ pwd = "NAN" if csid is not None else None
+ pmk = None
+
+ with HWSimRadio(n_channels=3, use_nan=True) as (wpas_radio1, wpas_iface1), \
+ HWSimRadio(n_channels=3, use_nan=True) as (wpas_radio2, wpas_iface2):
+ wpas1 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas1.interface_add(wpas_iface1)
+
+ wpas2 = WpaSupplicant(global_iface='/tmp/wpas-wlan6')
+ wpas2.interface_add(wpas_iface2)
+
+ with NanDevice(wpas1, "nan0", "ndi0") as pub, \
+ NanDevice(wpas2, "nan1", "ndi1") as sub:
+ paddr = pub.wpas.own_addr()
+ saddr = sub.wpas.own_addr()
+
+ pssi = "aabbccdd001122334455667788"
+ sssi = "ddbbccaa001122334455667788"
+
+ pid = pub.publish("test_service", ssi=pssi)
+ sid = sub.subscribe("test_service", ssi=sssi, active=0)
+
+ logger.info(f"Publish ID: {pid}, Subscribe ID: {sid}")
+
+ ev = sub.wpas.wait_event(["NAN-DISCOVERY-RESULT"], timeout=2)
+ if ev is None:
+ raise Exception("NAN-DISCOVERY-RESULT event not seen")
+
+ nan_sync_verify_event(ev, paddr, pid, sid, pssi)
+ ndi_sub = sub.dev.get_iface_addr(sub.ndi_name)
+ if "OK" not in sub.schedule_config((2437, "0e000000"), (5180, "f0ffffff")):
+ raise Exception("Failed to configure schedule (sub)")
+
+ if "OK" not in sub.ndp_request(sub.ndi_name, sid, paddr, pid, "aabbcc", csid=csid,
+ password=pwd, pmk=pmk):
+ raise Exception("NDP request failed")
+
+ ev = pub.wpas.wait_event(["NAN-NDP-REQUEST"], timeout=5)
+ if ev is None:
+ raise Exception("NAN-NDP-REQUEST event not seen")
+
+ # Validate the NDP-REQUEST event
+ nan_ndp_verify_event(ev, saddr, pid, ndi_sub, "aabbcc")
+
+ peer_schedule = pub.wpas.request("NAN_PEER_INFO " + saddr + " schedule")
+ logger.info("\n" + peer_schedule)
+ potential = pub.wpas.request("NAN_PEER_INFO " + saddr + " potential")
+ logger.info("\n" + potential)
+ capa = pub.wpas.request("NAN_PEER_INFO " + saddr + " capa")
+ logger.info("\n" + capa)
+ # Extract NDP information from the event
+ data = split_nan_event(ev)
+ ndp_id = data['ndp_id']
+ init_ndi = data['init_ndi']
+
+ if counter:
+ if "OK" not in pub.schedule_config((5745, "feffffff")):
+ raise Exception("Failed to configure schedule (pub)")
+ else:
+ if "OK" not in pub.schedule_config((2437, "0e000000"), (5180, "f0ffffff")):
+ raise Exception("Failed to configure schedule (pub)")
+
+ # Accept the NDP request
+ if wrong_pwd:
+ pwd = "WRONG_PWD"
+ if "OK" not in pub.ndp_response("accept",
+ saddr,
+ ndi=pub.ndi_name,
+ ndp_id=ndp_id,
+ init_ndi=init_ndi,
+ handle=pid,
+ ssi="ddeeff",
+ csid=csid,
+ password=pwd,
+ pmk=pmk):
+ raise Exception("NDP response (accept) failed")
+
+ # Verify disconnection on wrong password and stop the test
+ if wrong_pwd:
+ ev = sub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+ if ev is None or "reason=3" not in ev:
+ raise Exception("NAN-NDP-DISCONNECTED event not seen on subscriber")
+ ev = pub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+ if ev is None or "reason=3" not in ev:
+ raise Exception("NAN-NDP-DISCONNECTED event not seen on subscriber")
+
+ return
+
+ if counter:
+ ev = sub.wpas.wait_event(["NAN-NDP-COUNTER-REQUEST"], timeout=5)
+ if ev is None:
+ raise Exception("NAN-NDP-COUNTER-REQUEST event not seen")
+
+ # Validate the NDP-COUNTER-REQUEST event
+ nan_ndp_verify_event(ev, paddr, init_ndi=ndi_sub, ssi="ddeeff")
+
+ # Extract NDP information from the event
+ data = split_nan_event(ev)
+ ndp_id = data['ndp_id']
+ init_ndi = data['init_ndi']
+
+ # Configure compliant schedule
+ if "OK" not in sub.schedule_config((5745, "feffffff")):
+ raise Exception("Failed to configure schedule (sub)")
+
+ peer_schedule = sub.wpas.request("NAN_PEER_INFO " + paddr + " schedule")
+ logger.info("\n" + peer_schedule)
+ # Accept the NDP counter request
+ if "OK" not in sub.ndp_response("accept",
+ paddr,
+ ndi=sub.ndi_name,
+ ndp_id=ndp_id,
+ handle=sid,
+ init_ndi=init_ndi,
+ ssi="11223344",
+ csid=csid, password=pwd,
+ pmk=pmk):
+ raise Exception("NDP response (confirm) failed")
+
+ # Wait for NDP connected event
+ ev = pub.wpas.wait_event(["NAN-NDP-CONNECTED"], timeout=5)
+ if ev is None:
+ raise Exception("NAN-NDP-CONNECTED event not seen on publisher")
+
+ ev = sub.wpas.wait_event(["NAN-NDP-CONNECTED"], timeout=5)
+ if ev is None:
+ raise Exception("NAN-NDP-CONNECTED event not seen on subscriber")
+
+ logger.info("NDP connection established successfully")
+
+ sub.ndp_terminate(paddr, init_ndi, ndp_id)
+ ev = pub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+ if ev is None:
+ raise Exception("NAN-NDP-DISCONNECTED event not seen on publisher")
+
+ ev = sub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+ if ev is None:
+ raise Exception("NAN-NDP-DISCONNECTED event not seen on subscriber")
+
+def run_nan_dp(dev, country="US", counter=False, csid=None, wrong_pwd=False, use_pmk=False):
+ set_country(country)
+ try:
+ _run_nan_dp(dev, counter=counter, csid=csid, wrong_pwd=wrong_pwd, use_pmk=use_pmk)
+ finally:
+ set_country("00")
+
+def test_nan_dp_open(dev, apdev, params):
+ """NAN DP open"""
+ run_nan_dp(dev)
+
+def test_nan_dp_open_counter(dev, apdev, params):
+ """NAN DP open with counter proposal"""
+ run_nan_dp(dev, counter=True)
+
+def test_nan_dp_sk_ccmp128(dev, apdev, params):
+ """NAN DP - 2way NDL + SK CCMP security"""
+ run_nan_dp(dev, csid=1)
+
+def test_nan_dp_sk_gcmp256(dev, apdev, params):
+ """NAN DP - 3way NDL + SK GCMP-256 security"""
+ run_nan_dp(dev, counter=True, csid=2)
+
+def test_nan_dp_wrong_pwd(dev, apdev, params):
+ """NAN DP - Wrong password"""
+ run_nan_dp(dev, csid=1, wrong_pwd=True)
+
+def test_nan_dp_pmk(dev, apdev, params):
+ """NAN DP - 3way NDL + SK CCMP security with PMK"""
+ run_nan_dp(dev, counter=True, csid=1, use_pmk=True)
--
2.49.0
More information about the Hostap
mailing list