[RFC 87/97] tests: Add NAN NDP establishment with GTK, IGTK and BIGTK

Andrei Otcheretianski andrei.otcheretianski at intel.com
Tue Apr 28 13:06:28 PDT 2026


From: Ilan Peer <ilan.peer at intel.com>

Add NAN NDP tests with GTK, IGTK and BIGTK:

- Pairwise CCMP with CCMP GTK and BIP-CMAC-128 as management group
  cipher.
- Pairwise GCMP-256 with GCMP-256 GTK and BIP-GMAC-256 as management
  group cipher.

Have these tests (and others) include the pairwise csid and GTK csid
in the publish service configuration.

Signed-off-by: Ilan Peer <ilan.peer at intel.com>
---
 tests/hwsim/test_nan.py | 57 +++++++++++++++++++++++++++++++----------
 1 file changed, 43 insertions(+), 14 deletions(-)

diff --git a/tests/hwsim/test_nan.py b/tests/hwsim/test_nan.py
index 40874d4d74..93383f382c 100644
--- a/tests/hwsim/test_nan.py
+++ b/tests/hwsim/test_nan.py
@@ -43,12 +43,13 @@ def check_nan_capab(dev):
         raise HwsimSkip(f"NAN not supported: {capa}")
 
 class NanDevice:
-    def __init__(self, dev, ifname, ndi_name=None, nmi_addr=None):
+    def __init__(self, dev, ifname, ndi_name=None, nmi_addr=None, mgmt_group_cipher=None):
         self.dev = dev
         self.ifname = ifname
         self.wpas = None
         self.ndi_name = ndi_name
         self.nmi_addr = nmi_addr
+        self.mgmt_group_cipher = mgmt_group_cipher
 
     def __enter__(self):
         self.start()
@@ -65,6 +66,8 @@ class NanDevice:
         self.wpas = WpaSupplicant(ifname=self.ifname)
         self.set("master_pref", "10")
         self.set("dual_band", "0")
+        if self.mgmt_group_cipher is not None:
+            self.set("mgmt_group_cipher", self.mgmt_group_cipher)
 
         if "OK" not in self.wpas.request("NAN_START"):
             raise Exception(f"Failed to start NAN functionality on {self.ifname}")
@@ -140,7 +143,7 @@ class NanDevice:
 
     def ndp_request(self, ndi, handle, peer_nmi, peer_id, ssi=None,
                     qos_slots=0, qos_latency=0xffff, csid=None, password=None,
-                    pmk=None, interface_id=None):
+                    pmk=None, interface_id=None, gtk_csid=None):
         cmd = f"NAN_NDP_REQUEST handle={handle} ndi={ndi} peer_nmi={peer_nmi} peer_id={peer_id}"
 
         params = [
@@ -149,6 +152,7 @@ class NanDevice:
             ("password", password),
             ("pmk", pmk),
             ("interface_id", interface_id),
+            ("gtk_csid", gtk_csid),
         ]
 
         cmd += "".join(f" {name}={value}" for name, value in params if value is not None)
@@ -161,7 +165,8 @@ class NanDevice:
     def ndp_response(self, action, peer_nmi, ndi=None, peer_ndi=None,
                      ndp_id=None, init_ndi=None, reason_code=None, ssi=None,
                      qos_slots=0, qos_latency=0xffff, handle=None, csid=None,
-                     password=None, pmk=None, interface_id=None):
+                     password=None, pmk=None, interface_id=None,
+                     gtk_csid=None):
         if action not in ["accept", "reject"]:
             raise Exception(f"Invalid action: {action}. Must be 'accept' or 'reject'")
 
@@ -179,6 +184,7 @@ class NanDevice:
             ("password", password),
             ("pmk", pmk),
             ("interface_id", interface_id),
+            ("gtk_csid", gtk_csid),
         ]
 
         cmd += "".join(f" {name}={value}" for name, value in params if value is not None)
@@ -1071,11 +1077,18 @@ def test_nan_sched(dev, apdev, params):
     finally:
         set_country("00")
 
-def _nan_discover_service(pub, sub, service_name, pssi, sssi, ttl=None):
+def _nan_discover_service(pub, sub, service_name, pssi, sssi, ttl=None,
+                          csid=None, gtk_csid=None):
     paddr = pub.wpas.own_addr()
     saddr = sub.wpas.own_addr()
 
-    pid = pub.publish(service_name, ssi=pssi, ttl=ttl)
+    cipher_suites = None
+    if csid is not None:
+        cipher_suites = f"{csid}"
+        if gtk_csid is not None:
+            cipher_suites += f",{gtk_csid}"
+
+    pid = pub.publish(service_name, ssi=pssi, ttl=ttl, cipher_suites=cipher_suites)
     sid = sub.subscribe(service_name, ssi=sssi, active=0)
 
     logger.info(f"Publish ID: {pid}, Subscribe ID: {sid}")
@@ -1090,7 +1103,8 @@ def _nan_discover_service(pub, sub, service_name, pssi, sssi, ttl=None):
 
 def _nan_ndp_request_and_accept(pub, sub, pid, sid, paddr, saddr, req_ssi, resp_ssi, csid=None,
                                 password=None, pmk=None, counter=False, wrong_pwd=False,
-                                configure_schedule=True, pub_interface_id=None, sub_interface_id=None):
+                                configure_schedule=True, pub_interface_id=None, sub_interface_id=None,
+                                gtk_csid=None):
     """
     Request NDP from subscriber and accept on publisher.
 
@@ -1104,7 +1118,7 @@ def _nan_ndp_request_and_accept(pub, sub, pid, sid, paddr, saddr, req_ssi, resp_
     # NDP request
     if "OK" not in sub.ndp_request(sub.ndi_name, sid, paddr, pid, req_ssi,
                                    csid=csid, password=password, pmk=pmk,
-                                   interface_id=sub_interface_id):
+                                   interface_id=sub_interface_id, gtk_csid=gtk_csid):
         raise Exception("NDP request failed")
 
     ev = pub.wpas.wait_event(["NAN-NDP-REQUEST"], timeout=5)
@@ -1131,7 +1145,8 @@ def _nan_ndp_request_and_accept(pub, sub, pid, sid, paddr, saddr, req_ssi, resp_
     accept_pwd = "WRONG_PWD" if wrong_pwd else password
     if "OK" not in pub.ndp_response("accept", saddr, ndi=pub.ndi_name, ndp_id=ndp_id, init_ndi=init_ndi,
                                     handle=pid, ssi=resp_ssi, csid=csid, password=accept_pwd, pmk=pmk,
-                                    interface_id=pub_interface_id):
+                                    interface_id=pub_interface_id,
+                                    gtk_csid=gtk_csid):
         raise Exception("NDP response (accept) failed")
 
     # Verify disconnection on wrong password
@@ -1211,7 +1226,8 @@ def _nan_test_connectivity(pub, sub):
                       max_tries=3, timeout=5, broadcast=True)
 
 def _run_nan_dp(counter=False, csid=None, wrong_pwd=False, use_pmk=False,
-                use_interface_id=False, verify_max_idle_period=False):
+                use_interface_id=False, verify_max_idle_period=False, gtk_csid=None,
+                mgmt_group_cipher=None):
     if use_pmk:
         pmk = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
         pwd = None
@@ -1224,12 +1240,14 @@ def _run_nan_dp(counter=False, csid=None, wrong_pwd=False, use_pmk=False,
     )
 
     with hwsim_nan_radios() as (wpas1, wpas2), \
-        NanDevice(wpas1, "nan0", "ndi0") as pub, NanDevice(wpas2, "nan1", "ndi1") as sub:
+        NanDevice(wpas1, "nan0", "ndi0", mgmt_group_cipher=mgmt_group_cipher) as pub, \
+        NanDevice(wpas2, "nan1", "ndi1", mgmt_group_cipher=mgmt_group_cipher) as sub:
 
         pssi = "aabbccdd001122334455667788"
         sssi = "ddbbccaa001122334455667788"
 
-        pid, sid, paddr, saddr= _nan_discover_service(pub, sub, "test_service", pssi, sssi)
+        pid, sid, paddr, saddr= _nan_discover_service(pub, sub, "test_service", pssi, sssi,
+                                                      csid=csid, gtk_csid=gtk_csid)
 
         # Log peer info (specific to this test)
         peer_schedule = pub.wpas.request("NAN_PEER_INFO " + saddr + " schedule")
@@ -1247,7 +1265,8 @@ def _run_nan_dp(counter=False, csid=None, wrong_pwd=False, use_pmk=False,
                                              resp_ssi="ddeeff", csid=csid, password=pwd, pmk=pmk,
                                              counter=counter, wrong_pwd=wrong_pwd,
                                              pub_interface_id=pub_interface_id,
-                                             sub_interface_id=sub_interface_id)
+                                             sub_interface_id=sub_interface_id,
+                                             gtk_csid=gtk_csid)
         if result is None:
             # wrong_pwd test completed
             return
@@ -1288,12 +1307,14 @@ def _run_nan_dp(counter=False, csid=None, wrong_pwd=False, use_pmk=False,
                 raise Exception(f"NAN-NDP-DISCONNECTED event not seen on subscriber or invalid data")
 
 def run_nan_dp(country="US", counter=False, csid=None, wrong_pwd=False, use_pmk=False,
-               use_interface_id=False, verify_max_idle_period=False):
+               use_interface_id=False, verify_max_idle_period=False, gtk_csid=None,
+               mgmt_group_cipher=None):
     set_country(country)
     try:
         _run_nan_dp(counter=counter, csid=csid, wrong_pwd=wrong_pwd, use_pmk=use_pmk,
                     use_interface_id=use_interface_id,
-                    verify_max_idle_period=verify_max_idle_period)
+                    verify_max_idle_period=verify_max_idle_period, gtk_csid=gtk_csid,
+                    mgmt_group_cipher=mgmt_group_cipher)
     finally:
         set_country("00")
 
@@ -1867,3 +1888,11 @@ def test_nan_ndp_reconnect_after_terminate(dev, apdev, params):
 def test_nan_dp_max_idle_period(dev, apdev, params):
     """NAN DP open with max idle period verification"""
     run_nan_dp(use_interface_id=True, verify_max_idle_period=True)
+
+def test_nan_dp_sk_ccmp128_with_gtk(dev, apdev, params):
+    """NAN DP - 2way NDL + SK CCMP security with GTK"""
+    run_nan_dp(csid=1, gtk_csid=5, mgmt_group_cipher="BIP-CMAC-128")
+
+def test_nan_dp_sk_gcmp256_with_gtk(dev, apdev, params):
+    """NAN DP - 2way NDL + SK GCMP-256 security with GTK"""
+    run_nan_dp(csid=2, gtk_csid=6, mgmt_group_cipher="BIP-GMAC-256")
-- 
2.53.0




More information about the Hostap mailing list