[PATCH] um: vector: Replace locks guarding queue depth with atomics

Johannes Berg johannes at sipsolutions.net
Thu Jul 4 02:17:18 PDT 2024


Hi Anton,

Thanks for taking a look! Also thanks for the other explanation.

On Thu, 2024-07-04 at 08:21 +0100, anton.ivanov at cambridgegreys.com
wrote:
> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
> 
> UML vector drivers use ring buffer structures which map
> preallocated skbs onto mmsg vectors for use with sendmmsg
> and recvmmsg. They are designed around a single consumer,
> single producer pattern allowing simultaneous enqueue and
> dequeue.

Right, I didn't grok that from just the code yesterday :)

> Lock debugging with preemption showed possible races when
> locking the queue depth. This patch addresses this by
> removing extra locks while making queue depth inc/dec and
> access atomic.

I don't think this is fully correct in SMP, and I think we should fix it
even if we don't have SMP (yet?)

See, the other thing about spinlocks is that they serve as barriers, and
I don't think the atomics you use have "enough" of them (per the docs in
atomic_t.txt)?

If you have concurrent producers and consumers, you really want the
producer to actually have fully prepared the data before it's visible to
the consumer, which means you need to have a full barrier before
incrementing queue_length.

But you're now incrementing it by just atomic_add(), and atomic_t.txt
says:

 - RMW operations that have no return value are unordered;

So I think the producer should have

	/*
	 * adding to queue_length makes the prepared buffers
	 * visible to the consumer, ensure they're actually
	 * completely written/visible before
	 */
	smp_mb__before_atomic();
	atomic_add(...);

Or so?


> +	atomic_sub(advance, &qi->queue_depth);
>  
> -	if (qi->queue_depth == 0) {
> -		qi->head = 0;
> -		qi->tail = 0;
> -	}
> -	queue_depth = qi->queue_depth;
> -	spin_unlock(&qi->tail_lock);
> -	return queue_depth;
> +	return atomic_read(&qi->queue_depth);

Or just use atomic_sub_return()? Though that also implies a barrier,
which I think isn't needed on the consumer side.

However I think it's clearer to just remove the return value and make
this function void.

>  static int vector_advancetail(struct vector_queue *qi, int advance)
>  {
> -	int queue_depth;
> -
>  	qi->tail =
>  		(qi->tail + advance)
>  			% qi->max_depth;
> -	spin_lock(&qi->head_lock);
> -	qi->queue_depth += advance;
> -	queue_depth = qi->queue_depth;
> -	spin_unlock(&qi->head_lock);
> -	return queue_depth;
> +	atomic_add(advance, &qi->queue_depth);
> +	return atomic_read(&qi->queue_depth);
>  }

Or maybe here instead of the barrier use atomic_add_return() which
implies a full barrier on both sides, but I don't think you need that,
the barrier before is enough?


> @@ -411,61 +386,58 @@ static int vector_send(struct vector_queue *qi)
>  	int result = 0, send_len, queue_depth = qi->max_depth;
>  
>  	if (spin_trylock(&qi->head_lock)) {
[...]
> +		/* update queue_depth to current value */
> +		queue_depth = atomic_read(&qi->queue_depth);
> +		while (queue_depth > 0) {

I think it'd be clearer to write this as

	while ((queue_depth = atomic_read(...))) {

and simply not modify the queue_depth to the return value of
consume_vector_skbs() here:

[...]

> +			if (result > 0) {
> +				queue_depth =
> +					consume_vector_skbs(qi, result);
> +				/* This is equivalent to an TX IRQ.
> +				 * Restart the upper layers to feed us
> +				 * more packets.
>  				 */
> -				if (result != send_len) {
> -					vp->estats.tx_restart_queue++;
> -					break;
> -				}
> +				if (result > vp->estats.tx_queue_max)
> +					vp->estats.tx_queue_max = result;
> +				vp->estats.tx_queue_running_average =
> +					(vp->estats.tx_queue_running_average + result) >> 1;
> +			}
> +			netif_wake_queue(qi->dev);
> +			/* if TX is busy, break out of the send loop,
> +			 *  poll write IRQ will reschedule xmit for us
> +			 */
> +			if (result != send_len) {
> +				vp->estats.tx_restart_queue++;
> +				break;
>  			}
>  		}

The loop doesn't need queue_depth until it goes back to the beginning
anyway.

(it probably also never even executes twice unless you actually have SMP
or preemption?)

> @@ -675,11 +647,13 @@ static void prep_queue_for_rx(struct vector_queue *qi)
>  	struct vector_private *vp = netdev_priv(qi->dev);
>  	struct mmsghdr *mmsg_vector = qi->mmsg_vector;
>  	void **skbuff_vector = qi->skbuff_vector;
> -	int i;
> +	int i, queue_depth;
> +
> +	queue_depth = atomic_read(&qi->queue_depth);
>  
> -	if (qi->queue_depth == 0)
> +	if (queue_depth == 0)
>  		return;
> -	for (i = 0; i < qi->queue_depth; i++) {
> +	for (i = 0; i < queue_depth; i++) {
>  		/* it is OK if allocation fails - recvmmsg with NULL data in
>  		 * iov argument still performs an RX, just drops the packet
>  		 * This allows us stop faffing around with a "drop buffer"
> @@ -689,7 +663,7 @@ static void prep_queue_for_rx(struct vector_queue *qi)
>  		skbuff_vector++;
>  		mmsg_vector++;
>  	}
> -	qi->queue_depth = 0;
> +	atomic_set(&qi->queue_depth, 0);

atomic_read() and then atomic_set() rather than atomic_sub() seems
questionable, but you didn't have locks here before so maybe this is
somehow logically never in parallel? But it is done in NAPI polling via
vector_mmsg_rx(), so not sure I understand.

> @@ -987,7 +961,7 @@ static int vector_mmsg_rx(struct vector_private *vp, int budget)
>  	 * many do we need to prep the next time prep_queue_for_rx() is called.
>  	 */
>  
> -	qi->queue_depth = packet_count;
> +	atomic_add(packet_count, &qi->queue_depth);
>  
>  	for (i = 0; i < packet_count; i++) {
>  		skb = (*skbuff_vector);

especially since here you _did_ convert to atomic_add().

johannes



More information about the linux-um mailing list