[RFC v2 99/99] tests: Add NAN data path tests

Andrei Otcheretianski andrei.otcheretianski at intel.com
Tue Dec 23 03:52:43 PST 2025


Add several NDP establishment tests for open and encrypted connections,
including counter NDL proposal.
In addition add local schedule configuration test.

Signed-off-by: Andrei Otcheretianski <andrei.otcheretianski at intel.com>
---
 tests/hwsim/test_nan.py | 308 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 307 insertions(+), 1 deletion(-)

diff --git a/tests/hwsim/test_nan.py b/tests/hwsim/test_nan.py
index b1633737be..2eb36c57d0 100644
--- a/tests/hwsim/test_nan.py
+++ b/tests/hwsim/test_nan.py
@@ -9,6 +9,8 @@ import logging
 logger = logging.getLogger()
 from utils import *
 import string
+from test_p2p_channel import set_country
+from hwsim import HWSimRadio
 
 def check_nan_capab(dev):
     capa = dev.request("GET_CAPABILITY nan")
@@ -18,10 +20,11 @@ def check_nan_capab(dev):
         raise HwsimSkip(f"NAN not supported: {capa}")
 
 class NanDevice:
-    def __init__(self, dev, ifname):
+    def __init__(self, dev, ifname, ndi_name=None):
         self.dev = dev
         self.ifname = ifname
         self.wpas = None
+        self.ndi_name = ndi_name
 
     def __enter__(self):
         self.start()
@@ -48,12 +51,19 @@ class NanDevice:
 
         logger.info(f"NAN device started on {self.ifname}")
 
+        # Add NDI
+        if self.ndi_name is not None:
+            self.dev.interface_add(self.ndi_name, if_type="nan_data", create=True)
+
     def stop(self):
         logger.info(f"NAN device stopping on {self.ifname}")
 
         if "OK" not in self.wpas.request("NAN_STOP"):
             raise Exception(f"Failed to stop NAN functionality on {self.ifname}")
 
+        if self.ndi_name is not None:
+            self.dev.interface_remove(self.ndi_name)
+
         self.dev.global_request(f"INTERFACE_REMOVE {self.ifname}")
         self.wpas.remove_ifname()
 
@@ -82,6 +92,68 @@ class NanDevice:
 
         return self.wpas.request(cmd)
 
+    def schedule_config(self, *chans, map_id=1):
+        cmd = f"NAN_SCHED_CONFIG_MAP map_id={map_id} "
+        cmd += " ".join([f"{freq}:{bitmap}" for freq, bitmap in chans])
+        return self.wpas.request(cmd)
+
+    def remove_shedule(self, map_id=1):
+        cmd = f"NAN_SCHED_CONFIG_MAP map_id={map_id}"
+        return self.wpas.request(cmd)
+
+    def ndp_request(self, ndi, handle, peer_nmi, peer_id, ssi=None,
+                    qos_slots=0, qos_latency=0xffff, csid=None, password=None,
+                    pmk=None):
+        cmd = f"NAN_NDP_REQUEST handle={handle} ndi={ndi} peer_nmi={peer_nmi} peer_id={peer_id}"
+
+        params = [
+            ("ssi", ssi),
+            ("csid", csid),
+            ("password", password),
+            ("pmk", pmk),
+        ]
+
+        cmd += "".join(f" {name}={value}" for name, value in params if value is not None)
+
+        if qos_slots > 0 or qos_latency != 0xffff:
+            cmd += f" qos={qos_slots}:{qos_latency}"
+
+        return self.wpas.request(cmd)
+
+    def ndp_response(self, action, peer_nmi, ndi=None, peer_ndi=None,
+                     ndp_id=None, init_ndi=None, reason_code=None, ssi=None,
+                     qos_slots=0, qos_latency=0xffff, handle=None, csid=None,
+                     password=None, pmk=None):
+        if action not in ["accept", "reject"]:
+            raise Exception(f"Invalid action: {action}. Must be 'accept' or 'reject'")
+
+        cmd = f"NAN_NDP_RESPONSE {action} peer_nmi={peer_nmi}"
+
+        params = [
+            ("reason_code", reason_code),
+            ("ndi", ndi),
+            ("peer_ndi", peer_ndi),
+            ("ndp_id", ndp_id),
+            ("init_ndi", init_ndi),
+            ("handle", handle),
+            ("ssi", ssi),
+            ("csid", csid),
+            ("password", password),
+            ("pmk", pmk),
+        ]
+
+        cmd += "".join(f" {name}={value}" for name, value in params if value is not None)
+
+        if qos_slots > 0 or qos_latency != 0xffff:
+            cmd += f" qos={qos_slots}:{qos_latency}"
+
+        return self.wpas.request(cmd)
+
+
+    def ndp_terminate(self, peer_nmi, init_ndi, ndp_id):
+        cmd = f"NAN_NDP_TERMINATE peer_nmi={peer_nmi} init_ndi={init_ndi} ndp_id={ndp_id}"
+        return self.wpas.request(cmd)
+
     def subscribe(self, service_name, ssi=None, active=1,
                   sync=1, match_filter_rx=None, match_filter_tx=None,
                   srf_include=0, srf_mac_list=None, srf_bf_len=0,
@@ -174,6 +246,40 @@ def nan_sync_verify_event(ev, addr, pid, sid, ssi):
     if data['address'] != addr:
         raise Exception("Unexpected peer_addr: " + ev)
 
+def nan_ndp_verify_event(ev, peer_nmi, publish_inst_id=None, init_ndi=None,
+                         ssi=None, csid=None):
+    """Verify NAN-NDP-REQUEST event format and content"""
+    data = split_nan_event(ev)
+
+    if 'peer_nmi' not in data:
+        raise Exception(f"Missing peer_nmi in NDP event: {ev}")
+
+    if 'init_ndi' not in data:
+        raise Exception(f"Missing init_ndi in NDP event: {ev}")
+
+    if 'ndp_id' not in data:
+        raise Exception(f"Missing ndp_id in NDP event: {ev}")
+
+    if publish_inst_id is not None and 'publish_inst_id' not in data:
+        raise Exception(f"Missing publish_inst_id in NDP event: {ev}")
+
+    if data['peer_nmi'] != peer_nmi:
+        raise Exception(f"Unexpected peer_nmi: got {data['peer_nmi']}, expected {peer_nmi} in event: {ev}")
+
+    if init_ndi is not None and data['init_ndi'] != init_ndi:
+        raise Exception(f"Unexpected init_ndi: got {data['init_ndi']}, expected {init_ndi} in event: {ev}")
+
+    if (publish_inst_id is not None and data['publish_inst_id'] != publish_inst_id):
+        raise Exception(f"Unexpected publish_inst_id: got {data['publish_inst_id']}, expected {publish_inst_id} in event: {ev}")
+
+    if ssi is not None and 'ssi' in data:
+        if data['ssi'] != ssi:
+            raise Exception(f"Unexpected ssi: got {data['ssi']}, expected {ssi} in event: {ev}")
+
+    if csid is not None and 'csid' in data:
+        if data['csid'] != str(csid):
+            raise Exception(f"Unexpected csid: got {data['csid']}, expected {str(csid)} in event: {ev}")
+
 def nan_sync_discovery(pub, sub, service_name, pssi, sssi,
                        unsolicited=1, solicited=1, active=1,
                        expect_discovery=True,
@@ -774,3 +880,203 @@ def test_nan_config(dev, apdev, params):
         # and finally update the configuration
         logger.info("Updating NAN configuration")
         nan.update_config()
+
+def test_nan_sched(dev, apdev, params):
+    """NAN configure schedule"""
+    set_country("US")
+    try:
+        with HWSimRadio(n_channels=3, use_nan=True) as (wpas_radio1, wpas_iface1):
+            wpas1 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+            wpas1.interface_add(wpas_iface1)
+            with NanDevice(wpas1, "nan5") as pub:
+                if "OK" not in pub.schedule_config((2437, "03000000"), (5180, "0000ff00"), (5825, "000000ff")):
+                    raise Exception("Failed to configure schedule")
+                # Remove
+                if "OK" not in pub.remove_shedule():
+                    raise Exception("Failed to remove schedule")
+                # Overlapping maps
+                if "FAIL" not in pub.schedule_config((2437, "03000000"), (5180, "0000ff00"), (5825, "050000ff")):
+                    raise Exception("A schedule with overlapping time bitmaps was unexpectedly accepted")
+                # Same channel
+                if "FAIL" not in pub.schedule_config((2437, "03000000"), (2437, "0000ff00")):
+                    raise Exception("A schedule with duplicate channel entries was unexpectedly accepted")
+                # Bad length
+                if "FAIL" not in pub.schedule_config((2437, "0300")):
+                    raise Exception("Too short schedule bitmap accepted")
+    finally:
+        set_country("00")
+
+def _run_nan_dp(dev, counter=False, csid=None, wrong_pwd=False, use_pmk=False):
+    if use_pmk:
+        pmk = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
+        pwd = None
+    else:
+        pwd = "NAN" if csid is not None else None
+        pmk = None
+
+    with HWSimRadio(n_channels=3, use_nan=True) as (wpas_radio1, wpas_iface1), \
+        HWSimRadio(n_channels=3, use_nan=True) as (wpas_radio2, wpas_iface2):
+        wpas1 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+        wpas1.interface_add(wpas_iface1)
+
+        wpas2 = WpaSupplicant(global_iface='/tmp/wpas-wlan6')
+        wpas2.interface_add(wpas_iface2)
+
+        with NanDevice(wpas1, "nan0", "ndi0") as pub, \
+             NanDevice(wpas2, "nan1", "ndi1") as sub:
+            paddr = pub.wpas.own_addr()
+            saddr = sub.wpas.own_addr()
+
+            pssi = "aabbccdd001122334455667788"
+            sssi = "ddbbccaa001122334455667788"
+
+            pid = pub.publish("test_service", ssi=pssi)
+            sid = sub.subscribe("test_service", ssi=sssi, active=0)
+
+            logger.info(f"Publish ID: {pid}, Subscribe ID: {sid}")
+
+            ev = sub.wpas.wait_event(["NAN-DISCOVERY-RESULT"], timeout=2)
+            if ev is None:
+                raise Exception("NAN-DISCOVERY-RESULT event not seen")
+
+            nan_sync_verify_event(ev, paddr, pid, sid, pssi)
+            ndi_sub = sub.dev.get_iface_addr(sub.ndi_name)
+            if "OK" not in sub.schedule_config((2437, "0e000000"), (5180, "f0ffffff")):
+                raise Exception("Failed to configure schedule (sub)")
+
+            if "OK" not in sub.ndp_request(sub.ndi_name, sid, paddr, pid, "aabbcc", csid=csid,
+                                           password=pwd, pmk=pmk):
+                raise Exception("NDP request failed")
+
+            ev = pub.wpas.wait_event(["NAN-NDP-REQUEST"], timeout=5)
+            if ev is None:
+                raise Exception("NAN-NDP-REQUEST event not seen")
+
+            # Validate the NDP-REQUEST event
+            nan_ndp_verify_event(ev, saddr, pid, ndi_sub, "aabbcc")
+
+            peer_schedule = pub.wpas.request("NAN_PEER_INFO " + saddr + " schedule")
+            logger.info("\n" + peer_schedule)
+            potential = pub.wpas.request("NAN_PEER_INFO " + saddr + " potential")
+            logger.info("\n" + potential)
+            capa = pub.wpas.request("NAN_PEER_INFO " + saddr + " capa")
+            logger.info("\n" + capa)
+            # Extract NDP information from the event
+            data = split_nan_event(ev)
+            ndp_id = data['ndp_id']
+            init_ndi = data['init_ndi']
+
+            if counter:
+                if "OK" not in pub.schedule_config((5745, "feffffff")):
+                    raise Exception("Failed to configure schedule (pub)")
+            else:
+                if "OK" not in pub.schedule_config((2437, "0e000000"), (5180, "f0ffffff")):
+                    raise Exception("Failed to configure schedule (pub)")
+
+            # Accept the NDP request
+            if wrong_pwd:
+                pwd = "WRONG_PWD"
+            if "OK" not in pub.ndp_response("accept",
+                                             saddr,
+                                             ndi=pub.ndi_name,
+                                             ndp_id=ndp_id,
+                                             init_ndi=init_ndi,
+                                             handle=pid,
+                                             ssi="ddeeff",
+                                             csid=csid,
+                                             password=pwd,
+                                             pmk=pmk):
+                raise Exception("NDP response (accept) failed")
+
+            # Verify disconnection on wrong password and stop the test
+            if wrong_pwd:
+                ev = sub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+                if ev is None or "reason=3" not in ev:
+                    raise Exception("NAN-NDP-DISCONNECTED event not seen on subscriber")
+                ev = pub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+                if ev is None or "reason=3" not in ev:
+                    raise Exception("NAN-NDP-DISCONNECTED event not seen on subscriber")
+
+                return
+
+            if counter:
+                ev = sub.wpas.wait_event(["NAN-NDP-COUNTER-REQUEST"], timeout=5)
+                if ev is None:
+                    raise Exception("NAN-NDP-COUNTER-REQUEST event not seen")
+
+                # Validate the NDP-COUNTER-REQUEST event
+                nan_ndp_verify_event(ev, paddr, init_ndi=ndi_sub, ssi="ddeeff")
+
+                # Extract NDP information from the event
+                data = split_nan_event(ev)
+                ndp_id = data['ndp_id']
+                init_ndi = data['init_ndi']
+
+                # Configure compliant schedule
+                if "OK" not in sub.schedule_config((5745, "feffffff")):
+                    raise Exception("Failed to configure schedule (sub)")
+
+                peer_schedule = sub.wpas.request("NAN_PEER_INFO " + paddr + " schedule")
+                logger.info("\n" + peer_schedule)
+                # Accept the NDP counter request
+                if "OK" not in sub.ndp_response("accept",
+                                                paddr,
+                                                ndi=sub.ndi_name,
+                                                ndp_id=ndp_id,
+                                                handle=sid,
+                                                init_ndi=init_ndi,
+                                                ssi="11223344",
+                                                csid=csid, password=pwd,
+                                                pmk=pmk):
+                    raise Exception("NDP response (confirm) failed")
+
+            # Wait for NDP connected event
+            ev = pub.wpas.wait_event(["NAN-NDP-CONNECTED"], timeout=5)
+            if ev is None:
+                raise Exception("NAN-NDP-CONNECTED event not seen on publisher")
+
+            ev = sub.wpas.wait_event(["NAN-NDP-CONNECTED"], timeout=5)
+            if ev is None:
+                raise Exception("NAN-NDP-CONNECTED event not seen on subscriber")
+
+            logger.info("NDP connection established successfully")
+
+            sub.ndp_terminate(paddr, init_ndi, ndp_id)
+            ev = pub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+            if ev is None:
+                raise Exception("NAN-NDP-DISCONNECTED event not seen on publisher")
+
+            ev = sub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+            if ev is None:
+                raise Exception("NAN-NDP-DISCONNECTED event not seen on subscriber")
+
+def run_nan_dp(dev, country="US", counter=False, csid=None, wrong_pwd=False, use_pmk=False):
+    set_country(country)
+    try:
+        _run_nan_dp(dev, counter=counter, csid=csid, wrong_pwd=wrong_pwd, use_pmk=use_pmk)
+    finally:
+        set_country("00")
+
+def test_nan_dp_open(dev, apdev, params):
+    """NAN DP open"""
+    run_nan_dp(dev)
+
+def test_nan_dp_open_counter(dev, apdev, params):
+    """NAN DP open with counter proposal"""
+    run_nan_dp(dev, counter=True)
+
+def test_nan_dp_sk_ccmp128(dev, apdev, params):
+    """NAN DP - 2way NDL + SK CCMP security"""
+    run_nan_dp(dev, csid=1)
+
+def test_nan_dp_sk_gcmp256(dev, apdev, params):
+    """NAN DP - 3way NDL + SK GCMP-256 security"""
+    run_nan_dp(dev, counter=True, csid=2)
+
+def test_nan_dp_wrong_pwd(dev, apdev, params):
+    """NAN DP - Wrong password"""
+    run_nan_dp(dev, csid=1, wrong_pwd=True)
+
+def test_nan_dp_pmk(dev, apdev, params):
+    """NAN DP - 3way NDL + SK CCMP security with PMK"""
+    run_nan_dp(dev, counter=True, csid=1, use_pmk=True)
-- 
2.49.0




More information about the Hostap mailing list