[RFC 58/71] tests: Add NAN data path tests

Andrei Otcheretianski andrei.otcheretianski at intel.com
Wed Apr 1 15:02:07 PDT 2026


Add several NDP establishment tests for open and encrypted connections,
including counter NDL proposal.
In addition add local schedule configuration test.

Signed-off-by: Andrei Otcheretianski <andrei.otcheretianski at intel.com>
---
 tests/hwsim/test_nan.py | 320 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 319 insertions(+), 1 deletion(-)

diff --git a/tests/hwsim/test_nan.py b/tests/hwsim/test_nan.py
index f7a212f7f8..24e10608ab 100644
--- a/tests/hwsim/test_nan.py
+++ b/tests/hwsim/test_nan.py
@@ -9,8 +9,10 @@ import logging
 logger = logging.getLogger()
 from utils import *
 import string
+from hwsim_utils import test_connectivity
 from hwsim import HWSimRadio
 from contextlib import contextmanager, ExitStack
+from test_p2p_channel import set_country
 
 @contextmanager
 def hwsim_nan_radios(count=2, n_channels=3):
@@ -41,10 +43,11 @@ def check_nan_capab(dev):
         raise HwsimSkip(f"NAN not supported: {capa}")
 
 class NanDevice:
-    def __init__(self, dev, ifname):
+    def __init__(self, dev, ifname, ndi_name=None):
         self.dev = dev
         self.ifname = ifname
         self.wpas = None
+        self.ndi_name = ndi_name
 
     def __enter__(self):
         self.start()
@@ -71,12 +74,19 @@ class NanDevice:
 
         logger.info(f"NAN device started on {self.ifname}")
 
+        # Add NDI
+        if self.ndi_name is not None:
+            self.dev.interface_add(self.ndi_name, if_type="nan_data", create=True)
+
     def stop(self):
         logger.info(f"NAN device stopping on {self.ifname}")
 
         if "OK" not in self.wpas.request("NAN_STOP"):
             raise Exception(f"Failed to stop NAN functionality on {self.ifname}")
 
+        if self.ndi_name is not None:
+            self.dev.interface_remove(self.ndi_name)
+
         self.dev.global_request(f"INTERFACE_REMOVE {self.ifname}")
         self.wpas.remove_ifname()
 
@@ -105,6 +115,68 @@ class NanDevice:
 
         return self.wpas.request(cmd)
 
+    def schedule_config(self, *chans, map_id=1):
+        cmd = f"NAN_SCHED_CONFIG_MAP map_id={map_id} "
+        cmd += " ".join([f"{freq}:{bitmap}" for freq, bitmap in chans])
+        return self.wpas.request(cmd)
+
+    def remove_shedule(self, map_id=1):
+        cmd = f"NAN_SCHED_CONFIG_MAP map_id={map_id}"
+        return self.wpas.request(cmd)
+
+    def ndp_request(self, ndi, handle, peer_nmi, peer_id, ssi=None,
+                    qos_slots=0, qos_latency=0xffff, csid=None, password=None,
+                    pmk=None):
+        cmd = f"NAN_NDP_REQUEST handle={handle} ndi={ndi} peer_nmi={peer_nmi} peer_id={peer_id}"
+
+        params = [
+            ("ssi", ssi),
+            ("csid", csid),
+            ("password", password),
+            ("pmk", pmk),
+        ]
+
+        cmd += "".join(f" {name}={value}" for name, value in params if value is not None)
+
+        if qos_slots > 0 or qos_latency != 0xffff:
+            cmd += f" qos={qos_slots}:{qos_latency}"
+
+        return self.wpas.request(cmd)
+
+    def ndp_response(self, action, peer_nmi, ndi=None, peer_ndi=None,
+                     ndp_id=None, init_ndi=None, reason_code=None, ssi=None,
+                     qos_slots=0, qos_latency=0xffff, handle=None, csid=None,
+                     password=None, pmk=None):
+        if action not in ["accept", "reject"]:
+            raise Exception(f"Invalid action: {action}. Must be 'accept' or 'reject'")
+
+        cmd = f"NAN_NDP_RESPONSE {action} peer_nmi={peer_nmi}"
+
+        params = [
+            ("reason_code", reason_code),
+            ("ndi", ndi),
+            ("peer_ndi", peer_ndi),
+            ("ndp_id", ndp_id),
+            ("init_ndi", init_ndi),
+            ("handle", handle),
+            ("ssi", ssi),
+            ("csid", csid),
+            ("password", password),
+            ("pmk", pmk),
+        ]
+
+        cmd += "".join(f" {name}={value}" for name, value in params if value is not None)
+
+        if qos_slots > 0 or qos_latency != 0xffff:
+            cmd += f" qos={qos_slots}:{qos_latency}"
+
+        return self.wpas.request(cmd)
+
+
+    def ndp_terminate(self, peer_nmi, init_ndi, ndp_id):
+        cmd = f"NAN_NDP_TERMINATE peer_nmi={peer_nmi} init_ndi={init_ndi} ndp_id={ndp_id}"
+        return self.wpas.request(cmd)
+
     def subscribe(self, service_name, ssi=None, active=1,
                   sync=1, match_filter_rx=None, match_filter_tx=None,
                   srf_include=0, srf_mac_list=None, srf_bf_len=0,
@@ -197,6 +269,40 @@ def nan_sync_verify_event(ev, addr, pid, sid, ssi):
     if data['address'] != addr:
         raise Exception("Unexpected peer_addr: " + ev)
 
+def nan_ndp_verify_event(ev, peer_nmi, publish_inst_id=None, init_ndi=None,
+                         ssi=None, csid=None):
+    """Verify NAN-NDP-REQUEST event format and content"""
+    data = split_nan_event(ev)
+
+    if 'peer_nmi' not in data:
+        raise Exception(f"Missing peer_nmi in NDP event: {ev}")
+
+    if 'init_ndi' not in data:
+        raise Exception(f"Missing init_ndi in NDP event: {ev}")
+
+    if 'ndp_id' not in data:
+        raise Exception(f"Missing ndp_id in NDP event: {ev}")
+
+    if publish_inst_id is not None and 'publish_inst_id' not in data:
+        raise Exception(f"Missing publish_inst_id in NDP event: {ev}")
+
+    if data['peer_nmi'] != peer_nmi:
+        raise Exception(f"Unexpected peer_nmi: got {data['peer_nmi']}, expected {peer_nmi} in event: {ev}")
+
+    if init_ndi is not None and data['init_ndi'] != init_ndi:
+        raise Exception(f"Unexpected init_ndi: got {data['init_ndi']}, expected {init_ndi} in event: {ev}")
+
+    if (publish_inst_id is not None and data['publish_inst_id'] != publish_inst_id):
+        raise Exception(f"Unexpected publish_inst_id: got {data['publish_inst_id']}, expected {publish_inst_id} in event: {ev}")
+
+    if ssi is not None and 'ssi' in data:
+        if data['ssi'] != ssi:
+            raise Exception(f"Unexpected ssi: got {data['ssi']}, expected {ssi} in event: {ev}")
+
+    if csid is not None and 'csid' in data:
+        if data['csid'] != str(csid):
+            raise Exception(f"Unexpected csid: got {data['csid']}, expected {str(csid)} in event: {ev}")
+
 def nan_sync_discovery(pub, sub, service_name, pssi, sssi,
                        unsolicited=1, solicited=1, active=1,
                        expect_discovery=True,
@@ -834,3 +940,215 @@ def test_nan_config(dev, apdev, params):
         # and finally update the configuration
         logger.info("Updating NAN configuration")
         nan.update_config()
+
+def test_nan_sched(dev, apdev, params):
+    """NAN configure schedule"""
+    set_country("US")
+    try:
+        with hwsim_nan_radios() as (wpas1, wpas2), NanDevice(wpas1, "nan0") as pub:
+            if "OK" not in pub.schedule_config((2437, "03000000"), (5180, "0000ff00"), (5825, "000000ff")):
+                raise Exception("Failed to configure schedule")
+            # Remove
+            if "OK" not in pub.remove_shedule():
+                raise Exception("Failed to remove schedule")
+            # Overlapping maps
+            if "FAIL" not in pub.schedule_config((2437, "03000000"), (5180, "0000ff00"), (5825, "050000ff")):
+                raise Exception("A schedule with overlapping time bitmaps was unexpectedly accepted")
+            # Same channel
+            if "FAIL" not in pub.schedule_config((2437, "03000000"), (2437, "0000ff00")):
+                raise Exception("A schedule with duplicate channel entries was unexpectedly accepted")
+            # Bad length
+            if "FAIL" not in pub.schedule_config((2437, "0300")):
+                raise Exception("Too short schedule bitmap accepted")
+    finally:
+        set_country("00")
+
+def _nan_discover_service(pub, sub, service_name, pssi, sssi):
+    paddr = pub.wpas.own_addr()
+    saddr = sub.wpas.own_addr()
+
+    pid = pub.publish(service_name, ssi=pssi)
+    sid = sub.subscribe(service_name, ssi=sssi, active=0)
+
+    logger.info(f"Publish ID: {pid}, Subscribe ID: {sid}")
+
+    ev = sub.wpas.wait_event(["NAN-DISCOVERY-RESULT"], timeout=2)
+    if ev is None:
+        raise Exception(f"NAN-DISCOVERY-RESULT event not seen for {service_name}")
+
+    nan_sync_verify_event(ev, paddr, pid, sid, pssi)
+
+    return pid, sid, paddr, saddr
+
+def _nan_ndp_request_and_accept(pub, sub, pid, sid, paddr, saddr, req_ssi, resp_ssi, csid=None,
+                                password=None, pmk=None, counter=False, wrong_pwd=False, configure_schedule=True):
+    """
+    Request NDP from subscriber and accept on publisher.
+
+    Returns: (ndp_id, init_ndi) or None if wrong_pwd test completed
+    """
+    # Configure schedule on subscriber if needed
+    if configure_schedule:
+        if "OK" not in sub.schedule_config((2437, "0e000000"), (5180, "f0ffffff")):
+            raise Exception("Failed to configure schedule (sub)")
+
+    # NDP request
+    if "OK" not in sub.ndp_request(sub.ndi_name, sid, paddr, pid, req_ssi,
+                                   csid=csid, password=password, pmk=pmk):
+        raise Exception("NDP request failed")
+
+    ev = pub.wpas.wait_event(["NAN-NDP-REQUEST"], timeout=5)
+    if ev is None:
+        raise Exception("NAN-NDP-REQUEST event not seen")
+
+    ndi_sub = sub.dev.get_iface_addr(sub.ndi_name)
+    nan_ndp_verify_event(ev, saddr, pid, ndi_sub, req_ssi)
+
+    data = split_nan_event(ev)
+    ndp_id = data['ndp_id']
+    init_ndi = data['init_ndi']
+
+    # Configure schedule on publisher
+    if configure_schedule:
+        if counter:
+            if "OK" not in pub.schedule_config((5745, "feffffff")):
+                raise Exception("Failed to configure schedule (pub)")
+        else:
+            if "OK" not in pub.schedule_config((2437, "0e000000"), (5180, "f0ffffff")):
+                raise Exception("Failed to configure schedule (pub)")
+
+    # Accept NDP request
+    accept_pwd = "WRONG_PWD" if wrong_pwd else password
+    if "OK" not in pub.ndp_response("accept", saddr, ndi=pub.ndi_name, ndp_id=ndp_id, init_ndi=init_ndi,
+                                    handle=pid, ssi=resp_ssi, csid=csid, password=accept_pwd, pmk=pmk):
+        raise Exception("NDP response (accept) failed")
+
+    # Verify disconnection on wrong password
+    if wrong_pwd:
+        ev = sub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+        if ev is None or "reason=3" not in ev:
+            raise Exception("NAN-NDP-DISCONNECTED event not seen on subscriber")
+        ev = pub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+        if ev is None or "reason=3" not in ev:
+            raise Exception("NAN-NDP-DISCONNECTED event not seen on publisher")
+        return None
+
+    # Handle counter proposal
+    if counter:
+        ev = sub.wpas.wait_event(["NAN-NDP-COUNTER-REQUEST"], timeout=5)
+        if ev is None:
+            raise Exception("NAN-NDP-COUNTER-REQUEST event not seen")
+
+        nan_ndp_verify_event(ev, paddr, init_ndi=ndi_sub, ssi=resp_ssi)
+
+        data = split_nan_event(ev)
+        ndp_id = data['ndp_id']
+        init_ndi = data['init_ndi']
+
+        if "OK" not in sub.schedule_config((5745, "feffffff")):
+            raise Exception("Failed to configure schedule (sub)")
+
+        if "OK" not in sub.ndp_response("accept", paddr, ndi=sub.ndi_name, ndp_id=ndp_id, handle=sid,
+                                        init_ndi=init_ndi, ssi="11223344", csid=csid, password=password, pmk=pmk):
+            raise Exception("NDP response (confirm) failed")
+
+    # Wait for NDP connected events
+    ev = pub.wpas.wait_event(["NAN-NDP-CONNECTED"], timeout=5)
+    if ev is None:
+        raise Exception("NAN-NDP-CONNECTED event not seen on publisher")
+
+    ev = sub.wpas.wait_event(["NAN-NDP-CONNECTED"], timeout=5)
+    if ev is None:
+        raise Exception("NAN-NDP-CONNECTED event not seen on subscriber")
+
+    logger.info("NDP connection established successfully")
+
+    return ndp_id, init_ndi
+
+def _nan_ndp_terminate(pub, sub, paddr, init_ndi, ndp_id):
+    """Terminate an NDP and wait for disconnect events on both sides."""
+
+    sub.ndp_terminate(paddr, init_ndi, ndp_id)
+
+    ev = pub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+    if ev is None:
+        raise Exception(f"NAN-NDP-DISCONNECTED event not seen on publisher")
+
+    ev = sub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+    if ev is None:
+        raise Exception(f"NAN-NDP-DISCONNECTED event not seen on subscriber")
+
+def _nan_test_connectivity(pub, sub):
+    """Test IP connectivity between publisher and subscriber NDI interfaces."""
+    wpas_ndi_pub = WpaSupplicant(ifname=pub.ndi_name)
+    wpas_ndi_sub = WpaSupplicant(ifname=sub.ndi_name)
+    test_connectivity(wpas_ndi_pub, wpas_ndi_sub, tos=0, ifname1=pub.ndi_name, ifname2=sub.ndi_name,
+                      max_tries=3, timeout=5, broadcast=False)
+
+def _run_nan_dp(counter=False, csid=None, wrong_pwd=False, use_pmk=False):
+    if use_pmk:
+        pmk = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
+        pwd = None
+    else:
+        pwd = "NAN" if csid is not None else None
+        pmk = None
+
+    with hwsim_nan_radios() as (wpas1, wpas2), \
+        NanDevice(wpas1, "nan0", "ndi0") as pub, NanDevice(wpas2, "nan1", "ndi1") as sub:
+
+        pssi = "aabbccdd001122334455667788"
+        sssi = "ddbbccaa001122334455667788"
+
+        pid, sid, paddr, saddr= _nan_discover_service(pub, sub, "test_service", pssi, sssi)
+
+        # Log peer info (specific to this test)
+        peer_schedule = pub.wpas.request("NAN_PEER_INFO " + saddr + " schedule")
+        logger.info("\n" + peer_schedule)
+        potential = pub.wpas.request("NAN_PEER_INFO " + saddr + " potential")
+        logger.info("\n" + potential)
+        capa = pub.wpas.request("NAN_PEER_INFO " + saddr + " capa")
+        logger.info("\n" + capa)
+
+        result = _nan_ndp_request_and_accept(pub, sub, pid, sid, paddr, saddr, req_ssi="aabbcc",
+                                             resp_ssi="ddeeff", csid=csid, password=pwd, pmk=pmk,
+                                             counter=counter, wrong_pwd=wrong_pwd)
+
+        if result is None:
+            # wrong_pwd test completed
+            return
+
+        ndp_id, init_ndi = result
+
+        _nan_test_connectivity(pub, sub)
+        _nan_ndp_terminate(pub, sub, paddr, init_ndi, ndp_id)
+
+def run_nan_dp(country="US", counter=False, csid=None, wrong_pwd=False, use_pmk=False):
+    set_country(country)
+    try:
+        _run_nan_dp(counter=counter, csid=csid, wrong_pwd=wrong_pwd, use_pmk=use_pmk)
+    finally:
+        set_country("00")
+
+def test_nan_dp_open(dev, apdev, params):
+    """NAN DP open"""
+    run_nan_dp()
+
+def test_nan_dp_open_counter(dev, apdev, params):
+    """NAN DP open with counter proposal"""
+    run_nan_dp(counter=True)
+
+def test_nan_dp_sk_ccmp128(dev, apdev, params):
+    """NAN DP - 2way NDL + SK CCMP security"""
+    run_nan_dp(csid=1)
+
+def test_nan_dp_sk_gcmp256(dev, apdev, params):
+    """NAN DP - 3way NDL + SK GCMP-256 security"""
+    run_nan_dp(counter=True, csid=2)
+
+def test_nan_dp_wrong_pwd(dev, apdev, params):
+    """NAN DP - Wrong password"""
+    run_nan_dp(csid=1, wrong_pwd=True)
+
+def test_nan_dp_pmk(dev, apdev, params):
+    """NAN DP - 3way NDL + SK CCMP security with PMK"""
+    run_nan_dp(counter=True, csid=1, use_pmk=True)
-- 
2.53.0




More information about the Hostap mailing list