[RFC 58/71] tests: Add NAN data path tests
Andrei Otcheretianski
andrei.otcheretianski at intel.com
Wed Apr 1 15:02:07 PDT 2026
Add several NDP establishment tests for open and encrypted connections,
including counter NDL proposal.
In addition add local schedule configuration test.
Signed-off-by: Andrei Otcheretianski <andrei.otcheretianski at intel.com>
---
tests/hwsim/test_nan.py | 320 +++++++++++++++++++++++++++++++++++++++-
1 file changed, 319 insertions(+), 1 deletion(-)
diff --git a/tests/hwsim/test_nan.py b/tests/hwsim/test_nan.py
index f7a212f7f8..24e10608ab 100644
--- a/tests/hwsim/test_nan.py
+++ b/tests/hwsim/test_nan.py
@@ -9,8 +9,10 @@ import logging
logger = logging.getLogger()
from utils import *
import string
+from hwsim_utils import test_connectivity
from hwsim import HWSimRadio
from contextlib import contextmanager, ExitStack
+from test_p2p_channel import set_country
@contextmanager
def hwsim_nan_radios(count=2, n_channels=3):
@@ -41,10 +43,11 @@ def check_nan_capab(dev):
raise HwsimSkip(f"NAN not supported: {capa}")
class NanDevice:
- def __init__(self, dev, ifname):
+ def __init__(self, dev, ifname, ndi_name=None):
self.dev = dev
self.ifname = ifname
self.wpas = None
+ self.ndi_name = ndi_name
def __enter__(self):
self.start()
@@ -71,12 +74,19 @@ class NanDevice:
logger.info(f"NAN device started on {self.ifname}")
+ # Add NDI
+ if self.ndi_name is not None:
+ self.dev.interface_add(self.ndi_name, if_type="nan_data", create=True)
+
def stop(self):
logger.info(f"NAN device stopping on {self.ifname}")
if "OK" not in self.wpas.request("NAN_STOP"):
raise Exception(f"Failed to stop NAN functionality on {self.ifname}")
+ if self.ndi_name is not None:
+ self.dev.interface_remove(self.ndi_name)
+
self.dev.global_request(f"INTERFACE_REMOVE {self.ifname}")
self.wpas.remove_ifname()
@@ -105,6 +115,68 @@ class NanDevice:
return self.wpas.request(cmd)
+ def schedule_config(self, *chans, map_id=1):
+ cmd = f"NAN_SCHED_CONFIG_MAP map_id={map_id} "
+ cmd += " ".join([f"{freq}:{bitmap}" for freq, bitmap in chans])
+ return self.wpas.request(cmd)
+
+ def remove_shedule(self, map_id=1):
+ cmd = f"NAN_SCHED_CONFIG_MAP map_id={map_id}"
+ return self.wpas.request(cmd)
+
+ def ndp_request(self, ndi, handle, peer_nmi, peer_id, ssi=None,
+ qos_slots=0, qos_latency=0xffff, csid=None, password=None,
+ pmk=None):
+ cmd = f"NAN_NDP_REQUEST handle={handle} ndi={ndi} peer_nmi={peer_nmi} peer_id={peer_id}"
+
+ params = [
+ ("ssi", ssi),
+ ("csid", csid),
+ ("password", password),
+ ("pmk", pmk),
+ ]
+
+ cmd += "".join(f" {name}={value}" for name, value in params if value is not None)
+
+ if qos_slots > 0 or qos_latency != 0xffff:
+ cmd += f" qos={qos_slots}:{qos_latency}"
+
+ return self.wpas.request(cmd)
+
+ def ndp_response(self, action, peer_nmi, ndi=None, peer_ndi=None,
+ ndp_id=None, init_ndi=None, reason_code=None, ssi=None,
+ qos_slots=0, qos_latency=0xffff, handle=None, csid=None,
+ password=None, pmk=None):
+ if action not in ["accept", "reject"]:
+ raise Exception(f"Invalid action: {action}. Must be 'accept' or 'reject'")
+
+ cmd = f"NAN_NDP_RESPONSE {action} peer_nmi={peer_nmi}"
+
+ params = [
+ ("reason_code", reason_code),
+ ("ndi", ndi),
+ ("peer_ndi", peer_ndi),
+ ("ndp_id", ndp_id),
+ ("init_ndi", init_ndi),
+ ("handle", handle),
+ ("ssi", ssi),
+ ("csid", csid),
+ ("password", password),
+ ("pmk", pmk),
+ ]
+
+ cmd += "".join(f" {name}={value}" for name, value in params if value is not None)
+
+ if qos_slots > 0 or qos_latency != 0xffff:
+ cmd += f" qos={qos_slots}:{qos_latency}"
+
+ return self.wpas.request(cmd)
+
+
+ def ndp_terminate(self, peer_nmi, init_ndi, ndp_id):
+ cmd = f"NAN_NDP_TERMINATE peer_nmi={peer_nmi} init_ndi={init_ndi} ndp_id={ndp_id}"
+ return self.wpas.request(cmd)
+
def subscribe(self, service_name, ssi=None, active=1,
sync=1, match_filter_rx=None, match_filter_tx=None,
srf_include=0, srf_mac_list=None, srf_bf_len=0,
@@ -197,6 +269,40 @@ def nan_sync_verify_event(ev, addr, pid, sid, ssi):
if data['address'] != addr:
raise Exception("Unexpected peer_addr: " + ev)
+def nan_ndp_verify_event(ev, peer_nmi, publish_inst_id=None, init_ndi=None,
+ ssi=None, csid=None):
+ """Verify NAN-NDP-REQUEST event format and content"""
+ data = split_nan_event(ev)
+
+ if 'peer_nmi' not in data:
+ raise Exception(f"Missing peer_nmi in NDP event: {ev}")
+
+ if 'init_ndi' not in data:
+ raise Exception(f"Missing init_ndi in NDP event: {ev}")
+
+ if 'ndp_id' not in data:
+ raise Exception(f"Missing ndp_id in NDP event: {ev}")
+
+ if publish_inst_id is not None and 'publish_inst_id' not in data:
+ raise Exception(f"Missing publish_inst_id in NDP event: {ev}")
+
+ if data['peer_nmi'] != peer_nmi:
+ raise Exception(f"Unexpected peer_nmi: got {data['peer_nmi']}, expected {peer_nmi} in event: {ev}")
+
+ if init_ndi is not None and data['init_ndi'] != init_ndi:
+ raise Exception(f"Unexpected init_ndi: got {data['init_ndi']}, expected {init_ndi} in event: {ev}")
+
+ if (publish_inst_id is not None and data['publish_inst_id'] != publish_inst_id):
+ raise Exception(f"Unexpected publish_inst_id: got {data['publish_inst_id']}, expected {publish_inst_id} in event: {ev}")
+
+ if ssi is not None and 'ssi' in data:
+ if data['ssi'] != ssi:
+ raise Exception(f"Unexpected ssi: got {data['ssi']}, expected {ssi} in event: {ev}")
+
+ if csid is not None and 'csid' in data:
+ if data['csid'] != str(csid):
+ raise Exception(f"Unexpected csid: got {data['csid']}, expected {str(csid)} in event: {ev}")
+
def nan_sync_discovery(pub, sub, service_name, pssi, sssi,
unsolicited=1, solicited=1, active=1,
expect_discovery=True,
@@ -834,3 +940,215 @@ def test_nan_config(dev, apdev, params):
# and finally update the configuration
logger.info("Updating NAN configuration")
nan.update_config()
+
+def test_nan_sched(dev, apdev, params):
+ """NAN configure schedule"""
+ set_country("US")
+ try:
+ with hwsim_nan_radios() as (wpas1, wpas2), NanDevice(wpas1, "nan0") as pub:
+ if "OK" not in pub.schedule_config((2437, "03000000"), (5180, "0000ff00"), (5825, "000000ff")):
+ raise Exception("Failed to configure schedule")
+ # Remove
+ if "OK" not in pub.remove_shedule():
+ raise Exception("Failed to remove schedule")
+ # Overlapping maps
+ if "FAIL" not in pub.schedule_config((2437, "03000000"), (5180, "0000ff00"), (5825, "050000ff")):
+ raise Exception("A schedule with overlapping time bitmaps was unexpectedly accepted")
+ # Same channel
+ if "FAIL" not in pub.schedule_config((2437, "03000000"), (2437, "0000ff00")):
+ raise Exception("A schedule with duplicate channel entries was unexpectedly accepted")
+ # Bad length
+ if "FAIL" not in pub.schedule_config((2437, "0300")):
+ raise Exception("Too short schedule bitmap accepted")
+ finally:
+ set_country("00")
+
+def _nan_discover_service(pub, sub, service_name, pssi, sssi):
+ paddr = pub.wpas.own_addr()
+ saddr = sub.wpas.own_addr()
+
+ pid = pub.publish(service_name, ssi=pssi)
+ sid = sub.subscribe(service_name, ssi=sssi, active=0)
+
+ logger.info(f"Publish ID: {pid}, Subscribe ID: {sid}")
+
+ ev = sub.wpas.wait_event(["NAN-DISCOVERY-RESULT"], timeout=2)
+ if ev is None:
+ raise Exception(f"NAN-DISCOVERY-RESULT event not seen for {service_name}")
+
+ nan_sync_verify_event(ev, paddr, pid, sid, pssi)
+
+ return pid, sid, paddr, saddr
+
+def _nan_ndp_request_and_accept(pub, sub, pid, sid, paddr, saddr, req_ssi, resp_ssi, csid=None,
+ password=None, pmk=None, counter=False, wrong_pwd=False, configure_schedule=True):
+ """
+ Request NDP from subscriber and accept on publisher.
+
+ Returns: (ndp_id, init_ndi) or None if wrong_pwd test completed
+ """
+ # Configure schedule on subscriber if needed
+ if configure_schedule:
+ if "OK" not in sub.schedule_config((2437, "0e000000"), (5180, "f0ffffff")):
+ raise Exception("Failed to configure schedule (sub)")
+
+ # NDP request
+ if "OK" not in sub.ndp_request(sub.ndi_name, sid, paddr, pid, req_ssi,
+ csid=csid, password=password, pmk=pmk):
+ raise Exception("NDP request failed")
+
+ ev = pub.wpas.wait_event(["NAN-NDP-REQUEST"], timeout=5)
+ if ev is None:
+ raise Exception("NAN-NDP-REQUEST event not seen")
+
+ ndi_sub = sub.dev.get_iface_addr(sub.ndi_name)
+ nan_ndp_verify_event(ev, saddr, pid, ndi_sub, req_ssi)
+
+ data = split_nan_event(ev)
+ ndp_id = data['ndp_id']
+ init_ndi = data['init_ndi']
+
+ # Configure schedule on publisher
+ if configure_schedule:
+ if counter:
+ if "OK" not in pub.schedule_config((5745, "feffffff")):
+ raise Exception("Failed to configure schedule (pub)")
+ else:
+ if "OK" not in pub.schedule_config((2437, "0e000000"), (5180, "f0ffffff")):
+ raise Exception("Failed to configure schedule (pub)")
+
+ # Accept NDP request
+ accept_pwd = "WRONG_PWD" if wrong_pwd else password
+ if "OK" not in pub.ndp_response("accept", saddr, ndi=pub.ndi_name, ndp_id=ndp_id, init_ndi=init_ndi,
+ handle=pid, ssi=resp_ssi, csid=csid, password=accept_pwd, pmk=pmk):
+ raise Exception("NDP response (accept) failed")
+
+ # Verify disconnection on wrong password
+ if wrong_pwd:
+ ev = sub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+ if ev is None or "reason=3" not in ev:
+ raise Exception("NAN-NDP-DISCONNECTED event not seen on subscriber")
+ ev = pub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+ if ev is None or "reason=3" not in ev:
+ raise Exception("NAN-NDP-DISCONNECTED event not seen on publisher")
+ return None
+
+ # Handle counter proposal
+ if counter:
+ ev = sub.wpas.wait_event(["NAN-NDP-COUNTER-REQUEST"], timeout=5)
+ if ev is None:
+ raise Exception("NAN-NDP-COUNTER-REQUEST event not seen")
+
+ nan_ndp_verify_event(ev, paddr, init_ndi=ndi_sub, ssi=resp_ssi)
+
+ data = split_nan_event(ev)
+ ndp_id = data['ndp_id']
+ init_ndi = data['init_ndi']
+
+ if "OK" not in sub.schedule_config((5745, "feffffff")):
+ raise Exception("Failed to configure schedule (sub)")
+
+ if "OK" not in sub.ndp_response("accept", paddr, ndi=sub.ndi_name, ndp_id=ndp_id, handle=sid,
+ init_ndi=init_ndi, ssi="11223344", csid=csid, password=password, pmk=pmk):
+ raise Exception("NDP response (confirm) failed")
+
+ # Wait for NDP connected events
+ ev = pub.wpas.wait_event(["NAN-NDP-CONNECTED"], timeout=5)
+ if ev is None:
+ raise Exception("NAN-NDP-CONNECTED event not seen on publisher")
+
+ ev = sub.wpas.wait_event(["NAN-NDP-CONNECTED"], timeout=5)
+ if ev is None:
+ raise Exception("NAN-NDP-CONNECTED event not seen on subscriber")
+
+ logger.info("NDP connection established successfully")
+
+ return ndp_id, init_ndi
+
+def _nan_ndp_terminate(pub, sub, paddr, init_ndi, ndp_id):
+ """Terminate an NDP and wait for disconnect events on both sides."""
+
+ sub.ndp_terminate(paddr, init_ndi, ndp_id)
+
+ ev = pub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+ if ev is None:
+ raise Exception(f"NAN-NDP-DISCONNECTED event not seen on publisher")
+
+ ev = sub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+ if ev is None:
+ raise Exception(f"NAN-NDP-DISCONNECTED event not seen on subscriber")
+
+def _nan_test_connectivity(pub, sub):
+ """Test IP connectivity between publisher and subscriber NDI interfaces."""
+ wpas_ndi_pub = WpaSupplicant(ifname=pub.ndi_name)
+ wpas_ndi_sub = WpaSupplicant(ifname=sub.ndi_name)
+ test_connectivity(wpas_ndi_pub, wpas_ndi_sub, tos=0, ifname1=pub.ndi_name, ifname2=sub.ndi_name,
+ max_tries=3, timeout=5, broadcast=False)
+
+def _run_nan_dp(counter=False, csid=None, wrong_pwd=False, use_pmk=False):
+ if use_pmk:
+ pmk = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
+ pwd = None
+ else:
+ pwd = "NAN" if csid is not None else None
+ pmk = None
+
+ with hwsim_nan_radios() as (wpas1, wpas2), \
+ NanDevice(wpas1, "nan0", "ndi0") as pub, NanDevice(wpas2, "nan1", "ndi1") as sub:
+
+ pssi = "aabbccdd001122334455667788"
+ sssi = "ddbbccaa001122334455667788"
+
+ pid, sid, paddr, saddr= _nan_discover_service(pub, sub, "test_service", pssi, sssi)
+
+ # Log peer info (specific to this test)
+ peer_schedule = pub.wpas.request("NAN_PEER_INFO " + saddr + " schedule")
+ logger.info("\n" + peer_schedule)
+ potential = pub.wpas.request("NAN_PEER_INFO " + saddr + " potential")
+ logger.info("\n" + potential)
+ capa = pub.wpas.request("NAN_PEER_INFO " + saddr + " capa")
+ logger.info("\n" + capa)
+
+ result = _nan_ndp_request_and_accept(pub, sub, pid, sid, paddr, saddr, req_ssi="aabbcc",
+ resp_ssi="ddeeff", csid=csid, password=pwd, pmk=pmk,
+ counter=counter, wrong_pwd=wrong_pwd)
+
+ if result is None:
+ # wrong_pwd test completed
+ return
+
+ ndp_id, init_ndi = result
+
+ _nan_test_connectivity(pub, sub)
+ _nan_ndp_terminate(pub, sub, paddr, init_ndi, ndp_id)
+
+def run_nan_dp(country="US", counter=False, csid=None, wrong_pwd=False, use_pmk=False):
+ set_country(country)
+ try:
+ _run_nan_dp(counter=counter, csid=csid, wrong_pwd=wrong_pwd, use_pmk=use_pmk)
+ finally:
+ set_country("00")
+
+def test_nan_dp_open(dev, apdev, params):
+ """NAN DP open"""
+ run_nan_dp()
+
+def test_nan_dp_open_counter(dev, apdev, params):
+ """NAN DP open with counter proposal"""
+ run_nan_dp(counter=True)
+
+def test_nan_dp_sk_ccmp128(dev, apdev, params):
+ """NAN DP - 2way NDL + SK CCMP security"""
+ run_nan_dp(csid=1)
+
+def test_nan_dp_sk_gcmp256(dev, apdev, params):
+ """NAN DP - 3way NDL + SK GCMP-256 security"""
+ run_nan_dp(counter=True, csid=2)
+
+def test_nan_dp_wrong_pwd(dev, apdev, params):
+ """NAN DP - Wrong password"""
+ run_nan_dp(csid=1, wrong_pwd=True)
+
+def test_nan_dp_pmk(dev, apdev, params):
+ """NAN DP - 3way NDL + SK CCMP security with PMK"""
+ run_nan_dp(counter=True, csid=1, use_pmk=True)
--
2.53.0
More information about the Hostap
mailing list