[PATCH net-next 24/36] rxrpc: Implement a mechanism to send an event notification to a call

David Howells dhowells at redhat.com
Thu Dec 1 16:18:27 PST 2022


Provide a means by which an event notification can be sent to a call such
that the I/O thread can process it rather than it being done in a separate
workqueue.  This will allow a lot of locking to be removed.

Signed-off-by: David Howells <dhowells at redhat.com>
cc: Marc Dionne <marc.dionne at auristor.com>
cc: linux-afs at lists.infradead.org
---

 include/trace/events/rxrpc.h |   52 ++++++++++++++++++++++++++++++++++++++++++
 net/rxrpc/ar-internal.h      |    5 +++-
 net/rxrpc/call_object.c      |   24 +++++++++++++++++++
 net/rxrpc/input.c            |    3 +-
 net/rxrpc/io_thread.c        |   20 +++++++++++++++-
 net/rxrpc/local_object.c     |    1 +
 6 files changed, 100 insertions(+), 5 deletions(-)

diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index 44a9be9836f9..0b12d96c7921 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -16,6 +16,13 @@
 /*
  * Declare tracing information enums and their string mappings for display.
  */
+#define rxrpc_call_poke_traces \
+	EM(rxrpc_call_poke_error,		"Error")	\
+	EM(rxrpc_call_poke_idle,		"Idle")		\
+	EM(rxrpc_call_poke_start,		"Start")	\
+	EM(rxrpc_call_poke_timer,		"Timer")	\
+	E_(rxrpc_call_poke_timer_now,		"Timer-now")
+
 #define rxrpc_skb_traces \
 	EM(rxrpc_skb_eaten_by_unshare,		"ETN unshare  ") \
 	EM(rxrpc_skb_eaten_by_unshare_nomem,	"ETN unshar-nm") \
@@ -150,6 +157,7 @@
 	EM(rxrpc_call_get_input,		"GET input   ") \
 	EM(rxrpc_call_get_kernel_service,	"GET krnl-srv") \
 	EM(rxrpc_call_get_notify_socket,	"GET notify  ") \
+	EM(rxrpc_call_get_poke,			"GET poke    ") \
 	EM(rxrpc_call_get_recvmsg,		"GET recvmsg ") \
 	EM(rxrpc_call_get_release_sock,		"GET rel-sock") \
 	EM(rxrpc_call_get_sendmsg,		"GET sendmsg ") \
@@ -160,6 +168,7 @@
 	EM(rxrpc_call_put_discard_prealloc,	"PUT disc-pre") \
 	EM(rxrpc_call_put_input,		"PUT input   ") \
 	EM(rxrpc_call_put_kernel,		"PUT kernel  ") \
+	EM(rxrpc_call_put_poke,			"PUT poke    ") \
 	EM(rxrpc_call_put_recvmsg,		"PUT recvmsg ") \
 	EM(rxrpc_call_put_release_sock,		"PUT rls-sock") \
 	EM(rxrpc_call_put_release_sock_tba,	"PUT rls-sk-a") \
@@ -378,6 +387,7 @@
 #define E_(a, b) a
 
 enum rxrpc_bundle_trace		{ rxrpc_bundle_traces } __mode(byte);
+enum rxrpc_call_poke_trace	{ rxrpc_call_poke_traces } __mode(byte);
 enum rxrpc_call_trace		{ rxrpc_call_traces } __mode(byte);
 enum rxrpc_client_trace		{ rxrpc_client_traces } __mode(byte);
 enum rxrpc_congest_change	{ rxrpc_congest_changes } __mode(byte);
@@ -408,6 +418,7 @@ enum rxrpc_txqueue_trace	{ rxrpc_txqueue_traces } __mode(byte);
 #define E_(a, b) TRACE_DEFINE_ENUM(a);
 
 rxrpc_bundle_traces;
+rxrpc_call_poke_traces;
 rxrpc_call_traces;
 rxrpc_client_traces;
 rxrpc_congest_changes;
@@ -1747,6 +1758,47 @@ TRACE_EVENT(rxrpc_txbuf,
 		      __entry->ref)
 	    );
 
+TRACE_EVENT(rxrpc_poke_call,
+	    TP_PROTO(struct rxrpc_call *call, bool busy,
+		     enum rxrpc_call_poke_trace what),
+
+	    TP_ARGS(call, busy, what),
+
+	    TP_STRUCT__entry(
+		    __field(unsigned int,		call_debug_id	)
+		    __field(bool,			busy		)
+		    __field(enum rxrpc_call_poke_trace,	what		)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->call_debug_id = call->debug_id;
+		    __entry->busy = busy;
+		    __entry->what = what;
+			   ),
+
+	    TP_printk("c=%08x %s%s",
+		      __entry->call_debug_id,
+		      __print_symbolic(__entry->what, rxrpc_call_poke_traces),
+		      __entry->busy ? "!" : "")
+	    );
+
+TRACE_EVENT(rxrpc_call_poked,
+	    TP_PROTO(struct rxrpc_call *call),
+
+	    TP_ARGS(call),
+
+	    TP_STRUCT__entry(
+		    __field(unsigned int,		call_debug_id	)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->call_debug_id = call->debug_id;
+			   ),
+
+	    TP_printk("c=%08x",
+		      __entry->call_debug_id)
+	    );
+
 #undef EM
 #undef E_
 #endif /* _TRACE_RXRPC_H */
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 654e9dab107c..a80655fa9dfb 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -292,6 +292,7 @@ struct rxrpc_local {
 	struct sk_buff_head	reject_queue;	/* packets awaiting rejection */
 	struct sk_buff_head	event_queue;	/* endpoint event packets awaiting processing */
 	struct sk_buff_head	rx_queue;	/* Received packets */
+	struct list_head	call_attend_q;	/* Calls requiring immediate attention */
 	struct rb_root		client_bundles;	/* Client connection bundles by socket params */
 	spinlock_t		client_bundles_lock; /* Lock for client_bundles */
 	spinlock_t		lock;		/* access lock */
@@ -616,6 +617,7 @@ struct rxrpc_call {
 	struct list_head	recvmsg_link;	/* Link in rx->recvmsg_q */
 	struct list_head	sock_link;	/* Link in rx->sock_calls */
 	struct rb_node		sock_node;	/* Node in rx->calls */
+	struct list_head	attend_link;	/* Link in local->call_attend_q */
 	struct rxrpc_txbuf	*tx_pending;	/* Tx buffer being filled */
 	wait_queue_head_t	waitq;		/* Wait queue for channel or Tx */
 	s64			tx_total_len;	/* Total length left to be transmitted (or -1) */
@@ -843,6 +845,7 @@ extern const char *const rxrpc_call_states[];
 extern const char *const rxrpc_call_completions[];
 extern struct kmem_cache *rxrpc_call_jar;
 
+void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what);
 struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *, unsigned long);
 struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *, gfp_t, unsigned int);
 struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
@@ -951,7 +954,7 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
 /*
  * input.c
  */
-void rxrpc_input_call_packet(struct rxrpc_call *, struct sk_buff *);
+void rxrpc_input_call_event(struct rxrpc_call *, struct sk_buff *);
 void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection *,
 				   struct rxrpc_call *);
 
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index f6d1b3a6f8c6..997641e3d1c8 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -45,6 +45,29 @@ static struct semaphore rxrpc_call_limiter =
 static struct semaphore rxrpc_kernel_call_limiter =
 	__SEMAPHORE_INITIALIZER(rxrpc_kernel_call_limiter, 1000);
 
+void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
+{
+	struct rxrpc_local *local;
+	struct rxrpc_peer *peer = call->peer;
+	bool busy;
+
+	if (WARN_ON_ONCE(!peer))
+		return;
+	local = peer->local;
+
+	if (call->state < RXRPC_CALL_COMPLETE) {
+		spin_lock_bh(&local->lock);
+		busy = !list_empty(&call->attend_link);
+		trace_rxrpc_poke_call(call, busy, what);
+		if (!busy) {
+			rxrpc_get_call(call, rxrpc_call_get_poke);
+			list_add_tail(&call->attend_link, &local->call_attend_q);
+		}
+		spin_unlock_bh(&local->lock);
+		rxrpc_wake_up_io_thread(local);
+	}
+}
+
 static void rxrpc_call_timer_expired(struct timer_list *t)
 {
 	struct rxrpc_call *call = from_timer(call, t, timer);
@@ -137,6 +160,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 	INIT_LIST_HEAD(&call->accept_link);
 	INIT_LIST_HEAD(&call->recvmsg_link);
 	INIT_LIST_HEAD(&call->sock_link);
+	INIT_LIST_HEAD(&call->attend_link);
 	INIT_LIST_HEAD(&call->tx_buffer);
 	skb_queue_head_init(&call->recvmsg_queue);
 	skb_queue_head_init(&call->rx_oos_queue);
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 13c52145a926..036f02371051 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1017,8 +1017,7 @@ static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb)
 /*
  * Process an incoming call packet.
  */
-void rxrpc_input_call_packet(struct rxrpc_call *call,
-				    struct sk_buff *skb)
+void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
 {
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 	unsigned long timo;
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index 416c6101cf78..cc249bc6b8cd 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -366,7 +366,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff *skb)
 	/* Process a call packet; this either discards or passes on the ref
 	 * elsewhere.
 	 */
-	rxrpc_input_call_packet(call, skb);
+	rxrpc_input_call_event(call, skb);
 	goto out;
 
 discard:
@@ -413,6 +413,7 @@ int rxrpc_io_thread(void *data)
 {
 	struct sk_buff_head rx_queue;
 	struct rxrpc_local *local = data;
+	struct rxrpc_call *call;
 	struct sk_buff *skb;
 
 	skb_queue_head_init(&rx_queue);
@@ -422,6 +423,20 @@ int rxrpc_io_thread(void *data)
 	for (;;) {
 		rxrpc_inc_stat(local->rxnet, stat_io_loop);
 
+		/* Deal with calls that want immediate attention. */
+		if ((call = list_first_entry_or_null(&local->call_attend_q,
+						     struct rxrpc_call,
+						     attend_link))) {
+			spin_lock_bh(&local->lock);
+			list_del_init(&call->attend_link);
+			spin_unlock_bh(&local->lock);
+
+			trace_rxrpc_call_poked(call);
+			rxrpc_input_call_event(call, NULL);
+			rxrpc_put_call(call, rxrpc_call_put_poke);
+			continue;
+		}
+
 		/* Process received packets and errors. */
 		if ((skb = __skb_dequeue(&rx_queue))) {
 			switch (skb->mark) {
@@ -450,7 +465,8 @@ int rxrpc_io_thread(void *data)
 		}
 
 		set_current_state(TASK_INTERRUPTIBLE);
-		if (!skb_queue_empty(&local->rx_queue)) {
+		if (!skb_queue_empty(&local->rx_queue) ||
+		    !list_empty(&local->call_attend_q)) {
 			__set_current_state(TASK_RUNNING);
 			continue;
 		}
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 6b4d77219f36..03f491cc23ef 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -104,6 +104,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
 		skb_queue_head_init(&local->reject_queue);
 		skb_queue_head_init(&local->event_queue);
 		skb_queue_head_init(&local->rx_queue);
+		INIT_LIST_HEAD(&local->call_attend_q);
 		local->client_bundles = RB_ROOT;
 		spin_lock_init(&local->client_bundles_lock);
 		spin_lock_init(&local->lock);





More information about the linux-afs mailing list