[PATCH v2 12/16] monitor: add parent/child messaging and "notify" message exchange
mwilck at suse.com
mwilck at suse.com
Sat Mar 6 00:36:55 GMT 2021
From: Martin Wilck <mwilck at suse.com>
Persistent discovery controllers are set up in forked children,
possibly using recursion via referrals in do_discover(). The simplest
way to keep the parent's connection registry up to date is to communicate
freshly created controllers directly from child to parent.
To make this work, a callback function is passed to do_discover(),
which (if non-null) will be called after setting up a discovery
controller to initiate the message exchange. The callback is then
passed down to connect_ctrl() for recursive discoveries (referrals).
---
fabrics.c | 29 ++--
fabrics.h | 9 +-
monitor.c | 405 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
monitor.h | 6 +
4 files changed, 435 insertions(+), 14 deletions(-)
diff --git a/fabrics.c b/fabrics.c
index b195d0e..a9e28d8 100644
--- a/fabrics.c
+++ b/fabrics.c
@@ -50,6 +50,7 @@
#include "common.h"
#include "util/log.h"
#include "util/cleanup.h"
+#include "monitor.h"
#ifdef HAVE_SYSTEMD
#include <systemd/sd-id128.h>
@@ -1068,7 +1069,8 @@ free_addrinfo:
return ret;
}
-static int connect_ctrl(struct nvmf_disc_rsp_page_entry *e)
+static int connect_ctrl(struct nvmf_disc_rsp_page_entry *e,
+ const struct monitor_callbacks *monitor)
{
char argstr[BUF_SIZE], *p;
const char *transport;
@@ -1254,7 +1256,7 @@ retry:
flags = validate_output_format(fabrics_cfg.output_format);
if (flags < 0)
flags = NORMAL;
- ret = do_discover(argstr, true, flags);
+ ret = do_discover(argstr, true, flags, monitor);
} else
ret = add_ctrl(argstr);
if (ret == -EINVAL && e->treq & NVMF_TREQ_DISABLE_SQFLOW) {
@@ -1305,7 +1307,8 @@ static bool should_connect(struct nvmf_disc_rsp_page_entry *entry)
return !strncmp(fabrics_cfg.traddr, entry->traddr, len);
}
-static int connect_ctrls(struct nvmf_disc_rsp_page_hdr *log, int numrec)
+static int connect_ctrls(struct nvmf_disc_rsp_page_hdr *log, int numrec,
+ const struct monitor_callbacks *monitor)
{
int i;
int instance;
@@ -1315,7 +1318,7 @@ static int connect_ctrls(struct nvmf_disc_rsp_page_hdr *log, int numrec)
if (!should_connect(&log->entries[i]))
continue;
- instance = connect_ctrl(&log->entries[i]);
+ instance = connect_ctrl(&log->entries[i], monitor);
/* clean success */
if (instance >= 0)
@@ -1355,7 +1358,8 @@ static void nvmf_get_host_identifiers(int ctrl_instance)
static DEFINE_CLEANUP_FUNC(cleanup_log, struct nvmf_disc_rsp_page_hdr *, free);
-int do_discover(char *argstr, bool connect, enum nvme_print_flags flags)
+int do_discover(char *argstr, bool connect, enum nvme_print_flags flags,
+ const struct monitor_callbacks *monitor)
{
struct nvmf_disc_rsp_page_hdr *log __cleanup__(cleanup_log) = NULL;
char *dev_name;
@@ -1384,6 +1388,9 @@ int do_discover(char *argstr, bool connect, enum nvme_print_flags flags)
}
if (instance < 0)
return instance;
+ else if (monitor && monitor->notify &&
+ (fabrics_cfg.device || fabrics_cfg.persistent))
+ monitor->notify(argstr, instance);
if (asprintf(&dev_name, "/dev/nvme%d", instance) < 0)
return -errno;
@@ -1391,6 +1398,7 @@ int do_discover(char *argstr, bool connect, enum nvme_print_flags flags)
free(dev_name);
if (fabrics_cfg.persistent)
msg(LOG_NOTICE, "Persistent device: nvme%d\n", instance);
+
if (!fabrics_cfg.device && !fabrics_cfg.persistent) {
err = remove_ctrl(instance);
if (err)
@@ -1400,7 +1408,7 @@ int do_discover(char *argstr, bool connect, enum nvme_print_flags flags)
switch (ret) {
case DISC_OK:
if (connect)
- ret = connect_ctrls(log, numrec);
+ ret = connect_ctrls(log, numrec, monitor);
else if (fabrics_cfg.raw || flags == BINARY)
save_discovery_log(log, numrec);
else if (flags == JSON)
@@ -1440,7 +1448,8 @@ int do_discover(char *argstr, bool connect, enum nvme_print_flags flags)
static OPT_ARGS(discover_opts);
-int discover_from_conf_file(const char *desc, char *argstr, bool connect)
+int discover_from_conf_file(const char *desc, char *argstr, bool connect,
+ const struct monitor_callbacks *monitor)
{
FILE *f;
char line[256], *ptr, *all_args, *args, **argv;
@@ -1507,7 +1516,7 @@ int discover_from_conf_file(const char *desc, char *argstr, bool connect)
goto free_and_continue;
}
- err = do_discover(argstr, connect, flags);
+ err = do_discover(argstr, connect, flags, monitor);
if (err)
ret = err;
@@ -1580,7 +1589,7 @@ int fabrics_discover(const char *desc, int argc, char **argv, bool connect)
fabrics_cfg.nqn = NVME_DISC_SUBSYS_NAME;
if (!fabrics_cfg.transport && !fabrics_cfg.traddr) {
- ret = discover_from_conf_file(desc, argstr, connect);
+ ret = discover_from_conf_file(desc, argstr, connect, NULL);
} else {
set_discovery_kato(&fabrics_cfg);
@@ -1597,7 +1606,7 @@ int fabrics_discover(const char *desc, int argc, char **argv, bool connect)
if (ret)
goto out;
- ret = do_discover(argstr, connect, flags);
+ ret = do_discover(argstr, connect, flags, NULL);
}
out:
diff --git a/fabrics.h b/fabrics.h
index 128f251..f2f19d1 100644
--- a/fabrics.h
+++ b/fabrics.h
@@ -50,11 +50,16 @@ extern const char *const trtypes[];
#define FILE_NVMF_DISC "discovery.conf"
#define PATH_NVMF_DISC PATH_NVMF_CFG_DIR "/" FILE_NVMF_DISC
+struct monitor_callbacks;
+
int build_options(char *argstr, int max_len, bool discover);
-int do_discover(char *argstr, bool connect, enum nvme_print_flags flags);
+int do_discover(char *argstr, bool connect, enum nvme_print_flags flags,
+ const struct monitor_callbacks *);
int ctrl_instance(const char *device);
char *parse_conn_arg(const char *conargs, const char delim, const char *field);
int remove_ctrl(int instance);
-int discover_from_conf_file(const char *desc, char *argstr, bool connect);
+int discover_from_conf_file(const char *desc, char *argstr, bool connect,
+ const struct monitor_callbacks *);
+
#endif
diff --git a/monitor.c b/monitor.c
index a1229a7..7f08772 100644
--- a/monitor.c
+++ b/monitor.c
@@ -20,6 +20,7 @@
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
+#include <fcntl.h>
#include <inttypes.h>
#include <libudev.h>
#include <signal.h>
@@ -30,6 +31,8 @@
#include <sys/wait.h>
#include <sys/epoll.h>
#include <sys/inotify.h>
+#include <sys/socket.h>
+#include <sys/un.h>
#include "nvme-status.h"
#include "nvme.h"
@@ -43,6 +46,13 @@
#include "util/log.h"
#include "event/event.h"
+#define MSG_SIZE 1024
+#define SOCK_PATH "nvme-monitor"
+static const struct sockaddr_un monitor_sa = {
+ .sun_family = AF_UNIX,
+ .sun_path = "\0" SOCK_PATH
+};
+
static struct monitor_config {
bool autoconnect;
bool keep_ctrls;
@@ -161,6 +171,395 @@ static int child_reset_signals(void)
return -err;
}
+
+static ssize_t monitor_child_message(char *buf, size_t size, size_t len)
+{
+ int fd __cleanup__(cleanup_fd) = -1;
+ struct sockaddr_un clt_addr = { .sun_family = AF_UNIX, };
+ ssize_t rc;
+
+ fd = socket(AF_UNIX, SOCK_DGRAM, 0);
+ if (fd == -1) {
+ msg(LOG_ERR, "failed to create socket: %m\n");
+ return -errno;
+ }
+
+ snprintf(&clt_addr.sun_path[1], sizeof(clt_addr.sun_path) - 1,
+ SOCK_PATH ".%ld", (long)getpid());
+
+ if ((rc = bind(fd, (struct sockaddr *)&clt_addr, sizeof(clt_addr))) == -1) {
+ msg(LOG_ERR, "failed in bind(): %m\n");
+ return -errno;
+ }
+
+ if ((rc = sendto(fd, buf, len, 0,
+ (struct sockaddr *)&monitor_sa, sizeof(monitor_sa))) == -1) {
+ msg(LOG_ERR, "failed to send client message: %m\n");
+ return -errno;
+ }
+ msg(LOG_DEBUG, "sent %zd bytes to server\n", rc);
+
+ memset(buf, 0, size);
+ if ((rc = recv(fd, buf, size, MSG_TRUNC)) == -1) {
+ msg(LOG_ERR, "failed to receive response: %m\n");
+ return -errno;
+ } else if (rc >= size) {
+ msg(LOG_ERR, "response truncated: %zu bytes missing\n",
+ rc - (size - 1));
+ return -EOVERFLOW;
+ }
+
+ return rc;
+}
+
+#define safe_snprintf(var, size, format, args...) \
+({ \
+ size_t __size = size; \
+ int __ret; \
+ \
+ __ret = snprintf(var, __size, format, ##args); \
+ __ret < 0 || (size_t)__ret < __size ? __ret : -EOVERFLOW; \
+})
+
+/*
+ * Monitor parent <-> child message exchange protocol
+ *
+ * Every exchange consists of a single message sent from child (discovery
+ * process) to parent (monitor main program) and a single response from
+ * the parent to the child.
+ *
+ * "New discovery controller" exchange:
+ * - The child sends a MON_MSG_NEW message to the parent after establishing
+ * the connection to a new persistent discovery controller.
+ * Payload: the instance number and the the connection parameter string
+ * as sent to /dev/nvme-fabrics.
+ * This exchange is initiated in notify_new_discovery(), which is passed
+ * as "notify" callback for do_discover().
+ * - parent responds with MON_MSG_ACK (or MON_MSG_ERR if an error occurs).
+ */
+
+static const char monitor_magic[] = "NVMM";
+enum {
+ MON_MSG_ACK = 0,
+ MON_MSG_ERR,
+ MON_MSG_NEW,
+ __MAX_MON_MSG__,
+};
+
+enum {
+ MAGIC_LEN = 4,
+ OPCODE_LEN = 4,
+ HEADER_LEN = MAGIC_LEN + OPCODE_LEN,
+};
+
+static const char *const monitor_opcode[] = {
+ [MON_MSG_ACK] = "ACK ",
+ [MON_MSG_ERR] = "ERR ",
+ [MON_MSG_NEW] = "NEW ",
+};
+
+static int monitor_msg_hdr(char *buf, size_t len, int opcode)
+{
+ memset(buf, 0, len);
+ return safe_snprintf(buf, len, "%s%s",
+ monitor_magic, monitor_opcode[opcode]);
+}
+
+static int monitor_check_hdr(const char *buf, size_t len, int *opcode)
+{
+ int i;
+
+ if (len < HEADER_LEN) {
+ msg(LOG_ERR, "short packet\n");
+ return -EINVAL;
+ }
+
+ if (memcmp(buf, monitor_magic, MAGIC_LEN) != 0) {
+ msg(LOG_ERR, "bad magic\n");
+ return -EINVAL;
+ }
+
+ buf += MAGIC_LEN;
+ for (i = 0; i < ARRAY_SIZE(monitor_opcode); i ++) {
+ if (memcmp(buf, monitor_opcode[i], OPCODE_LEN) == 0)
+ break;
+ }
+
+ if (i == ARRAY_SIZE(monitor_opcode)) {
+ msg(LOG_ERR, "invalid opcode\n");
+ return -EINVAL;
+ }
+
+ *opcode = i;
+ return HEADER_LEN;
+}
+
+static int monitor_ack_msg(char *buf, size_t len)
+{
+ return monitor_msg_hdr(buf, len, MON_MSG_ACK);
+}
+
+static __attribute__((unused))
+int monitor_err_msg(char *buf, size_t len)
+{
+ return monitor_msg_hdr(buf, len, MON_MSG_ERR);
+}
+
+static int monitor_check_resp(const char *buf, size_t len, int req_opcode)
+{
+ int resp_opcode, rc, done;
+
+ if ((done = monitor_check_hdr(buf, len, &resp_opcode)) < 0)
+ return done;
+
+ buf += done;
+ len -= done;
+ rc = -EINVAL;
+
+ switch (req_opcode) {
+ case MON_MSG_NEW:
+ if (resp_opcode == MON_MSG_ACK && len == 0)
+ rc = 0;
+ break;
+ default:
+ break;
+ }
+
+ msg(rc == 0 ? LOG_DEBUG : LOG_ERR,
+ "%s response: %s => %s, len=%zu\n",
+ rc == 0 ? "good" : "bad",
+ monitor_opcode[req_opcode], monitor_opcode[resp_opcode], len);
+
+ return rc == 0 ? done : rc;
+}
+
+static void notify_new_discovery(const char *argstr, int instance)
+{
+ char buf[MSG_SIZE];
+ size_t len = 0;
+ ssize_t rc;
+
+ if ((rc = monitor_msg_hdr(buf, sizeof(buf), MON_MSG_NEW)) < 0) {
+ msg(LOG_ERR, "failed to create msghdr: %s\n", strerror(-rc));
+ return;
+ }
+ len += rc;
+
+ if ((rc = safe_snprintf(buf + len, sizeof(buf) - len, "%d %s",
+ instance, argstr)) < 0) {
+ msg(LOG_ERR, "failed to create msg: %s\n", strerror(-rc));
+ return;
+ }
+ len += rc;
+
+ if ((rc = monitor_child_message(buf, sizeof(buf), len)) < 0)
+ return;
+
+ monitor_check_resp(buf, rc, MON_MSG_NEW);
+}
+
+static const struct monitor_callbacks discover_callbacks = {
+ .notify = notify_new_discovery,
+};
+
+struct comm_event {
+ struct event e;
+ struct sockaddr_un addr;
+ char message[MSG_SIZE];
+ int msglen;
+};
+
+static int handle_child_msg_new(char *buf, size_t size, ssize_t *len, ssize_t ofs)
+{
+ int rc, instance, n;
+ struct nvme_connection *co = NULL;
+
+ if (*len - ofs < 2) {
+ msg(LOG_ERR, "short packet (len=%zu)\n", *len);
+ return MON_MSG_ERR;
+ }
+ buf += ofs;
+ if (sscanf(buf, "%d %n", &instance, &n) != 1) {
+ msg(LOG_ERR, "no instance number found\n");
+ return MON_MSG_ERR;
+ }
+ buf += n;
+
+ rc = conndb_add_disc_ctrl(buf, &co);
+ if (rc == 0 || rc == -EEXIST) {
+ if (co->discovery_instance != instance) {
+ co->discovery_instance = instance;
+ conn_msg(LOG_INFO, co,
+ "discovery instance set to %d\n", instance);
+ } else
+ conn_msg(LOG_DEBUG, co, "discovery instance unchanged\n");
+ } else
+ msg(LOG_ERR, "failed to add connection: %s\n", strerror(-rc));
+
+ return MON_MSG_ACK;
+}
+
+static int handle_child_msg(struct comm_event *comm, ssize_t len)
+{
+ ssize_t rc, ofs;
+ int opcode = MON_MSG_ERR;
+ char *buf = comm->message;
+
+ msg(LOG_DEBUG, "got message from %s: %s\n",
+ &comm->addr.sun_path[1], buf);
+
+ if ((ofs = monitor_check_hdr(comm->message, sizeof(comm->message),
+ &opcode)) < 0)
+ rc = MON_MSG_ERR;
+ else {
+ switch (opcode) {
+ case MON_MSG_NEW:
+ rc = handle_child_msg_new(comm->message,
+ sizeof(comm->message),
+ &len, ofs);
+ break;
+ case MON_MSG_ACK:
+ case MON_MSG_ERR:
+ msg(LOG_ERR, "unexpected message: %s\n", monitor_opcode[opcode]);
+ rc = MON_MSG_ERR;
+ break;
+ default:
+ msg(LOG_ERR, "bogus message\n");
+ rc = MON_MSG_ERR;
+ break;
+ };
+ }
+
+ switch (rc) {
+ case MON_MSG_ACK:
+ if ((rc = monitor_ack_msg(comm->message, sizeof(comm->message))) > 0)
+ len = rc;
+ break;
+ case MON_MSG_ERR:
+ if ((rc = monitor_err_msg(comm->message, sizeof(comm->message))) > 0)
+ len = rc;
+ break;
+ default:
+ /* other messages must be filled in by handlers above */
+ break;
+ }
+ if (rc < 0)
+ msg(LOG_ERR, "failed to create response\n");
+ else {
+ comm->msglen = len;
+ msg(LOG_DEBUG, "response (%zd): %s\n", len, comm->message);
+ }
+ return rc;
+}
+
+static int parent_comm_cb(struct event *evt, uint32_t events)
+{
+ struct comm_event *comm = container_of(evt, struct comm_event, e);
+ ssize_t rc;
+
+ if (events & EPOLLHUP) {
+ msg(LOG_WARNING, "socket disconnect\n");
+ return EVENTCB_CLEANUP;
+
+ } else if (events & EPOLLOUT) {
+ rc = sendto(evt->fd, comm->message, comm->msglen, 0,
+ (struct sockaddr *)&comm->addr, sizeof(comm->addr));
+ if (rc == -1) {
+ msg(LOG_ERR, "sendto: %m\n");
+ return EVENTCB_CLEANUP;
+ }
+ evt->ep.events = EPOLLIN|EPOLLHUP;
+
+ } else if (events & EPOLLIN) {
+ socklen_t len;
+
+ memset(&comm->addr, 0, sizeof(comm->addr));
+ len = sizeof(comm->addr);
+ rc = recvfrom(evt->fd, comm->message, sizeof(comm->message),
+ MSG_TRUNC, (struct sockaddr*)&comm->addr, &len);
+ if (rc <= 0) {
+ msg(LOG_ERR, "error receiving child message: %m\n");
+ return EVENTCB_CONTINUE;
+ } else if (rc >= sizeof(comm->message)) {
+ msg(LOG_ERR, "child message truncated: %zd bytes missing\n",
+ rc - (sizeof(comm->message) - 1));
+ return EVENTCB_CONTINUE;
+ }
+ if (handle_child_msg(comm, rc) < 0)
+ return EVENTCB_CONTINUE;
+
+ evt->ep.events = EPOLLOUT|EPOLLHUP;
+ }
+
+ if ((rc = event_modify(evt)) < 0) {
+ msg(LOG_ERR, "event_modify: %s\n", strerror(-rc));
+ return EVENTCB_CLEANUP;
+ }
+
+ return EVENTCB_CONTINUE;
+}
+
+static int set_socketflags(int fd)
+{
+ int flags;
+
+ if ((flags = fcntl(fd, F_GETFL, 0)) == -1) {
+ msg(LOG_ERR, "F_GETFL failed: %m\n");
+ return -errno;
+ }
+ if (fcntl(fd, F_SETFL, flags|O_NONBLOCK) == -1) {
+ msg(LOG_ERR, "F_SETFL failed: %m\n");
+ return -errno;
+ }
+ if ((flags = fcntl(fd, F_GETFD, 0)) == -1) {
+ msg(LOG_ERR, "F_GETFD failed: %m\n");
+ return -errno;
+ }
+ if (fcntl(fd, F_SETFD, flags|FD_CLOEXEC) == -1) {
+ msg(LOG_ERR, "F_SETFD failed: %m\n");
+ return -errno;
+ }
+ return 0;
+}
+
+static DEFINE_CLEANUP_FUNC(cleanup_comm, struct comm_event *, free);
+
+static void add_parent_comm_event(struct dispatcher *dsp)
+{
+ struct comm_event *comm __cleanup__(cleanup_comm) = NULL;
+ int fd __cleanup__(cleanup_fd) = -1;
+ int rc;
+
+ fd = socket(AF_UNIX, SOCK_DGRAM, 0);
+ if (fd == -1) {
+ msg(LOG_ERR, "failed to create socket: %m\n");
+ return;
+ }
+
+ if ((rc = set_socketflags(fd)) < 0)
+ return;
+
+ if (bind(fd, (struct sockaddr *)&monitor_sa,
+ sizeof(monitor_sa)) == -1) {
+ msg(LOG_ERR, "bind() failed: %m\n");
+ return;
+ }
+
+ comm = calloc(1, sizeof(*comm));
+ if (!comm)
+ return;
+
+ comm->e = EVENT_ON_HEAP(parent_comm_cb, fd, EPOLLIN);
+
+ if ((rc = event_add(dsp, &comm->e)) < 0) {
+ msg(LOG_ERR, "failed to add child communication event: %s\n",
+ strerror(-rc));
+ return;
+ }
+ fd = -1;
+ comm = NULL;
+}
+
static void monitor_handle_nvme_add(struct udev_device *ud)
{
const char *syspath = udev_device_get_syspath(ud);
@@ -311,7 +710,7 @@ static int monitor_discovery(const char *transport, const char *traddr,
rc = build_options(argstr, sizeof(argstr), true);
msg(LOG_DEBUG, "%s\n", argstr);
- rc = do_discover(argstr, mon_cfg.autoconnect, NORMAL);
+ rc = do_discover(argstr, mon_cfg.autoconnect, NORMAL, &discover_callbacks);
free(device);
exit(-rc);
@@ -634,7 +1033,8 @@ static int monitor_discover_from_conf_file(void)
fabrics_cfg.persistent = true;
rc = discover_from_conf_file("Discover NVMeoF subsystems from " PATH_NVMF_DISC,
- argstr, mon_cfg.autoconnect);
+ argstr, mon_cfg.autoconnect,
+ &discover_callbacks);
exit(-rc);
/* not reached */
@@ -848,6 +1248,7 @@ int aen_monitor(const char *desc, int argc, char **argv)
}
add_inotify_event(mon_dsp);
+ add_parent_comm_event(mon_dsp);
conndb_init_from_sysfs();
ret = event_loop(mon_dsp, &wait_mask, handle_epoll_err);
diff --git a/monitor.h b/monitor.h
index e79d3a6..01ae4de 100644
--- a/monitor.h
+++ b/monitor.h
@@ -1,6 +1,12 @@
#ifndef _MONITOR_H
#define _MONITOR_H
+typedef void (*disc_notify_cb)(const char *argstr, int instance);
+
+struct monitor_callbacks {
+ disc_notify_cb notify;
+};
+
extern int aen_monitor(const char *desc, int argc, char **argv);
#endif
--
2.29.2
More information about the Linux-nvme
mailing list