[RFC 02/97] tests: Add NAN data path, bootstrapping and pairing tests

Andrei Otcheretianski andrei.otcheretianski at intel.com
Tue Apr 28 13:05:03 PDT 2026


Add NDP establishment, bootstrapping and pairing NAN tests.

Signed-off-by: Avraham Stern <avraham.stern at intel.com>
Signed-off-by: Andrei Otcheretianski <andrei.otcheretianski at intel.com>
---
 tests/hwsim/test_nan.py | 793 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 789 insertions(+), 4 deletions(-)

diff --git a/tests/hwsim/test_nan.py b/tests/hwsim/test_nan.py
index f7a212f7f8..ce7a65fb84 100644
--- a/tests/hwsim/test_nan.py
+++ b/tests/hwsim/test_nan.py
@@ -9,8 +9,10 @@ import logging
 logger = logging.getLogger()
 from utils import *
 import string
+from hwsim_utils import test_connectivity
 from hwsim import HWSimRadio
 from contextlib import contextmanager, ExitStack
+from test_p2p_channel import set_country
 
 @contextmanager
 def hwsim_nan_radios(count=2, n_channels=3):
@@ -41,10 +43,11 @@ def check_nan_capab(dev):
         raise HwsimSkip(f"NAN not supported: {capa}")
 
 class NanDevice:
-    def __init__(self, dev, ifname):
+    def __init__(self, dev, ifname, ndi_name=None):
         self.dev = dev
         self.ifname = ifname
         self.wpas = None
+        self.ndi_name = ndi_name
 
     def __enter__(self):
         self.start()
@@ -71,12 +74,19 @@ class NanDevice:
 
         logger.info(f"NAN device started on {self.ifname}")
 
+        # Add NDI
+        if self.ndi_name is not None:
+            self.dev.interface_add(self.ndi_name, if_type="nan_data", create=True)
+
     def stop(self):
         logger.info(f"NAN device stopping on {self.ifname}")
 
         if "OK" not in self.wpas.request("NAN_STOP"):
             raise Exception(f"Failed to stop NAN functionality on {self.ifname}")
 
+        if self.ndi_name is not None:
+            self.dev.interface_remove(self.ndi_name)
+
         self.dev.global_request(f"INTERFACE_REMOVE {self.ifname}")
         self.wpas.remove_ifname()
 
@@ -84,7 +94,7 @@ class NanDevice:
 
     def publish(self, service_name, ssi=None, unsolicited=1, solicited=1,
                 sync=1, match_filter_rx=None, match_filter_tx=None,
-                close_proximity=0):
+                close_proximity=0, pbm=0, nd_pmk=None, cipher_suites=None):
 
         cmd = f"NAN_PUBLISH service_name={service_name} sync={sync} srv_proto_type=2 fsd=0"
 
@@ -103,6 +113,77 @@ class NanDevice:
         if match_filter_tx:
             cmd += f" match_filter_tx={match_filter_tx}"
 
+        if pbm:
+            cmd += f" pbm={pbm}"
+
+        if cipher_suites is not None:
+            cmd += f" cipher_suites={cipher_suites}"
+
+        if nd_pmk is not None:
+            cmd += f" nd_pmk={nd_pmk}"
+
+        return self.wpas.request(cmd)
+
+    def schedule_config(self, *chans, map_id=1):
+        cmd = f"NAN_SCHED_CONFIG_MAP map_id={map_id} "
+        cmd += " ".join([f"{freq}:{bitmap}" for freq, bitmap in chans])
+        return self.wpas.request(cmd)
+
+    def remove_schedule(self, map_id=1):
+        cmd = f"NAN_SCHED_CONFIG_MAP map_id={map_id}"
+        return self.wpas.request(cmd)
+
+    def ndp_request(self, ndi, handle, peer_nmi, peer_id, ssi=None,
+                    qos_slots=0, qos_latency=0xffff, csid=None, password=None,
+                    pmk=None):
+        cmd = f"NAN_NDP_REQUEST handle={handle} ndi={ndi} peer_nmi={peer_nmi} peer_id={peer_id}"
+
+        params = [
+            ("ssi", ssi),
+            ("csid", csid),
+            ("password", password),
+            ("pmk", pmk),
+        ]
+
+        cmd += "".join(f" {name}={value}" for name, value in params if value is not None)
+
+        if qos_slots > 0 or qos_latency != 0xffff:
+            cmd += f" qos={qos_slots}:{qos_latency}"
+
+        return self.wpas.request(cmd)
+
+    def ndp_response(self, action, peer_nmi, ndi=None, peer_ndi=None,
+                     ndp_id=None, init_ndi=None, reason_code=None, ssi=None,
+                     qos_slots=0, qos_latency=0xffff, handle=None, csid=None,
+                     password=None, pmk=None):
+        if action not in ["accept", "reject"]:
+            raise Exception(f"Invalid action: {action}. Must be 'accept' or 'reject'")
+
+        cmd = f"NAN_NDP_RESPONSE {action} peer_nmi={peer_nmi}"
+
+        params = [
+            ("reason_code", reason_code),
+            ("ndi", ndi),
+            ("peer_ndi", peer_ndi),
+            ("ndp_id", ndp_id),
+            ("init_ndi", init_ndi),
+            ("handle", handle),
+            ("ssi", ssi),
+            ("csid", csid),
+            ("password", password),
+            ("pmk", pmk),
+        ]
+
+        cmd += "".join(f" {name}={value}" for name, value in params if value is not None)
+
+        if qos_slots > 0 or qos_latency != 0xffff:
+            cmd += f" qos={qos_slots}:{qos_latency}"
+
+        return self.wpas.request(cmd)
+
+
+    def ndp_terminate(self, peer_nmi, init_ndi, ndp_id):
+        cmd = f"NAN_NDP_TERMINATE peer_nmi={peer_nmi} init_ndi={init_ndi} ndp_id={ndp_id}"
         return self.wpas.request(cmd)
 
     def subscribe(self, service_name, ssi=None, active=1,
@@ -138,6 +219,20 @@ class NanDevice:
 
         return self.wpas.request(cmd)
 
+    def bootstrap(self, peer, handle, peer_instance_id, pbm, auth=False):
+        logger.info(f"Bootstrapping NAN with peer {peer} pbm={pbm} auth={auth} on {self.ifname}")
+        auth_param = " auth" if auth else ""
+
+        if "OK" not in self.wpas.request(f"NAN_BOOTSTRAP {peer} handle={handle} "
+                         f"req_instance_id={peer_instance_id} method={pbm}" + auth_param):
+            raise Exception(f"{self.ifname}: failed to bootstrap with {peer}")
+
+    def bootstrap_reset(self, peer):
+        logger.info(f"Reset Bootstrapping NAN with peer {peer}")
+
+        if "OK" not in self.wpas.request(f"NAN_BOOTSTRAP_RESET {peer}"):
+            raise Exception(f"{self.ifname}: failed to reset bootstrap with {peer}")
+
     def cancel_publish(self, publish_id):
         logger.info(f"Cancelling publish with ID {publish_id} on {self.ifname}")
         if "OK" not in self.wpas.request(f"NAN_CANCEL_PUBLISH publish_id={publish_id}"):
@@ -172,11 +267,34 @@ class NanDevice:
         if "OK" not in self.wpas.request(cmd):
             raise Exception(f"{self.ifname}: failed to transmit NAN followup")
 
+    def pairing_request(self, peer, handle, peer_instance_id, mode, responder=False, password=None):
+        if mode == "SAE":
+            mode = 1
+        elif mode == "PASN":
+            mode = 0
+        elif mode == "PMK":
+            mode = 2
+
+        peer_nmi = peer.wpas.own_addr()
+        cmd = f"NAN_PAIR {peer_nmi} auth={mode} cipher=GCMP-256 handle={handle} peer_instance_id={peer_instance_id}"
+        if password is not None:
+            cmd += f" password={password}"
+        if responder:
+            cmd += " responder"
+
+        if "OK" not in self.wpas.request(cmd):
+            raise Exception("NAN_PAIR Failed on requesting device")
+
+    def pair_abort(self, peer_nmi):
+        cmd = f"NAN_PAIR_ABORT {peer_nmi}"
+        return self.wpas.request(cmd)
+
 def split_nan_event(ev):
     vals = dict()
     for p in ev.split(' ')[1:]:
-        name, val = p.split('=')
-        vals[name] = val
+        if '=' in p:
+            name, val = p.split('=', 1)
+            vals[name] = val
     return vals
 
 def nan_sync_verify_event(ev, addr, pid, sid, ssi):
@@ -197,6 +315,40 @@ def nan_sync_verify_event(ev, addr, pid, sid, ssi):
     if data['address'] != addr:
         raise Exception("Unexpected peer_addr: " + ev)
 
+def nan_ndp_verify_event(ev, peer_nmi, publish_inst_id=None, init_ndi=None,
+                         ssi=None, csid=None):
+    """Verify NAN-NDP-REQUEST event format and content"""
+    data = split_nan_event(ev)
+
+    if 'peer_nmi' not in data:
+        raise Exception(f"Missing peer_nmi in NDP event: {ev}")
+
+    if 'init_ndi' not in data:
+        raise Exception(f"Missing init_ndi in NDP event: {ev}")
+
+    if 'ndp_id' not in data:
+        raise Exception(f"Missing ndp_id in NDP event: {ev}")
+
+    if publish_inst_id is not None and 'publish_inst_id' not in data:
+        raise Exception(f"Missing publish_inst_id in NDP event: {ev}")
+
+    if data['peer_nmi'] != peer_nmi:
+        raise Exception(f"Unexpected peer_nmi: got {data['peer_nmi']}, expected {peer_nmi} in event: {ev}")
+
+    if init_ndi is not None and data['init_ndi'] != init_ndi:
+        raise Exception(f"Unexpected init_ndi: got {data['init_ndi']}, expected {init_ndi} in event: {ev}")
+
+    if (publish_inst_id is not None and data['publish_inst_id'] != publish_inst_id):
+        raise Exception(f"Unexpected publish_inst_id: got {data['publish_inst_id']}, expected {publish_inst_id} in event: {ev}")
+
+    if ssi is not None and 'ssi' in data:
+        if data['ssi'] != ssi:
+            raise Exception(f"Unexpected ssi: got {data['ssi']}, expected {ssi} in event: {ev}")
+
+    if csid is not None and 'csid' in data:
+        if data['csid'] != str(csid):
+            raise Exception(f"Unexpected csid: got {data['csid']}, expected {str(csid)} in event: {ev}")
+
 def nan_sync_discovery(pub, sub, service_name, pssi, sssi,
                        unsolicited=1, solicited=1, active=1,
                        expect_discovery=True,
@@ -834,3 +986,636 @@ def test_nan_config(dev, apdev, params):
         # and finally update the configuration
         logger.info("Updating NAN configuration")
         nan.update_config()
+
+def test_nan_sched(dev, apdev, params):
+    """NAN configure schedule"""
+    set_country("US")
+    try:
+        with hwsim_nan_radios() as (wpas1, wpas2), NanDevice(wpas1, "nan0") as pub:
+            if "OK" not in pub.schedule_config((2437, "03000000"), (5180, "0000ff00"), (5825, "000000ff")):
+                raise Exception("Failed to configure schedule")
+            # Remove
+            if "OK" not in pub.remove_schedule():
+                raise Exception("Failed to remove schedule")
+            # Overlapping maps
+            if "FAIL" not in pub.schedule_config((2437, "03000000"), (5180, "0000ff00"), (5825, "050000ff")):
+                raise Exception("A schedule with overlapping time bitmaps was unexpectedly accepted")
+            # Same channel
+            if "FAIL" not in pub.schedule_config((2437, "03000000"), (2437, "0000ff00")):
+                raise Exception("A schedule with duplicate channel entries was unexpectedly accepted")
+            # Bad length
+            if "FAIL" not in pub.schedule_config((2437, "0300")):
+                raise Exception("Too short schedule bitmap accepted")
+    finally:
+        set_country("00")
+
+def _nan_discover_service(pub, sub, service_name, pssi, sssi):
+    paddr = pub.wpas.own_addr()
+    saddr = sub.wpas.own_addr()
+
+    pid = pub.publish(service_name, ssi=pssi)
+    sid = sub.subscribe(service_name, ssi=sssi, active=0)
+
+    logger.info(f"Publish ID: {pid}, Subscribe ID: {sid}")
+
+    ev = sub.wpas.wait_event(["NAN-DISCOVERY-RESULT"], timeout=2)
+    if ev is None:
+        raise Exception(f"NAN-DISCOVERY-RESULT event not seen for {service_name}")
+
+    nan_sync_verify_event(ev, paddr, pid, sid, pssi)
+
+    return pid, sid, paddr, saddr
+
+def _nan_ndp_request_and_accept(pub, sub, pid, sid, paddr, saddr, req_ssi, resp_ssi, csid=None,
+                                password=None, pmk=None, counter=False, wrong_pwd=False, configure_schedule=True):
+    """
+    Request NDP from subscriber and accept on publisher.
+
+    Returns: (ndp_id, init_ndi) or None if wrong_pwd test completed
+    """
+    # Configure schedule on subscriber if needed
+    if configure_schedule:
+        if "OK" not in sub.schedule_config((2437, "0e000000"), (5180, "f0ffffff")):
+            raise Exception("Failed to configure schedule (sub)")
+
+    # NDP request
+    if "OK" not in sub.ndp_request(sub.ndi_name, sid, paddr, pid, req_ssi,
+                                   csid=csid, password=password, pmk=pmk):
+        raise Exception("NDP request failed")
+
+    ev = pub.wpas.wait_event(["NAN-NDP-REQUEST"], timeout=5)
+    if ev is None:
+        raise Exception("NAN-NDP-REQUEST event not seen")
+
+    ndi_sub = sub.dev.get_iface_addr(sub.ndi_name)
+    nan_ndp_verify_event(ev, saddr, pid, ndi_sub, req_ssi)
+
+    data = split_nan_event(ev)
+    ndp_id = data['ndp_id']
+    init_ndi = data['init_ndi']
+
+    # Configure schedule on publisher
+    if configure_schedule:
+        if counter:
+            if "OK" not in pub.schedule_config((5745, "feffffff")):
+                raise Exception("Failed to configure schedule (pub)")
+        else:
+            if "OK" not in pub.schedule_config((2437, "0e000000"), (5180, "f0ffffff")):
+                raise Exception("Failed to configure schedule (pub)")
+
+    # Accept NDP request
+    accept_pwd = "WRONG_PWD" if wrong_pwd else password
+    if "OK" not in pub.ndp_response("accept", saddr, ndi=pub.ndi_name, ndp_id=ndp_id, init_ndi=init_ndi,
+                                    handle=pid, ssi=resp_ssi, csid=csid, password=accept_pwd, pmk=pmk):
+        raise Exception("NDP response (accept) failed")
+
+    # Verify disconnection on wrong password
+    if wrong_pwd:
+        ev = sub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+        if ev is None or "reason=3" not in ev:
+            raise Exception("NAN-NDP-DISCONNECTED event not seen on subscriber")
+        ev = pub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+        if ev is None or "reason=3" not in ev:
+            raise Exception("NAN-NDP-DISCONNECTED event not seen on publisher")
+        return None
+
+    # Handle counter proposal
+    if counter:
+        ev = sub.wpas.wait_event(["NAN-NDP-COUNTER-REQUEST"], timeout=5)
+        if ev is None:
+            raise Exception("NAN-NDP-COUNTER-REQUEST event not seen")
+
+        nan_ndp_verify_event(ev, paddr, init_ndi=ndi_sub, ssi=resp_ssi)
+
+        data = split_nan_event(ev)
+        ndp_id = data['ndp_id']
+        init_ndi = data['init_ndi']
+
+        if "OK" not in sub.schedule_config((5745, "feffffff")):
+            raise Exception("Failed to configure schedule (sub)")
+
+        if "OK" not in sub.ndp_response("accept", paddr, ndi=sub.ndi_name, ndp_id=ndp_id, handle=sid,
+                                        init_ndi=init_ndi, ssi="11223344", csid=csid, password=password, pmk=pmk):
+            raise Exception("NDP response (confirm) failed")
+
+    # Wait for NDP connected events
+    ev = pub.wpas.wait_event(["NAN-NDP-CONNECTED"], timeout=5)
+    if ev is None:
+        raise Exception("NAN-NDP-CONNECTED event not seen on publisher")
+
+    ev = sub.wpas.wait_event(["NAN-NDP-CONNECTED"], timeout=5)
+    if ev is None:
+        raise Exception("NAN-NDP-CONNECTED event not seen on subscriber")
+
+    logger.info("NDP connection established successfully")
+
+    return ndp_id, init_ndi
+
+def _nan_ndp_terminate(pub, sub, paddr, init_ndi, ndp_id):
+    """Terminate an NDP and wait for disconnect events on both sides."""
+
+    sub.ndp_terminate(paddr, init_ndi, ndp_id)
+
+    ev = pub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+    if ev is None:
+        raise Exception(f"NAN-NDP-DISCONNECTED event not seen on publisher")
+
+    ev = sub.wpas.wait_event(["NAN-NDP-DISCONNECTED"], timeout=5)
+    if ev is None:
+        raise Exception(f"NAN-NDP-DISCONNECTED event not seen on subscriber")
+
+def _nan_test_connectivity(pub, sub):
+    """Test IP connectivity between publisher and subscriber NDI interfaces."""
+    wpas_ndi_pub = WpaSupplicant(ifname=pub.ndi_name)
+    wpas_ndi_sub = WpaSupplicant(ifname=sub.ndi_name)
+    test_connectivity(wpas_ndi_pub, wpas_ndi_sub, tos=0, ifname1=pub.ndi_name, ifname2=sub.ndi_name,
+                      max_tries=3, timeout=5, broadcast=False)
+
+def _run_nan_dp(counter=False, csid=None, wrong_pwd=False, use_pmk=False):
+    if use_pmk:
+        pmk = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
+        pwd = None
+    else:
+        pwd = "NAN" if csid is not None else None
+        pmk = None
+
+    with hwsim_nan_radios() as (wpas1, wpas2), \
+        NanDevice(wpas1, "nan0", "ndi0") as pub, NanDevice(wpas2, "nan1", "ndi1") as sub:
+
+        pssi = "aabbccdd001122334455667788"
+        sssi = "ddbbccaa001122334455667788"
+
+        pid, sid, paddr, saddr= _nan_discover_service(pub, sub, "test_service", pssi, sssi)
+
+        # Log peer info (specific to this test)
+        peer_schedule = pub.wpas.request("NAN_PEER_INFO " + saddr + " schedule")
+        logger.info("\n" + peer_schedule)
+        potential = pub.wpas.request("NAN_PEER_INFO " + saddr + " potential")
+        logger.info("\n" + potential)
+        capa = pub.wpas.request("NAN_PEER_INFO " + saddr + " capa")
+        logger.info("\n" + capa)
+
+        result = _nan_ndp_request_and_accept(pub, sub, pid, sid, paddr, saddr, req_ssi="aabbcc",
+                                             resp_ssi="ddeeff", csid=csid, password=pwd, pmk=pmk,
+                                             counter=counter, wrong_pwd=wrong_pwd)
+
+        if result is None:
+            # wrong_pwd test completed
+            return
+
+        ndp_id, init_ndi = result
+
+        _nan_test_connectivity(pub, sub)
+        _nan_ndp_terminate(pub, sub, paddr, init_ndi, ndp_id)
+
+def run_nan_dp(country="US", counter=False, csid=None, wrong_pwd=False, use_pmk=False):
+    set_country(country)
+    try:
+        _run_nan_dp(counter=counter, csid=csid, wrong_pwd=wrong_pwd, use_pmk=use_pmk)
+    finally:
+        set_country("00")
+
+def test_nan_dp_open(dev, apdev, params):
+    """NAN DP open"""
+    run_nan_dp()
+
+def test_nan_dp_open_counter(dev, apdev, params):
+    """NAN DP open with counter proposal"""
+    run_nan_dp(counter=True)
+
+def test_nan_dp_sk_ccmp128(dev, apdev, params):
+    """NAN DP - 2way NDL + SK CCMP security"""
+    run_nan_dp(csid=1)
+
+def test_nan_dp_sk_gcmp256(dev, apdev, params):
+    """NAN DP - 3way NDL + SK GCMP-256 security"""
+    run_nan_dp(counter=True, csid=2)
+
+def test_nan_dp_wrong_pwd(dev, apdev, params):
+    """NAN DP - Wrong password"""
+    run_nan_dp(csid=1, wrong_pwd=True)
+
+def test_nan_dp_pmk(dev, apdev, params):
+    """NAN DP - 3way NDL + SK CCMP security with PMK"""
+    run_nan_dp(counter=True, csid=1, use_pmk=True)
+
+def nan_pre_bootstrap(pub, sub, pmb=0x1):
+    paddr = pub.wpas.own_addr()
+    saddr = sub.wpas.own_addr()
+
+    pssi = "aabbccdd"
+    sssi = "ddbbccaa"
+
+    pid = pub.publish("test_service", ssi=pssi, unsolicited=0, pbm=pmb)
+    sid = sub.subscribe("test_service", ssi=sssi)
+
+    logger.info(f"Publish ID: {pid}, Subscribe ID: {sid}")
+
+    ev = sub.wpas.wait_event(["NAN-DISCOVERY-RESULT"], timeout=2)
+    if ev is None:
+        raise Exception("NAN-DISCOVERY-RESULT event not seen")
+
+    nan_sync_verify_event(ev, paddr, pid, sid, pssi)
+
+    ev = pub.wpas.wait_event(["NAN-REPLIED"], timeout=2)
+    if ev is None:
+        raise Exception("NAN-REPLIED event not seen")
+
+    nan_sync_verify_event(ev, saddr, pid, sid, sssi)
+
+    return pid, sid, paddr, saddr
+
+def test_nan_bootstrap_opportunistic(dev, apdev, params):
+    """NAN opportunistic bootstrap with auto accept"""
+    with hwsim_nan_radios(count=2) as [wpas1, wpas2], \
+        NanDevice(wpas1, "nan0") as pub, NanDevice(wpas2, "nan1") as sub:
+        pid, sid, paddr, saddr = nan_pre_bootstrap(pub, sub)
+
+        sub.bootstrap(paddr, sid, pid, 0x1)
+
+        ev = sub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen")
+
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen")
+
+def test_nan_bootstrap_password(dev, apdev, params):
+    """NAN bootstrap with password"""
+    with hwsim_nan_radios(count=2) as [wpas1, wpas2], \
+        NanDevice(wpas1, "nan0") as pub, NanDevice(wpas2, "nan1") as sub:
+        pid, sid, paddr, saddr = nan_pre_bootstrap(pub, sub, pmb=0x4)
+
+        # request bootstrap with passphrase using passpharse keypad method (BIT 6)
+        sub.bootstrap(paddr, sid, pid, 0x40)
+
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-REQUEST"], timeout=2)
+        if ev is None or "peer_nmi=" + saddr not in ev or "pbm=0x0004" not in ev:
+            raise Exception("NAN-BOOTSTRAP-REQUEST event not seen")
+
+        pub.bootstrap(saddr, pid, sid, 0x4, auth=True)
+        ev = sub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None or "pbm=0x0040" not in ev:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen (subscriber)")
+
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None or "pbm=0x0004" not in ev:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen (publisher)")
+
+def test_nan_bootstrap_password_with_delays(dev, apdev, params):
+    """NAN bootstrap with password with delay and wrong method"""
+    with hwsim_nan_radios(count=2) as [wpas1, wpas2], \
+        NanDevice(wpas1, "nan0") as pub, NanDevice(wpas2, "nan1") as sub:
+        pid, sid, paddr, saddr = nan_pre_bootstrap(pub, sub, pmb=0x4)
+
+        # request bootstrap with passphrase using passpharse keypad method (BIT 6)
+        sub.bootstrap(paddr, sid, pid, 0x40)
+
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-REQUEST"], timeout=2)
+        if ev is None or "peer_nmi=" + saddr not in ev or "pbm=0x0004" not in ev:
+            raise Exception("NAN-BOOTSTRAP-REQUEST event not seen")
+
+        # To not authenticate the peer for 10 seconds and verify that no success event is sent
+        ev = sub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=10)
+        if ev is not None:
+            raise Exception("Got unexpected NAN-BOOTSTRAP-SUCCESS event seen (subscriber)")
+
+        # now try with wrong method (QR code display, BIT 3)
+        pub.bootstrap(saddr, pid, sid, 0x8, auth=True)
+        ev = sub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=5)
+        if ev is not None:
+            raise Exception("Got unexpected NAN-BOOTSTRAP-SUCCESS event seen (subscriber)")
+
+        # now authenticate properly
+        pub.bootstrap(saddr, pid, sid, 0x4, auth=True)
+        ev = sub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None or "pbm=0x0040" not in ev:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen (subscriber)")
+
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None or "pbm=0x0004" not in ev:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen (publisher)")
+
+        pub.bootstrap_reset(saddr)
+        sub.bootstrap_reset(paddr)
+
+def test_nan_bootstrap_password_no_response(dev, apdev, params):
+    """NAN bootstrap with password with no response from publisher"""
+    with hwsim_nan_radios(count=2) as [wpas1, wpas2], \
+        NanDevice(wpas1, "nan0") as pub, NanDevice(wpas2, "nan1") as sub:
+        pid, sid, paddr, saddr = nan_pre_bootstrap(pub, sub, pmb=0x4)
+
+        # cancel the publish to simulate no response
+        pub.cancel_publish(pid)
+
+        # request bootstrap with passphrase using passpharse keypad method (BIT 6)
+        sub.bootstrap(paddr, sid, pid, 0x40)
+
+        ev = sub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=10)
+        if ev is not None:
+            raise Exception("Got unexpected NAN-BOOTSTRAP-SUCCESS event seen (subscriber)")
+
+        sub.bootstrap_reset(paddr)
+        sub.cancel_subscribe(sid)
+
+def test_nan_pair_abort(dev, apdev, params):
+    """NAN pair abort"""
+    with hwsim_nan_radios() as (wpas1, wpas2), \
+        NanDevice(wpas1, "nan0") as pub, NanDevice(wpas2, "nan1") as sub:
+        pid, sid, paddr, saddr = nan_pre_bootstrap(pub, sub, pmb=0x4)
+
+        # Complete bootstrap
+        sub.bootstrap(paddr, sid, pid, 0x40)
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-REQUEST"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-REQUEST event not seen")
+        pub.bootstrap(saddr, pid, sid, 0x4, auth=True)
+        ev = sub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen (subscriber)")
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen (publisher)")
+
+        # Start PASN pairing without responder ready
+        sub.pairing_request(pub, sid, pid, "SAE", responder=False, password="password123")
+
+        ev_sub = sub.wpas.wait_event(["NAN-PAIRING-STATUS"], timeout=2)
+        if ev_sub is not None:
+            raise Exception("Unexpected PASN result seen on subscriber")
+
+        ev_pub = pub.wpas.wait_event(["NAN-PAIRING-REQUEST"], timeout=2)
+        if ev_pub is None:
+            raise Exception("PASN pairing request not seen on publisher")
+
+        if "OK" not in sub.pair_abort(paddr):
+            raise Exception("NAN_PAIR_ABORT failed on subscriber")
+        if "OK" not in pub.pair_abort(saddr):
+            raise Exception("NAN_PAIR_ABORT failed on publisher")
+
+        # After abort, we should be able to restart pairing successfully
+        sub.pairing_request(pub, sid, pid, "SAE", responder=False, password="password123")
+        ev_pub = pub.wpas.wait_event(["NAN-PAIRING-REQUEST"], timeout=2)
+        if ev_pub is None:
+            raise Exception("PASN pairing request not seen on publisher after abort")
+
+        pub.pairing_request(sub, sid, pid, "SAE", responder=True, password="password123")
+        ev_sub = sub.wpas.wait_event(["NAN-PAIRING-STATUS"], timeout=5)
+        if ev_sub is None:
+            raise Exception("PASN result not seen on subscriber")
+        if "status=success" not in ev_sub:
+            raise Exception("NAN PASN pairing failed on subscriber after abort")
+        ev_pub = pub.wpas.wait_event(["NAN-PAIRING-STATUS"], timeout=5)
+        if ev_pub is None:
+            raise Exception("PASN result not seen on publisher")
+        if "status=success" not in ev_pub:
+            raise Exception("NAN PASN pairing failed on publisher after abort")
+
+        # Test abort with invalid peer address
+        if "FAIL" not in sub.pair_abort("02:00:00:00:00:00"):
+            raise Exception("NAN_PAIR_ABORT with invalid peer address succeeded unexpectedly")
+
+def run_nan_pairing(sub, pub, pid, sid, pairing_type, password=None):
+    if pairing_type == "SAE":
+        if password is not None:
+            pass
+        else:
+            raise Exception("Password must be provided for SAE pairing")
+
+    pub.pairing_request(sub, sid, pid, pairing_type, responder=True, password=password)
+    sub.pairing_request(pub, sid, pid, pairing_type, responder=False, password=password)
+
+    ev_sub = sub.wpas.wait_event(["NAN-PAIRING-STATUS"], timeout=5)
+    if ev_sub is None:
+        raise Exception("PASN result not seen on requesting device")
+    if "status=success" not in ev_sub:
+        raise Exception("NAN PASN pairing failed on requesting device")
+
+    ev_pub = pub.wpas.wait_event(["NAN-PAIRING-STATUS"], timeout=5)
+    if ev_pub is None:
+        raise Exception("PASN result not seen on publisher")
+    if "status=success" not in ev_pub:
+        raise Exception("NAN PASN pairing failed on publisher")
+
+    # Extract nd_pmk from both events
+    data_sub = split_nan_event(ev_sub)
+    data_pub = split_nan_event(ev_pub)
+
+    nd_pmk_sub = data_sub.get('nd_pmk')
+    nd_pmk_pub = data_pub.get('nd_pmk')
+
+    if nd_pmk_sub is None:
+        raise Exception("nd_pmk not found in subscriber pairing status event")
+    if nd_pmk_pub is None:
+        raise Exception("nd_pmk not found in publisher pairing status event")
+
+    if nd_pmk_sub != nd_pmk_pub:
+        raise Exception(f"nd_pmk mismatch: sub={nd_pmk_sub}, pub={nd_pmk_pub}")
+
+    logger.info(f"NAN pairing successful, nd_pmk={nd_pmk_sub}")
+
+    ev = pub.wpas.wait_event(["NAN-NIK-RECEIVED"], timeout=1)
+    ev = sub.wpas.wait_event(["NAN-NIK-RECEIVED"], timeout=1)
+
+    return nd_pmk_sub
+
+def run_nan_pairing_bootstrap(pairing_type, password=None):
+    with hwsim_nan_radios(count=2) as [wpas1, wpas2], \
+        NanDevice(wpas1, "nan0") as pub, NanDevice(wpas2, "nan1") as sub:
+        if "OK" not in sub.schedule_config((2437, "0e000000")):
+            raise Exception("Failed to configure schedule (sub)")
+
+        if "OK" not in pub.schedule_config((2437, "0e000000")):
+            raise Exception("Failed to configure schedule (pub)")
+
+        pid, sid, paddr, saddr = nan_pre_bootstrap(pub, sub)
+        sub.bootstrap(paddr, sid, pid, 0x1)
+
+        ev = sub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen")
+
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen")
+
+        run_nan_pairing(sub, pub, pid, sid, pairing_type, password)
+
+        pub.cancel_publish(pid)
+        sub.cancel_subscribe(sid)
+
+def test_nan_sae_pairing_bootstrap(dev, apdev, params):
+    """NAN Pairing setup using opportunistic bootstrapping"""
+    run_nan_pairing_bootstrap("SAE", password="password123")
+
+def run_nan_pairing_verification(pairing_type, password=None):
+    with hwsim_nan_radios(count=2) as [wpas1, wpas2], \
+        NanDevice(wpas1, "nan0") as pub, NanDevice(wpas2, "nan1") as sub:
+        paddr = pub.wpas.own_addr()
+        saddr = sub.wpas.own_addr()
+
+        if "OK" not in sub.schedule_config((2437, "0e000000")):
+            raise Exception("Failed to configure schedule (sub)")
+
+        if "OK" not in pub.schedule_config((2437, "0e000000")):
+            raise Exception("Failed to configure schedule (pub)")
+
+        pid, sid, paddr, saddr = nan_sync_discovery(pub, sub, "test_service",
+                                                    pssi="aabbccdd",
+                                                    sssi="ddbbccaa",
+                                                    unsolicited=0)
+
+        if pairing_type == "SAE":
+            if password is not None:
+                pass
+            else:
+                raise Exception("Password must be provided for SAE pairing")
+
+        run_nan_pairing(sub, pub, pid, sid, pairing_type, password)
+
+        pub.cancel_publish(pid)
+        sub.cancel_subscribe(sid)
+
+        import time
+        time.sleep(1)
+        pub.wpas.dump_monitor()
+        sub.wpas.dump_monitor()
+
+        pid, sid, paddr, saddr = nan_sync_discovery(pub, sub, "test_service",
+                                                    pssi="aabbccee",
+                                                    sssi="ddbbccee",
+                                                    unsolicited=0,
+                                                    timeout=5)
+
+        run_nan_pairing(sub, pub, pid, sid, "PMK")
+
+def test_nan_opportunistic_pairing(dev, apdev, params):
+    """NAN Pairing setup using opportunistic bootstrapping"""
+    run_nan_pairing_verification("PASN")
+
+def test_nan_sae_pairing(dev, apdev, params):
+    """NAN Pairing setup using a password (SAE)"""
+    run_nan_pairing_verification("SAE", "nanpassword")
+
+def test_nan_publish_with_pmk(dev, apdev, params):
+    """NAN publish with PMK and cipher suites"""
+    with hwsim_nan_radios(count=2) as [wpas1, wpas2], \
+        NanDevice(wpas1, "nan0") as pub, NanDevice(wpas2, "nan1") as sub:
+        paddr = pub.wpas.own_addr()
+
+        # Test PMK - 32 bytes (64 hex characters)
+        nd_pmk = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
+        cipher_suites = "1,2"  # NAN_CS_SK_CCM_128, NAN_CS_SK_GCM_256
+
+        # Publish with PMK and cipher suites
+        pid = pub.publish("secure_test", nd_pmk=nd_pmk, cipher_suites=cipher_suites)
+        if "FAIL" in pid:
+            raise Exception(f"Failed to publish with PMK: {pid}")
+
+        logger.info(f"Published with PMK, ID: {pid}")
+
+        # Subscribe to the service
+        sid = sub.subscribe("secure_test", ssi=None)
+
+        # Wait for discovery with PMKIDs advertised
+        ev = sub.wpas.wait_event(["NAN-DISCOVERY-RESULT"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-DISCOVERY-RESULT event not seen")
+
+        if f"address={paddr}" not in ev:
+            raise Exception(f"Unexpected publisher address in discovery: {ev}")
+
+        if f"publish_id={pid}" not in ev:
+            raise Exception(f"Unexpected publish ID in discovery: {ev}")
+
+        logger.info(f"Discovery event: {ev}")
+
+        pub.cancel_publish(pid)
+        sub.cancel_subscribe(sid)
+
+def _run_nan_pairing_bootstrap_ndp(pairing_type, password=None, csid=1):
+    """Run NAN pairing bootstrap followed by NDP setup with security"""
+    with hwsim_nan_radios() as (wpas1, wpas2), \
+        NanDevice(wpas1, "nan0", "ndi0") as pub, NanDevice(wpas2, "nan1", "ndi1") as sub:
+        paddr = pub.wpas.own_addr()
+        saddr = sub.wpas.own_addr()
+
+        # Configure schedules for both devices
+        if "OK" not in sub.schedule_config((2437, "0e000000")):
+            raise Exception("Failed to configure schedule (sub)")
+
+        if "OK" not in pub.schedule_config((2437, "0e000000")):
+            raise Exception("Failed to configure schedule (pub)")
+
+        # Step 1: Pre-bootstrap discovery
+        pssi = "aabbccdd"
+        sssi = "ddbbccaa"
+
+        pid = pub.publish("test_service", ssi=pssi, unsolicited=0, pbm=0x1)
+        sid = sub.subscribe("test_service", ssi=sssi)
+
+        logger.info(f"Publish ID: {pid}, Subscribe ID: {sid}")
+
+        ev = sub.wpas.wait_event(["NAN-DISCOVERY-RESULT"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-DISCOVERY-RESULT event not seen")
+
+        nan_sync_verify_event(ev, paddr, pid, sid, pssi)
+
+        ev = pub.wpas.wait_event(["NAN-REPLIED"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-REPLIED event not seen")
+
+        nan_sync_verify_event(ev, saddr, pid, sid, sssi)
+
+        # Step 2: Bootstrapping
+        sub.bootstrap(paddr, sid, pid, 0x1)
+
+        ev = sub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen on subscriber")
+
+        ev = pub.wpas.wait_event(["NAN-BOOTSTRAP-SUCCESS"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-BOOTSTRAP-SUCCESS event not seen on publisher")
+
+        # Step 3: Pairing
+        nd_pmk = run_nan_pairing(sub, pub, pid, sid, pairing_type, password)
+
+        logger.info(f"Pairing completed successfully: nd_pmk={nd_pmk}, now setting up NDP")
+
+        # Step 4: NDP setup with security using the derived nd_pmk
+        ndp_id, init_ndi = _nan_ndp_request_and_accept(pub, sub, pid, sid, paddr, saddr,
+                                                       req_ssi="aabbcc", resp_ssi="ddeeff", csid=csid,
+                                                       pmk=nd_pmk, configure_schedule=True)
+
+        logger.info("NDP connection established successfully after pairing")
+
+        _nan_test_connectivity(pub, sub)
+        _nan_ndp_terminate(pub, sub, paddr, init_ndi, ndp_id)
+
+        # Once the NDP is removed, verify that service discovery still works
+        pub.wpas.dump_monitor()
+        sub.wpas.dump_monitor()
+        time.sleep(1)
+
+        ev = sub.wpas.wait_event(["NAN-DISCOVERY-RESULT"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-DISCOVERY-RESULT event not seen")
+
+        nan_sync_verify_event(ev, paddr, pid, sid, pssi)
+
+        ev = pub.wpas.wait_event(["NAN-REPLIED"], timeout=2)
+        if ev is None:
+            raise Exception("NAN-REPLIED event not seen")
+
+        nan_sync_verify_event(ev, saddr, pid, sid, sssi)
+
+def test_nan_pairing_bootstrap_ndp_sk_ccmp128(dev, apdev, params):
+    """NAN Pairing bootstrap followed by NDP with SK CCMP-128 security"""
+    set_country("US")
+    try:
+        _run_nan_pairing_bootstrap_ndp("SAE", password="password123", csid=1)
+    finally:
+        set_country("00")
-- 
2.53.0




More information about the Hostap mailing list