[PATCH v5 1/2] blk-mq: add tagset quiesce interface

Ming Lei ming.lei at redhat.com
Tue Jul 28 06:10:42 EDT 2020


On Tue, Jul 28, 2020 at 02:43:06AM -0700, Sagi Grimberg wrote:
> 
> > > > > > I like the tagset based interface.  But the idea of doing a per-hctx
> > > > > > allocation and wait doesn't seem very scalable.
> > > > > > 
> > > > > > Paul, do you have any good idea for an interface that waits on
> > > > > > multiple srcu heads?  As far as I can tell we could just have a single
> > > > > > global completion and counter, and each call_srcu would just just
> > > > > > decrement it and then the final one would do the
> > > > > > wakeup.  It would just
> > > > > > be great to figure out a way to keep the struct rcu_synchronize and
> > > > > > counter on stack to avoid an allocation.
> > > > > > 
> > > > > > But if we can't do with an on-stack object I'd much rather just embedd
> > > > > > the rcu_head in the hw_ctx.
> > > > > 
> > > > > I think we can do that, please see the following patch which
> > > > > is against Sagi's V5:
> > > > 
> > > > I don't think you can send a single rcu_head to multiple
> > > > call_srcu calls.
> > > 
> > > OK, then one variant is to put the rcu_head into blk_mq_hw_ctx, and put
> > > rcu_synchronize into blk_mq_tag_set.
> > 
> > I can cook up a spin,
> 
> Nope.. spoke too soon, the rcu_head needs to be in a context that has
> access to the counter (which is what you called blk_mq_srcu_sync).
> you want to add also a pointer to hctx? that is almost as big as
> rcu_synchronize...

We can just put rcu_head into hctx, and put the count & completion into
tag_set, and the tagset can be retrieved via hctx, something like the
following patch:

diff --git a/block/blk-mq.c b/block/blk-mq.c
index c3856377b961..129665da4dbd 100644
--- a/block/blk-mq.c
+++ b/block/blk-mq.c
@@ -27,6 +27,7 @@
 #include <linux/crash_dump.h>
 #include <linux/prefetch.h>
 #include <linux/blk-crypto.h>
+#include <linux/rcupdate_wait.h>
 
 #include <trace/events/block.h>
 
@@ -209,6 +210,34 @@ void blk_mq_quiesce_queue_nowait(struct request_queue *q)
 }
 EXPORT_SYMBOL_GPL(blk_mq_quiesce_queue_nowait);
 
+static void blk_mq_wakeme_after_rcu(struct rcu_head *head)
+{
+
+	struct blk_mq_srcu_struct *srcu = container_of(head,
+			struct blk_mq_srcu_struct, head);
+	struct blk_mq_hw_ctx *hctx = (void *)srcu -
+		sizeof(struct blk_mq_hw_ctx);
+	struct blk_mq_tag_set *set = hctx->queue->tag_set;
+
+	if (atomic_dec_and_test(&set->quiesce_count))
+		complete(&set->quiesce_completion);
+}
+
+static void blk_mq_quiesce_blocking_queue_async(struct request_queue *q)
+{
+	struct blk_mq_hw_ctx *hctx;
+	unsigned int i;
+
+	blk_mq_quiesce_queue_nowait(q);
+
+	queue_for_each_hw_ctx(q, hctx, i) {
+		WARN_ON_ONCE(!(hctx->flags & BLK_MQ_F_BLOCKING));
+		init_rcu_head(&hctx->srcu->head);
+		call_srcu(&hctx->srcu->srcu, &hctx->srcu->head,
+				blk_mq_wakeme_after_rcu);
+	}
+}
+
 /**
  * blk_mq_quiesce_queue() - wait until all ongoing dispatches have finished
  * @q: request queue.
@@ -228,7 +257,7 @@ void blk_mq_quiesce_queue(struct request_queue *q)
 
 	queue_for_each_hw_ctx(q, hctx, i) {
 		if (hctx->flags & BLK_MQ_F_BLOCKING)
-			synchronize_srcu(hctx->srcu);
+			synchronize_srcu(&hctx->srcu->srcu);
 		else
 			rcu = true;
 	}
@@ -700,23 +729,23 @@ void blk_mq_complete_request(struct request *rq)
 EXPORT_SYMBOL(blk_mq_complete_request);
 
 static void hctx_unlock(struct blk_mq_hw_ctx *hctx, int srcu_idx)
-	__releases(hctx->srcu)
+	__releases(&hctx->srcu->srcu)
 {
 	if (!(hctx->flags & BLK_MQ_F_BLOCKING))
 		rcu_read_unlock();
 	else
-		srcu_read_unlock(hctx->srcu, srcu_idx);
+		srcu_read_unlock(&hctx->srcu->srcu, srcu_idx);
 }
 
 static void hctx_lock(struct blk_mq_hw_ctx *hctx, int *srcu_idx)
-	__acquires(hctx->srcu)
+	__acquires(&hctx->srcu->srcu)
 {
 	if (!(hctx->flags & BLK_MQ_F_BLOCKING)) {
 		/* shut up gcc false positive */
 		*srcu_idx = 0;
 		rcu_read_lock();
 	} else
-		*srcu_idx = srcu_read_lock(hctx->srcu);
+		*srcu_idx = srcu_read_lock(&hctx->srcu->srcu);
 }
 
 /**
@@ -2599,7 +2628,7 @@ static int blk_mq_hw_ctx_size(struct blk_mq_tag_set *tag_set)
 		     sizeof(struct blk_mq_hw_ctx));
 
 	if (tag_set->flags & BLK_MQ_F_BLOCKING)
-		hw_ctx_size += sizeof(struct srcu_struct);
+		hw_ctx_size += sizeof(struct blk_mq_srcu_struct);
 
 	return hw_ctx_size;
 }
@@ -2684,7 +2713,7 @@ blk_mq_alloc_hctx(struct request_queue *q, struct blk_mq_tag_set *set,
 		goto free_bitmap;
 
 	if (hctx->flags & BLK_MQ_F_BLOCKING)
-		init_srcu_struct(hctx->srcu);
+		init_srcu_struct(&hctx->srcu->srcu);
 	blk_mq_hctx_kobj_init(hctx);
 
 	return hctx;
@@ -2880,6 +2909,43 @@ static void queue_set_hctx_shared(struct request_queue *q, bool shared)
 	}
 }
 
+void blk_mq_quiesce_tagset(struct blk_mq_tag_set *set)
+{
+	struct request_queue *q;
+
+	mutex_lock(&set->tag_list_lock);
+	if (set->flags & BLK_MQ_F_BLOCKING) {
+		int count = 0;
+
+		list_for_each_entry(q, &set->tag_list, tag_set_list)
+			count++;
+
+		atomic_set(&set->quiesce_count, count);
+		init_completion(&set->quiesce_completion);
+
+		list_for_each_entry(q, &set->tag_list, tag_set_list)
+			blk_mq_quiesce_blocking_queue_async(q);
+		wait_for_completion(&set->quiesce_completion);
+	} else {
+		list_for_each_entry(q, &set->tag_list, tag_set_list)
+			blk_mq_quiesce_queue_nowait(q);
+		synchronize_rcu();
+	}
+	mutex_unlock(&set->tag_list_lock);
+}
+EXPORT_SYMBOL_GPL(blk_mq_quiesce_tagset);
+
+void blk_mq_unquiesce_tagset(struct blk_mq_tag_set *set)
+{
+	struct request_queue *q;
+
+	mutex_lock(&set->tag_list_lock);
+	list_for_each_entry(q, &set->tag_list, tag_set_list)
+		blk_mq_unquiesce_queue(q);
+	mutex_unlock(&set->tag_list_lock);
+}
+EXPORT_SYMBOL_GPL(blk_mq_unquiesce_tagset);
+
 static void blk_mq_update_tag_set_depth(struct blk_mq_tag_set *set,
 					bool shared)
 {
diff --git a/include/linux/blk-mq.h b/include/linux/blk-mq.h
index 23230c1d031e..9ef7fdb809a7 100644
--- a/include/linux/blk-mq.h
+++ b/include/linux/blk-mq.h
@@ -9,6 +9,11 @@
 struct blk_mq_tags;
 struct blk_flush_queue;
 
+struct blk_mq_srcu_struct {
+	struct srcu_struct srcu;
+	struct rcu_head    head;
+};
+
 /**
  * struct blk_mq_hw_ctx - State for a hardware queue facing the hardware
  * block device
@@ -175,7 +180,7 @@ struct blk_mq_hw_ctx {
 	 * blocking (BLK_MQ_F_BLOCKING). Must be the last member - see also
 	 * blk_mq_hw_ctx_size().
 	 */
-	struct srcu_struct	srcu[];
+	struct blk_mq_srcu_struct	srcu[];
 };
 
 /**
@@ -254,6 +259,9 @@ struct blk_mq_tag_set {
 
 	struct mutex		tag_list_lock;
 	struct list_head	tag_list;
+
+	struct completion	quiesce_completion;
+	atomic_t		quiesce_count;
 };
 
 /**
@@ -532,6 +540,8 @@ int blk_mq_map_queues(struct blk_mq_queue_map *qmap);
 void blk_mq_update_nr_hw_queues(struct blk_mq_tag_set *set, int nr_hw_queues);
 
 void blk_mq_quiesce_queue_nowait(struct request_queue *q);
+void blk_mq_quiesce_tagset(struct blk_mq_tag_set *set);
+void blk_mq_unquiesce_tagset(struct blk_mq_tag_set *set);
 
 unsigned int blk_mq_rq_cpu(struct request *rq);
 

-- 
Ming




More information about the Linux-nvme mailing list