[PATCH] af_rxrpc accept race (was: RxRPC: 3 issues found in my example code)

Tim Smith tim at electronghost.co.uk
Thu May 1 13:52:06 PDT 2014


On Monday 28 April 2014 13:47:42 Leon Liu wrote:
> Hi Tim,
> 
> Thank you so much for your very quick response and patch!
> 
> I've tried the patch. It did fixed the issue #1. I know it maybe a temporary
> fixing, whatever, I observed two new issues:
> 
> 1. The fixing increases probability of hitting issue #2 (rxrpc stack return
> "local error 103, rx busy") dramatically. Probability changes from 1/100000
> to 1/5000. You have to turn rxrpc log off to reproduce the issue.
> Occasionally, this can be fixed by re-insmod af-rxrpc.ko at sender.
> 2. After hitting issue #2 a lot of times, receiver kernel hangs. This was
> reproduced 3 times.
> 
> I can run the test again if you have a formal fixing to see if I can still
> reproduce these two additional issues.

I suspect the reason for hitting your original issue #2 is simply because 
everything got quicker and there is a different problem on the RX side. I'll 
take a peek at that next and see what I can see.

I don't think fixing the accept race on the server side (your issue #1) will 
help with the other.

David: two patches against your rxrpc branch. See what you think.

The first removes the acceptor work queue

The second removes what appears to be a redundant check, along with 
lock/unlock and associated exit path.

-- 
Tim Smith <tim at electronghost.co.uk>
The Rebel Alliance is due to start selling shares in its Special Operations
Division.
-------------- next part --------------
af_rxrpc: Race in connection setup

When the first few packets of the call arrive very close together, sending the
first via a workqueue creates a race where the second packet can be processed
before the first, causing it to be rejected because the call has not yet been
set up.

Signed-off-by: Tim Smith <tim at electronghost.co.uk>
---
 net/rxrpc/ar-accept.c   |   46 +++++++++++-----------------------------------
 net/rxrpc/ar-input.c    |    3 +--
 net/rxrpc/ar-internal.h |    3 +--
 net/rxrpc/ar-local.c    |    2 --
 4 files changed, 13 insertions(+), 41 deletions(-)

diff --git a/net/rxrpc/ar-accept.c b/net/rxrpc/ar-accept.c
index 6d79310..707cf88 100644
--- a/net/rxrpc/ar-accept.c
+++ b/net/rxrpc/ar-accept.c
@@ -67,13 +67,12 @@ static int rxrpc_busy(struct rxrpc_local *local, struct sockaddr_rxrpc *srx,
 }
 
 /*
- * accept an incoming call that needs peer, transport and/or connection setting
- * up
+ * Set up a peer, transport and/or connection for an incoming call.
  */
-static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
-				      struct rxrpc_sock *rx,
-				      struct sk_buff *skb,
-				      struct sockaddr_rxrpc *srx)
+static int rxrpc_setup_incoming_call(struct rxrpc_local *local,
+				     struct rxrpc_sock *rx,
+				     struct sk_buff *skb,
+				     struct sockaddr_rxrpc *srx)
 {
 	struct rxrpc_connection *conn;
 	struct rxrpc_transport *trans;
@@ -204,38 +203,15 @@ error_nofree:
  * accept incoming calls that need peer, transport and/or connection setting up
  * - the packets we get are all incoming client DATA packets that have seq == 1
  */
-void rxrpc_accept_incoming_calls(struct work_struct *work)
+void rxrpc_accept_incoming_call(struct rxrpc_local *local, struct sk_buff *skb)
 {
-	struct rxrpc_local *local =
-		container_of(work, struct rxrpc_local, acceptor);
 	struct rxrpc_skb_priv *sp;
 	struct sockaddr_rxrpc srx;
 	struct rxrpc_sock *rx;
-	struct sk_buff *skb;
 	__be16 service_id;
 	int ret;
 
 	_enter("%d", local->debug_id);
-
-	read_lock_bh(&rxrpc_local_lock);
-	if (atomic_read(&local->usage) > 0)
-		rxrpc_get_local(local);
-	else
-		local = NULL;
-	read_unlock_bh(&rxrpc_local_lock);
-	if (!local) {
-		_leave(" [local dead]");
-		return;
-	}
-
-process_next_packet:
-	skb = skb_dequeue(&local->accept_queue);
-	if (!skb) {
-		rxrpc_put_local(local);
-		_leave("\n");
-		return;
-	}
-
 	_net("incoming call skb %p", skb);
 
 	sp = rxrpc_skb(skb);
@@ -274,7 +250,7 @@ found_service:
 	sock_hold(&rx->sk);
 	read_unlock_bh(&local->services_lock);
 
-	ret = rxrpc_accept_incoming_call(local, rx, skb, &srx);
+	ret = rxrpc_setup_incoming_call(local, rx, skb, &srx);
 	if (ret < 0)
 		sk_acceptq_removed(&rx->sk);
 	sock_put(&rx->sk);
@@ -282,7 +258,7 @@ found_service:
 	case -ECONNRESET: /* old calls are ignored */
 	case -ECONNABORTED: /* aborted calls are reaborted or ignored */
 	case 0:
-		goto process_next_packet;
+		return;
 	case -ECONNREFUSED:
 		goto invalid_service;
 	case -EBUSY:
@@ -298,18 +274,18 @@ backlog_full:
 busy:
 	rxrpc_busy(local, &srx, &sp->hdr);
 	rxrpc_free_skb(skb);
-	goto process_next_packet;
+	return;
 
 invalid_service:
 	skb->priority = RX_INVALID_OPERATION;
 	rxrpc_reject_packet(local, skb);
-	goto process_next_packet;
+	return;
 
 	/* can't change connection security type mid-flow */
 security_mismatch:
 	skb->priority = RX_PROTOCOL_ERROR;
 	rxrpc_reject_packet(local, skb);
-	goto process_next_packet;
+	return;
 }
 
 /*
diff --git a/net/rxrpc/ar-input.c b/net/rxrpc/ar-input.c
index 7374264..91ebd3b 100644
--- a/net/rxrpc/ar-input.c
+++ b/net/rxrpc/ar-input.c
@@ -747,8 +747,7 @@ cant_route_call:
 	    sp->hdr.type == RXRPC_PACKET_TYPE_DATA) {
 		if (sp->hdr.seq == cpu_to_be32(1)) {
 			_debug("first packet");
-			skb_queue_tail(&local->accept_queue, skb);
-			rxrpc_queue_work(&local->acceptor);
+			rxrpc_accept_incoming_call(local, skb);
 			rxrpc_put_local(local);
 			_leave(" [incoming]");
 			return;
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index c831d44..43a6918 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -150,7 +150,6 @@ struct rxrpc_security {
 struct rxrpc_local {
 	struct socket		*socket;	/* my UDP socket */
 	struct work_struct	destroyer;	/* endpoint destroyer */
-	struct work_struct	acceptor;	/* incoming call processor */
 	struct work_struct	rejecter;	/* packet reject writer */
 	struct list_head	services;	/* services listening on this endpoint */
 	struct list_head	link;		/* link in endpoint list */
@@ -437,7 +436,7 @@ extern struct workqueue_struct *rxrpc_workqueue;
 /*
  * ar-accept.c
  */
-void rxrpc_accept_incoming_calls(struct work_struct *);
+void rxrpc_accept_incoming_call(struct rxrpc_local *local, struct sk_buff *skb);
 struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long);
 int rxrpc_reject_call(struct rxrpc_sock *);
 
diff --git a/net/rxrpc/ar-local.c b/net/rxrpc/ar-local.c
index 87f7135..71ce382 100644
--- a/net/rxrpc/ar-local.c
+++ b/net/rxrpc/ar-local.c
@@ -35,7 +35,6 @@ struct rxrpc_local *rxrpc_alloc_local(struct sockaddr_rxrpc *srx)
 	local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL);
 	if (local) {
 		INIT_WORK(&local->destroyer, &rxrpc_destroy_local);
-		INIT_WORK(&local->acceptor, &rxrpc_accept_incoming_calls);
 		INIT_WORK(&local->rejecter, &rxrpc_reject_packets);
 		INIT_LIST_HEAD(&local->services);
 		INIT_LIST_HEAD(&local->link);
@@ -262,7 +261,6 @@ static void rxrpc_destroy_local(struct work_struct *work)
 	downgrade_write(&rxrpc_local_sem);
 
 	ASSERT(list_empty(&local->services));
-	ASSERT(!work_pending(&local->acceptor));
 	ASSERT(!work_pending(&local->rejecter));
 
 	/* finish cleaning up the local descriptor */
-------------- next part --------------
af_rxrpc: Remove unnecessary sk_state == RXRPC_CLOSE check

rxrpc_setup_incoming_call() cannot be called with an rxrpc_sock in state 
RXRPC_CLOSE, so the check and exit path can be removed.

Signed-off-by: Tim Smith <tim at electronghost.co.uk>
---
 net/rxrpc/ar-accept.c |   18 ------------------
 1 file changed, 18 deletions(-)

diff --git a/net/rxrpc/ar-accept.c b/net/rxrpc/ar-accept.c
index 707cf88..ca35a3f 100644
--- a/net/rxrpc/ar-accept.c
+++ b/net/rxrpc/ar-accept.c
@@ -128,10 +128,6 @@ static int rxrpc_setup_incoming_call(struct rxrpc_local *local,
 	}
 
 	/* attach the call to the socket */
-	read_lock_bh(&local->services_lock);
-	if (rx->sk.sk_state == RXRPC_CLOSE)
-		goto invalid_service;
-
 	write_lock(&rx->call_lock);
 	if (!test_and_set_bit(RXRPC_CALL_INIT_ACCEPT, &call->flags)) {
 		rxrpc_get_call(call);
@@ -173,25 +169,11 @@ static int rxrpc_setup_incoming_call(struct rxrpc_local *local,
 	rxrpc_fast_process_packet(call, skb);
 
 	_debug("done");
-	read_unlock_bh(&local->services_lock);
 	rxrpc_free_skb(notification);
 	rxrpc_put_call(call);
 	_leave(" = 0");
 	return 0;
 
-invalid_service:
-	_debug("invalid");
-	read_unlock_bh(&local->services_lock);
-
-	read_lock_bh(&call->state_lock);
-	if (!test_bit(RXRPC_CALL_RELEASE, &call->flags) &&
-	    !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) {
-		rxrpc_get_call(call);
-		rxrpc_queue_call(call);
-	}
-	read_unlock_bh(&call->state_lock);
-	rxrpc_put_call(call);
-	ret = -ECONNREFUSED;
 error:
 	rxrpc_free_skb(notification);
 error_nofree:
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 836 bytes
Desc: This is a digitally signed message part.
URL: <http://lists.infradead.org/pipermail/linux-afs/attachments/20140501/de272935/attachment.sig>


More information about the linux-afs mailing list