[PATCH] rxrpc: Fix call timer start racing with call destruction
Marc Dionne
marc.dionne at auristor.com
Wed Mar 30 07:31:16 PDT 2022
On Mon, Mar 28, 2022 at 1:59 PM David Howells <dhowells at redhat.com> wrote:
>
> The rxrpc_call struct has a timer used to handle various timed events
> relating to a call. This timer can get started from the packet input
> routines that are run in softirq mode with just the RCU read lock held.
> Unfortunately, because only the RCU read lock is held - and neither ref or
> other lock is taken - the call can start getting destroyed at the same time
> a packet comes in addressed to that call. This causes the timer - which
> was already stopped - to get restarted. Later, the timer dispatch code may
> then oops if the timer got deallocated first.
>
> Fix this by trying to take a ref on the rxrpc_call struct and, if
> successful, passing that ref along to the timer. If the timer was already
> running, the ref is discarded.
>
> The timer completion routine can then pass the ref along to the call's work
> item when it queues it. If the timer or work item where already
> queued/running, the extra ref is discarded.
>
> Fixes: a158bdd3247b ("rxrpc: Fix call timeouts")
> Reported-by: Marc Dionne <marc.dionne at auristor.com>
> Signed-off-by: David Howells <dhowells at redhat.com>
> ---
>
> include/trace/events/rxrpc.h | 8 +++++++-
> net/rxrpc/ar-internal.h | 15 +++++++--------
> net/rxrpc/call_event.c | 2 +-
> net/rxrpc/call_object.c | 40 +++++++++++++++++++++++++++++++++++-----
> 4 files changed, 50 insertions(+), 15 deletions(-)
>
> diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
> index e70c90116eda..4a3ab0ed6e06 100644
> --- a/include/trace/events/rxrpc.h
> +++ b/include/trace/events/rxrpc.h
> @@ -83,12 +83,15 @@ enum rxrpc_call_trace {
> rxrpc_call_error,
> rxrpc_call_got,
> rxrpc_call_got_kernel,
> + rxrpc_call_got_timer,
> rxrpc_call_got_userid,
> rxrpc_call_new_client,
> rxrpc_call_new_service,
> rxrpc_call_put,
> rxrpc_call_put_kernel,
> rxrpc_call_put_noqueue,
> + rxrpc_call_put_notimer,
> + rxrpc_call_put_timer,
> rxrpc_call_put_userid,
> rxrpc_call_queued,
> rxrpc_call_queued_ref,
> @@ -278,12 +281,15 @@ enum rxrpc_tx_point {
> EM(rxrpc_call_error, "*E*") \
> EM(rxrpc_call_got, "GOT") \
> EM(rxrpc_call_got_kernel, "Gke") \
> + EM(rxrpc_call_got_timer, "GTM") \
> EM(rxrpc_call_got_userid, "Gus") \
> EM(rxrpc_call_new_client, "NWc") \
> EM(rxrpc_call_new_service, "NWs") \
> EM(rxrpc_call_put, "PUT") \
> EM(rxrpc_call_put_kernel, "Pke") \
> - EM(rxrpc_call_put_noqueue, "PNQ") \
> + EM(rxrpc_call_put_noqueue, "PnQ") \
> + EM(rxrpc_call_put_notimer, "PnT") \
> + EM(rxrpc_call_put_timer, "PTM") \
> EM(rxrpc_call_put_userid, "Pus") \
> EM(rxrpc_call_queued, "QUE") \
> EM(rxrpc_call_queued_ref, "QUR") \
> diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
> index 7bd6f8a66a3e..969e532f77a9 100644
> --- a/net/rxrpc/ar-internal.h
> +++ b/net/rxrpc/ar-internal.h
> @@ -777,14 +777,12 @@ void rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, bool, bool,
> enum rxrpc_propose_ack_trace);
> void rxrpc_process_call(struct work_struct *);
>
> -static inline void rxrpc_reduce_call_timer(struct rxrpc_call *call,
> - unsigned long expire_at,
> - unsigned long now,
> - enum rxrpc_timer_trace why)
> -{
> - trace_rxrpc_timer(call, why, now);
> - timer_reduce(&call->timer, expire_at);
> -}
> +void rxrpc_reduce_call_timer(struct rxrpc_call *call,
> + unsigned long expire_at,
> + unsigned long now,
> + enum rxrpc_timer_trace why);
> +
> +void rxrpc_delete_call_timer(struct rxrpc_call *call);
>
> /*
> * call_object.c
> @@ -808,6 +806,7 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
> bool __rxrpc_queue_call(struct rxrpc_call *);
> bool rxrpc_queue_call(struct rxrpc_call *);
> void rxrpc_see_call(struct rxrpc_call *);
> +bool rxrpc_try_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op);
> void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
> void rxrpc_put_call(struct rxrpc_call *, enum rxrpc_call_trace);
> void rxrpc_cleanup_call(struct rxrpc_call *);
> diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
> index df864e692267..22e05de5d1ca 100644
> --- a/net/rxrpc/call_event.c
> +++ b/net/rxrpc/call_event.c
> @@ -310,7 +310,7 @@ void rxrpc_process_call(struct work_struct *work)
> }
>
> if (call->state == RXRPC_CALL_COMPLETE) {
> - del_timer_sync(&call->timer);
> + rxrpc_delete_call_timer(call);
> goto out_put;
> }
>
> diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
> index 4eb91d958a48..043508fd8d8a 100644
> --- a/net/rxrpc/call_object.c
> +++ b/net/rxrpc/call_object.c
> @@ -53,10 +53,30 @@ static void rxrpc_call_timer_expired(struct timer_list *t)
>
> if (call->state < RXRPC_CALL_COMPLETE) {
> trace_rxrpc_timer(call, rxrpc_timer_expired, jiffies);
> - rxrpc_queue_call(call);
> + __rxrpc_queue_call(call);
> + } else {
> + rxrpc_put_call(call, rxrpc_call_put);
> + }
> +}
> +
> +void rxrpc_reduce_call_timer(struct rxrpc_call *call,
> + unsigned long expire_at,
> + unsigned long now,
> + enum rxrpc_timer_trace why)
> +{
> + if (rxrpc_try_get_call(call, rxrpc_call_got_timer)) {
> + trace_rxrpc_timer(call, why, now);
> + if (timer_reduce(&call->timer, expire_at))
> + rxrpc_put_call(call, rxrpc_call_put_notimer);
> }
> }
>
> +void rxrpc_delete_call_timer(struct rxrpc_call *call)
> +{
> + if (del_timer_sync(&call->timer))
> + rxrpc_put_call(call, rxrpc_call_put_timer);
> +}
> +
> static struct lock_class_key rxrpc_call_user_mutex_lock_class_key;
>
> /*
> @@ -463,6 +483,17 @@ void rxrpc_see_call(struct rxrpc_call *call)
> }
> }
>
> +bool rxrpc_try_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
> +{
> + const void *here = __builtin_return_address(0);
> + int n = atomic_fetch_add_unless(&call->usage, 1, 0);
> +
> + if (n == 0)
> + return false;
> + trace_rxrpc_call(call->debug_id, op, n, here, NULL);
> + return true;
> +}
> +
> /*
> * Note the addition of a ref on a call.
> */
> @@ -510,8 +541,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
> spin_unlock_bh(&call->lock);
>
> rxrpc_put_call_slot(call);
> -
> - del_timer_sync(&call->timer);
> + rxrpc_delete_call_timer(call);
>
> /* Make sure we don't get any more notifications */
> write_lock_bh(&rx->recvmsg_lock);
> @@ -618,6 +648,8 @@ static void rxrpc_destroy_call(struct work_struct *work)
> struct rxrpc_call *call = container_of(work, struct rxrpc_call, processor);
> struct rxrpc_net *rxnet = call->rxnet;
>
> + rxrpc_delete_call_timer(call);
> +
> rxrpc_put_connection(call->conn);
> rxrpc_put_peer(call->peer);
> kfree(call->rxtx_buffer);
> @@ -652,8 +684,6 @@ void rxrpc_cleanup_call(struct rxrpc_call *call)
>
> memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
>
> - del_timer_sync(&call->timer);
> -
> ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
> ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
Reviewed-by: Marc Dionne <marc.dionne at auristor.com>
With this I was able to complete 30 runs of a test (targeted subset of
xfstests) that used to fail every 4-5 runs, so you can also add:
Tested-by: Marc Dionne <marc.dionne at auristor.com>
Marc
More information about the linux-afs
mailing list