[PATCH] um: vector: Replace locks guarding queue depth with atomics

Anton Ivanov anton.ivanov at cambridgegreys.com
Thu Jul 4 02:45:21 PDT 2024



On 04/07/2024 10:17, Johannes Berg wrote:
> Hi Anton,
> 
> Thanks for taking a look! Also thanks for the other explanation.
> 
> On Thu, 2024-07-04 at 08:21 +0100, anton.ivanov at cambridgegreys.com
> wrote:
>> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>>
>> UML vector drivers use ring buffer structures which map
>> preallocated skbs onto mmsg vectors for use with sendmmsg
>> and recvmmsg. They are designed around a single consumer,
>> single producer pattern allowing simultaneous enqueue and
>> dequeue.
> 
> Right, I didn't grok that from just the code yesterday :)
> 
>> Lock debugging with preemption showed possible races when
>> locking the queue depth. This patch addresses this by
>> removing extra locks while making queue depth inc/dec and
>> access atomic.
> 
> I don't think this is fully correct in SMP, and I think we should fix it
> even if we don't have SMP (yet?)
> 
> See, the other thing about spinlocks is that they serve as barriers, and
> I don't think the atomics you use have "enough" of them (per the docs in
> atomic_t.txt)?
> 
> If you have concurrent producers and consumers, you really want the
> producer to actually have fully prepared the data before it's visible to
> the consumer, which means you need to have a full barrier before
> incrementing queue_length.

Indeed. I will add that for v2.

> 
> But you're now incrementing it by just atomic_add(), and atomic_t.txt
> says:
> 
>   - RMW operations that have no return value are unordered;
> 
> So I think the producer should have
> 
> 	/*
> 	 * adding to queue_length makes the prepared buffers
> 	 * visible to the consumer, ensure they're actually
> 	 * completely written/visible before
> 	 */
> 	smp_mb__before_atomic();
> 	atomic_add(...);
> 
> Or so?
> 
> 
>> +	atomic_sub(advance, &qi->queue_depth);
>>   
>> -	if (qi->queue_depth == 0) {
>> -		qi->head = 0;
>> -		qi->tail = 0;
>> -	}
>> -	queue_depth = qi->queue_depth;
>> -	spin_unlock(&qi->tail_lock);
>> -	return queue_depth;
>> +	return atomic_read(&qi->queue_depth);
> 
> Or just use atomic_sub_return()? Though that also implies a barrier,
> which I think isn't needed on the consumer side.

Let me have a look at it once again. IIRC you need only producer barriers.

> 
> However I think it's clearer to just remove the return value and make
> this function void.
> 
>>   static int vector_advancetail(struct vector_queue *qi, int advance)
>>   {
>> -	int queue_depth;
>> -
>>   	qi->tail =
>>   		(qi->tail + advance)
>>   			% qi->max_depth;
>> -	spin_lock(&qi->head_lock);
>> -	qi->queue_depth += advance;
>> -	queue_depth = qi->queue_depth;
>> -	spin_unlock(&qi->head_lock);
>> -	return queue_depth;
>> +	atomic_add(advance, &qi->queue_depth);
>> +	return atomic_read(&qi->queue_depth);
>>   }
> 
> Or maybe here instead of the barrier use atomic_add_return() which
> implies a full barrier on both sides, but I don't think you need that,
> the barrier before is enough?
> 
> 
>> @@ -411,61 +386,58 @@ static int vector_send(struct vector_queue *qi)
>>   	int result = 0, send_len, queue_depth = qi->max_depth;
>>   
>>   	if (spin_trylock(&qi->head_lock)) {
> [...]
>> +		/* update queue_depth to current value */
>> +		queue_depth = atomic_read(&qi->queue_depth);
>> +		while (queue_depth > 0) {
> 
> I think it'd be clearer to write this as
> 
> 	while ((queue_depth = atomic_read(...))) {
> 
> and simply not modify the queue_depth to the return value of
> consume_vector_skbs() here:
> 
> [...]
> 
>> +			if (result > 0) {
>> +				queue_depth =
>> +					consume_vector_skbs(qi, result);
>> +				/* This is equivalent to an TX IRQ.
>> +				 * Restart the upper layers to feed us
>> +				 * more packets.
>>   				 */
>> -				if (result != send_len) {
>> -					vp->estats.tx_restart_queue++;
>> -					break;
>> -				}
>> +				if (result > vp->estats.tx_queue_max)
>> +					vp->estats.tx_queue_max = result;
>> +				vp->estats.tx_queue_running_average =
>> +					(vp->estats.tx_queue_running_average + result) >> 1;
>> +			}
>> +			netif_wake_queue(qi->dev);
>> +			/* if TX is busy, break out of the send loop,
>> +			 *  poll write IRQ will reschedule xmit for us
>> +			 */
>> +			if (result != send_len) {
>> +				vp->estats.tx_restart_queue++;
>> +				break;
>>   			}
>>   		}
> 
> The loop doesn't need queue_depth until it goes back to the beginning
> anyway.
> 
> (it probably also never even executes twice unless you actually have SMP
> or preemption?)

It does. If half of the vector is at the end of the array which is used to
imitate a ring buffer and the other half is at the beginning. Quite a common
condition actually.

There is an extra issue there - stats. I need to double-check the locking when
they are being fetched.

> 
>> @@ -675,11 +647,13 @@ static void prep_queue_for_rx(struct vector_queue *qi)
>>   	struct vector_private *vp = netdev_priv(qi->dev);
>>   	struct mmsghdr *mmsg_vector = qi->mmsg_vector;
>>   	void **skbuff_vector = qi->skbuff_vector;
>> -	int i;
>> +	int i, queue_depth;
>> +
>> +	queue_depth = atomic_read(&qi->queue_depth);
>>   
>> -	if (qi->queue_depth == 0)
>> +	if (queue_depth == 0)
>>   		return;
>> -	for (i = 0; i < qi->queue_depth; i++) {
>> +	for (i = 0; i < queue_depth; i++) {
>>   		/* it is OK if allocation fails - recvmmsg with NULL data in
>>   		 * iov argument still performs an RX, just drops the packet
>>   		 * This allows us stop faffing around with a "drop buffer"
>> @@ -689,7 +663,7 @@ static void prep_queue_for_rx(struct vector_queue *qi)
>>   		skbuff_vector++;
>>   		mmsg_vector++;
>>   	}
>> -	qi->queue_depth = 0;
>> +	atomic_set(&qi->queue_depth, 0);
> 
> atomic_read() and then atomic_set() rather than atomic_sub() seems
> questionable, but you didn't have locks here before so maybe this is
> somehow logically never in parallel? But it is done in NAPI polling via
> vector_mmsg_rx(), so not sure I understand.
> 
>> @@ -987,7 +961,7 @@ static int vector_mmsg_rx(struct vector_private *vp, int budget)
>>   	 * many do we need to prep the next time prep_queue_for_rx() is called.
>>   	 */
>>   
>> -	qi->queue_depth = packet_count;
>> +	atomic_add(packet_count, &qi->queue_depth);
>>   
>>   	for (i = 0; i < packet_count; i++) {
>>   		skb = (*skbuff_vector);
> 
> especially since here you _did_ convert to atomic_add().
> 
> johannes
> 

-- 
Anton R. Ivanov
Cambridgegreys Limited. Registered in England. Company Number 10273661
https://www.cambridgegreys.com/



More information about the linux-um mailing list