[PATCH net-next 36/36] rxrpc: Transmit ACKs at the point of generation

David Howells dhowells at redhat.com
Thu Dec 1 16:20:09 PST 2022


For ACKs generated inside the I/O thread, transmit the ACK at the point of
generation.  Where the ACK is generated outside of the I/O thread, it's
offloaded to the I/O thread to transmit it.

Signed-off-by: David Howells <dhowells at redhat.com>
cc: Marc Dionne <marc.dionne at auristor.com>
cc: linux-afs at lists.infradead.org
---

 include/trace/events/rxrpc.h |    3 ---
 net/rxrpc/ar-internal.h      |    5 +----
 net/rxrpc/call_event.c       |   17 ++---------------
 net/rxrpc/io_thread.c        |    5 -----
 net/rxrpc/local_object.c     |    2 --
 net/rxrpc/output.c           |   42 ++----------------------------------------
 net/rxrpc/recvmsg.c          |    3 ---
 net/rxrpc/sendmsg.c          |    2 --
 net/rxrpc/txbuf.c            |    1 -
 9 files changed, 5 insertions(+), 75 deletions(-)

diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index b41e913ae78a..049b52e7aa6a 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -63,7 +63,6 @@
 	EM(rxrpc_local_put_peer,		"PUT peer    ") \
 	EM(rxrpc_local_put_prealloc_conn,	"PUT conn-pre") \
 	EM(rxrpc_local_put_release_sock,	"PUT rel-sock") \
-	EM(rxrpc_local_see_tx_ack,		"SEE tx-ack  ") \
 	EM(rxrpc_local_stop,			"STOP        ") \
 	EM(rxrpc_local_stopped,			"STOPPED     ") \
 	EM(rxrpc_local_unuse_bind,		"UNU bind    ") \
@@ -156,7 +155,6 @@
 	EM(rxrpc_call_get_recvmsg,		"GET recvmsg ") \
 	EM(rxrpc_call_get_release_sock,		"GET rel-sock") \
 	EM(rxrpc_call_get_sendmsg,		"GET sendmsg ") \
-	EM(rxrpc_call_get_send_ack,		"GET send-ack") \
 	EM(rxrpc_call_get_userid,		"GET user-id ") \
 	EM(rxrpc_call_new_client,		"NEW client  ") \
 	EM(rxrpc_call_new_prealloc_service,	"NEW prealloc") \
@@ -168,7 +166,6 @@
 	EM(rxrpc_call_put_recvmsg,		"PUT recvmsg ") \
 	EM(rxrpc_call_put_release_sock,		"PUT rls-sock") \
 	EM(rxrpc_call_put_release_sock_tba,	"PUT rls-sk-a") \
-	EM(rxrpc_call_put_send_ack,		"PUT send-ack") \
 	EM(rxrpc_call_put_sendmsg,		"PUT sendmsg ") \
 	EM(rxrpc_call_put_unnotify,		"PUT unnotify") \
 	EM(rxrpc_call_put_userid_exists,	"PUT u-exists") \
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 2a4928249a64..e7dccab7b741 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -287,8 +287,6 @@ struct rxrpc_local {
 	struct hlist_node	link;
 	struct socket		*socket;	/* my UDP socket */
 	struct task_struct	*io_thread;
-	struct list_head	ack_tx_queue;	/* List of ACKs that need sending */
-	spinlock_t		ack_tx_lock;	/* ACK list lock */
 	struct rxrpc_sock __rcu	*service;	/* Service(s) listening on this endpoint */
 	struct rw_semaphore	defrag_sem;	/* control re-enablement of IP DF bit */
 	struct sk_buff_head	rx_queue;	/* Received packets */
@@ -762,7 +760,6 @@ struct rxrpc_txbuf {
 	struct rcu_head		rcu;
 	struct list_head	call_link;	/* Link in call->tx_sendmsg/tx_buffer */
 	struct list_head	tx_link;	/* Link in live Enc queue or Tx queue */
-	struct rxrpc_call	*call;		/* Call to which belongs */
 	ktime_t			last_sent;	/* Time at which last transmitted */
 	refcount_t		ref;
 	rxrpc_seq_t		seq;		/* Sequence number of this packet */
@@ -1047,7 +1044,7 @@ static inline struct rxrpc_net *rxrpc_net(struct net *net)
 /*
  * output.c
  */
-void rxrpc_transmit_ack_packets(struct rxrpc_local *);
+int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb);
 int rxrpc_send_abort_packet(struct rxrpc_call *);
 int rxrpc_send_data_packet(struct rxrpc_call *, struct rxrpc_txbuf *);
 void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb);
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index fd122e3726bd..b2cf448fb02c 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -69,7 +69,6 @@ void rxrpc_propose_delay_ACK(struct rxrpc_call *call, rxrpc_serial_t serial,
 void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
 		    rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why)
 {
-	struct rxrpc_local *local = call->conn->local;
 	struct rxrpc_txbuf *txb;
 
 	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
@@ -96,17 +95,9 @@ void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
 	txb->ack.reason		= ack_reason;
 	txb->ack.nAcks		= 0;
 
-	if (!rxrpc_try_get_call(call, rxrpc_call_get_send_ack)) {
-		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_nomem);
-		return;
-	}
-
-	spin_lock(&local->ack_tx_lock);
-	list_add_tail(&txb->tx_link, &local->ack_tx_queue);
-	spin_unlock(&local->ack_tx_lock);
 	trace_rxrpc_send_ack(call, why, ack_reason, serial);
-
-	rxrpc_wake_up_io_thread(local);
+	rxrpc_send_ack_packet(call, txb);
+	rxrpc_put_txbuf(txb, rxrpc_txbuf_put_ack_tx);
 }
 
 /*
@@ -294,10 +285,6 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
 
 		rxrpc_transmit_one(call, txb);
 
-		// TODO: Drain the transmission buffers.  Do this somewhere better
-		if (after(call->acks_hard_ack, call->tx_bottom + 16))
-			rxrpc_shrink_call_tx_buffer(call);
-
 		if (!rxrpc_tx_window_has_space(call))
 			break;
 	}
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index 19aa315eddf5..d83ae3193032 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -447,11 +447,6 @@ int rxrpc_io_thread(void *data)
 			continue;
 		}
 
-		if (!list_empty(&local->ack_tx_queue)) {
-			rxrpc_transmit_ack_packets(local);
-			continue;
-		}
-
 		/* Process received packets and errors. */
 		if ((skb = __skb_dequeue(&rx_queue))) {
 			switch (skb->mark) {
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 1e994a83db2b..44222923c0d1 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -96,8 +96,6 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
 		atomic_set(&local->active_users, 1);
 		local->rxnet = rxnet;
 		INIT_HLIST_NODE(&local->link);
-		INIT_LIST_HEAD(&local->ack_tx_queue);
-		spin_lock_init(&local->ack_tx_lock);
 		init_rwsem(&local->defrag_sem);
 		skb_queue_head_init(&local->rx_queue);
 		INIT_LIST_HEAD(&local->call_attend_q);
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 8147a47d1702..3d8c9f830ee0 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -203,12 +203,11 @@ static void rxrpc_cancel_rtt_probe(struct rxrpc_call *call,
 }
 
 /*
- * Send an ACK call packet.
+ * Transmit an ACK packet.
  */
-static int rxrpc_send_ack_packet(struct rxrpc_local *local, struct rxrpc_txbuf *txb)
+int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
 {
 	struct rxrpc_connection *conn;
-	struct rxrpc_call *call = txb->call;
 	struct msghdr msg;
 	struct kvec iov[1];
 	rxrpc_serial_t serial;
@@ -271,43 +270,6 @@ static int rxrpc_send_ack_packet(struct rxrpc_local *local, struct rxrpc_txbuf *
 	return ret;
 }
 
-/*
- * ACK transmitter for a local endpoint.  The UDP socket locks around each
- * transmission, so we can only transmit one packet at a time, ACK, DATA or
- * otherwise.
- */
-void rxrpc_transmit_ack_packets(struct rxrpc_local *local)
-{
-	LIST_HEAD(queue);
-	int ret;
-
-	rxrpc_see_local(local, rxrpc_local_see_tx_ack);
-
-	if (list_empty(&local->ack_tx_queue))
-		return;
-
-	spin_lock(&local->ack_tx_lock);
-	list_splice_tail_init(&local->ack_tx_queue, &queue);
-	spin_unlock(&local->ack_tx_lock);
-
-	while (!list_empty(&queue)) {
-		struct rxrpc_txbuf *txb =
-			list_entry(queue.next, struct rxrpc_txbuf, tx_link);
-
-		ret = rxrpc_send_ack_packet(local, txb);
-		if (ret < 0 && ret != -ECONNRESET) {
-			spin_lock(&local->ack_tx_lock);
-			list_splice_init(&queue, &local->ack_tx_queue);
-			spin_unlock(&local->ack_tx_lock);
-			break;
-		}
-
-		list_del_init(&txb->tx_link);
-		rxrpc_put_call(txb->call, rxrpc_call_put_send_ack);
-		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_ack_tx);
-	}
-}
-
 /*
  * Send an ABORT call packet.
  */
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 3a8576e9daf3..36b25d003cf0 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -320,7 +320,6 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 				ret = ret2;
 				goto out;
 			}
-			rxrpc_transmit_ack_packets(call->peer->local);
 		} else {
 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
 					     rx_pkt_offset, rx_pkt_len, 0);
@@ -502,7 +501,6 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 		if (ret == -EAGAIN)
 			ret = 0;
 
-		rxrpc_transmit_ack_packets(call->peer->local);
 		if (!skb_queue_empty(&call->recvmsg_queue))
 			rxrpc_notify_socket(call);
 		break;
@@ -632,7 +630,6 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 read_phase_complete:
 	ret = 1;
 out:
-	rxrpc_transmit_ack_packets(call->peer->local);
 	if (_service)
 		*_service = call->dest_srx.srx_service;
 	mutex_unlock(&call->user_mutex);
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index 2c861c55ed70..9fa7e37f7155 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -276,8 +276,6 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
 		rxrpc_see_txbuf(txb, rxrpc_txbuf_see_send_more);
 
 	do {
-		rxrpc_transmit_ack_packets(call->peer->local);
-
 		if (!txb) {
 			size_t remain, bufsize, chunk, offset;
 
diff --git a/net/rxrpc/txbuf.c b/net/rxrpc/txbuf.c
index a5054389dfbb..d2cf2aac3adb 100644
--- a/net/rxrpc/txbuf.c
+++ b/net/rxrpc/txbuf.c
@@ -26,7 +26,6 @@ struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type,
 		INIT_LIST_HEAD(&txb->call_link);
 		INIT_LIST_HEAD(&txb->tx_link);
 		refcount_set(&txb->ref, 1);
-		txb->call		= call;
 		txb->call_debug_id	= call->debug_id;
 		txb->debug_id		= atomic_inc_return(&rxrpc_txbuf_debug_ids);
 		txb->space		= sizeof(txb->data);





More information about the linux-afs mailing list