[PATCH v3 1/2] blk-mq: add async quiesce interface

Sagi Grimberg sagi at grimberg.me
Mon Jul 27 17:00:15 EDT 2020


>>>>>> +void blk_mq_quiesce_queue_async(struct request_queue *q)
>>>>>> +{
>>>>>> +	struct blk_mq_hw_ctx *hctx;
>>>>>> +	unsigned int i;
>>>>>> +
>>>>>> +	blk_mq_quiesce_queue_nowait(q);
>>>>>> +
>>>>>> +	queue_for_each_hw_ctx(q, hctx, i) {
>>>>>> +		init_completion(&hctx->rcu_sync.completion);
>>>>>> +		init_rcu_head(&hctx->rcu_sync.head);
>>>>>> +		if (hctx->flags & BLK_MQ_F_BLOCKING)
>>>>>> +			call_srcu(hctx->srcu, &hctx->rcu_sync.head,
>>>>>> +				wakeme_after_rcu);
>>>>>> +		else
>>>>>> +			call_rcu(&hctx->rcu_sync.head,
>>>>>> +				wakeme_after_rcu);
>>>>>> +	}
>>>>>
>>>>> Looks not necessary to do anything in case of !BLK_MQ_F_BLOCKING, and single
>>>>> synchronize_rcu() is OK for all hctx during waiting.
>>>>
>>>> That's true, but I want a single interface for both. v2 had exactly
>>>> that, but I decided that this approach is better.
>>>
>>> Not sure one new interface is needed, and one simple way is to:
>>>
>>> 1) call blk_mq_quiesce_queue_nowait() for each request queue
>>>
>>> 2) wait in driver specific way
>>>
>>> Or just wondering why nvme doesn't use set->tag_list to retrieve NS,
>>> then you may add per-tagset APIs for the waiting.
>>
>> Because it puts assumptions on how quiesce works, which is something
>> I'd like to avoid because I think its cleaner, what do others think?
>> Jens? Christoph?
> 
> I'd prefer to have it in a helper, and just have blk_mq_quiesce_queue()
> call that.

I agree with this approach as well.

Jens, this mean that we use the call_rcu mechanism also for non-blocking
hctxs, because the caller  will call it for multiple request queues (see
patch 2) and we don't want to call synchronize_rcu for every request
queue serially, we want it to happen in parallel.

Which leaves us with the patchset as it is, just to convert the
rcu_synchronize structure to be dynamically allocated on the heap
rather than keeping it statically allocated in the hctx.

This is how it looks:
--
diff --git a/block/blk-mq.c b/block/blk-mq.c
index abcf590f6238..d913924117d2 100644
--- a/block/blk-mq.c
+++ b/block/blk-mq.c
@@ -209,6 +209,52 @@ void blk_mq_quiesce_queue_nowait(struct 
request_queue *q)
  }
  EXPORT_SYMBOL_GPL(blk_mq_quiesce_queue_nowait);

+void blk_mq_quiesce_queue_async(struct request_queue *q)
+{
+       struct blk_mq_hw_ctx *hctx;
+       unsigned int i;
+       int rcu = false;
+
+       blk_mq_quiesce_queue_nowait(q);
+
+       queue_for_each_hw_ctx(q, hctx, i) {
+               hctx->rcu_sync = kmalloc(sizeof(*hctx->rcu_sync), 
GFP_KERNEL);
+               if (!hctx->rcu_sync) {
+                       /* fallback to serial rcu sync */
+                       if (hctx->flags & BLK_MQ_F_BLOCKING)
+                               synchronize_srcu(hctx->srcu);
+                       else
+                               rcu = true;
+               } else {
+                       init_completion(&hctx->rcu_sync->completion);
+                       init_rcu_head(&hctx->rcu_sync->head);
+                       if (hctx->flags & BLK_MQ_F_BLOCKING)
+                               call_srcu(hctx->srcu, &hctx->rcu_sync->head,
+                                       wakeme_after_rcu);
+                       else
+                               call_rcu(&hctx->rcu_sync->head,
+                                       wakeme_after_rcu);
+               }
+       }
+       if (rcu)
+               synchronize_rcu();
+}
+EXPORT_SYMBOL_GPL(blk_mq_quiesce_queue_async);
+
+void blk_mq_quiesce_queue_async_wait(struct request_queue *q)
+{
+       struct blk_mq_hw_ctx *hctx;
+       unsigned int i;
+
+       queue_for_each_hw_ctx(q, hctx, i) {
+               if (!hctx->rcu_sync)
+                       continue;
+               wait_for_completion(&hctx->rcu_sync->completion);
+               destroy_rcu_head(&hctx->rcu_sync->head);
+       }
+}
+EXPORT_SYMBOL_GPL(blk_mq_quiesce_queue_async_wait);
+
  /**
   * blk_mq_quiesce_queue() - wait until all ongoing dispatches have 
finished
   * @q: request queue.
diff --git a/include/linux/blk-mq.h b/include/linux/blk-mq.h
index 23230c1d031e..7213ce56bb31 100644
--- a/include/linux/blk-mq.h
+++ b/include/linux/blk-mq.h
@@ -5,6 +5,7 @@
  #include <linux/blkdev.h>
  #include <linux/sbitmap.h>
  #include <linux/srcu.h>
+#include <linux/rcupdate_wait.h>

  struct blk_mq_tags;
  struct blk_flush_queue;
@@ -170,6 +171,7 @@ struct blk_mq_hw_ctx {
          */
         struct list_head        hctx_list;

+       struct rcu_synchronize  *rcu_sync;
         /**
          * @srcu: Sleepable RCU. Use as lock when type of the hardware 
queue is
          * blocking (BLK_MQ_F_BLOCKING). Must be the last member - see also
@@ -532,6 +534,8 @@ int blk_mq_map_queues(struct blk_mq_queue_map *qmap);
  void blk_mq_update_nr_hw_queues(struct blk_mq_tag_set *set, int 
nr_hw_queues);

  void blk_mq_quiesce_queue_nowait(struct request_queue *q);
+void blk_mq_quiesce_queue_async(struct request_queue *q);
+void blk_mq_quiesce_queue_async_wait(struct request_queue *q);

  unsigned int blk_mq_rq_cpu(struct request *rq);
--

and in nvme:
--
diff --git a/drivers/nvme/host/core.c b/drivers/nvme/host/core.c
index 05aa568a60af..e8cc728dee46 100644
--- a/drivers/nvme/host/core.c
+++ b/drivers/nvme/host/core.c
@@ -4561,7 +4561,9 @@ void nvme_stop_queues(struct nvme_ctrl *ctrl)

         down_read(&ctrl->namespaces_rwsem);
         list_for_each_entry(ns, &ctrl->namespaces, list)
-               blk_mq_quiesce_queue(ns->queue);
+               blk_mq_quiesce_queue_async(ns->queue);
+       list_for_each_entry(ns, &ctrl->namespaces, list)
+               blk_mq_quiesce_queue_async_wait(ns->queue);
         up_read(&ctrl->namespaces_rwsem);
  }
  EXPORT_SYMBOL_GPL(nvme_stop_queues);
--

Agreed on this?



More information about the Linux-nvme mailing list