[PATCH v9 7/7] tests: Add AFC hwsim tests
Allen Ye
allen.ye at mediatek.com
Mon Dec 1 03:10:42 PST 2025
Add hwsim test with a mock AFCD and AFC response to verify the
Automated Frequency Coordination (AFC) feature.
Signed-off-by: Allen Ye <allen.ye at mediatek.com>
CR-Id: WCNCR00259302
---
tests/hwsim/test_afc.py | 276 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 276 insertions(+)
create mode 100644 tests/hwsim/test_afc.py
diff --git a/tests/hwsim/test_afc.py b/tests/hwsim/test_afc.py
new file mode 100644
index 000000000..a64f6d164
--- /dev/null
+++ b/tests/hwsim/test_afc.py
@@ -0,0 +1,276 @@
+# AFC tests
+# Copyright (c) 2025, Mediatek Corporation
+#
+# This software may be distributed under the terms of the BSD license.
+# See README for more details.
+
+import logging
+logger = logging.getLogger()
+import os
+import threading
+from datetime import datetime, timedelta
+import socket
+import json
+
+import hwsim_utils
+import hostapd
+from utils import *
+
+class AFCDServer:
+ def __init__(self):
+ self.path = '/tmp/afcd.sock'
+ if os.path.exists(self.path):
+ os.remove(self.path)
+ self.s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.s.bind(self.path)
+ self.s.listen(1)
+
+ def listen_reply(self, freq_info, chan_info):
+ if freq_info is None:
+ freq_info = []
+ if chan_info is None:
+ chan_info = []
+
+ try:
+ self.s.settimeout(1)
+ self.conn, _ = self.s.accept()
+ except Exception as e:
+ print("hostapd failed to connect to afcd")
+ self.close()
+ if os.path.exists(self.path):
+ os.unlink(self.path)
+ return
+
+ try:
+ req_raw = self.conn.recv(8192).decode()
+ except socket.timeout:
+ print("AFCD failed to get request")
+ req_raw = None
+ if req_raw:
+ print(f"Received: {req_raw}")
+ if 'availableSpectrumInquiryRequests' in req_raw:
+ try:
+ req_json = json.loads(req_raw)
+ requestId = req_json["availableSpectrumInquiryRequests"][0]["requestId"]
+ except Exception as e:
+ print(f"Failed to parse requestId: {e}")
+ self.close()
+
+ expire_time = (datetime.now() + timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%SZ")
+
+ resp = {
+ "availableSpectrumInquiryResponses": [
+ {
+ "response": {
+ "responseCode": 0,
+ "shortDescription": "SUCCESS"
+ },
+ "availableFrequencyInfo": freq_info,
+ "availableChannelInfo": chan_info,
+ "requestId": requestId,
+ "availabilityExpireTime": expire_time,
+ "rulesetId": "US_47_CFR_PART_15_SUBPART_E"
+ }
+ ],
+ "version": "1.4"
+ }
+ reply = json.dumps(resp)
+ self.conn.sendall(reply.encode())
+ print(f"response Sent: {reply}")
+
+ def __del__(self):
+ self.close()
+
+ def close(self):
+ self.s.close()
+
+def run_afcd_server(freq_info=None, chan_info=None):
+ server = AFCDServer()
+ server.listen_reply(freq_info, chan_info)
+ server.close()
+
+def init_afc_dut_params():
+ params = {"country_code": "CA",
+ "hw_mode": "a",
+ "ssid": "AFC",
+ "ieee80211ax": "1",
+ "wpa": "2",
+ "rsn_pairwise": "CCMP",
+ "wpa_key_mgmt": "SAE",
+ "sae_pwe": "1",
+ "sae_password": "password",
+ "ieee80211w": "2",
+ "afcd_sock": "/tmp/afcd.sock",
+ "afc_request_version": "1.4",
+ "afc_serial_number": "SN000",
+ "afc_cert_ids": "US_47_CFR_PART_15_SUBPART_E:CID000",
+ "afc_location_type": "0",
+ "afc_linear_polygon": "-121.98586998164663:37.38193354300452",
+ "afc_radial_polygon": "118.8:92.76,76.44:87.456,98.56:123.33",
+ "afc_major_axis": "150",
+ "afc_minor_axis": "150",
+ "afc_orientation": "0",
+ "afc_height": "15",
+ "afc_height_type": "AGL",
+ "afc_vertical_tolerance": "2",
+ "afc_freq_range": "5925:6425,6525:6875",
+ "afc_op_class": "131,132,133,134,136,137"
+ }
+ return params
+
+def test_afc_empty(dev, apdev):
+ """AFC server with no usable channel"""
+ try:
+ t = threading.Thread(target=run_afcd_server)
+ t.start()
+
+ hapd = None
+ params = init_afc_dut_params()
+ params["he_6ghz_reg_pwr_type"] = "1"
+ params["channel"] = "5"
+ params["op_class"] = "131"
+ hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
+ if hapd.get_status_field("state") != "COUNTRY_UPDATE":
+ raise Exception("Unexpected interface state - expected COUNTRY_UPDATE")
+ except Exception as e:
+ if isinstance(e, Exception) and \
+ str(e) == "Failed to set hostapd parameter afcd_sock":
+ raise HwsimSkip("AFC is not supported")
+ if not he_6ghz_supported():
+ raise HwsimSkip("6 GHz frequency is not supported")
+ raise
+ finally:
+ t.join()
+ clear_regdom(hapd, dev)
+
+def _test_afc(dev, apdev, freq_info, chan_info, channel, op_class, ccfs1):
+ check_sae_capab(dev[0])
+ dev[0].cmd_execute(['iw', 'reg', 'set', 'CA'])
+ wait_regdom_changes(dev[0])
+
+ try:
+ t = threading.Thread(target=run_afcd_server, args=(freq_info, chan_info))
+ t.start()
+
+ hapd = None
+ params = init_afc_dut_params()
+ params["he_6ghz_reg_pwr_type"] = "1"
+ params["channel"] = str(channel)
+ params["op_class"] = str(op_class)
+ params["he_oper_centr_freq_seg0_idx"] = str(ccfs1)
+ hapd = hostapd.add_ap(apdev[0], params)
+
+ status = hapd.get_status()
+ logger.info("hostapd STATUS: " + str(status))
+ if hapd.get_status_field("ieee80211ax") != "1":
+ raise Exception("STATUS did not indicate ieee80211ax=1")
+
+ freq = hapd.get_status_field("freq")
+ if int(freq) < 5955:
+ raise Exception("Unexpected frequency: " + freq)
+
+ dev[0].set("sae_pwe", "1")
+ dev[0].set("sae_groups", "")
+
+ dev[0].connect("AFC", key_mgmt="SAE", sae_password="password", ieee80211w="2",
+ scan_freq=freq)
+ hwsim_utils.test_connectivity(dev[0], hapd)
+
+ sta = hapd.get_sta(dev[0].own_addr())
+ if 'supp_op_classes' not in sta:
+ raise Exception("supp_op_classes not indicated")
+ supp_op_classes = binascii.unhexlify(sta['supp_op_classes'])
+ if op_class not in supp_op_classes:
+ raise Exception("STA did not indicate support for opclass %d" % op_class)
+
+ dev[0].request("DISCONNECT")
+ dev[0].wait_disconnected()
+ hapd.wait_sta_disconnect()
+ hapd.disable()
+
+ except Exception as e:
+ if isinstance(e, Exception) and \
+ str(e) == "Failed to set hostapd parameter afcd_sock":
+ raise HwsimSkip("AFC is not supported")
+ if not he_6ghz_supported():
+ raise HwsimSkip("6 GHz frequency is not supported")
+ raise
+
+ finally:
+ t.join()
+ dev[0].set("sae_pwe", "0")
+ dev[0].cmd_execute(['iw', 'reg', 'set', '00'])
+ wait_regdom_changes(dev[0])
+ clear_regdom(hapd, dev)
+
+def afc_gen_freq_info(freq, max_psd):
+ high = freq + 10
+ low = freq - 10
+ ranges = {
+ "frequencyRange": {
+ "highFrequency": high,
+ "lowFrequency": low
+ },
+ "maxPsd": max_psd
+ }
+ return ranges
+
+# Make current channel unusable to force using acs
+def test_afc_freq_20mhz(dev, apdev):
+ """AFC availableFrequencyInfo with 20 MHz channel width usable"""
+ freq_info = [afc_gen_freq_info(6115, 10.0)]
+ _test_afc(dev, apdev, freq_info, None, 5, 131, 5)
+
+def test_afc_freq_40mhz(dev, apdev):
+ """AFC availableFrequencyInfo with 40 MHz channel width usable"""
+ freq_info = [afc_gen_freq_info(6115, 10.0), afc_gen_freq_info(6135, 10.0)]
+ _test_afc(dev, apdev, freq_info, None, 5, 132, 3)
+
+def test_afc_freq_80mhz(dev, apdev):
+ """AFC availableFrequencyInfo with 80 MHz channel width usable"""
+ freq_info = [afc_gen_freq_info(6115, 10.0), afc_gen_freq_info(6135, 10.0),
+ afc_gen_freq_info(6155, 10.0), afc_gen_freq_info(6175, 10.0)]
+ _test_afc(dev, apdev, freq_info, None, 5, 133, 7)
+
+def test_afc_freq_160mhz(dev, apdev):
+ """AFC availableFrequencyInfo with 160 MHz channel width usable"""
+ freq_info = [afc_gen_freq_info(6115, 10.0), afc_gen_freq_info(6135, 10.0),
+ afc_gen_freq_info(6155, 10.0), afc_gen_freq_info(6175, 10.0),
+ afc_gen_freq_info(6195, 10.0), afc_gen_freq_info(6215, 10.0),
+ afc_gen_freq_info(6235, 10.0), afc_gen_freq_info(6255, 10.0)]
+ _test_afc(dev, apdev, freq_info, None, 5, 134, 15)
+
+
+def afc_gen_chan_info(chan, op_class, eirp):
+ chan_info = {
+ "channelCfi": chan,
+ "globalOperatingClass": op_class,
+ "maxEirp": [eirp] * len(chan)
+ }
+ return chan_info
+
+def test_afc_chan_20mhz(dev, apdev):
+ """AFC availableChannelInfo with 20 MHz channel width usable"""
+ chan_info = [afc_gen_chan_info([33], 131, 23.0)]
+ _test_afc(dev, apdev, None, chan_info, 5, 131, 5)
+
+def test_afc_chan_40mhz(dev, apdev):
+ """AFC availableChannelInfo with 40 MHz channel width usable"""
+ chan_info = [afc_gen_chan_info([33, 37], 131, 23.0),
+ afc_gen_chan_info([35], 132, 26.0)]
+ _test_afc(dev, apdev, None, chan_info, 5, 132, 3)
+
+def test_afc_chan_80mhz(dev, apdev):
+ """AFC availableChannelInfo with 80 MHz channel width usable"""
+ chan_info = [afc_gen_chan_info([33, 37, 41, 45], 131, 23.0),
+ afc_gen_chan_info([35, 43], 132, 26.0),
+ afc_gen_chan_info([39], 133, 29.0)]
+ _test_afc(dev, apdev, None, chan_info, 5, 133, 7)
+
+def test_afc_chan_160mhz(dev, apdev):
+ """AFC availableChannelInfo with 160 MHz channel width usable"""
+ chan_info = [afc_gen_chan_info([33, 37, 41, 45, 49, 53, 57, 61], 131, 23.0),
+ afc_gen_chan_info([35, 43, 51, 59], 132, 26.0),
+ afc_gen_chan_info([39, 55], 133, 29.0),
+ afc_gen_chan_info([47], 134, 32.0)]
+ _test_afc(dev, apdev, None, chan_info, 5, 134, 15)
--
2.45.2
More information about the Hostap
mailing list