[openwrt/openwrt] unetmsg: add subscriber update callback to notify about publish events
LEDE Commits
lede-commits at lists.infradead.org
Mon May 5 04:57:23 PDT 2025
nbd pushed a commit to openwrt/openwrt.git, branch main:
https://git.openwrt.org/06f44f69b6fe33b07d85c5b6ed0eb49a199a532e
commit 06f44f69b6fe33b07d85c5b6ed0eb49a199a532e
Author: Felix Fietkau <nbd at nbd.name>
AuthorDate: Mon May 5 13:55:42 2025 +0200
unetmsg: add subscriber update callback to notify about publish events
When services start publishing on a topic, this can be used to allow
subscribers to query them.
Signed-off-by: Felix Fietkau <nbd at nbd.name>
---
.../files/usr/share/ucode/unetmsg/client.uc | 36 +++++++++++++++++++---
.../usr/share/ucode/unetmsg/unetmsgd-client.uc | 8 ++++-
.../usr/share/ucode/unetmsg/unetmsgd-remote.uc | 6 +++-
.../files/usr/share/ucode/unetmsg/unetmsgd.uc | 30 +++++++++++++++---
4 files changed, 70 insertions(+), 10 deletions(-)
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
index 06c927297e..293763572f 100644
--- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
@@ -23,7 +23,7 @@ function publish(name, request_cb)
this.channel.request("publish", { name });
}
-function subscribe(name, message_cb)
+function subscribe(name, message_cb, update_cb)
{
if (!this.channel)
this.connect();
@@ -31,8 +31,12 @@ function subscribe(name, message_cb)
if (type(name) == "string")
name = [ name ];
+ let cb = {
+ cb: message_cb,
+ update: update_cb
+ };
for (let cur in name)
- this.cb_sub[cur] = message_cb;
+ this.cb_sub[cur] = cb;
if (!this.channel)
return;
@@ -109,6 +113,12 @@ function connect()
const client_proto = {
connect, publish, subscribe, send, request,
close: function() {
+ for (let sub in this.sub_cb) {
+ if (!sub.timer)
+ continue;
+ sub.timer.cancel();
+ delete sub.timer;
+ }
if (this.channel)
this.channel.disconnect();
this.connect_timer.cancel();
@@ -119,11 +129,29 @@ const client_proto = {
function handle_request(cl, req)
{
- let cb;
+ let data, cb;
switch (req.type) {
+ case "publish":
+ data = cl.cb_sub[req.args.name];
+ if (!data || data.timer)
+ break;
+
+ cb = data.update;
+ if (!cb)
+ return;
+
+ data.timer = uloop.timer(100, () => {
+ delete data.timer;
+ cb();
+ });
+ break;
case "message":
- cb = cl.cb_sub[req.args.name];
+ data = cl.cb_sub[req.args.name];
+ if (!data)
+ break;
+
+ cb = data.cb;
if (cb)
return cb(req);
break;
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
index 8b42882152..6da745a770 100644
--- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
@@ -40,6 +40,9 @@ function client_pubsub(kind, cl, names)
cl_list[name] = core.pubsub_add(kind, name, proto({
client: cl.id,
}, pubsub_proto));
+
+ if (kind == "publish")
+ core.handle_publish(cl_list[name], name);
}
return 0;
@@ -101,8 +104,11 @@ function client_disconnect(id)
return;
for (let kind in [ "publish", "subscribe" ])
- for (let name, data in cl[kind])
+ for (let name, data in cl[kind]) {
+ if (kind == "publish")
+ core.handle_publish(data, name);
core.pubsub_del(kind, name, data);
+ }
delete clients[id];
}
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
index edc034343b..18ee2a3684 100644
--- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
@@ -96,8 +96,10 @@ function network_socket_handle_request(sock_data, req)
if (!name)
return;
if (args.enabled) {
- if (list[name])
+ if (list[name]) {
+ core.handle_publish(null, name);
return 0;
+ }
let allowed = net.peers[host].allowed == null;
for (let cur in net.peers[host].allowed) {
@@ -114,10 +116,12 @@ function network_socket_handle_request(sock_data, req)
network: sock_data.network,
name: host,
}, pubsub_proto);
+ core.handle_publish(null, name);
list[name] = true;
} else {
if (!list[name])
return 0;
+ core.handle_publish(null, name);
delete core["remote_" + msgtype][name][host];
delete list[name];
}
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc
index 393a6ea47a..b81acb908e 100644
--- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc
@@ -47,8 +47,8 @@ function new_handle(list, name, data)
function pubsub_add(kind, name, data)
{
let list = this[kind];
- if (!length(list[name])) {
- list[name] = {};
+ if (!length(list[name]) || kind == "publish") {
+ list[name] ??= {};
remote.pubsub_set(kind, name, true);
}
return new_handle(this[kind], name, data);
@@ -58,8 +58,8 @@ function pubsub_del(kind, name, data)
{
let list = this[kind][name];
delete list[data._id];
- if (!length(list))
- remote.pubsub_set(kind, name, false);
+ if (!length(list) || kind == "publish")
+ remote.pubsub_set(kind, name, length(list) > 0);
}
function get_handles(handle, local, remote)
@@ -158,6 +158,27 @@ function handle_message(handle, data, remote)
return 0;
}
+function handle_publish(handle, name)
+{
+ let local = this.subscribe[name];
+ let handles = get_handles(handle, local);
+
+ for (let cur in handles) {
+ if (!cur || !cur.get_channel)
+ continue;
+
+ let chan = cur.get_channel();
+ if (!chan)
+ continue;
+
+ chan.request({
+ method: "publish",
+ return: "ignore",
+ data: { name },
+ });
+ }
+}
+
function add_acl(type, user, data)
{
if (!data || !user)
@@ -199,6 +220,7 @@ const core_proto = {
pubsub_del,
handle_request,
handle_message,
+ handle_publish,
dbg: function(msg) {
if (this.debug_enabled)
warn(msg);
More information about the lede-commits
mailing list