[openwrt/openwrt] unetmsg: add unet pub/sub message broker based on ubus

LEDE Commits lede-commits at lists.infradead.org
Mon Mar 17 05:19:45 PDT 2025


nbd pushed a commit to openwrt/openwrt.git, branch main:
https://git.openwrt.org/77f8a70f65cee3c266f78de0d187f1a5f30ec711

commit 77f8a70f65cee3c266f78de0d187f1a5f30ec711
Author: Felix Fietkau <nbd at nbd.name>
AuthorDate: Fri Mar 7 18:20:23 2025 +0100

    unetmsg: add unet pub/sub message broker based on ubus
    
    This service automatically establishes connections to any hosts that are members
    of the same unet network, and allows publish/subscribe exchanges via ubus channels.
    
    Signed-off-by: Felix Fietkau <nbd at nbd.name>
---
 package/network/services/unetd/files/unet.uc       |  24 +-
 package/network/services/unetmsg/Makefile          |  35 ++
 .../services/unetmsg/files/etc/init.d/unetmsg      |  14 +
 .../services/unetmsg/files/usr/sbin/unetmsgd       |  68 +++
 .../files/usr/share/ucode/unetmsg/client.uc        | 159 +++++++
 .../usr/share/ucode/unetmsg/unetmsgd-client.uc     | 127 +++++
 .../usr/share/ucode/unetmsg/unetmsgd-remote.uc     | 530 +++++++++++++++++++++
 .../files/usr/share/ucode/unetmsg/unetmsgd.uc      | 228 +++++++++
 .../unetmsg/files/usr/share/ucode/unetmsg/utils.uc |  40 ++
 9 files changed, 1223 insertions(+), 2 deletions(-)

diff --git a/package/network/services/unetd/files/unet.uc b/package/network/services/unetd/files/unet.uc
index ad45d4b839..1b7d021829 100644
--- a/package/network/services/unetd/files/unet.uc
+++ b/package/network/services/unetd/files/unet.uc
@@ -1024,13 +1024,23 @@ const host_editor = {
 
 const UnetHostEdit = editor.new(host_editor);
 
-function is_vxlan_service(ctx, argv, named, spec)
+function has_service_type(ctx, named, name)
 {
 	let type = named.type;
 	if (ctx.data.edit)
 		type ??= ctx.data.edit.type;
 
-	return type == "vxlan";
+	return type == name;
+}
+
+function is_vxlan_service(ctx, argv, named, spec)
+{
+	return has_service_type(ctx, named, "vxlan");
+}
+
+function is_unetmsg_service(ctx, argv, named, spec)
+{
+	return has_service_type(ctx, named, "unetmsg");
 }
 
 function get_config_object(ctx, spec, obj, argv)
@@ -1106,6 +1116,16 @@ const service_editor = {
 				value: (ctx) => keys(ctx.data.netdata.json.hosts)
 			}
 		},
+		"unetmsg-allowed": {
+			help: "Allowed topics for this unetmsg service group",
+			attribute: "allowed",
+			available: is_unetmsg_service,
+			get_object: get_config_object,
+			multiple: true,
+			args: {
+				type: "string"
+			}
+		},
 	}
 };
 
diff --git a/package/network/services/unetmsg/Makefile b/package/network/services/unetmsg/Makefile
new file mode 100644
index 0000000000..eff2089d6b
--- /dev/null
+++ b/package/network/services/unetmsg/Makefile
@@ -0,0 +1,35 @@
+#
+# Copyright (C) 2025 OpenWrt.org
+#
+# This is free software, licensed under the GNU General Public License v2.
+# See /LICENSE for more information.
+#
+
+include $(TOPDIR)/rules.mk
+
+PKG_NAME:=unetmsg
+PKG_RELEASE:=$(AUTORELEASE)
+
+PKG_LICENSE:=GPL-2.0
+PKG_MAINTAINER:=Felix Fietkau <nbd at nbd.name>
+
+include $(INCLUDE_DIR)/package.mk
+
+define Package/unetmsg
+  SECTION:=utils
+  CATEGORY:=Utilities
+  TITLE:=unet network pub/sub message broker
+  DEPENDS:=+ucode +ucode-mod-socket \
+	+ucode-mod-ubus +ucode-mod-uloop \
+	+ucode-mod-fs
+endef
+
+define Build/Compile
+	:
+endef
+
+define Package/unetmsg/install
+	$(CP) ./files/* $(1)/
+endef
+
+$(eval $(call BuildPackage,unetmsg))
diff --git a/package/network/services/unetmsg/files/etc/init.d/unetmsg b/package/network/services/unetmsg/files/etc/init.d/unetmsg
new file mode 100755
index 0000000000..0960be366d
--- /dev/null
+++ b/package/network/services/unetmsg/files/etc/init.d/unetmsg
@@ -0,0 +1,14 @@
+#!/bin/sh /etc/rc.common
+# Copyright (c) 2021 OpenWrt.org
+
+START=50
+
+USE_PROCD=1
+PROG=/usr/sbin/unetmsgd
+
+start_service() {
+	procd_open_instance
+	procd_set_param command "$PROG"
+	procd_set_param respawn
+	procd_close_instance
+}
diff --git a/package/network/services/unetmsg/files/usr/sbin/unetmsgd b/package/network/services/unetmsg/files/usr/sbin/unetmsgd
new file mode 100755
index 0000000000..aa21053ca9
--- /dev/null
+++ b/package/network/services/unetmsg/files/usr/sbin/unetmsgd
@@ -0,0 +1,68 @@
+#!/usr/bin/env ucode
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2025 Felix Fietkau <nbd at nbd.name>
+ */
+'use strict';
+import * as libubus from "ubus";
+import * as uloop from "uloop";
+import * as unetmsg_core from "unetmsg.unetmsgd";
+
+uloop.init();
+let ubus = libubus.connect();
+if (!ubus) {
+	warn(`Failed to connect to ubus\n`);
+	exit(1);
+}
+
+let core = unetmsg_core.init(ubus, true);
+
+function update_acl() {
+	let data = ubus.call(libubus.SYSTEM_OBJECT_ACL, "query");
+	core.acl_set(data.acl);
+}
+
+let obj = ubus.publish("unetmsg", {
+	channel: {
+		args: {},
+		call: function(req) {
+			if (!core.client.new(req))
+				return libubus.STATUS_INVALID_ARGUMENT;
+
+			return 0;
+		}
+	},
+	list: {
+		args: {
+			name: "",
+		},
+		call: function(req) {
+			let ret = [];
+			for (let name in core.publish)
+				if (req.args.name == null || wildcard(name, req.args.name))
+					push(ret, name);
+
+			return {
+				id: sort(ret),
+			};
+		},
+	},
+	request: {
+		args: {
+			name: "",
+			type: "",
+			data: {},
+		},
+		call: function(req) {
+			try {
+				core.handle_request(null, req, req.args, true);
+			} catch (e) {
+				core.exception(e);
+			}
+		}
+	}
+});
+
+ubus.subscriber("ubus.acl.sequence", () => update_acl());
+update_acl();
+uloop.run();
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
new file mode 100644
index 0000000000..06c927297e
--- /dev/null
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/client.uc
@@ -0,0 +1,159 @@
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2025 Felix Fietkau <nbd at nbd.name>
+ */
+'use strict';
+import * as libubus from "ubus";
+import * as uloop from "uloop";
+
+function publish(name, request_cb)
+{
+	if (!this.channel)
+		this.connect();
+
+	if (type(name) == "string")
+		name = [ name ];
+
+	for (let cur in name)
+		this.cb_pub[cur] = request_cb;
+
+	if (!this.channel)
+		return;
+
+	this.channel.request("publish", { name });
+}
+
+function subscribe(name, message_cb)
+{
+	if (!this.channel)
+		this.connect();
+
+	if (type(name) == "string")
+		name = [ name ];
+
+	for (let cur in name)
+		this.cb_sub[cur] = message_cb;
+
+	if (!this.channel)
+		return;
+
+	this.channel.request("subscribe", { name });
+}
+
+function send(name, type, data)
+{
+	this.channel.request({
+		method: "message",
+		return: "ignore",
+		data: {
+			name, type, data
+		},
+	});
+}
+
+function default_complete_cb()
+{
+}
+
+function request(name, type, data, data_cb, complete_cb)
+{
+	if (!this.channel)
+		this.connect();
+
+	if (!this.channel)
+		return;
+
+	let req = this.channel.defer({
+		method: "request",
+		data: {
+			name, type, data
+		},
+		data_cb,
+		cb: complete_cb
+	});
+
+	if (!complete_cb)
+		req.await();
+}
+
+function connect()
+{
+	if (this.channel)
+		return;
+
+	let cl = this;
+	let res = cl.ubus.call({
+		object: "unetmsg",
+		method: "channel",
+		fd_cb: (fd) => {
+			cl.channel = libubus.open_channel(fd, cl.request_cb, cl.disconnect_cb, cl.timeout);
+		}
+	});
+
+	if (!this.channel) {
+		this.connect_timer.set(1000);
+		return;
+	}
+
+	if (length(this.cb_pub) > 0)
+		this.channel.request("publish", {
+			name: keys(this.cb_pub)
+		});
+
+	if (length(this.cb_sub) > 0)
+		this.channel.request("subscribe", {
+			name: keys(this.cb_sub)
+		});
+}
+
+const client_proto = {
+	connect, publish, subscribe, send, request,
+	close: function() {
+		if (this.channel)
+			this.channel.disconnect();
+		this.connect_timer.cancel();
+		for (let name in keys(this))
+			delete this[name];
+	}
+};
+
+function handle_request(cl, req)
+{
+	let cb;
+
+	switch (req.type) {
+	case "message":
+		cb = cl.cb_sub[req.args.name];
+		if (cb)
+			return cb(req);
+		break;
+	case "request":
+		cb = cl.cb_pub[req.args.name];
+		if (cb)
+			return cb(req);
+	}
+	return 0;
+}
+
+export function open(ubus_conn, timeout)
+{
+	let cl = proto({
+		cb_sub: {},
+		cb_pub: {},
+		ubus: ubus_conn,
+		timeout,
+	}, client_proto);
+
+	cl.request_cb = (req) => {
+		return handle_request(cl, req);
+	};
+
+	cl.disconnect_cb = () => {
+		cl.channel = null;
+		cl.connect_timer.set(100);
+	};
+
+	cl.connect_timer = uloop.timer(1, () => cl.connect());
+
+	return cl;
+};
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
new file mode 100644
index 0000000000..cc971b562b
--- /dev/null
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-client.uc
@@ -0,0 +1,127 @@
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2025 Felix Fietkau <nbd at nbd.name>
+ */
+'use strict';
+import * as libubus from "ubus";
+import { gen_id } from "./utils.uc";
+
+let core;
+let clients = {};
+
+const pubsub_proto = {
+	get_channel: function() {
+		let cl = clients[this.client];
+		if (!cl)
+			return;
+
+		return cl.chan;
+	}
+};
+
+function client_pubsub(kind, cl, names)
+{
+	if (type(names) != "array")
+		return libubus.STATUS_INVALID_ARGUMENT;
+
+	for (let cur in names) {
+		if (type(cur) != "string")
+			return libubus.STATUS_INVALID_ARGUMENT;
+	}
+
+	if (!core.acl_check(kind, cl.acl, names))
+		return libubus.STATUS_PERMISSION_DENIED;
+
+	let cl_list = cl[kind];
+	for (let name in names) {
+		if (cl_list[name])
+			continue;
+
+		cl_list[name] = core.pubsub_add(kind, name, proto({
+			client: cl.id,
+		}, pubsub_proto));
+	}
+
+	return 0;
+}
+
+function prepare_data(args)
+{
+	return {
+		name: args.name,
+		type: args.type,
+		data: args.data,
+	};
+}
+
+function client_request(cl, req)
+{
+	let args = req.args;
+	let name = args.name;
+
+	if (type(name) != "string" || type(args.type) != "string" || type(args.data) != "object")
+		return libubus.STATUS_INVALID_ARGUMENT;
+
+	let data = prepare_data(req.args);
+	let handle;
+	switch (req.type) {
+	case "message":
+		handle = cl.publish[name];
+	    if (!handle)
+			return libubus.STATUS_INVALID_ARGUMENT;
+		return core.handle_message(handle, data, true);
+	case "request":
+		handle = cl.subscribe[name];
+	    if (!handle)
+			return libubus.STATUS_INVALID_ARGUMENT;
+		return core.handle_request(handle, req, data, true);
+	}
+}
+
+function client_cb(cl, req)
+{
+	let args = req.args;
+	switch (req.type) {
+	case "publish":
+	case "subscribe":
+		return client_pubsub(req.type, cl, args.name);
+	case "message":
+	case "request":
+		return client_request(cl, req);
+	}
+}
+
+function client_disconnect(id)
+{
+	let cl = clients[id];
+	if (!cl)
+		return;
+
+	for (let kind in [ "publish", "subscribe" ])
+		for (let name, data in cl[kind])
+			core.pubsub_del(kind, name, data);
+
+	delete clients[id];
+}
+
+export function new(req)
+{
+	let id = gen_id();
+	let acl = req.info.acl;
+	let client = {
+		id, acl,
+		publish: {},
+		subscribe: {},
+	};
+	let cb = (req) => client_cb(client, req);
+	let disconnect_cb = () => client_disconnect(id);
+	client.chan = req.new_channel(cb, disconnect_cb);
+	clients[id] = client;
+
+	return client;
+};
+
+export function set_core(_core)
+{
+	core = _core;
+};
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
new file mode 100644
index 0000000000..c12f4abce8
--- /dev/null
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd-remote.uc
@@ -0,0 +1,530 @@
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2025 Felix Fietkau <nbd at nbd.name>
+ */
+'use strict';
+import * as libubus from "ubus";
+import * as uloop from "uloop";
+import * as socket from "socket";
+import { gen_id, is_equal } from "./utils.uc";
+
+let core, ubus;
+let local_id = gen_id();
+let ev_listener, sub;
+
+let networks = {};
+
+const USYNC_PORT = 51818;
+
+const pubsub_proto = {
+	get_channel: function() {
+		let net = networks[this.network];
+		if (!net)
+			return;
+
+		let sock_data = net.tx_channels[this.name];
+		if (!sock_data)
+			return;
+
+		return sock_data.channel;
+	},
+	get_response_data: function(data) {
+		data.network = this.network,
+		data.host = this.name;
+		return data;
+	}
+};
+
+function network_socket_close(data)
+{
+	if (!data)
+		return;
+
+	if (data.timer)
+		data.timer.cancel();
+	data.channel.disconnect();
+	data.socket.close();
+}
+
+function network_rx_socket_close(data)
+{
+	if (!data)
+		return;
+
+	core.dbg(`Incoming connection from ${data.name} closed\n`);
+	let net = networks[data.network];
+	if (net && net.rx_channels[data.name] == data)
+		delete net.rx_channels[data.name];
+
+	for (let name, sub in core.remote_subscribe)
+		delete sub[data.name];
+
+	for (let name, sub in core.remote_publish)
+		delete sub[data.name];
+
+	network_socket_close(data);
+}
+
+function network_tx_socket_close(data)
+{
+	if (!data)
+		return;
+
+	core.dbg(`Outgoing connection to ${data.name} closed\n`);
+	let net = networks[data.network];
+	if (net && net.tx_channels[data.name] == data)
+		delete net.tx_channels[data.name];
+
+	network_socket_close(data);
+}
+
+function network_socket_handle_request(sock_data, req)
+{
+	let net = networks[sock_data.network];
+	if (!net)
+		return;
+
+	let msgtype = req.type;
+	let host = sock_data.name;
+	let network = sock_data.network;
+	let args = { ...req.args, host, network };
+	switch (msgtype) {
+	case "publish":
+	case "subscribe":
+		let list = sock_data[msgtype];
+		let name = args.name;
+		if (!name)
+			return;
+		if (args.enabled) {
+			if (list[name])
+				return 0;
+
+			let allowed;
+			for (let cur in net.peers[host].allowed) {
+				if (!wildcard(name, cur))
+					continue;
+				allowed = true;
+				break;
+			}
+			if (!allowed)
+				return 0;
+
+			core["remote_" + msgtype][name] ??= {};
+			core["remote_" + msgtype][name][host] = proto({
+				network: sock_data.network,
+				name: host,
+			}, pubsub_proto);
+			list[name] = true;
+		} else {
+			if (!list[name])
+				return 0;
+			delete core["remote_" + msgtype][name][host];
+			delete list[name];
+		}
+		break;
+	case "request":
+		return core.handle_request(null, req, args);
+	case "message":
+		core.handle_message(null, args);
+		return 0;
+	}
+
+	return 0;
+}
+
+function network_auth_token(net, host, id)
+{
+	let auth_data = ubus.call("unetd", "token_create", {
+		network: net,
+		target: host,
+		data: { id }
+	});
+
+	if (!auth_data)
+		return;
+
+	return auth_data.token;
+}
+
+function network_auth_valid(host, id, token)
+{
+	if (!token)
+		return;
+
+	let data = ubus.call("unetd", "token_parse", { token });
+	if (!data)
+		return;
+
+	if (data.host != host)
+		return;
+
+	if (data.user != "root")
+		return;
+
+	data = data.data;
+	if (data.id != id)
+		return;
+
+	return true;
+}
+
+
+function network_check_auth(sock_data, info)
+{
+	if (!network_auth_valid(sock_data.name, sock_data.id, info.token))
+		return;
+
+	let net = networks[sock_data.network];
+	if (!net)
+		return;
+
+	if (!net.peers[sock_data.name])
+		return;
+
+	network_rx_socket_close(net.rx_channels[sock_data.name]);
+	if (sock_data.timer)
+		sock_data.timer.cancel();
+	sock_data.auth = true;
+	net.rx_channels[sock_data.name] = sock_data;
+	core.dbg(`Incoming connection from ${sock_data.name} established\n`);
+	if (!net.tx_channels[sock_data.name])
+		net.timer.set(100);
+}
+
+function network_accept(net, sock, addr)
+{
+	let src = addr.address;
+	let name;
+
+	for (let cur_name, data in net.peers)
+		if (data.address == src)
+			name = cur_name;
+
+	if (!name) {
+		core.dbg(`No peer found for address ${src}\n`);
+		sock.close();
+		return;
+	}
+
+	let sock_data = {
+		network: net.name,
+		socket: sock,
+		publish: {},
+		subscribe: {},
+		name,
+	};
+
+	let cb = (req) => {
+		if (!sock_data.auth) {
+			if (req.type == "hello") {
+				sock_data.id = req.args.id;
+				return;
+			}
+			if (req.type == "auth")
+				network_check_auth(sock_data, req.args);
+
+			if (!sock_data.auth) {
+				warn(`Auth failed\n`);
+				network_rx_socket_close(sock_data);
+				return 0;
+			}
+
+			let token = network_auth_token(net.name, name, req.args.id);
+			if (!token) {
+				warn(`Failed to generate auth reply token\n`);
+				return 0;
+			}
+
+			req.reply({ token }, -1);
+
+			return 0;
+		}
+
+		return network_socket_handle_request(sock_data, req);
+	};
+
+	let disconnect_cb = (req) => {
+		network_rx_socket_close(sock_data);
+	};
+
+	sock_data.id = gen_id();
+	sock_data.timer = uloop.timer(10 * 1000, () => {
+		network_socket_close(sock_data);
+	});
+	sock_data.channel = libubus.open_channel(sock, cb, disconnect_cb);
+	sock_data.channel.request({
+		method: "hello",
+		data: { id: sock_data.id },
+		return: "ignore",
+	});
+}
+
+function network_open_channel(net, name, peer)
+{
+	network_tx_socket_close(net.tx_channels[name]);
+
+	let sock_data = {
+		network: net.name,
+		name
+	};
+
+	let addr = socket.sockaddr({
+		address: peer.address,
+		port: USYNC_PORT
+	});
+	if (!addr)
+		return;
+
+	let sock = socket.create(socket.AF_INET6, socket.SOCK_STREAM | socket.SOCK_NONBLOCK);
+	if (!sock)
+		return;
+
+	core.dbg(`Try to connect to ${name}\n`);
+	sock.connect(addr);
+	let auth_data_cb = (msg) => {
+		if (!network_auth_valid(sock_data.name, sock_data.id, msg.token))
+			return;
+
+		sock_data.auth = true;
+		core.dbg(`Outgoing connection to ${name} established\n`);
+
+		for (let kind in [ "publish", "subscribe" ])
+			for (let name in core[kind])
+				sock_data.channel.request({
+					method: kind,
+					data: { name, enabled: true },
+					return: "ignore",
+				});
+	};
+	let auth_cb = () => {
+		if (!sock_data.auth)
+			network_tx_socket_close(sock_data);
+	};
+
+	let cb = (req) => {
+		if (sock_data.auth)
+			return 0;
+
+		if (req.type != "hello") {
+			network_tx_socket_close(sock_data);
+			return 0;
+		}
+
+		let token = network_auth_token(net.name, name, req.args.id);
+		if (!token) {
+			network_tx_socket_close(sock_data);
+			return 0;
+		}
+
+		sock_data.request = sock_data.channel.defer({
+			method: "auth",
+			data: { token },
+			data_cb: auth_data_cb,
+			cb: auth_cb,
+		});
+
+		return 0;
+	};
+
+	let disconnect_cb = (req) => {
+		let net = networks[sock_data.network];
+		let cur_data = net.tx_channels[sock_data.name];
+		if (cur_data == sock_data)
+			delete net.rx_channels[sock_data.name];
+
+		network_tx_socket_close(sock_data);
+	};
+
+	sock_data.socket = sock;
+	sock_data.channel = libubus.open_channel(sock, cb, disconnect_cb);
+	net.tx_channels[name] = sock_data;
+}
+
+function network_connect_peers(net)
+{
+	let n_pending = 0;
+
+	for (let name, data in net.peers) {
+		let chan = net.tx_channels[name];
+		if (chan && chan.auth)
+			continue;
+
+		network_open_channel(net, name, data);
+		n_pending++;
+	}
+
+	for (let name, sock_data in net.tx_channels)
+		if (!net.peers[name])
+			network_tx_socket_close(sock_data);
+
+	for (let name, sock_data in net.rx_channels)
+		if (!net.peers[name])
+			network_rx_socket_close(sock_data);
+
+	if (n_pending)
+		net.timer.set(10 * 1000);
+}
+
+function network_open(name, info)
+{
+	let net = info;
+
+	net.socket = socket.listen(net.local_address, USYNC_PORT, {
+		family: socket.AF_INET6,
+		socktype: socket.SOCK_STREAM,
+		flags: socket.AI_NUMERICHOST,
+	}, null, true);
+
+	if (!net.socket) {
+		warn(`Failed to open socket: ${socket.error()}\n`);
+		return;
+	}
+
+	net.name = name;
+	net.rx_channels = {};
+	net.tx_channels = {};
+
+	net.socket.setopt(socket.SOL_TCP, socket.TCP_USER_TIMEOUT, 30 * 1000);
+
+	let cb = () => {
+		let addr = {};
+		let sock = net.socket.accept(addr);
+		if (sock)
+			network_accept(net, sock, addr);
+	};
+
+	net.handle = uloop.handle(net.socket.fileno(), cb, uloop.ULOOP_READ);
+	net.timer = uloop.timer(100, () => network_connect_peers(net));
+
+	networks[name] = net;
+}
+
+function network_close(name)
+{
+	let net = networks[name];
+	net.timer.cancel();
+	net.handle.delete();
+	net.socket.close();
+	delete networks[name];
+}
+
+function network_update(name, info)
+{
+	let net = networks[name];
+	if (!net)
+		return;
+
+	if (net.local_host != info.local_host ||
+	    net.local_address != info.local_address) {
+		network_close(name);
+		network_open(name, info);
+		return;
+	}
+
+	for (let name, peer in net.peers) {
+		let allowed;
+		if (info.peers[name])
+			allowed = info.peers[name].allowed;
+		if (is_equal(peer.allowed, allowed))
+			continue;
+		network_rx_socket_close(net.rx_channels[name]);
+		network_tx_socket_close(net.tx_channels[name]);
+	}
+	net.peers = info.peers;
+	net.timer.set(100);
+}
+
+function unetd_network_check_peers(info)
+{
+	let services = [];
+
+	for (let name, data in info.services) {
+		if (data.type == "unetmsg")
+			push(services, data);
+	}
+
+	if (!length(services))
+		return;
+
+	services = filter(services, (v) => index(v.members, info.local_host) >= 0);
+	for (let name in keys(info.peers)) {
+		let allowed = [];
+
+		for (let data in services) {
+			if (index(data.members, name) < 0)
+				continue;
+
+			let cur_allowed = [ "*" ];
+			if (data.config && data.config.allowed)
+				cur_allowed = data.config.allowed;
+
+			for (let cur in cur_allowed)
+				if (index(allowed, cur) < 0)
+					push(allowed, cur);
+		}
+
+		if (!length(allowed))
+			delete info.peers[name];
+		else
+			info.peers[name].allowed = allowed;
+	}
+}
+
+function unetd_network_update()
+{
+	let data = ubus.call("unetd", "network_get");
+	if (!data || !data.networks)
+		return;
+
+	for (let name, info in data.networks) {
+		if (!info.local_host)
+			continue;
+
+		unetd_network_check_peers(info);
+
+		if (networks[name])
+			network_update(name, info);
+		else
+			network_open(name, info);
+	}
+
+	for (let name in networks)
+		if (!data.networks)
+			network_close(name);
+}
+
+function unetd_cb(msg)
+{
+	if (msg.type == "network_update")
+		unetd_network_update();
+	return 0;
+}
+
+export function pubsub_set(kind, name, enabled)
+{
+	for (let net_name, net in networks) {
+		for (let host_name, chan in net.tx_channels) {
+			if (!chan.auth)
+				continue;
+
+			chan.channel.request({
+				method: kind,
+				data: { name, enabled },
+				return: "ignore",
+			});
+		}
+	}
+};
+
+export function init(_core)
+{
+	core = _core;
+	ubus = core.ubus;
+	sub = ubus.subscriber(unetd_cb);
+	unetd_network_update();
+	ev_listener = ubus.listener("ubus.object.add", (event, msg) => {
+		if (msg.path == "unetd")
+			sub.subscribe(msg.path);
+	});
+	sub.subscribe("unetd");
+};
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc
new file mode 100644
index 0000000000..68b0600fe9
--- /dev/null
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/unetmsgd.uc
@@ -0,0 +1,228 @@
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2025 Felix Fietkau <nbd at nbd.name>
+ */
+'use strict';
+import * as client from "./unetmsgd-client.uc";
+import * as remote from "./unetmsgd-remote.uc";
+import { gen_id } from "./utils.uc";
+
+function __acl_check(list, name)
+{
+	for (let cur in list)
+		if (wildcard(name, cur, true))
+			return true;
+}
+
+function acl_check(acl_type, info, names)
+{
+	let acl = this.acl;
+
+	if (info.user == "root")
+		return true;
+
+	let list = acl[acl_type][info.user] ?? [];
+	if (info.group) {
+		let list2 = acl[acl_type][":" + info.group];
+		if (list2)
+			list = [ ...list, ...list2 ];
+	}
+
+	for (let name in names)
+		if (!__acl_check(list, name))
+			return;
+
+	return true;
+}
+
+function new_handle(list, name, data)
+{
+	let id = gen_id();
+	data._id = id;
+	list[name] ??= {};
+	list[name][id] = data;
+	return data;
+}
+
+function pubsub_add(kind, name, data)
+{
+	let list = this[kind];
+	if (!length(list[name])) {
+		list[name] = {};
+		remote.pubsub_set(kind, name, true);
+	}
+	return new_handle(this[kind], name, data);
+}
+
+function pubsub_del(kind, name, data)
+{
+	let list = this[kind][name];
+	delete list[data._id];
+	if (!length(list))
+		remote.pubsub_set(kind, name, false);
+}
+
+function get_handles(handle, local, remote)
+{
+	let handles = [];
+
+	for (let cur_id, cur in local) {
+		if (handle) {
+			if (handle.id == cur_id)
+				continue;
+			if (handle.client && handle.client == cur.client)
+				continue;
+		}
+
+		push(handles, cur);
+	}
+
+	if (!remote)
+		return handles;
+
+	for (let cur_id, cur in remote)
+		push(handles, cur);
+
+	return handles;
+}
+
+function handle_request(handle, req, data, remote)
+{
+	let name = data.name;
+	let local = this.publish[name];
+	if (remote)
+		remote = this.remote_publish[name];
+	let handles = get_handles(handle, local, remote);
+
+	let context = {
+		pending: length(handles),
+		req
+	};
+
+	if (!context.pending)
+		return 0;
+
+	req.defer();
+	let cb = (ret) => {
+		if (--context.pending > 0)
+			return;
+		req.reply();
+	};
+
+	for (let cur in handles) {
+		if (!cur || !cur.get_channel) {
+			continue;
+		}
+		let chan = cur.get_channel();
+		if (!chan) {
+			cb();
+			continue;
+		}
+
+		let cur_handle = cur;
+		let data_cb = (msg) => {
+			if (cur_handle.get_response_data)
+				msg = cur.get_response_data(msg);
+			req.reply(msg, -1);
+		};
+
+		chan.defer({
+			method: "request",
+			data, cb, data_cb
+		});
+	}
+}
+
+function handle_message(handle, data, remote)
+{
+	let name = data.name;
+	let local = this.subscribe[name];
+	if (remote)
+		remote = this.remote_subscribe[name];
+	let handles = get_handles(handle, local, remote);
+	for (let cur in handles) {
+		if (!cur || !cur.get_channel)
+			continue;
+
+		let chan = cur.get_channel();
+		if (!chan)
+			continue;
+
+		chan.request({
+			method: "message",
+			return: "ignore",
+			data,
+		});
+	}
+	return 0;
+}
+
+function add_acl(type, user, data)
+{
+	if (!data || !user)
+		return;
+
+	type[user] ??= [];
+	let list = type[user];
+	for (let cur in data)
+		if (index(list, data) < 0)
+			push(list, cur);
+}
+
+function acl_set(acl_data)
+{
+	let acl = this.acl = {
+		publish: {},
+		subscribe: {},
+	};
+
+	for (let cur in acl_data) {
+		if (cur.obj != "unetmsg" || !cur.acl)
+			continue;
+
+		if (cur.group)
+			cur.group = ":" + cur.group;
+
+		for (let user in [ cur.user, cur.group ]) {
+			add_acl(acl.publish, user, cur.acl.publish);
+			add_acl(acl.subscribe, user, cur.acl.publish);
+			add_acl(acl.subscribe, user, cur.acl.subscribe);
+		}
+	}
+};
+
+const core_proto = {
+	acl_check,
+	acl_set,
+	pubsub_add,
+	pubsub_del,
+	handle_request,
+	handle_message,
+	dbg: function(msg) {
+		if (this.debug_enabled)
+			warn(msg);
+	},
+	exception: function(e) {
+		this.dbg(`Exception: ${e}\n${e.stacktrace[0].context}`);
+	}
+};
+
+export function init(ubus, debug_enabled)
+{
+	let data = proto({
+		clients: {},
+		publish: {},
+		subscribe: {},
+		remote_publish: {},
+		remote_subscribe: {},
+		client,
+		remote,
+		ubus,
+		debug_enabled
+	}, core_proto);
+
+	client.set_core(data);
+	remote.init(data);
+
+	return data;
+};
diff --git a/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/utils.uc b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/utils.uc
new file mode 100644
index 0000000000..3bad212e41
--- /dev/null
+++ b/package/network/services/unetmsg/files/usr/share/ucode/unetmsg/utils.uc
@@ -0,0 +1,40 @@
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Copyright (C) 2025 Felix Fietkau <nbd at nbd.name>
+ */
+'use strict';
+import { open } from "fs";
+
+export function is_equal(val1, val2) {
+	let t1 = type(val1);
+
+	if (t1 != type(val2))
+		return false;
+
+	if (t1 == "array") {
+		if (length(val1) != length(val2))
+			return false;
+
+		for (let i = 0; i < length(val1); i++)
+			if (!is_equal(val1[i], val2[i]))
+				return false;
+
+		return true;
+	} else if (t1 == "object") {
+		for (let key in val1)
+			if (!is_equal(val1[key], val2[key]))
+				return false;
+		for (let key in val2)
+			if (val1[key] == null)
+				return false;
+		return true;
+	} else {
+		return val1 == val2;
+	}
+};
+
+export function gen_id()
+{
+	let id = open("/dev/urandom").read(12);
+	return join("", map(split(id, ""), (v) => sprintf("%02x", ord(v))));
+};




More information about the lede-commits mailing list