[PATCH] af_rxrpc accept race (was: RxRPC: 3 issues found in my example code)

Tim Smith tim at electronghost.co.uk
Tue May 20 14:42:31 PDT 2014


On Monday 12 May 2014 17:39:45 Tim Smith wrote:
> On Monday 12 May 2014 13:50:13 David Howells wrote:
> > Tim Smith <tim at electronghost.co.uk> wrote:
> > > -			skb_queue_tail(&local->accept_queue, skb);
> > > -			rxrpc_queue_work(&local->acceptor);
> > > +			rxrpc_accept_incoming_call(local, skb);
> > 
> > The main problem I have with this is that you're putting a lot of work
> > into
> > the UDP driver's sk_data_ready callback.  Some of that work involves
> > memory
> > allocation with GFP_NOIO or GFP_NOFS - which may not be appropriate for
> > the
> > context.
> 
> In which case going the other way will be necessary and sending the
> subsequent packets through the work queue. Well, that or adding a lock back
> in.
> 
> I will try the former.

OK, the attached sends all the packets through a work queue, not just the 
first packet of a connection. I've run >200000 large requests through this and 
with all the debug turned off I got 12 "Busy" responses (which turn up as 
"RXRPC_LOCAL_ERROR: Software caused connection abort").

When I turn the server debug *on* I get *LOTS* of busy responses. Further 
investigation indicates that the -EBUSY is coming back through line 479 of ar-
call.c in rxrpc_incoming_call(), which means there is another race between 
processing the incoming packet for the new connection, and transmitting the 
ACK for the end of the last one (which is when the call state gets updated).

That will want separate thinking-about. I think.

-- 
Tim Smith <tim at electronghost.co.uk>
Boba Fett is actually a small, green, scaly humanoid of about 25cm height. 
He controls his Mandalorian Battle Armour via a complicated system of
levers, pulleys, and bits of string.
-------------- next part --------------
af_rxrpc: Race in connection setup

From: Tim Smith <tim at electronghost.co.uk>

When the first few packets of the call arrive very close together,  
sending the first via a work queue and the rest directly via 
rxrpc_post_packet_to_call() creates a race where the second packet 
can be processed before the first, causing it to be rejected because 
the call has not yet been set up. The solution is to send all packets 
via the work queue.

Signed-off-by: Tim Smith <tim at electronghost.co.uk>
---
 net/rxrpc/ar-accept.c   |   46 ++++------------
 net/rxrpc/ar-input.c    |  135 +++++++++++++++++++++++++++++------------------
 net/rxrpc/ar-internal.h |    7 +-
 net/rxrpc/ar-local.c    |    8 +--
 4 files changed, 101 insertions(+), 95 deletions(-)

diff --git a/net/rxrpc/ar-accept.c b/net/rxrpc/ar-accept.c
index 6d79310..707cf88 100644
--- a/net/rxrpc/ar-accept.c
+++ b/net/rxrpc/ar-accept.c
@@ -67,13 +67,12 @@ static int rxrpc_busy(struct rxrpc_local *local, struct sockaddr_rxrpc *srx,
 }
 
 /*
- * accept an incoming call that needs peer, transport and/or connection setting
- * up
+ * Set up a peer, transport and/or connection for an incoming call.
  */
-static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
-				      struct rxrpc_sock *rx,
-				      struct sk_buff *skb,
-				      struct sockaddr_rxrpc *srx)
+static int rxrpc_setup_incoming_call(struct rxrpc_local *local,
+				     struct rxrpc_sock *rx,
+				     struct sk_buff *skb,
+				     struct sockaddr_rxrpc *srx)
 {
 	struct rxrpc_connection *conn;
 	struct rxrpc_transport *trans;
@@ -204,38 +203,15 @@ error_nofree:
  * accept incoming calls that need peer, transport and/or connection setting up
  * - the packets we get are all incoming client DATA packets that have seq == 1
  */
-void rxrpc_accept_incoming_calls(struct work_struct *work)
+void rxrpc_accept_incoming_call(struct rxrpc_local *local, struct sk_buff *skb)
 {
-	struct rxrpc_local *local =
-		container_of(work, struct rxrpc_local, acceptor);
 	struct rxrpc_skb_priv *sp;
 	struct sockaddr_rxrpc srx;
 	struct rxrpc_sock *rx;
-	struct sk_buff *skb;
 	__be16 service_id;
 	int ret;
 
 	_enter("%d", local->debug_id);
-
-	read_lock_bh(&rxrpc_local_lock);
-	if (atomic_read(&local->usage) > 0)
-		rxrpc_get_local(local);
-	else
-		local = NULL;
-	read_unlock_bh(&rxrpc_local_lock);
-	if (!local) {
-		_leave(" [local dead]");
-		return;
-	}
-
-process_next_packet:
-	skb = skb_dequeue(&local->accept_queue);
-	if (!skb) {
-		rxrpc_put_local(local);
-		_leave("\n");
-		return;
-	}
-
 	_net("incoming call skb %p", skb);
 
 	sp = rxrpc_skb(skb);
@@ -274,7 +250,7 @@ found_service:
 	sock_hold(&rx->sk);
 	read_unlock_bh(&local->services_lock);
 
-	ret = rxrpc_accept_incoming_call(local, rx, skb, &srx);
+	ret = rxrpc_setup_incoming_call(local, rx, skb, &srx);
 	if (ret < 0)
 		sk_acceptq_removed(&rx->sk);
 	sock_put(&rx->sk);
@@ -282,7 +258,7 @@ found_service:
 	case -ECONNRESET: /* old calls are ignored */
 	case -ECONNABORTED: /* aborted calls are reaborted or ignored */
 	case 0:
-		goto process_next_packet;
+		return;
 	case -ECONNREFUSED:
 		goto invalid_service;
 	case -EBUSY:
@@ -298,18 +274,18 @@ backlog_full:
 busy:
 	rxrpc_busy(local, &srx, &sp->hdr);
 	rxrpc_free_skb(skb);
-	goto process_next_packet;
+	return;
 
 invalid_service:
 	skb->priority = RX_INVALID_OPERATION;
 	rxrpc_reject_packet(local, skb);
-	goto process_next_packet;
+	return;
 
 	/* can't change connection security type mid-flow */
 security_mismatch:
 	skb->priority = RX_PROTOCOL_ERROR;
 	rxrpc_reject_packet(local, skb);
-	goto process_next_packet;
+	return;
 }
 
 /*
diff --git a/net/rxrpc/ar-input.c b/net/rxrpc/ar-input.c
index 7374264..2fcb90b 100644
--- a/net/rxrpc/ar-input.c
+++ b/net/rxrpc/ar-input.c
@@ -710,65 +710,94 @@ void rxrpc_data_ready(struct sock *sk, int count)
 	    (sp->hdr.callNumber == 0 || sp->hdr.seq == 0))
 		goto bad_message;
 
-	if (sp->hdr.callNumber == 0) {
-		/* This is a connection-level packet. These should be
-		 * fairly rare, so the extra overhead of looking them up the
-		 * old-fashioned way doesn't really hurt */
-		struct rxrpc_connection *conn;
-
-		conn = rxrpc_conn_from_local(local, skb, sp);
-		if (!conn)
-			goto cant_route_call;
-
-		_debug("CONN %p {%d}", conn, conn->debug_id);
-		rxrpc_post_packet_to_conn(conn, skb);
-		rxrpc_put_connection(conn);
-	} else {
-		struct rxrpc_call *call;
-		u8 in_clientflag = 0;
-
-		if (sp->hdr.flags & RXRPC_CLIENT_INITIATED)
-			in_clientflag = RXRPC_CLIENT_INITIATED;
-		call = rxrpc_find_call_hash(in_clientflag, sp->hdr.cid,
-					    sp->hdr.callNumber, sp->hdr.epoch,
-					    sp->hdr.serviceId, local, AF_INET,
-					    (u8 *)&ip_hdr(skb)->saddr);
-		if (call)
-			rxrpc_post_packet_to_call(call, skb);
-		else
-			goto cant_route_call;
-	}
+	/* Anything which is not a bad_message can go onto the input queue to
+	 * be dealt with by that. */
+	skb_queue_tail(&local->input_queue, skb);
+	rxrpc_queue_work(&local->inputq);
 	rxrpc_put_local(local);
 	return;
 
-cant_route_call:
-	_debug("can't route call");
-	if (sp->hdr.flags & RXRPC_CLIENT_INITIATED &&
-	    sp->hdr.type == RXRPC_PACKET_TYPE_DATA) {
-		if (sp->hdr.seq == cpu_to_be32(1)) {
-			_debug("first packet");
-			skb_queue_tail(&local->accept_queue, skb);
-			rxrpc_queue_work(&local->acceptor);
-			rxrpc_put_local(local);
-			_leave(" [incoming]");
-			return;
-		}
-		skb->priority = RX_INVALID_OPERATION;
-	} else {
-		skb->priority = RX_CALL_DEAD;
-	}
-
-	if (sp->hdr.type != RXRPC_PACKET_TYPE_ABORT) {
-		_debug("reject type %d",sp->hdr.type);
-		rxrpc_reject_packet(local, skb);
-	}
-	rxrpc_put_local(local);
-	_leave(" [no call]");
-	return;
-
 bad_message:
 	skb->priority = RX_PROTOCOL_ERROR;
 	rxrpc_reject_packet(local, skb);
 	rxrpc_put_local(local);
 	_leave(" [badmsg]");
 }
+
+/*
+ * Work queue handler for data recieved on the local endpoint
+ */
+void rxrpc_input_queue(struct work_struct *work)
+{
+	struct rxrpc_local *local =
+		container_of(work, struct rxrpc_local, inputq);
+	struct sk_buff *skb;
+	struct rxrpc_skb_priv *sp;
+
+	_enter("%d", local->debug_id);
+
+	read_lock_bh(&rxrpc_local_lock);
+	if (atomic_read(&local->usage) > 0)
+		rxrpc_get_local(local);
+	else
+		local = NULL;
+	read_unlock_bh(&rxrpc_local_lock);
+	if (!local) {
+		_leave(" [local dead]");
+		return;
+	}
+
+	while ((skb = skb_dequeue(&local->input_queue))) {
+		sp = rxrpc_skb(skb);
+		
+		if (sp->hdr.callNumber == 0) {
+			/* This is a connection-level packet. These should be
+			* fairly rare, so the extra overhead of looking them up the
+			* old-fashioned way doesn't really hurt */
+			struct rxrpc_connection *conn;
+
+			if ((conn = rxrpc_conn_from_local(local, skb, sp))) {
+				_debug("CONN %p {%d}", conn, conn->debug_id);
+				rxrpc_post_packet_to_conn(conn, skb);
+				rxrpc_put_connection(conn);
+				continue;
+			}
+		} else {
+			struct rxrpc_call *call;
+			u8 in_clientflag = 0;
+
+			if (sp->hdr.flags & RXRPC_CLIENT_INITIATED)
+				in_clientflag = RXRPC_CLIENT_INITIATED;
+			call = rxrpc_find_call_hash(in_clientflag, sp->hdr.cid,
+						sp->hdr.callNumber, sp->hdr.epoch,
+						sp->hdr.serviceId, local, AF_INET,
+						(u8 *)&ip_hdr(skb)->saddr);
+			if (call) {
+				rxrpc_post_packet_to_call(call, skb);
+				continue;
+			}
+		}
+		
+		_debug("can't route call");
+		if (sp->hdr.flags & RXRPC_CLIENT_INITIATED &&
+		sp->hdr.type == RXRPC_PACKET_TYPE_DATA) {
+			if (sp->hdr.seq == cpu_to_be32(1)) {
+				_debug("first packet");
+				rxrpc_accept_incoming_call(local, skb);
+				_leave(" [incoming]");
+				continue;
+			}
+			skb->priority = RX_INVALID_OPERATION;
+		} else {
+			skb->priority = RX_CALL_DEAD;
+		}
+
+		if (sp->hdr.type != RXRPC_PACKET_TYPE_ABORT) {
+			_debug("reject type %d",sp->hdr.type);
+			rxrpc_reject_packet(local, skb);
+		}
+	}
+	rxrpc_put_local(local);
+	_leave("\n");
+	return;
+}
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index c831d44..f48b40d 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -150,12 +150,12 @@ struct rxrpc_security {
 struct rxrpc_local {
 	struct socket		*socket;	/* my UDP socket */
 	struct work_struct	destroyer;	/* endpoint destroyer */
-	struct work_struct	acceptor;	/* incoming call processor */
+	struct work_struct	inputq;		/* incoming packet processor */
 	struct work_struct	rejecter;	/* packet reject writer */
 	struct list_head	services;	/* services listening on this endpoint */
 	struct list_head	link;		/* link in endpoint list */
 	struct rw_semaphore	defrag_sem;	/* control re-enablement of IP DF bit */
-	struct sk_buff_head	accept_queue;	/* incoming calls awaiting acceptance */
+	struct sk_buff_head	input_queue;	/* incoming items awaiting processing */
 	struct sk_buff_head	reject_queue;	/* packets awaiting rejection */
 	spinlock_t		lock;		/* access lock */
 	rwlock_t		services_lock;	/* lock for services list */
@@ -437,7 +437,7 @@ extern struct workqueue_struct *rxrpc_workqueue;
 /*
  * ar-accept.c
  */
-void rxrpc_accept_incoming_calls(struct work_struct *);
+void rxrpc_accept_incoming_call(struct rxrpc_local *local, struct sk_buff *skb);
 struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long);
 int rxrpc_reject_call(struct rxrpc_sock *);
 
@@ -521,6 +521,7 @@ extern const char *rxrpc_pkts[];
 void rxrpc_data_ready(struct sock *, int);
 int rxrpc_queue_rcv_skb(struct rxrpc_call *, struct sk_buff *, bool, bool);
 void rxrpc_fast_process_packet(struct rxrpc_call *, struct sk_buff *);
+void rxrpc_input_queue(struct work_struct *);
 
 /*
  * ar-local.c
diff --git a/net/rxrpc/ar-local.c b/net/rxrpc/ar-local.c
index 87f7135..93bf036 100644
--- a/net/rxrpc/ar-local.c
+++ b/net/rxrpc/ar-local.c
@@ -35,12 +35,12 @@ struct rxrpc_local *rxrpc_alloc_local(struct sockaddr_rxrpc *srx)
 	local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL);
 	if (local) {
 		INIT_WORK(&local->destroyer, &rxrpc_destroy_local);
-		INIT_WORK(&local->acceptor, &rxrpc_accept_incoming_calls);
+		INIT_WORK(&local->inputq, &rxrpc_input_queue);
 		INIT_WORK(&local->rejecter, &rxrpc_reject_packets);
 		INIT_LIST_HEAD(&local->services);
 		INIT_LIST_HEAD(&local->link);
 		init_rwsem(&local->defrag_sem);
-		skb_queue_head_init(&local->accept_queue);
+		skb_queue_head_init(&local->input_queue);
 		skb_queue_head_init(&local->reject_queue);
 		spin_lock_init(&local->lock);
 		rwlock_init(&local->services_lock);
@@ -262,11 +262,11 @@ static void rxrpc_destroy_local(struct work_struct *work)
 	downgrade_write(&rxrpc_local_sem);
 
 	ASSERT(list_empty(&local->services));
-	ASSERT(!work_pending(&local->acceptor));
+	ASSERT(!work_pending(&local->inputq));
 	ASSERT(!work_pending(&local->rejecter));
 
 	/* finish cleaning up the local descriptor */
-	rxrpc_purge_queue(&local->accept_queue);
+	rxrpc_purge_queue(&local->input_queue);
 	rxrpc_purge_queue(&local->reject_queue);
 	kernel_sock_shutdown(local->socket, SHUT_RDWR);
 	sock_release(local->socket);
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 836 bytes
Desc: This is a digitally signed message part.
URL: <http://lists.infradead.org/pipermail/linux-afs/attachments/20140520/a0cb9808/attachment.sig>


More information about the linux-afs mailing list