[PATCH] Support plug qdisc - queue traffic until explicit release
Shriram Rajagopalan
rshriram at cs.ubc.ca
Sun Feb 12 17:53:10 EST 2012
The plug qdisc supports two operations - plug and unplug. When the
qdisc receives a plug ("buffer") command via netlink request,
packets arriving henceforth are buffered until a corresponding unplug
command is received. Depending on the type of unplug ("release_one"
or "release_indefinite"), the queue can be unplugged indefinitely or
selectively.
The plug qdisc allows a user to implement network output buffering
(aka output commit), used commonly in checkpoint based fault tolerance
systems. It also supports a general purpose queue plug/unplug
functionality.
The associated kernel module is available in David Miller's net-next
tree, commit: c3059be16c9ef29c05f0876a9df5fea21f29724f
This patch introduces userspace tools and API, to control the qdisc
via netlink messages.
Signed-off-by: Shriram Rajagopalan <rshriram at cs.ubc.ca>
---
include/Makefile.am | 1 +
include/linux/pkt_sched.h | 21 ++++
include/netlink-types.h | 6 +
include/netlink/route/qdisc/plug.h | 30 ++++++
lib/Makefile.am | 4 +-
lib/cli/qdisc/plug.c | 113 +++++++++++++++++++++++
lib/route/qdisc/plug.c | 177 ++++++++++++++++++++++++++++++++++++
7 files changed, 351 insertions(+), 1 deletions(-)
create mode 100644 include/netlink/route/qdisc/plug.h
create mode 100644 lib/cli/qdisc/plug.c
create mode 100644 lib/route/qdisc/plug.c
diff --git a/include/Makefile.am b/include/Makefile.am
index db6862d..2ba0ece 100644
--- a/include/Makefile.am
+++ b/include/Makefile.am
@@ -54,6 +54,7 @@ nobase_libnlinclude_HEADERS = \
netlink/route/qdisc/red.h \
netlink/route/qdisc/sfq.h \
netlink/route/qdisc/tbf.h \
+ netlink/route/qdisc/plug.h \
netlink/route/addr.h \
netlink/route/class.h \
netlink/route/classifier.h \
diff --git a/include/linux/pkt_sched.h b/include/linux/pkt_sched.h
index c533670..7ccc1fd 100644
--- a/include/linux/pkt_sched.h
+++ b/include/linux/pkt_sched.h
@@ -127,6 +127,27 @@ struct tc_multiq_qopt {
__u16 max_bands; /* Maximum number of queues */
};
+/* PLUG section */
+
+#define TCQ_PLUG_BUFFER 0
+#define TCQ_PLUG_RELEASE_ONE 1
+#define TCQ_PLUG_RELEASE_INDEFINITE 2
+#define TCQ_PLUG_LIMIT 3
+
+struct tc_plug_qopt {
+ /* TCQ_PLUG_BUFFER: Inset a plug into the queue and
+ * buffer any incoming packets
+ * TCQ_PLUG_RELEASE_ONE: Dequeue packets from queue head
+ * to beginning of the next plug.
+ * TCQ_PLUG_RELEASE_INDEFINITE: Dequeue all packets from queue.
+ * Stop buffering packets until the next TCQ_PLUG_BUFFER
+ * command is received (just act as a pass-thru queue).
+ * TCQ_PLUG_LIMIT: Increase/decrease queue size
+ */
+ int action;
+ __u32 limit;
+};
+
/* TBF section */
struct tc_tbf_qopt {
diff --git a/include/netlink-types.h b/include/netlink-types.h
index 82481b7..5ced836 100644
--- a/include/netlink-types.h
+++ b/include/netlink-types.h
@@ -669,6 +669,12 @@ struct rtnl_red
uint32_t qr_mask;
};
+struct rtnl_plug
+{
+ int action;
+ uint32_t limit;
+};
+
struct flnl_request
{
NLHDR_COMMON
diff --git a/include/netlink/route/qdisc/plug.h b/include/netlink/route/qdisc/plug.h
new file mode 100644
index 0000000..ffb1a04
--- /dev/null
+++ b/include/netlink/route/qdisc/plug.h
@@ -0,0 +1,30 @@
+/*
+ * netlink/route/qdisc/plug.c PLUG Qdisc
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation version 2.1
+ * of the License.
+ *
+ * Copyright (c) 2012 Shriram Rajagopalan <rshriram at cs.ubc.ca>
+ */
+
+#ifndef NETLINK_PLUG_H_
+#define NETLINK_PLUG_H_
+
+#include <netlink/netlink.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern int rtnl_qdisc_plug_set_limit(struct rtnl_qdisc *, int);
+extern int rtnl_qdisc_plug_buffer(struct rtnl_qdisc *);
+extern int rtnl_qdisc_plug_release_one(struct rtnl_qdisc *);
+extern int rtnl_qdisc_plug_release_indefinite(struct rtnl_qdisc *);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/Makefile.am b/lib/Makefile.am
index c2668ef..aee8d0f 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -72,7 +72,7 @@ libnl_route_3_la_SOURCES = \
route/qdisc/blackhole.c route/qdisc/cbq.c route/qdisc/dsmark.c \
route/qdisc/fifo.c route/qdisc/htb.c route/qdisc/netem.c \
route/qdisc/prio.c route/qdisc/red.c route/qdisc/sfq.c \
- route/qdisc/tbf.c \
+ route/qdisc/tbf.c route/qdisc/plug.c \
\
fib_lookup/lookup.c fib_lookup/request.c \
\
@@ -101,6 +101,7 @@ nobase_pkglib_LTLIBRARIES = \
cli/qdisc/htb.la \
cli/qdisc/blackhole.la \
cli/qdisc/pfifo.la \
+ cli/qdisc/plug.la \
cli/qdisc/bfifo.la \
cli/cls/basic.la \
cli/cls/cgroup.la
@@ -108,6 +109,7 @@ nobase_pkglib_LTLIBRARIES = \
cli_qdisc_htb_la_LDFLAGS = -module -avoid-version
cli_qdisc_blackhole_la_LDFLAGS = -module -avoid-version
cli_qdisc_pfifo_la_LDFLAGS = -module -avoid-version
+cli_qdisc_plug_la_LDFLAGS = -module -avoid-version
cli_qdisc_bfifo_la_LDFLAGS = -module -avoid-version
cli_cls_basic_la_LDFLAGS = -module -avoid-version
cli_cls_cgroup_la_LDFLAGS = -module -avoid-version
diff --git a/lib/cli/qdisc/plug.c b/lib/cli/qdisc/plug.c
new file mode 100644
index 0000000..2b8d5d6
--- /dev/null
+++ b/lib/cli/qdisc/plug.c
@@ -0,0 +1,113 @@
+
+/*
+ * src/lib/cli/qdisc/plug.c plug module for CLI lib
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation version 2.1
+ * of the License.
+ *
+ * Copyright (c) 2012 Shriram Rajagopalan <rshriram at cs.ubc.ca>
+ */
+
+#include <netlink/cli/utils.h>
+#include <netlink/cli/tc.h>
+#include <netlink/route/qdisc/plug.h>
+
+static void print_usage(void)
+{
+ printf(
+"Usage: nl-qdisc-add [...] plug [OPTIONS]...\n"
+"\n"
+"OPTIONS\n"
+" --help Show this help text.\n"
+" --limit Maximum queue length in bytes.\n"
+" --buffer create a new buffer(plug) and queue incoming traffic into it.\n"
+" --release-one release traffic from previous buffer.\n"
+" --release-indefinite stop buffering and release all (buffered and new) packets.\n"
+"\n"
+"EXAMPLE"
+" # Attach plug qdisc with 32KB queue size to ifb0\n"
+" nl-qdisc-add --dev=ifb0 --parent=root plug --limit=32768\n"
+" # Plug network traffic arriving at ifb0\n"
+" nl-qdisc-add --dev=ifb0 --parent=root --update plug --buffer\n"
+" # Unplug traffic arriving at ifb0 indefinitely\n"
+" nl-qdisc-add --dev=ifb0 --parent=root --update plug --release-indefinite\n\n"
+" # If operating in output buffering mode:\n"
+" # at time t=t0, create a new output buffer b0 to hold network output\n"
+" nl-qdisc-add --dev=ifb0 --parent=root --update plug --buffer\n\n"
+" # at time t=t1, take a checkpoint c0, create a new output buffer b1\n"
+" nl-qdisc-add --dev=ifb0 --parent=root --update plug --buffer\n"
+" # at time t=t1+r, after c0 is committed, release b0\n"
+" nl-qdisc-add --dev=ifb0 --parent=root --update plug --release-one\n\n"
+" # at time t=t2, take a checkpoint c1, create a new output buffer b2\n"
+" nl-qdisc-add --dev=ifb0 --parent=root --update plug --buffer\n"
+" # at time t=t2+r, after c1 is committed, release b1\n"
+" nl-qdisc-add --dev=ifb0 --parent=root --update plug --release-one\n");
+}
+
+static void plug_parse_argv(struct rtnl_tc *tc, int argc, char **argv)
+{
+ struct rtnl_qdisc *qdisc = (struct rtnl_qdisc *) tc;
+
+ for (;;) {
+ int c, optidx = 0;
+ enum {
+ ARG_LIMIT = 257,
+ ARG_BUFFER = 258,
+ ARG_RELEASE_ONE = 259,
+ ARG_RELEASE_INDEFINITE = 260,
+ };
+ static struct option long_opts[] = {
+ { "help", 0, 0, 'h' },
+ { "limit", 1, 0, ARG_LIMIT },
+ { "buffer", 0, 0, ARG_BUFFER },
+ { "release-one", 0, 0, ARG_RELEASE_ONE },
+ { "release-indefinite", 0, 0, ARG_RELEASE_INDEFINITE },
+ { 0, 0, 0, 0 }
+ };
+
+ c = getopt_long(argc, argv, "h", long_opts, &optidx);
+ if (c == -1)
+ break;
+
+ switch (c) {
+ case 'h':
+ print_usage();
+ return;
+
+ case ARG_LIMIT:
+ rtnl_qdisc_plug_set_limit(qdisc, nl_cli_parse_u32(optarg));
+ break;
+
+ case ARG_BUFFER:
+ rtnl_qdisc_plug_buffer(qdisc);
+ break;
+
+ case ARG_RELEASE_ONE:
+ rtnl_qdisc_plug_release_one(qdisc);
+ break;
+
+ case ARG_RELEASE_INDEFINITE:
+ rtnl_qdisc_plug_release_indefinite(qdisc);
+ break;
+ }
+ }
+}
+
+static struct nl_cli_tc_module plug_module =
+{
+ .tm_name = "plug",
+ .tm_type = RTNL_TC_TYPE_QDISC,
+ .tm_parse_argv = plug_parse_argv,
+};
+
+static void __init plug_init(void)
+{
+ nl_cli_tc_register(&plug_module);
+}
+
+static void __exit plug_exit(void)
+{
+ nl_cli_tc_unregister(&plug_module);
+}
diff --git a/lib/route/qdisc/plug.c b/lib/route/qdisc/plug.c
new file mode 100644
index 0000000..a99b9be
--- /dev/null
+++ b/lib/route/qdisc/plug.c
@@ -0,0 +1,177 @@
+/*
+ * lib/route/qdisc/plug.c PLUG Qdisc
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation version 2.1
+ * of the License.
+ *
+ * Copyright (c) 2012 Shriram Rajagopalan <rshriram at cs.ubc.ca>
+ */
+
+/**
+ * @ingroup qdisc
+ * @defgroup qdisc_plug Plug/Unplug Traffic (PLUG)
+ * @brief
+ *
+ * Queue traffic until an explicit release command.
+ *
+ * There are two ways to use this qdisc:
+ * 1. A simple "instantaneous" plug/unplug operation, by issuing an alternating
+ * sequence of TCQ_PLUG_BUFFER & TCQ_PLUG_RELEASE_INDEFINITE commands.
+ *
+ * 2. For network output buffering (a.k.a output commit) functionality.
+ * Output commit property is commonly used by applications using checkpoint
+ * based fault-tolerance to ensure that the checkpoint from which a system
+ * is being restored is consistent w.r.t outside world.
+ *
+ * Consider for e.g. Remus - a Virtual Machine checkpointing system,
+ * wherein a VM is checkpointed, say every 50ms. The checkpoint is replicated
+ * asynchronously to the backup host, while the VM continues executing the
+ * next epoch speculatively.
+ *
+ * The following is a typical sequence of output buffer operations:
+ * 1.At epoch i, start_buffer(i)
+ * 2. At end of epoch i (i.e. after 50ms):
+ * 2.1 Stop VM and take checkpoint(i).
+ * 2.2 start_buffer(i+1) and Resume VM
+ * 3. While speculatively executing epoch(i+1), asynchronously replicate
+ * checkpoint(i) to backup host.
+ * 4. When checkpoint_ack(i) is received from backup, release_buffer(i)
+ * Thus, this Qdisc would receive the following sequence of commands:
+ * TCQ_PLUG_BUFFER (epoch i)
+ * .. TCQ_PLUG_BUFFER (epoch i+1)
+ * ....TCQ_PLUG_RELEASE_ONE (epoch i)
+ * ......TCQ_PLUG_BUFFER (epoch i+2)
+ * ........
+ *
+ *
+ * State of the queue, when used for network output buffering:
+ *
+ * plug(i+1) plug(i) head
+ * ------------------+--------------------+---------------->
+ * | |
+ * | |
+ * pkts_current_epoch| pkts_last_epoch |pkts_to_release
+ * ----------------->|<--------+--------->|+--------------->
+ * v v
+ *
+ *
+ * @{
+ */
+
+#include <netlink-local.h>
+#include <netlink-tc.h>
+#include <netlink/netlink.h>
+#include <netlink/utils.h>
+#include <netlink/route/tc-api.h>
+#include <netlink/route/qdisc/plug.h>
+
+static int plug_msg_fill(struct rtnl_tc *tc, void *data, struct nl_msg *msg)
+{
+ struct rtnl_plug *plug = data;
+ struct tc_plug_qopt opts;
+
+ if (!plug)
+ return -NLE_INVAL;
+
+ opts.action = plug->action;
+ opts.limit = plug->limit;
+
+ return nlmsg_append(msg, &opts, sizeof(opts), NL_DONTPAD);
+}
+
+/**
+ * @name Attribute Modification
+ * @{
+ */
+
+/**
+ * Insert a plug into the qdisc and buffer any incoming
+ * network traffic.
+ * @arg qdisc PLUG qdisc to be modified.
+ */
+int rtnl_qdisc_plug_buffer(struct rtnl_qdisc *qdisc)
+{
+ struct rtnl_plug *plug;
+
+ if (!(plug = rtnl_tc_data(TC_CAST(qdisc))))
+ return -NLE_NOMEM;
+
+ plug->action = TCQ_PLUG_BUFFER;
+ return 0;
+}
+
+/**
+ * Unplug the qdisc, releasing packets from queue head
+ * to the last complete buffer, while new traffic
+ * continues to be buffered.
+ * @arg qdisc PLUG qdisc to be modified.
+ */
+int rtnl_qdisc_plug_release_one(struct rtnl_qdisc *qdisc)
+{
+ struct rtnl_plug *plug;
+
+ if (!(plug = rtnl_tc_data(TC_CAST(qdisc))))
+ return -NLE_NOMEM;
+
+ plug->action = TCQ_PLUG_RELEASE_ONE;
+ return 0;
+}
+
+/**
+ * Indefinitely unplug the qdisc, releasing all packets.
+ * Network traffic will not be buffered until the next
+ * buffer command is issued.
+ * @arg qdisc PLUG qdisc to be modified.
+ */
+int rtnl_qdisc_plug_release_indefinite(struct rtnl_qdisc *qdisc)
+{
+ struct rtnl_plug *plug;
+
+ if (!(plug = rtnl_tc_data(TC_CAST(qdisc))))
+ return -NLE_NOMEM;
+
+ plug->action = TCQ_PLUG_RELEASE_INDEFINITE;
+ return 0;
+}
+
+/**
+ * Set limit of PLUG qdisc.
+ * @arg qdisc PLUG qdisc to be modified.
+ * @arg limit New limit.
+ * @return 0 on success or a negative error code.
+ */
+int rtnl_qdisc_plug_set_limit(struct rtnl_qdisc *qdisc, int limit)
+{
+ struct rtnl_plug *plug;
+
+ if (!(plug = rtnl_tc_data(TC_CAST(qdisc))))
+ return -NLE_NOMEM;
+
+ plug->action = TCQ_PLUG_LIMIT;
+ plug->limit = limit;
+
+ return 0;
+}
+
+/** @} */
+
+static struct rtnl_tc_ops plug_ops = {
+ .to_kind = "plug",
+ .to_type = RTNL_TC_TYPE_QDISC,
+ .to_size = sizeof(struct rtnl_plug),
+ .to_msg_fill = plug_msg_fill,
+};
+
+static void __init plug_init(void)
+{
+ rtnl_tc_register(&plug_ops);
+}
+
+static void __exit plug_exit(void)
+{
+ rtnl_tc_unregister(&plug_ops);
+}
+
+/** @} */
--
1.7.0.4
More information about the libnl
mailing list