[PATCH net-next 23/32] rxrpc: Wrap accesses to get call state to put the barrier in one place

David Howells dhowells at redhat.com
Tue Dec 6 08:01:50 PST 2022


Wrap accesses to get the state of a call from outside of the I/O thread in
a single place so that the barrier needed to order wrt the error code and
abort code is in just that place.

Signed-off-by: David Howells <dhowells at redhat.com>
cc: Marc Dionne <marc.dionne at auristor.com>
cc: linux-afs at lists.infradead.org
---

 net/rxrpc/af_rxrpc.c    |    2 +-
 net/rxrpc/ar-internal.h |   16 ++++++++++++++++
 net/rxrpc/recvmsg.c     |   12 ++++++------
 net/rxrpc/sendmsg.c     |   22 +++++++++++-----------
 4 files changed, 34 insertions(+), 18 deletions(-)

diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index 7446b7bd5490..74304f81ade7 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -379,7 +379,7 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call);
 bool rxrpc_kernel_check_life(const struct socket *sock,
 			     const struct rxrpc_call *call)
 {
-	return call->state != RXRPC_CALL_COMPLETE;
+	return !rxrpc_call_is_complete(call);
 }
 EXPORT_SYMBOL(rxrpc_kernel_check_life);
 
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index a95e161bd980..6a5552274dca 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -896,6 +896,22 @@ bool rxrpc_call_completed(struct rxrpc_call *);
 bool __rxrpc_abort_call(struct rxrpc_call *, rxrpc_seq_t, u32, int, enum rxrpc_abort_reason);
 bool rxrpc_abort_call(struct rxrpc_call *, rxrpc_seq_t, u32, int, enum rxrpc_abort_reason);
 
+static inline enum rxrpc_call_state rxrpc_call_state(const struct rxrpc_call *call)
+{
+	/* Order read ->state before read ->error. */
+	return smp_load_acquire(&call->state);
+}
+
+static inline bool rxrpc_call_is_complete(const struct rxrpc_call *call)
+{
+	return rxrpc_call_state(call) == RXRPC_CALL_COMPLETE;
+}
+
+static inline bool rxrpc_call_has_failed(const struct rxrpc_call *call)
+{
+	return rxrpc_call_is_complete(call) && call->completion != RXRPC_CALL_SUCCEEDED;
+}
+
 /*
  * conn_client.c
  */
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index c163c103aa1d..44576d16cdb0 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -89,7 +89,7 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
 		break;
 	default:
-		pr_err("Invalid terminal call state %u\n", call->state);
+		pr_err("Invalid terminal call state %u\n", call->completion);
 		BUG();
 		break;
 	}
@@ -111,7 +111,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
 
 	trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
 
-	if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
+	if (rxrpc_call_state(call) == RXRPC_CALL_CLIENT_RECV_REPLY)
 		rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
 
 	spin_lock(&call->state_lock);
@@ -210,7 +210,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 	rx_pkt_offset = call->rx_pkt_offset;
 	rx_pkt_len = call->rx_pkt_len;
 
-	if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
+	if (rxrpc_call_state(call) >= RXRPC_CALL_SERVER_ACK_REQUEST) {
 		seq = call->ackr_window - 1;
 		ret = 1;
 		goto done;
@@ -414,7 +414,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 		msg->msg_namelen = len;
 	}
 
-	switch (READ_ONCE(call->state)) {
+	switch (rxrpc_call_state(call)) {
 	case RXRPC_CALL_CLIENT_RECV_REPLY:
 	case RXRPC_CALL_SERVER_RECV_REQUEST:
 	case RXRPC_CALL_SERVER_ACK_REQUEST:
@@ -434,7 +434,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 	if (ret < 0)
 		goto error_unlock_call;
 
-	if (call->state == RXRPC_CALL_COMPLETE) {
+	if (rxrpc_call_is_complete(call)) {
 		ret = rxrpc_recvmsg_term(call, msg);
 		if (ret < 0)
 			goto error_unlock_call;
@@ -514,7 +514,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 
 	mutex_lock(&call->user_mutex);
 
-	switch (READ_ONCE(call->state)) {
+	switch (rxrpc_call_state(call)) {
 	case RXRPC_CALL_CLIENT_RECV_REPLY:
 	case RXRPC_CALL_SERVER_RECV_REQUEST:
 	case RXRPC_CALL_SERVER_ACK_REQUEST:
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index 51c676bf03a8..1f047ec27316 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -25,7 +25,7 @@ bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error,
 {
 	_enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
 
-	if (!call->send_abort && call->state < RXRPC_CALL_COMPLETE) {
+	if (!call->send_abort && !rxrpc_call_is_complete(call)) {
 		call->send_abort_why = why;
 		call->send_abort_err = error;
 		call->send_abort_seq = 0;
@@ -59,7 +59,7 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
 		if (rxrpc_check_tx_space(call, NULL))
 			return 0;
 
-		if (call->state >= RXRPC_CALL_COMPLETE)
+		if (rxrpc_call_is_complete(call))
 			return call->error;
 
 		if (signal_pending(current))
@@ -94,7 +94,7 @@ static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
 		if (rxrpc_check_tx_space(call, &tx_win))
 			return 0;
 
-		if (call->state >= RXRPC_CALL_COMPLETE)
+		if (rxrpc_call_is_complete(call))
 			return call->error;
 
 		if (timeout == 0 &&
@@ -123,7 +123,7 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
 		if (rxrpc_check_tx_space(call, NULL))
 			return 0;
 
-		if (call->state >= RXRPC_CALL_COMPLETE)
+		if (rxrpc_call_is_complete(call))
 			return call->error;
 
 		trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
@@ -272,7 +272,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
 	ret = -EPIPE;
 	if (sk->sk_shutdown & SEND_SHUTDOWN)
 		goto maybe_error;
-	state = READ_ONCE(call->state);
+	state = rxrpc_call_state(call);
 	ret = -ESHUTDOWN;
 	if (state >= RXRPC_CALL_COMPLETE)
 		goto maybe_error;
@@ -349,7 +349,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
 
 		/* check for the far side aborting the call or a network error
 		 * occurring */
-		if (call->state == RXRPC_CALL_COMPLETE)
+		if (rxrpc_call_is_complete(call))
 			goto call_terminated;
 
 		/* add the packet to the send queue if it's now full */
@@ -374,7 +374,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
 
 success:
 	ret = copied;
-	if (smp_load_acquire(&call->state) == RXRPC_CALL_COMPLETE &&
+	if (rxrpc_call_is_complete(call) &&
 	    call->error < 0)
 		ret = call->error;
 out:
@@ -614,10 +614,10 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
 			return PTR_ERR(call);
 		/* ... and we have the call lock. */
 		ret = 0;
-		if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE)
+		if (rxrpc_call_is_complete(call))
 			goto out_put_unlock;
 	} else {
-		switch (READ_ONCE(call->state)) {
+		switch (rxrpc_call_state(call)) {
 		case RXRPC_CALL_UNINITIALISED:
 		case RXRPC_CALL_CLIENT_AWAIT_CONN:
 		case RXRPC_CALL_SERVER_PREALLOC:
@@ -671,7 +671,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
 		break;
 	}
 
-	state = READ_ONCE(call->state);
+	state = rxrpc_call_state(call);
 	_debug("CALL %d USR %lx ST %d on CONN %p",
 	       call->debug_id, call->user_call_ID, state, call->conn);
 
@@ -731,7 +731,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
 	_debug("CALL %d USR %lx ST %d on CONN %p",
 	       call->debug_id, call->user_call_ID, call->state, call->conn);
 
-	switch (smp_load_acquire(&call->state)) {
+	switch (rxrpc_call_state(call)) {
 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
 	case RXRPC_CALL_SERVER_ACK_REQUEST:
 	case RXRPC_CALL_SERVER_SEND_REPLY:





More information about the linux-afs mailing list