[RFC 92/92] tests: Add NAN hwsim pairing tests

Andrei Otcheretianski andrei.otcheretianski at intel.com
Wed Apr 22 05:24:23 PDT 2026


From: Avraham Stern <avraham.stern at intel.com>

Signed-off-by: Avraham Stern <avraham.stern at intel.com>
Signed-off-by: Andrei Otcheretianski <andrei.otcheretianski at intel.com>
---
 tests/hwsim/test_nan.py | 336 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 333 insertions(+), 3 deletions(-)

diff --git a/tests/hwsim/test_nan.py b/tests/hwsim/test_nan.py
index 7278b4d79a..ce7a65fb84 100644
--- a/tests/hwsim/test_nan.py
+++ b/tests/hwsim/test_nan.py
@@ -94,7 +94,7 @@ class NanDevice:
 
     def publish(self, service_name, ssi=None, unsolicited=1, solicited=1,
                 sync=1, match_filter_rx=None, match_filter_tx=None,
-                close_proximity=0, pbm=0):
+                close_proximity=0, pbm=0, nd_pmk=None, cipher_suites=None):
 
         cmd = f"NAN_PUBLISH service_name={service_name} sync={sync} srv_proto_type=2 fsd=0"
 
@@ -116,6 +116,12 @@ class NanDevice:
         if pbm:
             cmd += f" pbm={pbm}"
 
+        if cipher_suites is not None:
+            cmd += f" cipher_suites={cipher_suites}"
+
+        if nd_pmk is not None:
+            cmd += f" nd_pmk={nd_pmk}"
+
         return self.wpas.request(cmd)
 
     def schedule_config(self, *chans, map_id=1):
@@ -261,11 +267,34 @@ class NanDevice:
         if "OK" not in self.wpas.request(cmd):
             raise Exception(f"{self.ifname}: failed to transmit NAN followup")
 
+    def pairing_request(self, peer, handle, peer_instance_id, mode, responder=False, password=None):
+        if mode == "SAE":
+            mode = 1
+        elif mode == "PASN":
+            mode = 0
+        elif mode == "PMK":
+            mode = 2
+
+        peer_nmi = peer.wpas.own_addr()
+        cmd = f"NAN_PAIR {peer_nmi} auth={mode} cipher=GCMP-256 handle={handle} peer_instance_id={peer_instance_id}"
+        if password is not None:
+            cmd += f" password={password}"
+        if responder:
+            cmd += " responder"
+
+        if "OK" not in self.wpas.request(cmd):
+            raise Exception("NAN_PAIR Failed on requesting device")
+
+    def pair_abort(self, peer_nmi):
+        cmd = f"NAN_PAIR_ABORT {peer_nmi}"
+        return self.wpas.request(cmd)
+
 def split_nan_event(ev):
     vals = dict()
     for p in ev.split(' ')[1:]:
-        name, val = p.split('=')
-        vals[name] = val
+        if '=' in p:
+            name, val = p.split('=', 1)
+            vals[name] = val
     return vals
 
 def nan_sync_verify_event(ev, addr, pid, sid, ssi):
@@ -1289,3 +1318,304 @@ def test_nan_bootstrap_password_no_response(dev, apdev, params):
 
         sub.bootstrap_reset(paddr)
         sub.cancel_subscribe(sid)
+
+def test_nan_pair_abort(dev, apdev, params):
+    """NAN pair abort"""
+    with hwsim_nan_radios() as (wpas1, wpas2), \
+        NanDevice(wpas1, "nan0") as pub, NanDevice(wpas2, "nan1") as sub:
+        pid, sid, paddr, saddr = nan_pre_bootstrap(pub, sub, pmb=0x4)
+
+        # Complete bootstrap
+        sub.bootstrap(paddr, sid, pid, 0x40)
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-REQUEST"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-REQUEST event not seen")
+        pub.bootstrap(saddr, pid, sid, 0x4, auth=True)
+        ev = sub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen (subscriber)")
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen (publisher)")
+
+        # Start PASN pairing without responder ready
+        sub.pairing_request(pub, sid, pid, "SAE", responder=False, password="password123")
+
+        ev_sub = sub.wpas.wait_event(["NAN-PAIRING-STATUS"], timeout=2)
+        if ev_sub is not None:
+            raise Exception("Unexpected PASN result seen on subscriber")
+
+        ev_pub = pub.wpas.wait_event(["NAN-PAIRING-REQUEST"], timeout=2)
+        if ev_pub is None:
+            raise Exception("PASN pairing request not seen on publisher")
+
+        if "OK" not in sub.pair_abort(paddr):
+            raise Exception("NAN_PAIR_ABORT failed on subscriber")
+        if "OK" not in pub.pair_abort(saddr):
+            raise Exception("NAN_PAIR_ABORT failed on publisher")
+
+        # After abort, we should be able to restart pairing successfully
+        sub.pairing_request(pub, sid, pid, "SAE", responder=False, password="password123")
+        ev_pub = pub.wpas.wait_event(["NAN-PAIRING-REQUEST"], timeout=2)
+        if ev_pub is None:
+            raise Exception("PASN pairing request not seen on publisher after abort")
+
+        pub.pairing_request(sub, sid, pid, "SAE", responder=True, password="password123")
+        ev_sub = sub.wpas.wait_event(["NAN-PAIRING-STATUS"], timeout=5)
+        if ev_sub is None:
+            raise Exception("PASN result not seen on subscriber")
+        if "status=success" not in ev_sub:
+            raise Exception("NAN PASN pairing failed on subscriber after abort")
+        ev_pub = pub.wpas.wait_event(["NAN-PAIRING-STATUS"], timeout=5)
+        if ev_pub is None:
+            raise Exception("PASN result not seen on publisher")
+        if "status=success" not in ev_pub:
+            raise Exception("NAN PASN pairing failed on publisher after abort")
+
+        # Test abort with invalid peer address
+        if "FAIL" not in sub.pair_abort("02:00:00:00:00:00"):
+            raise Exception("NAN_PAIR_ABORT with invalid peer address succeeded unexpectedly")
+
+def run_nan_pairing(sub, pub, pid, sid, pairing_type, password=None):
+    if pairing_type == "SAE":
+        if password is not None:
+            pass
+        else:
+            raise Exception("Password must be provided for SAE pairing")
+
+    pub.pairing_request(sub, sid, pid, pairing_type, responder=True, password=password)
+    sub.pairing_request(pub, sid, pid, pairing_type, responder=False, password=password)
+
+    ev_sub = sub.wpas.wait_event(["NAN-PAIRING-STATUS"], timeout=5)
+    if ev_sub is None:
+        raise Exception("PASN result not seen on requesting device")
+    if "status=success" not in ev_sub:
+        raise Exception("NAN PASN pairing failed on requesting device")
+
+    ev_pub = pub.wpas.wait_event(["NAN-PAIRING-STATUS"], timeout=5)
+    if ev_pub is None:
+        raise Exception("PASN result not seen on publisher")
+    if "status=success" not in ev_pub:
+        raise Exception("NAN PASN pairing failed on publisher")
+
+    # Extract nd_pmk from both events
+    data_sub = split_nan_event(ev_sub)
+    data_pub = split_nan_event(ev_pub)
+
+    nd_pmk_sub = data_sub.get('nd_pmk')
+    nd_pmk_pub = data_pub.get('nd_pmk')
+
+    if nd_pmk_sub is None:
+        raise Exception("nd_pmk not found in subscriber pairing status event")
+    if nd_pmk_pub is None:
+        raise Exception("nd_pmk not found in publisher pairing status event")
+
+    if nd_pmk_sub != nd_pmk_pub:
+        raise Exception(f"nd_pmk mismatch: sub={nd_pmk_sub}, pub={nd_pmk_pub}")
+
+    logger.info(f"NAN pairing successful, nd_pmk={nd_pmk_sub}")
+
+    ev = pub.wpas.wait_event(["NAN-NIK-RECEIVED"], timeout=1)
+    ev = sub.wpas.wait_event(["NAN-NIK-RECEIVED"], timeout=1)
+
+    return nd_pmk_sub
+
+def run_nan_pairing_bootstrap(pairing_type, password=None):
+    with hwsim_nan_radios(count=2) as [wpas1, wpas2], \
+        NanDevice(wpas1, "nan0") as pub, NanDevice(wpas2, "nan1") as sub:
+        if "OK" not in sub.schedule_config((2437, "0e000000")):
+            raise Exception("Failed to configure schedule (sub)")
+
+        if "OK" not in pub.schedule_config((2437, "0e000000")):
+            raise Exception("Failed to configure schedule (pub)")
+
+        pid, sid, paddr, saddr = nan_pre_bootstrap(pub, sub)
+        sub.bootstrap(paddr, sid, pid, 0x1)
+
+        ev = sub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen")
+
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen")
+
+        run_nan_pairing(sub, pub, pid, sid, pairing_type, password)
+
+        pub.cancel_publish(pid)
+        sub.cancel_subscribe(sid)
+
+def test_nan_sae_pairing_bootstrap(dev, apdev, params):
+    """NAN Pairing setup using opportunistic bootstrapping"""
+    run_nan_pairing_bootstrap("SAE", password="password123")
+
+def run_nan_pairing_verification(pairing_type, password=None):
+    with hwsim_nan_radios(count=2) as [wpas1, wpas2], \
+        NanDevice(wpas1, "nan0") as pub, NanDevice(wpas2, "nan1") as sub:
+        paddr = pub.wpas.own_addr()
+        saddr = sub.wpas.own_addr()
+
+        if "OK" not in sub.schedule_config((2437, "0e000000")):
+            raise Exception("Failed to configure schedule (sub)")
+
+        if "OK" not in pub.schedule_config((2437, "0e000000")):
+            raise Exception("Failed to configure schedule (pub)")
+
+        pid, sid, paddr, saddr = nan_sync_discovery(pub, sub, "test_service",
+                                                    pssi="aabbccdd",
+                                                    sssi="ddbbccaa",
+                                                    unsolicited=0)
+
+        if pairing_type == "SAE":
+            if password is not None:
+                pass
+            else:
+                raise Exception("Password must be provided for SAE pairing")
+
+        run_nan_pairing(sub, pub, pid, sid, pairing_type, password)
+
+        pub.cancel_publish(pid)
+        sub.cancel_subscribe(sid)
+
+        import time
+        time.sleep(1)
+        pub.wpas.dump_monitor()
+        sub.wpas.dump_monitor()
+
+        pid, sid, paddr, saddr = nan_sync_discovery(pub, sub, "test_service",
+                                                    pssi="aabbccee",
+                                                    sssi="ddbbccee",
+                                                    unsolicited=0,
+                                                    timeout=5)
+
+        run_nan_pairing(sub, pub, pid, sid, "PMK")
+
+def test_nan_opportunistic_pairing(dev, apdev, params):
+    """NAN Pairing setup using opportunistic bootstrapping"""
+    run_nan_pairing_verification("PASN")
+
+def test_nan_sae_pairing(dev, apdev, params):
+    """NAN Pairing setup using a password (SAE)"""
+    run_nan_pairing_verification("SAE", "nanpassword")
+
+def test_nan_publish_with_pmk(dev, apdev, params):
+    """NAN publish with PMK and cipher suites"""
+    with hwsim_nan_radios(count=2) as [wpas1, wpas2], \
+        NanDevice(wpas1, "nan0") as pub, NanDevice(wpas2, "nan1") as sub:
+        paddr = pub.wpas.own_addr()
+
+        # Test PMK - 32 bytes (64 hex characters)
+        nd_pmk = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
+        cipher_suites = "1,2"  # NAN_CS_SK_CCM_128, NAN_CS_SK_GCM_256
+
+        # Publish with PMK and cipher suites
+        pid = pub.publish("secure_test", nd_pmk=nd_pmk, cipher_suites=cipher_suites)
+        if "FAIL" in pid:
+            raise Exception(f"Failed to publish with PMK: {pid}")
+
+        logger.info(f"Published with PMK, ID: {pid}")
+
+        # Subscribe to the service
+        sid = sub.subscribe("secure_test", ssi=None)
+
+        # Wait for discovery with PMKIDs advertised
+        ev = sub.wpas.wait_event(["NAN-DISCOVERY-RESULT"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-DISCOVERY-RESULT event not seen")
+
+        if f"address={paddr}" not in ev:
+            raise Exception(f"Unexpected publisher address in discovery: {ev}")
+
+        if f"publish_id={pid}" not in ev:
+            raise Exception(f"Unexpected publish ID in discovery: {ev}")
+
+        logger.info(f"Discovery event: {ev}")
+
+        pub.cancel_publish(pid)
+        sub.cancel_subscribe(sid)
+
+def _run_nan_pairing_bootstrap_ndp(pairing_type, password=None, csid=1):
+    """Run NAN pairing bootstrap followed by NDP setup with security"""
+    with hwsim_nan_radios() as (wpas1, wpas2), \
+        NanDevice(wpas1, "nan0", "ndi0") as pub, NanDevice(wpas2, "nan1", "ndi1") as sub:
+        paddr = pub.wpas.own_addr()
+        saddr = sub.wpas.own_addr()
+
+        # Configure schedules for both devices
+        if "OK" not in sub.schedule_config((2437, "0e000000")):
+            raise Exception("Failed to configure schedule (sub)")
+
+        if "OK" not in pub.schedule_config((2437, "0e000000")):
+            raise Exception("Failed to configure schedule (pub)")
+
+        # Step 1: Pre-bootstrap discovery
+        pssi = "aabbccdd"
+        sssi = "ddbbccaa"
+
+        pid = pub.publish("test_service", ssi=pssi, unsolicited=0, pbm=0x1)
+        sid = sub.subscribe("test_service", ssi=sssi)
+
+        logger.info(f"Publish ID: {pid}, Subscribe ID: {sid}")
+
+        ev = sub.wpas.wait_event(["NAN-DISCOVERY-RESULT"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-DISCOVERY-RESULT event not seen")
+
+        nan_sync_verify_event(ev, paddr, pid, sid, pssi)
+
+        ev = pub.wpas.wait_event(["NAN-REPLIED"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-REPLIED event not seen")
+
+        nan_sync_verify_event(ev, saddr, pid, sid, sssi)
+
+        # Step 2: Bootstrapping
+        sub.bootstrap(paddr, sid, pid, 0x1)
+
+        ev = sub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen on subscriber")
+
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen on publisher")
+
+        # Step 3: Pairing
+        nd_pmk = run_nan_pairing(sub, pub, pid, sid, pairing_type, password)
+
+        logger.info(f"Pairing completed successfully: nd_pmk={nd_pmk}, now setting up NDP")
+
+        # Step 4: NDP setup with security using the derived nd_pmk
+        ndp_id, init_ndi = _nan_ndp_request_and_accept(pub, sub, pid, sid, paddr, saddr,
+                                                       req_ssi="aabbcc", resp_ssi="ddeeff", csid=csid,
+                                                       pmk=nd_pmk, configure_schedule=True)
+
+        logger.info("NDP connection established successfully after pairing")
+
+        _nan_test_connectivity(pub, sub)
+        _nan_ndp_terminate(pub, sub, paddr, init_ndi, ndp_id)
+
+        # Once the NDP is removed, verify that service discovery still works
+        pub.wpas.dump_monitor()
+        sub.wpas.dump_monitor()
+        time.sleep(1)
+
+        ev = sub.wpas.wait_event(["NAN-DISCOVERY-RESULT"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-DISCOVERY-RESULT event not seen")
+
+        nan_sync_verify_event(ev, paddr, pid, sid, pssi)
+
+        ev = pub.wpas.wait_event(["NAN-REPLIED"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-REPLIED event not seen")
+
+        nan_sync_verify_event(ev, saddr, pid, sid, sssi)
+
+def test_nan_pairing_bootstrap_ndp_sk_ccmp128(dev, apdev, params):
+    """NAN Pairing bootstrap followed by NDP with SK CCMP-128 security"""
+    set_country("US")
+    try:
+        _run_nan_pairing_bootstrap_ndp("SAE", password="password123", csid=1)
+    finally:
+        set_country("00")
-- 
2.53.0




More information about the Hostap mailing list