[PATCH v2 1/5] tests: add RemoteCtrl class
Janusz Dziedzic
janusz.dziedzic at gmail.com
Thu May 30 12:19:00 PDT 2024
Signed-off-by: Janusz Dziedzic <janusz.dziedzic at gmail.com>
---
tests/hwsim/remotectrl.py | 92 +++++++++++++++++++++++++++++++++++++++
1 file changed, 92 insertions(+)
create mode 100644 tests/hwsim/remotectrl.py
diff --git a/tests/hwsim/remotectrl.py b/tests/hwsim/remotectrl.py
new file mode 100644
index 000000000..d48e338cf
--- /dev/null
+++ b/tests/hwsim/remotectrl.py
@@ -0,0 +1,92 @@
+#!/usr/bin/python
+#
+# wpa_supplicant/hostapd control interface using Python
+# Copyright (c) 2024, Jouni Malinen <j at w1.fi>
+#
+# This software may be distributed under the terms of the BSD license.
+# See README for more details.
+
+from wpaspy import Ctrl
+import remotehost
+
+class RemoteCtrl(Ctrl):
+ def __init__(self, path, port=9877, hostname=None, ifname=None):
+ self.started = False
+ self.attached = False
+ self.path = path
+ self.port = port
+ self.ifname = ifname
+ self.hostname = hostname
+ self.proc = None
+
+ self.host = remotehost.Host(hostname)
+ self.started = True
+
+ def __del__(self):
+ self.close()
+
+ def close(self):
+ if self.attached:
+ try:
+ self.detach()
+ except Exception as e:
+ # Need to ignore this allow the socket to be closed
+ self.attached = False
+ pass
+
+ if self.host and self.started:
+ self.started = False
+
+ def request(self, cmd, timeout=10):
+ if self.host:
+ cmd = '\'' + cmd + '\''
+ if self.ifname:
+ _cmd = ['wpa_cli', '-p', self.path, '-i', self.ifname, "raw " + cmd]
+ else:
+ _cmd = ['wpa_cli', '-g', self.path, "raw " + cmd]
+ status, buf = self.host.execute(_cmd)
+ return buf
+
+ def attach(self):
+ if self.attached:
+ return
+
+ if self.host:
+ if self.ifname:
+ _cmd = [ "wpa_cli", "-p", self.path, "-i", self.ifname ]
+ else:
+ _cmd = [ "wpa_cli", '-g', self.path]
+ self.proc = self.host.proc_run(_cmd)
+ self.attached = True
+
+ def detach(self):
+ if not self.attached:
+ return
+
+ if self.hostname and self.proc:
+ self.request("DETACH")
+ self.request("QUIT")
+ self.host.proc_stop(self.proc)
+ self.attached = False
+ self.proc = None
+
+ def terminate(self):
+ if self.attached:
+ try:
+ self.detach()
+ except Exception as e:
+ # Need to ignore this to allow the socket to be closed
+ self.attached = False
+ self.request("TERMINATE")
+ self.close()
+
+ def pending(self, timeout=0):
+ if self.host and self.proc:
+ return self.host.proc_pending(self.proc, timeout=timeout)
+ return False
+
+ def recv(self):
+ if self.host and self.proc:
+ res = self.host.proc_read(self.proc)
+ return res
+ return ""
--
2.25.1
More information about the Hostap
mailing list