[openwrt/openwrt] unetmsg: add subscriber update callback to notify about publish events

LEDE Commits lede-commits at lists.infradead.org
Mon May 5 04:57:23 PDT 2025


nbd pushed a commit to openwrt/openwrt.git, branch main:
https://git.openwrt.org/06f44f69b6fe33b07d85c5b6ed0eb49a199a532e

commit 06f44f69b6fe33b07d85c5b6ed0eb49a199a532e
Author: Felix Fietkau <nbd at nbd.name>
AuthorDate: Mon May 5 13:55:42 2025 +0200

    unetmsg: add subscriber update callback to notify about publish events
    
    When services start publishing on a topic, this can be used to allow
    subscribers to query them.
    
    Signed-off-by: Felix Fietkau <nbd at nbd.name>
---
 .../files/usr/share/ucode/unetmsg/client.uc        | 36 +++++++++++++++++++---
 .../usr/share/ucode/unetmsg/unetmsgd-client.uc     |  8 ++++-
 .../usr/share/ucode/unetmsg/unetmsgd-remote.uc     |  6 +++-
 .../files/usr/share/ucode/unetmsg/unetmsgd.uc      | 30 +++++++++++++++---
 4 files changed, 70 insertions(+), 10 deletions(-)

diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
index 06c927297e..293763572f 100644
--- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
@@ -23,7 +23,7 @@ function publish(name, request_cb)
 	this.channel.request("publish", { name });
 }
 
-function subscribe(name, message_cb)
+function subscribe(name, message_cb, update_cb)
 {
 	if (!this.channel)
 		this.connect();
@@ -31,8 +31,12 @@ function subscribe(name, message_cb)
 	if (type(name) == "string")
 		name = [ name ];
 
+	let cb = {
+		cb: message_cb,
+		update: update_cb
+	};
 	for (let cur in name)
-		this.cb_sub[cur] = message_cb;
+		this.cb_sub[cur] = cb;
 
 	if (!this.channel)
 		return;
@@ -109,6 +113,12 @@ function connect()
 const client_proto = {
 	connect, publish, subscribe, send, request,
 	close: function() {
+		for (let sub in this.sub_cb) {
+			if (!sub.timer)
+				continue;
+			sub.timer.cancel();
+			delete sub.timer;
+		}
 		if (this.channel)
 			this.channel.disconnect();
 		this.connect_timer.cancel();
@@ -119,11 +129,29 @@ const client_proto = {
 
 function handle_request(cl, req)
 {
-	let cb;
+	let data, cb;
 
 	switch (req.type) {
+	case "publish":
+		data = cl.cb_sub[req.args.name];
+		if (!data || data.timer)
+			break;
+
+		cb = data.update;
+		if (!cb)
+			return;
+
+		data.timer = uloop.timer(100, () => {
+			delete data.timer;
+			cb();
+		});
+		break;
 	case "message":
-		cb = cl.cb_sub[req.args.name];
+		data = cl.cb_sub[req.args.name];
+		if (!data)
+			break;
+
+		cb = data.cb;
 		if (cb)
 			return cb(req);
 		break;
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
index 8b42882152..6da745a770 100644
--- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
@@ -40,6 +40,9 @@ function client_pubsub(kind, cl, names)
 		cl_list[name] = core.pubsub_add(kind, name, proto({
 			client: cl.id,
 		}, pubsub_proto));
+
+		if (kind == "publish")
+			core.handle_publish(cl_list[name], name);
 	}
 
 	return 0;
@@ -101,8 +104,11 @@ function client_disconnect(id)
 		return;
 
 	for (let kind in [ "publish", "subscribe" ])
-		for (let name, data in cl[kind])
+		for (let name, data in cl[kind]) {
+			if (kind == "publish")
+				core.handle_publish(data, name);
 			core.pubsub_del(kind, name, data);
+		}
 
 	delete clients[id];
 }
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
index edc034343b..18ee2a3684 100644
--- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
@@ -96,8 +96,10 @@ function network_socket_handle_request(sock_data, req)
 		if (!name)
 			return;
 		if (args.enabled) {
-			if (list[name])
+			if (list[name]) {
+				core.handle_publish(null, name);
 				return 0;
+			}
 
 			let allowed = net.peers[host].allowed == null;
 			for (let cur in net.peers[host].allowed) {
@@ -114,10 +116,12 @@ function network_socket_handle_request(sock_data, req)
 				network: sock_data.network,
 				name: host,
 			}, pubsub_proto);
+			core.handle_publish(null, name);
 			list[name] = true;
 		} else {
 			if (!list[name])
 				return 0;
+			core.handle_publish(null, name);
 			delete core["remote_" + msgtype][name][host];
 			delete list[name];
 		}
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc
index 393a6ea47a..b81acb908e 100644
--- a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc
@@ -47,8 +47,8 @@ function new_handle(list, name, data)
 function pubsub_add(kind, name, data)
 {
 	let list = this[kind];
-	if (!length(list[name])) {
-		list[name] = {};
+	if (!length(list[name]) || kind == "publish") {
+		list[name] ??= {};
 		remote.pubsub_set(kind, name, true);
 	}
 	return new_handle(this[kind], name, data);
@@ -58,8 +58,8 @@ function pubsub_del(kind, name, data)
 {
 	let list = this[kind][name];
 	delete list[data._id];
-	if (!length(list))
-		remote.pubsub_set(kind, name, false);
+	if (!length(list) || kind == "publish")
+		remote.pubsub_set(kind, name, length(list) > 0);
 }
 
 function get_handles(handle, local, remote)
@@ -158,6 +158,27 @@ function handle_message(handle, data, remote)
 	return 0;
 }
 
+function handle_publish(handle, name)
+{
+	let local = this.subscribe[name];
+	let handles = get_handles(handle, local);
+
+	for (let cur in handles) {
+		if (!cur || !cur.get_channel)
+			continue;
+
+		let chan = cur.get_channel();
+		if (!chan)
+			continue;
+
+		chan.request({
+			method: "publish",
+			return: "ignore",
+			data: { name },
+		});
+	}
+}
+
 function add_acl(type, user, data)
 {
 	if (!data || !user)
@@ -199,6 +220,7 @@ const core_proto = {
 	pubsub_del,
 	handle_request,
 	handle_message,
+	handle_publish,
 	dbg: function(msg) {
 		if (this.debug_enabled)
 			warn(msg);




More information about the lede-commits mailing list