[PATCH 8/9] tests: Add support for wlantest for remote hwsim tests
Jonathan Afek
jonathan at wizery.com
Thu May 19 06:06:49 PDT 2016
Use a monitor interface given in the commandline that is
not also a station or a softap as a monitor running wlantest
on the channel used by the test.
This makes all the tests that use wlantest available for execution
on real hardware on remote hosts.
Signed-off-by: Jonathan Afek <jonathanx.afek at intel.com>
---
tests/hwsim/run-tests.py | 3 +
tests/hwsim/test_ap_ciphers.py | 10 +--
tests/hwsim/test_ap_hs20.py | 3 +-
tests/hwsim/test_ap_pmf.py | 70 ++++++++++++---------
tests/hwsim/test_ap_qosmap.py | 1 +
tests/hwsim/test_ap_tdls.py | 49 +++++++--------
tests/hwsim/test_p2p_autogo.py | 3 +-
tests/hwsim/test_peerkey.py | 10 +--
tests/hwsim/test_wnm.py | 8 ++-
tests/hwsim/wlantest.py | 135 +++++++++++++++++++++++++++++++++--------
tests/remote/config.py | 2 +
tests/remote/hwsim_wrapper.py | 14 +++++
12 files changed, 217 insertions(+), 91 deletions(-)
diff --git a/tests/hwsim/run-tests.py b/tests/hwsim/run-tests.py
index fd81b02..0a5489f 100755
--- a/tests/hwsim/run-tests.py
+++ b/tests/hwsim/run-tests.py
@@ -503,6 +503,9 @@ def main():
del hapd
hapd = None
+ #use None here since this instance of Wlantest() will never be
+ #used for remote host hwsim tests on real hw.
+ Wlantest.setup(None)
wt = Wlantest()
rename_log(args.logdir, 'hwsim0.pcapng', name, wt)
rename_log(args.logdir, 'hwsim0', name, wt)
diff --git a/tests/hwsim/test_ap_ciphers.py b/tests/hwsim/test_ap_ciphers.py
index 6cde14a..bff8906 100644
--- a/tests/hwsim/test_ap_ciphers.py
+++ b/tests/hwsim/test_ap_ciphers.py
@@ -28,10 +28,6 @@ def check_cipher(dev, ap, cipher):
hwsim_utils.test_connectivity(dev, hapd)
def check_group_mgmt_cipher(dev, ap, cipher):
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
-
if cipher not in dev.get_capability("group_mgmt"):
raise HwsimSkip("Cipher %s not supported" % cipher)
params = { "ssid": "test-wpa2-psk-pmf",
@@ -42,6 +38,12 @@ def check_group_mgmt_cipher(dev, ap, cipher):
"rsn_pairwise": "CCMP",
"group_mgmt_cipher": cipher }
hapd = hostapd.add_ap(ap, params)
+
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
+
dev.connect("test-wpa2-psk-pmf", psk="12345678", ieee80211w="2",
key_mgmt="WPA-PSK-SHA256",
pairwise="CCMP", group="CCMP", scan_freq="2412")
diff --git a/tests/hwsim/test_ap_hs20.py b/tests/hwsim/test_ap_hs20.py
index a3b61b2..3a93544 100644
--- a/tests/hwsim/test_ap_hs20.py
+++ b/tests/hwsim/test_ap_hs20.py
@@ -298,7 +298,7 @@ def _test_ap_interworking_scan_filtering(dev, apdev):
ssid = "test-hs20-ap1"
params['ssid'] = ssid
params['hessid'] = bssid
- hostapd.add_ap(apdev[0], params)
+ hapd0 = hostapd.add_ap(apdev[0], params)
bssid2 = apdev[1]['bssid']
params = hs20_ap_params()
@@ -312,6 +312,7 @@ def _test_ap_interworking_scan_filtering(dev, apdev):
dev[0].hs20_enable()
+ Wlantest.setup(hapd0)
wt = Wlantest()
wt.flush()
diff --git a/tests/hwsim/test_ap_pmf.py b/tests/hwsim/test_ap_pmf.py
index f7c5f29..c9d7403 100644
--- a/tests/hwsim/test_ap_pmf.py
+++ b/tests/hwsim/test_ap_pmf.py
@@ -17,13 +17,14 @@ from wpasupplicant import WpaSupplicant
def test_ap_pmf_required(dev, apdev):
"""WPA2-PSK AP with PMF required"""
ssid = "test-pmf-required"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
params["ieee80211w"] = "2";
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
key_mgmt = hapd.get_config()['key_mgmt']
if key_mgmt.split(' ')[0] != "WPA-PSK-SHA256":
raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
@@ -53,13 +54,14 @@ def test_ap_pmf_required(dev, apdev):
def test_ap_pmf_optional(dev, apdev):
"""WPA2-PSK AP with PMF optional"""
ssid = "test-pmf-optional"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "1";
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
@@ -75,13 +77,14 @@ def test_ap_pmf_optional(dev, apdev):
def test_ap_pmf_optional_2akm(dev, apdev):
"""WPA2-PSK AP with PMF optional (2 AKMs)"""
ssid = "test-pmf-optional-2akm"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK WPA-PSK-SHA256";
params["ieee80211w"] = "1";
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
@@ -101,11 +104,12 @@ def test_ap_pmf_optional_2akm(dev, apdev):
def test_ap_pmf_negative(dev, apdev):
"""WPA2-PSK AP without PMF (negative test)"""
ssid = "test-pmf-negative"
+ params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
+ hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
- params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
- hapd = hostapd.add_ap(apdev[0], params)
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
@@ -123,13 +127,14 @@ def test_ap_pmf_negative(dev, apdev):
def test_ap_pmf_assoc_comeback(dev, apdev):
"""WPA2-PSK AP with PMF association comeback"""
ssid = "assoc-comeback"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
params["ieee80211w"] = "2";
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
@@ -146,13 +151,14 @@ def test_ap_pmf_assoc_comeback(dev, apdev):
def test_ap_pmf_assoc_comeback2(dev, apdev):
"""WPA2-PSK AP with PMF association comeback (using DROP_SA)"""
ssid = "assoc-comeback"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "1";
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
dev[0].connect(ssid, psk="12345678", ieee80211w="2",
key_mgmt="WPA-PSK", proto="WPA2", scan_freq="2412")
if "OK" not in dev[0].request("DROP_SA"):
@@ -167,9 +173,6 @@ def test_ap_pmf_sta_sa_query(dev, apdev):
"""WPA2-PSK AP with station using SA Query"""
ssid = "assoc-comeback"
addr = dev[0].own_addr()
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("wlan5", drv_params="use_monitor=1")
@@ -187,6 +190,11 @@ def test_ap_pmf_sta_sa_query(dev, apdev):
bssid = wpas.own_addr()
wpas.dump_monitor()
+ Wlantest.setup(wpas)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
+
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
@@ -260,9 +268,6 @@ def test_ap_pmf_sta_unprot_deauth_burst(dev, apdev):
"""WPA2-PSK AP with station receiving burst of unprotected Deauthentication frames"""
ssid = "deauth-attack"
addr = dev[0].own_addr()
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("wlan5", drv_params="use_monitor=1")
@@ -279,6 +284,11 @@ def test_ap_pmf_sta_unprot_deauth_burst(dev, apdev):
wpas.connect_network(id)
bssid = wpas.own_addr()
+ Wlantest.setup(wpas)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
+
dev[0].connect(ssid, psk="12345678", ieee80211w="1",
key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
scan_freq="2412")
@@ -348,13 +358,14 @@ def test_ap_pmf_optional_eap(dev, apdev):
def test_ap_pmf_required_sha1(dev, apdev):
"""WPA2-PSK AP with PMF required with SHA1 AKM"""
ssid = "test-pmf-required-sha1"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "2";
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
key_mgmt = hapd.get_config()['key_mgmt']
if key_mgmt.split(' ')[0] != "WPA-PSK":
raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
@@ -373,15 +384,16 @@ def test_ap_pmf_toggle(dev, apdev):
def _test_ap_pmf_toggle(dev, apdev):
ssid = "test-pmf-optional"
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
params["wpa_key_mgmt"] = "WPA-PSK";
params["ieee80211w"] = "1";
params["assoc_sa_query_max_timeout"] = "1"
params["assoc_sa_query_retry_timeout"] = "1"
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
bssid = apdev[0]['bssid']
addr = dev[0].own_addr()
dev[0].request("SET reassoc_same_bss_optim 1")
diff --git a/tests/hwsim/test_ap_qosmap.py b/tests/hwsim/test_ap_qosmap.py
index 0dd0cba..6084521 100644
--- a/tests/hwsim/test_ap_qosmap.py
+++ b/tests/hwsim/test_ap_qosmap.py
@@ -18,6 +18,7 @@ def check_qos_map(ap, hapd, dev, sta, dscp, tid, ap_tid=None):
if not ap_tid:
ap_tid = tid
bssid = ap['bssid']
+ Wlantest.setup(hapd)
wt = Wlantest()
wt.clear_sta_counters(bssid, sta)
hwsim_utils.test_connectivity(dev, hapd, dscp=dscp, config=False)
diff --git a/tests/hwsim/test_ap_tdls.py b/tests/hwsim/test_ap_tdls.py
index 297a928..138f21c 100644
--- a/tests/hwsim/test_ap_tdls.py
+++ b/tests/hwsim/test_ap_tdls.py
@@ -56,7 +56,8 @@ def connect_2sta_open(dev, hapd, scan_freq="2412"):
dev[1].connect("test-open", key_mgmt="NONE", scan_freq=scan_freq)
connectivity(dev, hapd)
-def wlantest_setup():
+def wlantest_setup(hapd):
+ Wlantest.setup(hapd)
wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
@@ -165,7 +166,7 @@ def check_tdls_link(sta0, sta1, connected=True):
def test_ap_tdls_discovery(dev, apdev):
"""WPA2-PSK AP and two stations using TDLS discovery"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[0].request("TDLS_DISCOVER " + dev[1].p2p_interface_addr())
time.sleep(0.2)
@@ -173,7 +174,7 @@ def test_ap_tdls_discovery(dev, apdev):
def test_ap_wpa2_tdls(dev, apdev):
"""WPA2-PSK AP and two stations using TDLS"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
@@ -183,7 +184,7 @@ def test_ap_wpa2_tdls(dev, apdev):
def test_ap_wpa2_tdls_concurrent_init(dev, apdev):
"""Concurrent TDLS setup initiation"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[0].request("SET tdls_testing 0x80")
setup_tdls(dev[1], dev[0], hapd, reverse=True)
@@ -191,7 +192,7 @@ def test_ap_wpa2_tdls_concurrent_init(dev, apdev):
def test_ap_wpa2_tdls_concurrent_init2(dev, apdev):
"""Concurrent TDLS setup initiation (reverse)"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x80")
setup_tdls(dev[0], dev[1], hapd)
@@ -199,7 +200,7 @@ def test_ap_wpa2_tdls_concurrent_init2(dev, apdev):
def test_ap_wpa2_tdls_decline_resp(dev, apdev):
"""Decline TDLS Setup Response"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x200")
setup_tdls(dev[1], dev[0], hapd, expect_fail=True)
@@ -207,7 +208,7 @@ def test_ap_wpa2_tdls_decline_resp(dev, apdev):
def test_ap_wpa2_tdls_long_lifetime(dev, apdev):
"""TDLS with long TPK lifetime"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x40")
setup_tdls(dev[1], dev[0], hapd)
@@ -215,7 +216,7 @@ def test_ap_wpa2_tdls_long_lifetime(dev, apdev):
def test_ap_wpa2_tdls_long_frame(dev, apdev):
"""TDLS with long setup/teardown frames"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[0].request("SET tdls_testing 0x1")
dev[1].request("SET tdls_testing 0x1")
@@ -226,7 +227,7 @@ def test_ap_wpa2_tdls_long_frame(dev, apdev):
def test_ap_wpa2_tdls_reneg(dev, apdev):
"""Renegotiate TDLS link"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
setup_tdls(dev[1], dev[0], hapd)
setup_tdls(dev[0], dev[1], hapd)
@@ -234,7 +235,7 @@ def test_ap_wpa2_tdls_reneg(dev, apdev):
def test_ap_wpa2_tdls_wrong_lifetime_resp(dev, apdev):
"""Incorrect TPK lifetime in TDLS Setup Response"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x10")
setup_tdls(dev[0], dev[1], hapd, expect_fail=True)
@@ -242,7 +243,7 @@ def test_ap_wpa2_tdls_wrong_lifetime_resp(dev, apdev):
def test_ap_wpa2_tdls_diff_rsnie(dev, apdev):
"""TDLS with different RSN IEs"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x2")
setup_tdls(dev[1], dev[0], hapd)
@@ -251,7 +252,7 @@ def test_ap_wpa2_tdls_diff_rsnie(dev, apdev):
def test_ap_wpa2_tdls_wrong_tpk_m2_mic(dev, apdev):
"""Incorrect MIC in TDLS Setup Response"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[0].request("SET tdls_testing 0x800")
addr0 = dev[0].p2p_interface_addr()
@@ -261,7 +262,7 @@ def test_ap_wpa2_tdls_wrong_tpk_m2_mic(dev, apdev):
def test_ap_wpa2_tdls_wrong_tpk_m3_mic(dev, apdev):
"""Incorrect MIC in TDLS Setup Confirm"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
dev[1].request("SET tdls_testing 0x800")
addr0 = dev[0].p2p_interface_addr()
@@ -274,7 +275,7 @@ def test_ap_wpa_tdls(dev, apdev):
hapd = hostapd.add_ap(apdev[0],
hostapd.wpa_params(ssid="test-wpa-psk",
passphrase="12345678"))
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa_psk(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
@@ -286,7 +287,7 @@ def test_ap_wpa_mixed_tdls(dev, apdev):
hapd = hostapd.add_ap(apdev[0],
hostapd.wpa_mixed_params(ssid="test-wpa-mixed-psk",
passphrase="12345678"))
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa_psk_mixed(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
@@ -296,7 +297,7 @@ def test_ap_wep_tdls(dev, apdev):
"""WEP AP and two stations using TDLS"""
hapd = hostapd.add_ap(apdev[0],
{ "ssid": "test-wep", "wep_key0": '"hello"' })
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wep(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
@@ -305,7 +306,7 @@ def test_ap_wep_tdls(dev, apdev):
def test_ap_open_tdls(dev, apdev):
"""Open AP and two stations using TDLS"""
hapd = hostapd.add_ap(apdev[0], { "ssid": "test-open" })
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_open(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
@@ -321,7 +322,7 @@ def test_ap_wpa2_tdls_bssid_mismatch(dev, apdev):
params['bridge'] = 'ap-br0'
hapd = hostapd.add_ap(apdev[0], params)
hostapd.add_ap(apdev[1], params)
- wlantest_setup()
+ wlantest_setup(hapd)
subprocess.call(['brctl', 'setfd', 'ap-br0', '0'])
subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
dev[0].connect(ssid, psk=passphrase, scan_freq="2412",
@@ -343,7 +344,7 @@ def test_ap_wpa2_tdls_bssid_mismatch(dev, apdev):
def test_ap_wpa2_tdls_responder_teardown(dev, apdev):
"""TDLS teardown from responder with WPA2-PSK AP"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd, responder=True)
@@ -362,7 +363,7 @@ def test_ap_open_tdls_vht(dev, apdev):
"vht_oper_centr_freq_seg0_idx": "0" }
try:
hapd = hostapd.add_ap(apdev[0], params)
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_open(dev, hapd, scan_freq="5180")
setup_tdls(dev[0], dev[1], hapd)
teardown_tdls(dev[0], dev[1], hapd)
@@ -392,7 +393,7 @@ def test_ap_open_tdls_vht80(dev, apdev):
try:
hapd = None
hapd = hostapd.add_ap(apdev[0], params)
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_open(dev, hapd, scan_freq="5180")
sig = dev[0].request("SIGNAL_POLL").splitlines()
if "WIDTH=80 MHz" not in sig:
@@ -436,7 +437,7 @@ def test_ap_open_tdls_vht80plus80(dev, apdev):
try:
hapd = None
hapd = hostapd.add_ap(apdev[0], params)
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_open(dev, hapd, scan_freq="5180")
sig = dev[0].request("SIGNAL_POLL").splitlines()
if "FREQUENCY=5180" not in sig:
@@ -492,7 +493,7 @@ def test_ap_open_tdls_vht160(dev, apdev):
if "5490" in r and "DFS" in r:
raise HwsimSkip("ZA regulatory rule did not have DFS requirement removed")
raise Exception("AP setup timed out")
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_open(dev, hapd, scan_freq="5520")
sig = dev[0].request("SIGNAL_POLL").splitlines()
if "WIDTH=160 MHz" not in sig:
@@ -539,7 +540,7 @@ def test_tdls_chan_switch(dev, apdev):
def test_ap_tdls_link_status(dev, apdev):
"""Check TDLS link status between two stations"""
hapd = start_ap_wpa2_psk(apdev[0])
- wlantest_setup()
+ wlantest_setup(hapd)
connect_2sta_wpa2_psk(dev, hapd)
check_tdls_link(dev[0], dev[1], connected=False)
setup_tdls(dev[0], dev[1], hapd)
diff --git a/tests/hwsim/test_p2p_autogo.py b/tests/hwsim/test_p2p_autogo.py
index 3875343..55c4c0f 100644
--- a/tests/hwsim/test_p2p_autogo.py
+++ b/tests/hwsim/test_p2p_autogo.py
@@ -231,7 +231,6 @@ def test_autogo_pbc(dev):
def test_autogo_tdls(dev):
"""P2P autonomous GO and two clients using TDLS"""
- wt = Wlantest()
go = dev[0]
logger.info("Start autonomous GO with fixed parameters " + go.ifname)
id = go.add_network()
@@ -241,6 +240,8 @@ def test_autogo_tdls(dev):
go.set_network(id, "disabled", "2")
res = go.p2p_start_go(persistent=id, freq="2462")
logger.debug("res: " + str(res))
+ Wlantest.setup(go, True)
+ wt = Wlantest()
wt.flush()
wt.add_passphrase("12345678")
connect_cli(go, dev[1], social=True, freq=2462)
diff --git a/tests/hwsim/test_peerkey.py b/tests/hwsim/test_peerkey.py
index b1e7b1f..30d9b60 100644
--- a/tests/hwsim/test_peerkey.py
+++ b/tests/hwsim/test_peerkey.py
@@ -50,15 +50,17 @@ def test_peerkey_unknown_peer(dev, apdev):
def test_peerkey_pairwise_mismatch(dev, apdev):
"""RSN TKIP+CCMP AP and PeerKey between two STAs using different ciphers"""
skip_with_fips(dev[0])
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
ssid = "test-peerkey"
passphrase = "12345678"
params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
params['peerkey'] = "1"
params['rsn_pairwise'] = "TKIP CCMP"
- hostapd.add_ap(apdev[0], params)
+ hapd = hostapd.add_ap(apdev[0], params)
+
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
dev[0].connect(ssid, psk=passphrase, scan_freq="2412", peerkey=True,
pairwise="CCMP")
diff --git a/tests/hwsim/test_wnm.py b/tests/hwsim/test_wnm.py
index 8048113..3788ed2 100644
--- a/tests/hwsim/test_wnm.py
+++ b/tests/hwsim/test_wnm.py
@@ -185,9 +185,6 @@ def test_wnm_sleep_mode_ap_oom(dev, apdev):
def test_wnm_sleep_mode_rsn_pmf(dev, apdev):
"""WNM Sleep Mode - RSN with PMF"""
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
params = hostapd.wpa2_params("test-wnm-rsn", "12345678")
params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
params["ieee80211w"] = "2";
@@ -197,6 +194,11 @@ def test_wnm_sleep_mode_rsn_pmf(dev, apdev):
params["bss_transition"] = "1"
hapd = hostapd.add_ap(apdev[0], params)
+ Wlantest.setup(hapd)
+ wt = Wlantest()
+ wt.flush()
+ wt.add_passphrase("12345678")
+
dev[0].connect("test-wnm-rsn", psk="12345678", ieee80211w="2",
key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
ev = hapd.wait_event([ "AP-STA-CONNECTED" ], timeout=5)
diff --git a/tests/hwsim/wlantest.py b/tests/hwsim/wlantest.py
index 5f6b4ac..ea450af 100644
--- a/tests/hwsim/wlantest.py
+++ b/tests/hwsim/wlantest.py
@@ -4,7 +4,9 @@
# This software may be distributed under the terms of the BSD license.
# See README for more details.
+import re
import os
+import posixpath
import time
import subprocess
import logging
@@ -13,44 +15,135 @@ import wpaspy
logger = logging.getLogger()
class Wlantest:
+
+ remote_host = None
+ setup_params = None
+ exe_thread = None
+ exe_res = []
+ monitor_mod = None
+ setup_done = False
+
+ @classmethod
+ def stop_remote_wlantest(cls):
+ if cls.exe_thread is None:
+ #non remote flow
+ return
+
+ cls.remote_host.execute(["killall", "-9", "wlantest"])
+ cls.remote_host.wait_execute_complete(cls.exe_thread, 5)
+ cls.exe_thread = None
+ cls.exe_res = []
+
+ @classmethod
+ def reset_remote_wlantest(cls):
+ cls.stop_remote_wlantest()
+ cls.remote_host = None
+ cls.setup_params = None
+ cls.exe_thread = None
+ cls.exe_res = []
+ cls.monitor_mod = None
+ cls.setup_done = False
+
+ @classmethod
+ def start_remote_wlantest(cls):
+ if cls.remote_host is None:
+ #non remote flow
+ return
+ if cls.exe_thread is not None:
+ raise Exception("can't start wlantest twice")
+
+ log_dir = cls.setup_params['log_dir']
+ ifaces = re.split('; | |, ', cls.remote_host.ifname)
+ ifname = ifaces[0]
+ exe = cls.setup_params["wlantest"]
+ tc_name = cls.setup_params["tc_name"]
+ base_log_name = tc_name + "_wlantest_" + \
+ cls.remote_host.name + "_" + ifname
+ log_file = posixpath.join(log_dir, base_log_name + ".log")
+ pcap_file = posixpath.join(log_dir, base_log_name + ".pcapng")
+ cmd = "{} -i {} -n {} -c -dtN -L {}".format(exe, ifname,
+ pcap_file, log_file)
+ cls.remote_host.add_log(log_file)
+ cls.remote_host.add_log(pcap_file)
+ cls.exe_thread = cls.remote_host.execute_run(cmd.split(), cls.exe_res)
+ #give wlantest a chance to start working
+ time.sleep(1)
+
+
+ @classmethod
+ def register_remote_wlantest(cls, host, setup_params, monitor_mod):
+ if cls.remote_host is not None:
+ raise Exception("can't register remote wlantest twice")
+ cls.remote_host = host
+ cls.setup_params = setup_params
+ cls.monitor_mod = monitor_mod
+ status, buf = host.execute(["which", setup_params['wlantest']])
+ if status != 0:
+ raise Exception(host.name + " - wlantest: " + buf)
+ status, buf = host.execute(["which", setup_params['wlantest_cli']])
+ if status != 0:
+ raise Exception(host.name + " - wlantest_cli: " + buf)
+
+ @classmethod
+ def chan_from_wpa(cls, wpa, is_p2p=False):
+ if cls.monitor_mod is None:
+ return
+ m = cls.monitor_mod
+ return m.setup(cls.remote_host, [m.get_monitor_params(wpa, is_p2p)])
+
+ @classmethod
+ def setup(cls, wpa, is_p2p=False):
+ cls.chan_from_wpa(wpa, is_p2p)
+ cls.start_remote_wlantest()
+ cls.setup_done = True
+
def __init__(self):
+ if not self.setup_done:
+ raise Exception("can't create Wlantest inst before setup()")
if os.path.isfile('../../wlantest/wlantest_cli'):
self.wlantest_cli = '../../wlantest/wlantest_cli'
else:
self.wlantest_cli = 'wlantest_cli'
+ def cli_cmd(self, params):
+ if self.remote_host is not None:
+ exe = self.setup_params["wlantest_cli"]
+ ret = self.remote_host.execute([exe] + params)
+ if 0 != ret[0]:
+ raise Exception("wlantest_cli failed")
+ return ret[1]
+ else:
+ return subprocess.check_output([self.wlantest_cli] + params)
+
def flush(self):
- res = subprocess.check_output([self.wlantest_cli, "flush"])
+ res = self.cli_cmd(["flush"])
if "FAIL" in res:
raise Exception("wlantest_cli flush failed")
def relog(self):
- res = subprocess.check_output([self.wlantest_cli, "relog"])
+ res = self.cli_cmd(["relog"])
if "FAIL" in res:
raise Exception("wlantest_cli relog failed")
def add_passphrase(self, passphrase):
- res = subprocess.check_output([self.wlantest_cli, "add_passphrase",
- passphrase])
+ res = self.cli_cmd(["add_passphrase", passphrase])
if "FAIL" in res:
raise Exception("wlantest_cli add_passphrase failed")
def add_wepkey(self, key):
- res = subprocess.check_output([self.wlantest_cli, "add_wepkey", key])
+ res = self.cli_cmd(["add_wepkey", key])
if "FAIL" in res:
raise Exception("wlantest_cli add_key failed")
def info_bss(self, field, bssid):
- res = subprocess.check_output([self.wlantest_cli, "info_bss",
- field, bssid])
+ res = self.cli_cmd(["info_bss", field, bssid])
if "FAIL" in res:
raise Exception("Could not get BSS info from wlantest for " + bssid)
return res
def get_bss_counter(self, field, bssid):
try:
- res = subprocess.check_output([self.wlantest_cli, "get_bss_counter",
- field, bssid]);
+ res = self.cli_cmd(["get_bss_counter", field, bssid])
except Exception, e:
return 0
if "FAIL" in res:
@@ -58,36 +151,30 @@ class Wlantest:
return int(res)
def clear_bss_counters(self, bssid):
- subprocess.call([self.wlantest_cli, "clear_bss_counters", bssid],
- stdout=open('/dev/null', 'w'));
+ self.cli_cmd(["clear_bss_counters", bssid])
def info_sta(self, field, bssid, addr):
- res = subprocess.check_output([self.wlantest_cli, "info_sta",
- field, bssid, addr])
+ res = self.cli_cmd(["info_sta", field, bssid, addr])
if "FAIL" in res:
raise Exception("Could not get STA info from wlantest for " + addr)
return res
def get_sta_counter(self, field, bssid, addr):
- res = subprocess.check_output([self.wlantest_cli, "get_sta_counter",
- field, bssid, addr]);
+ res = self.cli_cmd(["get_sta_counter", field, bssid, addr])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
def clear_sta_counters(self, bssid, addr):
- res = subprocess.check_output([self.wlantest_cli, "clear_sta_counters",
- bssid, addr]);
+ res = self.cli_cmd(["clear_sta_counters", bssid, addr])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
def tdls_clear(self, bssid, addr1, addr2):
- res = subprocess.check_output([self.wlantest_cli, "clear_tdls_counters",
- bssid, addr1, addr2]);
+ self.cli_cmd(["clear_tdls_counters", bssid, addr1, addr2])
def get_tdls_counter(self, field, bssid, addr1, addr2):
- res = subprocess.check_output([self.wlantest_cli, "get_tdls_counter",
- field, bssid, addr1, addr2]);
+ res = self.cli_cmd(["get_tdls_counter", field, bssid, addr1, addr2])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
@@ -139,15 +226,13 @@ class Wlantest:
raise Exception("Unexpected STA key_mgmt")
def get_tx_tid(self, bssid, addr, tid):
- res = subprocess.check_output([self.wlantest_cli, "get_tx_tid",
- bssid, addr, str(tid)]);
+ res = self.cli_cmd(["get_tx_tid", bssid, addr, str(tid)])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
def get_rx_tid(self, bssid, addr, tid):
- res = subprocess.check_output([self.wlantest_cli, "get_rx_tid",
- bssid, addr, str(tid)]);
+ res = self.cli_cmd(["get_rx_tid", bssid, addr, str(tid)])
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
diff --git a/tests/remote/config.py b/tests/remote/config.py
index 0826ea0..cf3b77f 100644
--- a/tests/remote/config.py
+++ b/tests/remote/config.py
@@ -20,6 +20,8 @@ setup_params = { "setup_hw" : "./tests/setup_hw.sh",
"hostapd" : "./tests/hostapd",
"wpa_supplicant" : "./tests/wpa_supplicant",
"iperf" : "iperf",
+ "wlantest" : "./tests/wlantest",
+ "wlantest_cli" : "./tests/wlantest_cli",
"country" : "US",
"log_dir" : "/tmp/",
"ipv4_test_net" : "192.168.12.0",
diff --git a/tests/remote/hwsim_wrapper.py b/tests/remote/hwsim_wrapper.py
index d70fe85..3b4c0cc 100644
--- a/tests/remote/hwsim_wrapper.py
+++ b/tests/remote/hwsim_wrapper.py
@@ -11,6 +11,7 @@ import config
import rutils
import monitor
import traceback
+import wlantest
import logging
logger = logging.getLogger()
@@ -45,6 +46,13 @@ def run_hwsim_test(devices, setup_params, refs, duts, monitors, hwsim_test):
monitor.add(dut_host, monitors)
monitor.run(dut_host, setup_params)
+ monitor_hosts = monitor.create(devices, setup_params, refs, duts, monitors)
+ mon = None
+ if len(monitor_hosts) > 0:
+ mon = monitor_hosts[0]
+ wlantest.Wlantest.reset_remote_wlantest()
+ wlantest.Wlantest.register_remote_wlantest(mon, setup_params, monitor)
+
# run hostapd/wpa_supplicant
for ref_host in ref_hosts:
rutils.run_wpasupplicant(ref_host, setup_params)
@@ -83,6 +91,9 @@ def run_hwsim_test(devices, setup_params, refs, duts, monitors, hwsim_test):
for dut_host in dut_hosts:
dut_host.execute(["killall", "hostapd"])
dut_host.get_logs(local_log_dir)
+ if mon is not None:
+ wlantest.Wlantest.reset_remote_wlantest()
+ mon.get_logs(local_log_dir)
return ""
except:
@@ -105,4 +116,7 @@ def run_hwsim_test(devices, setup_params, refs, duts, monitors, hwsim_test):
for dut_host in dut_hosts:
dut_host.execute(["killall", "hostapd"])
dut_host.get_logs(local_log_dir)
+ if mon is not None:
+ wlantest.Wlantest.reset_remote_wlantest()
+ mon.get_logs(local_log_dir)
raise
--
1.9.1
More information about the Hostap
mailing list