[PATCH net-next 06/32] rxrpc: Convert call->state_lock to a spinlock

David Howells dhowells at redhat.com
Tue Dec 6 07:59:25 PST 2022


Convert call->state_lock to a spinlock and use a barrier on it when setting
the completion state.  The only readers can then be made to read it
locklessly.

Signed-off-by: David Howells <dhowells at redhat.com>
cc: Marc Dionne <marc.dionne at auristor.com>
cc: linux-afs at lists.infradead.org
---

 net/rxrpc/ar-internal.h |    2 +-
 net/rxrpc/call_object.c |    2 +-
 net/rxrpc/conn_client.c |    4 ++--
 net/rxrpc/conn_event.c  |    4 ++--
 net/rxrpc/input.c       |    6 +++---
 net/rxrpc/recvmsg.c     |   23 ++++++++++++-----------
 net/rxrpc/sendmsg.c     |   17 ++++++-----------
 7 files changed, 27 insertions(+), 31 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 423f2e1eddb3..755395d1f2ca 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -622,7 +622,7 @@ struct rxrpc_call {
 	unsigned long		flags;
 	unsigned long		events;
 	spinlock_t		notify_lock;	/* Kernel notification lock */
-	rwlock_t		state_lock;	/* lock for state transition */
+	spinlock_t		state_lock;	/* lock for state transition */
 	u32			abort_code;	/* Local/remote abort code */
 	int			error;		/* Local error incurred */
 	enum rxrpc_call_state	state;		/* current state of call */
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 36cc868b8922..07abf12e99bb 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -162,7 +162,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 	init_waitqueue_head(&call->waitq);
 	spin_lock_init(&call->notify_lock);
 	spin_lock_init(&call->tx_lock);
-	rwlock_init(&call->state_lock);
+	spin_lock_init(&call->state_lock);
 	refcount_set(&call->ref, 1);
 	call->debug_id = debug_id;
 	call->tx_total_len = -1;
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index 87efa0373aed..ec8913de42c9 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -555,9 +555,9 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
 
 	trace_rxrpc_connect_call(call);
 
-	write_lock(&call->state_lock);
+	spin_lock(&call->state_lock);
 	call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
-	write_unlock(&call->state_lock);
+	spin_unlock(&call->state_lock);
 
 	/* Paired with the read barrier in rxrpc_connect_call().  This orders
 	 * cid and epoch in the connection wrt to call_id without the need to
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index dfd29882126f..f05d58636307 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -265,12 +265,12 @@ static void rxrpc_call_is_secure(struct rxrpc_call *call)
 {
 	_enter("%p", call);
 	if (call) {
-		write_lock(&call->state_lock);
+		spin_lock(&call->state_lock);
 		if (call->state == RXRPC_CALL_SERVER_SECURING) {
 			call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
 			rxrpc_notify_socket(call);
 		}
-		write_unlock(&call->state_lock);
+		spin_unlock(&call->state_lock);
 	}
 }
 
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index dbd92f09c2ca..3b2e8e7d2e0f 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -257,7 +257,7 @@ static bool rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun,
 
 	ASSERT(test_bit(RXRPC_CALL_TX_LAST, &call->flags));
 
-	write_lock(&call->state_lock);
+	spin_lock(&call->state_lock);
 
 	state = call->state;
 	switch (state) {
@@ -278,7 +278,7 @@ static bool rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun,
 		goto bad_state;
 	}
 
-	write_unlock(&call->state_lock);
+	spin_unlock(&call->state_lock);
 	if (state == RXRPC_CALL_CLIENT_AWAIT_REPLY)
 		trace_rxrpc_txqueue(call, rxrpc_txqueue_await_reply);
 	else
@@ -287,7 +287,7 @@ static bool rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun,
 	return true;
 
 bad_state:
-	write_unlock(&call->state_lock);
+	spin_unlock(&call->state_lock);
 	kdebug("end_tx %s", rxrpc_call_states[call->state]);
 	rxrpc_proto_abort(abort_why, call, call->tx_top);
 	return false;
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 0cde2b477711..a9c9b2a8a27a 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -70,7 +70,8 @@ bool __rxrpc_set_call_completion(struct rxrpc_call *call,
 		call->abort_code = abort_code;
 		call->error = error;
 		call->completion = compl;
-		call->state = RXRPC_CALL_COMPLETE;
+		/* Allow reader of completion state to operate locklessly */
+		smp_store_release(&call->state, RXRPC_CALL_COMPLETE);
 		trace_rxrpc_call_complete(call);
 		wake_up(&call->waitq);
 		rxrpc_notify_socket(call);
@@ -87,9 +88,9 @@ bool rxrpc_set_call_completion(struct rxrpc_call *call,
 	bool ret = false;
 
 	if (call->state < RXRPC_CALL_COMPLETE) {
-		write_lock(&call->state_lock);
+		spin_lock(&call->state_lock);
 		ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
-		write_unlock(&call->state_lock);
+		spin_unlock(&call->state_lock);
 	}
 	return ret;
 }
@@ -107,9 +108,9 @@ bool rxrpc_call_completed(struct rxrpc_call *call)
 	bool ret = false;
 
 	if (call->state < RXRPC_CALL_COMPLETE) {
-		write_lock(&call->state_lock);
+		spin_lock(&call->state_lock);
 		ret = __rxrpc_call_completed(call);
-		write_unlock(&call->state_lock);
+		spin_unlock(&call->state_lock);
 	}
 	return ret;
 }
@@ -131,9 +132,9 @@ bool rxrpc_abort_call(const char *why, struct rxrpc_call *call,
 {
 	bool ret;
 
-	write_lock(&call->state_lock);
+	spin_lock(&call->state_lock);
 	ret = __rxrpc_abort_call(why, call, seq, abort_code, error);
-	write_unlock(&call->state_lock);
+	spin_unlock(&call->state_lock);
 	return ret;
 }
 
@@ -193,23 +194,23 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
 	if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
 		rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
 
-	write_lock(&call->state_lock);
+	spin_lock(&call->state_lock);
 
 	switch (call->state) {
 	case RXRPC_CALL_CLIENT_RECV_REPLY:
 		__rxrpc_call_completed(call);
-		write_unlock(&call->state_lock);
+		spin_unlock(&call->state_lock);
 		break;
 
 	case RXRPC_CALL_SERVER_RECV_REQUEST:
 		call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
 		call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
-		write_unlock(&call->state_lock);
+		spin_unlock(&call->state_lock);
 		rxrpc_propose_delay_ACK(call, serial,
 					rxrpc_propose_ack_processing_op);
 		break;
 	default:
-		write_unlock(&call->state_lock);
+		spin_unlock(&call->state_lock);
 		break;
 	}
 }
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index cde1e65f16b4..816c1b083a69 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -195,7 +195,7 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
 
 	if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
 		_debug("________awaiting reply/ACK__________");
-		write_lock(&call->state_lock);
+		spin_lock(&call->state_lock);
 		switch (call->state) {
 		case RXRPC_CALL_CLIENT_SEND_REQUEST:
 			call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
@@ -218,7 +218,7 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
 		default:
 			break;
 		}
-		write_unlock(&call->state_lock);
+		spin_unlock(&call->state_lock);
 	}
 
 	if (poke)
@@ -354,12 +354,9 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
 
 success:
 	ret = copied;
-	if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) {
-		read_lock(&call->state_lock);
-		if (call->error < 0)
-			ret = call->error;
-		read_unlock(&call->state_lock);
-	}
+	if (smp_load_acquire(&call->state) == RXRPC_CALL_COMPLETE &&
+	    call->error < 0)
+		ret = call->error;
 out:
 	call->tx_pending = txb;
 	_leave(" = %d", ret);
@@ -715,7 +712,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
 	_debug("CALL %d USR %lx ST %d on CONN %p",
 	       call->debug_id, call->user_call_ID, call->state, call->conn);
 
-	switch (READ_ONCE(call->state)) {
+	switch (smp_load_acquire(&call->state)) {
 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
 	case RXRPC_CALL_SERVER_ACK_REQUEST:
 	case RXRPC_CALL_SERVER_SEND_REPLY:
@@ -723,9 +720,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
 				      notify_end_tx, &dropped_lock);
 		break;
 	case RXRPC_CALL_COMPLETE:
-		read_lock(&call->state_lock);
 		ret = call->error;
-		read_unlock(&call->state_lock);
 		break;
 	default:
 		/* Request phase complete for this client call */





More information about the linux-afs mailing list