[PATCH] Support plug qdisc - queue traffic until explicit release

Shriram Rajagopalan rshriram at cs.ubc.ca
Sun Feb 12 17:53:10 EST 2012


The plug qdisc supports two operations - plug and unplug. When the
qdisc receives a plug ("buffer") command via netlink request,
packets arriving henceforth are buffered until a corresponding unplug
command is received. Depending on the type of unplug ("release_one"
or "release_indefinite"), the queue can be unplugged indefinitely or
selectively.

The plug qdisc allows a user to implement network output buffering
(aka output commit), used commonly in checkpoint based fault tolerance
systems. It also supports a general purpose queue plug/unplug
functionality.

The associated kernel module is available in David Miller's net-next
tree, commit: c3059be16c9ef29c05f0876a9df5fea21f29724f

This patch introduces userspace tools and API, to control the qdisc
via netlink messages.

Signed-off-by: Shriram Rajagopalan <rshriram at cs.ubc.ca>
---
 include/Makefile.am                |    1 +
 include/linux/pkt_sched.h          |   21 ++++
 include/netlink-types.h            |    6 +
 include/netlink/route/qdisc/plug.h |   30 ++++++
 lib/Makefile.am                    |    4 +-
 lib/cli/qdisc/plug.c               |  113 +++++++++++++++++++++++
 lib/route/qdisc/plug.c             |  177 ++++++++++++++++++++++++++++++++++++
 7 files changed, 351 insertions(+), 1 deletions(-)
 create mode 100644 include/netlink/route/qdisc/plug.h
 create mode 100644 lib/cli/qdisc/plug.c
 create mode 100644 lib/route/qdisc/plug.c

diff --git a/include/Makefile.am b/include/Makefile.am
index db6862d..2ba0ece 100644
--- a/include/Makefile.am
+++ b/include/Makefile.am
@@ -54,6 +54,7 @@ nobase_libnlinclude_HEADERS = \
 	netlink/route/qdisc/red.h \
 	netlink/route/qdisc/sfq.h \
 	netlink/route/qdisc/tbf.h \
+	netlink/route/qdisc/plug.h \
 	netlink/route/addr.h \
 	netlink/route/class.h \
 	netlink/route/classifier.h \
diff --git a/include/linux/pkt_sched.h b/include/linux/pkt_sched.h
index c533670..7ccc1fd 100644
--- a/include/linux/pkt_sched.h
+++ b/include/linux/pkt_sched.h
@@ -127,6 +127,27 @@ struct tc_multiq_qopt {
 	__u16	max_bands;		/* Maximum number of queues */
 };
 
+/* PLUG section */
+
+#define TCQ_PLUG_BUFFER                0
+#define TCQ_PLUG_RELEASE_ONE           1
+#define TCQ_PLUG_RELEASE_INDEFINITE    2
+#define TCQ_PLUG_LIMIT                 3
+
+struct tc_plug_qopt {
+        /* TCQ_PLUG_BUFFER: Inset a plug into the queue and
+         *  buffer any incoming packets
+         * TCQ_PLUG_RELEASE_ONE: Dequeue packets from queue head
+         *   to beginning of the next plug.
+         * TCQ_PLUG_RELEASE_INDEFINITE: Dequeue all packets from queue.
+         *   Stop buffering packets until the next TCQ_PLUG_BUFFER
+         *   command is received (just act as a pass-thru queue).
+         * TCQ_PLUG_LIMIT: Increase/decrease queue size
+         */
+        int             action;
+        __u32           limit;
+};
+
 /* TBF section */
 
 struct tc_tbf_qopt {
diff --git a/include/netlink-types.h b/include/netlink-types.h
index 82481b7..5ced836 100644
--- a/include/netlink-types.h
+++ b/include/netlink-types.h
@@ -669,6 +669,12 @@ struct rtnl_red
 	uint32_t	qr_mask;
 };
 
+struct rtnl_plug
+{
+	int             action;
+	uint32_t        limit;
+};
+
 struct flnl_request
 {
 	NLHDR_COMMON
diff --git a/include/netlink/route/qdisc/plug.h b/include/netlink/route/qdisc/plug.h
new file mode 100644
index 0000000..ffb1a04
--- /dev/null
+++ b/include/netlink/route/qdisc/plug.h
@@ -0,0 +1,30 @@
+/*
+ * netlink/route/qdisc/plug.c	PLUG Qdisc
+ *
+ *	This library is free software; you can redistribute it and/or
+ *	modify it under the terms of the GNU Lesser General Public
+ *	License as published by the Free Software Foundation version 2.1
+ *	of the License.
+ *
+ * Copyright (c) 2012 Shriram Rajagopalan <rshriram at cs.ubc.ca>
+ */
+
+#ifndef NETLINK_PLUG_H_
+#define NETLINK_PLUG_H_
+
+#include <netlink/netlink.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern int	rtnl_qdisc_plug_set_limit(struct rtnl_qdisc *, int);
+extern int	rtnl_qdisc_plug_buffer(struct rtnl_qdisc *);
+extern int	rtnl_qdisc_plug_release_one(struct rtnl_qdisc *);
+extern int	rtnl_qdisc_plug_release_indefinite(struct rtnl_qdisc *);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/Makefile.am b/lib/Makefile.am
index c2668ef..aee8d0f 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -72,7 +72,7 @@ libnl_route_3_la_SOURCES = \
 	route/qdisc/blackhole.c route/qdisc/cbq.c route/qdisc/dsmark.c \
 	route/qdisc/fifo.c route/qdisc/htb.c route/qdisc/netem.c \
 	route/qdisc/prio.c route/qdisc/red.c route/qdisc/sfq.c \
-	route/qdisc/tbf.c \
+	route/qdisc/tbf.c route/qdisc/plug.c \
 	\
 	fib_lookup/lookup.c fib_lookup/request.c \
 	\
@@ -101,6 +101,7 @@ nobase_pkglib_LTLIBRARIES = \
 	cli/qdisc/htb.la \
 	cli/qdisc/blackhole.la \
 	cli/qdisc/pfifo.la \
+	cli/qdisc/plug.la \
 	cli/qdisc/bfifo.la \
 	cli/cls/basic.la \
 	cli/cls/cgroup.la
@@ -108,6 +109,7 @@ nobase_pkglib_LTLIBRARIES = \
 cli_qdisc_htb_la_LDFLAGS = -module -avoid-version
 cli_qdisc_blackhole_la_LDFLAGS = -module -avoid-version
 cli_qdisc_pfifo_la_LDFLAGS = -module -avoid-version
+cli_qdisc_plug_la_LDFLAGS = -module -avoid-version
 cli_qdisc_bfifo_la_LDFLAGS = -module -avoid-version
 cli_cls_basic_la_LDFLAGS = -module -avoid-version
 cli_cls_cgroup_la_LDFLAGS = -module -avoid-version
diff --git a/lib/cli/qdisc/plug.c b/lib/cli/qdisc/plug.c
new file mode 100644
index 0000000..2b8d5d6
--- /dev/null
+++ b/lib/cli/qdisc/plug.c
@@ -0,0 +1,113 @@
+
+/*
+ * src/lib/cli/qdisc/plug.c     	plug module for CLI lib
+ *
+ *	This library is free software; you can redistribute it and/or
+ *	modify it under the terms of the GNU Lesser General Public
+ *	License as published by the Free Software Foundation version 2.1
+ *	of the License.
+ *
+ * Copyright (c) 2012 Shriram Rajagopalan <rshriram at cs.ubc.ca>
+ */
+
+#include <netlink/cli/utils.h>
+#include <netlink/cli/tc.h>
+#include <netlink/route/qdisc/plug.h>
+
+static void print_usage(void)
+{
+	printf(
+"Usage: nl-qdisc-add [...] plug [OPTIONS]...\n"
+"\n"
+"OPTIONS\n"
+"     --help                Show this help text.\n"
+"     --limit               Maximum queue length in bytes.\n"
+"     --buffer              create a new buffer(plug) and queue incoming traffic into it.\n"
+"     --release-one         release traffic from previous buffer.\n"
+"     --release-indefinite  stop buffering and release all (buffered and new) packets.\n"
+"\n"
+"EXAMPLE"
+"    # Attach plug qdisc with 32KB queue size to ifb0\n"
+"    nl-qdisc-add --dev=ifb0 --parent=root plug --limit=32768\n"
+"    # Plug network traffic arriving at ifb0\n"
+"    nl-qdisc-add --dev=ifb0 --parent=root --update plug --buffer\n"
+"    # Unplug traffic arriving at ifb0 indefinitely\n"
+"    nl-qdisc-add --dev=ifb0 --parent=root --update plug --release-indefinite\n\n"
+"    # If operating in output buffering mode:\n"
+"    # at time t=t0, create a new output buffer b0 to hold network output\n"
+"    nl-qdisc-add --dev=ifb0 --parent=root --update plug --buffer\n\n"
+"    # at time t=t1, take a checkpoint c0, create a new output buffer b1\n"
+"    nl-qdisc-add --dev=ifb0 --parent=root --update plug --buffer\n"
+"    # at time t=t1+r, after c0 is committed, release b0\n"
+"    nl-qdisc-add --dev=ifb0 --parent=root --update plug --release-one\n\n"
+"    # at time t=t2, take a checkpoint c1, create a new output buffer b2\n"
+"    nl-qdisc-add --dev=ifb0 --parent=root --update plug --buffer\n"
+"    # at time t=t2+r, after c1 is committed, release b1\n"
+"    nl-qdisc-add --dev=ifb0 --parent=root --update plug --release-one\n");
+}
+
+static void plug_parse_argv(struct rtnl_tc *tc, int argc, char **argv)
+{
+	struct rtnl_qdisc *qdisc = (struct rtnl_qdisc *) tc;
+
+	for (;;) {
+		int c, optidx = 0;
+		enum {
+			ARG_LIMIT              = 257,
+			ARG_BUFFER             = 258,
+			ARG_RELEASE_ONE        = 259,
+			ARG_RELEASE_INDEFINITE = 260,
+		};
+		static struct option long_opts[] = {
+			{ "help", 0, 0, 'h' },
+			{ "limit", 1, 0, ARG_LIMIT },
+			{ "buffer", 0, 0, ARG_BUFFER },
+			{ "release-one", 0, 0, ARG_RELEASE_ONE },
+			{ "release-indefinite", 0, 0, ARG_RELEASE_INDEFINITE },
+			{ 0, 0, 0, 0 }
+		};
+	
+		c = getopt_long(argc, argv, "h", long_opts, &optidx);
+		if (c == -1)
+			break;
+
+		switch (c) {
+		case 'h':
+			print_usage();
+			return;
+
+		case ARG_LIMIT:
+			rtnl_qdisc_plug_set_limit(qdisc, nl_cli_parse_u32(optarg));
+			break;
+
+		case ARG_BUFFER:
+			rtnl_qdisc_plug_buffer(qdisc);
+			break;
+
+		case ARG_RELEASE_ONE:
+		        rtnl_qdisc_plug_release_one(qdisc);
+			break;
+
+		case ARG_RELEASE_INDEFINITE:
+			rtnl_qdisc_plug_release_indefinite(qdisc);
+			break;
+		}
+ 	}
+}
+
+static struct nl_cli_tc_module plug_module =
+{
+	.tm_name		= "plug",
+	.tm_type		= RTNL_TC_TYPE_QDISC,
+	.tm_parse_argv		= plug_parse_argv,
+};
+
+static void __init plug_init(void)
+{
+	nl_cli_tc_register(&plug_module);
+}
+
+static void __exit plug_exit(void)
+{
+	nl_cli_tc_unregister(&plug_module);
+}
diff --git a/lib/route/qdisc/plug.c b/lib/route/qdisc/plug.c
new file mode 100644
index 0000000..a99b9be
--- /dev/null
+++ b/lib/route/qdisc/plug.c
@@ -0,0 +1,177 @@
+/*
+ * lib/route/qdisc/plug.c		PLUG Qdisc
+ *
+ *	This library is free software; you can redistribute it and/or
+ *	modify it under the terms of the GNU Lesser General Public
+ *	License as published by the Free Software Foundation version 2.1
+ *	of the License.
+ *
+ * Copyright (c) 2012 Shriram Rajagopalan <rshriram at cs.ubc.ca>
+ */
+
+/**
+ * @ingroup qdisc
+ * @defgroup qdisc_plug Plug/Unplug Traffic (PLUG)
+ * @brief
+ *
+ * Queue traffic until an explicit release command.
+ *
+ * There are two ways to use this qdisc:
+ * 1. A simple "instantaneous" plug/unplug operation, by issuing an alternating
+ *    sequence of TCQ_PLUG_BUFFER & TCQ_PLUG_RELEASE_INDEFINITE commands.
+ *
+ * 2. For network output buffering (a.k.a output commit) functionality.
+ *    Output commit property is commonly used by applications using checkpoint
+ *    based fault-tolerance to ensure that the checkpoint from which a system
+ *    is being restored is consistent w.r.t outside world.
+ *
+ *    Consider for e.g. Remus - a Virtual Machine checkpointing system,
+ *    wherein a VM is checkpointed, say every 50ms. The checkpoint is replicated
+ *    asynchronously to the backup host, while the VM continues executing the
+ *    next epoch speculatively.
+ *
+ *    The following is a typical sequence of output buffer operations:
+ *       1.At epoch i, start_buffer(i)
+ *       2. At end of epoch i (i.e. after 50ms):
+ *          2.1 Stop VM and take checkpoint(i).
+ *          2.2 start_buffer(i+1) and Resume VM
+ *       3. While speculatively executing epoch(i+1), asynchronously replicate
+ *          checkpoint(i) to backup host.
+ *       4. When checkpoint_ack(i) is received from backup, release_buffer(i)
+ *    Thus, this Qdisc would receive the following sequence of commands:
+ *       TCQ_PLUG_BUFFER (epoch i)
+ *       .. TCQ_PLUG_BUFFER (epoch i+1)
+ *       ....TCQ_PLUG_RELEASE_ONE (epoch i)
+ *       ......TCQ_PLUG_BUFFER (epoch i+2)
+ *       ........
+ *
+ *
+ * State of the queue, when used for network output buffering:
+ *
+ *                 plug(i+1)            plug(i)          head
+ * ------------------+--------------------+---------------->
+ *                   |                    |
+ *                   |                    |
+ * pkts_current_epoch| pkts_last_epoch    |pkts_to_release
+ * ----------------->|<--------+--------->|+--------------->
+ *                   v                    v
+ *
+ *
+ * @{
+ */
+
+#include <netlink-local.h>
+#include <netlink-tc.h>
+#include <netlink/netlink.h>
+#include <netlink/utils.h>
+#include <netlink/route/tc-api.h>
+#include <netlink/route/qdisc/plug.h>
+
+static int plug_msg_fill(struct rtnl_tc *tc, void *data, struct nl_msg *msg)
+{
+	struct rtnl_plug *plug = data;
+	struct tc_plug_qopt opts;
+
+	if (!plug)
+		return -NLE_INVAL;
+
+	opts.action = plug->action;
+	opts.limit  = plug->limit;
+
+	return nlmsg_append(msg, &opts, sizeof(opts), NL_DONTPAD);
+}
+
+/**
+ * @name Attribute Modification
+ * @{
+ */
+
+/**
+ * Insert a plug into the qdisc and buffer any incoming
+ * network traffic.
+ * @arg qdisc		PLUG qdisc to be modified.
+ */
+int rtnl_qdisc_plug_buffer(struct rtnl_qdisc *qdisc)
+{
+	struct rtnl_plug *plug;
+
+	if (!(plug = rtnl_tc_data(TC_CAST(qdisc))))
+		return -NLE_NOMEM;
+
+	plug->action = TCQ_PLUG_BUFFER;
+	return 0;
+}
+
+/**
+ * Unplug the qdisc, releasing packets from queue head
+ * to the last complete buffer, while new traffic
+ * continues to be buffered.
+ * @arg qdisc		PLUG qdisc to be modified.
+ */
+int rtnl_qdisc_plug_release_one(struct rtnl_qdisc *qdisc)
+{
+	struct rtnl_plug *plug;
+
+	if (!(plug = rtnl_tc_data(TC_CAST(qdisc))))
+		return -NLE_NOMEM;
+
+	plug->action = TCQ_PLUG_RELEASE_ONE;
+	return 0;
+}
+
+/**
+ * Indefinitely unplug the qdisc, releasing all packets.
+ * Network traffic will not be buffered until the next
+ * buffer command is issued.
+ * @arg qdisc		PLUG qdisc to be modified.
+ */
+int rtnl_qdisc_plug_release_indefinite(struct rtnl_qdisc *qdisc)
+{
+	struct rtnl_plug *plug;
+
+	if (!(plug = rtnl_tc_data(TC_CAST(qdisc))))
+		return -NLE_NOMEM;
+
+	plug->action = TCQ_PLUG_RELEASE_INDEFINITE;
+	return 0;
+}
+
+/**
+ * Set limit of PLUG qdisc.
+ * @arg qdisc		PLUG qdisc to be modified.
+ * @arg limit		New limit.
+ * @return 0 on success or a negative error code.
+ */
+int rtnl_qdisc_plug_set_limit(struct rtnl_qdisc *qdisc, int limit)
+{
+	struct rtnl_plug *plug;
+	
+	if (!(plug = rtnl_tc_data(TC_CAST(qdisc))))
+		return -NLE_NOMEM;
+		
+	plug->action = TCQ_PLUG_LIMIT;
+	plug->limit  = limit;
+
+	return 0;
+}
+
+/** @} */
+
+static struct rtnl_tc_ops plug_ops = {
+	.to_kind		= "plug",
+	.to_type		= RTNL_TC_TYPE_QDISC,
+	.to_size		= sizeof(struct rtnl_plug),
+	.to_msg_fill		= plug_msg_fill,
+};
+
+static void __init plug_init(void)
+{
+	rtnl_tc_register(&plug_ops);
+}
+
+static void __exit plug_exit(void)
+{
+	rtnl_tc_unregister(&plug_ops);
+}
+
+/** @} */
-- 
1.7.0.4




More information about the libnl mailing list