[PATCH] nvme-rdma: parallelize I/O queue allocation and startup

Surabhi Gogte sgogte at purestorage.com
Thu May 28 17:13:54 PDT 2026


Refactor nvme rdma I/O queue setup to use parallel work queues,
combining allocation and startup into a single parallel operation per
queue. This reduces connection and reconnection setup time when there
are delays in establishing connections, which is especially important
for high-core-count hosts.

Key changes:
- Use a dedicated nvme_setup_wq for per-queue setup work instead of
  nvme_wq or nvme_reset_wq. Since reconnect and reset paths run as
  workers on those queues, calling flush_work() on setup items queued
  to the same workqueue would trigger lockdep's recursive-flush
  detection and risk deadlock under worker-pool exhaustion.
- Add setup_work and queue_idx fields to nvme_rdma_queue to enable
  per-queue workqueue dispatch; add qsetup_err atomic to
  nvme_rdma_ctrl for collecting errors across parallel workers.
- Refactor nvme_rdma_alloc_queue() to accept a pre-initialized queue
  pointer instead of (ctrl, idx, queue_size), updating all call sites
  including the admin queue path.
- Remove nvme_rdma_alloc_io_queues() and nvme_rdma_start_io_queues();
  their logic is folded into nvme_rdma_setup_io_queues() and
  nvme_rdma_configure_io_queues().
- Move queue count negotiation (nvme_set_queue_count,
  nvmf_set_io_queues) from the removed nvme_rdma_alloc_io_queues()
  into nvme_rdma_configure_io_queues().
- Introduce nvme_rdma_setup_io_queues() to dispatch alloc+start per
  queue in parallel and collect errors atomically.

Testing on a 64-core host with 64 IO-queues shows
nvme-rdma connection time reduced from ~1.4s to 416ms.

Signed-off-by: Surabhi Gogte <sgogte at purestorage.com>
---
 drivers/nvme/host/core.c |  15 ++++-
 drivers/nvme/host/nvme.h |   1 +
 drivers/nvme/host/rdma.c | 127 ++++++++++++++++++++++-----------------
 3 files changed, 86 insertions(+), 57 deletions(-)

diff --git a/drivers/nvme/host/core.c b/drivers/nvme/host/core.c
index dc388e24caad..f1247a61c797 100644
--- a/drivers/nvme/host/core.c
+++ b/drivers/nvme/host/core.c
@@ -107,7 +107,8 @@ MODULE_PARM_DESC(disable_pi_offsets,
 	"disable protection information if it has an offset");
 
 /*
- * nvme_wq - hosts nvme related works that are not reset or delete
+ * nvme_wq - hosts nvme related works that are not queue setup, reset or delete
+ * nvme_setup_wq - hosts nvme queue setup works
  * nvme_reset_wq - hosts nvme reset works
  * nvme_delete_wq - hosts nvme delete works
  *
@@ -126,6 +127,9 @@ EXPORT_SYMBOL_GPL(nvme_reset_wq);
 struct workqueue_struct *nvme_delete_wq;
 EXPORT_SYMBOL_GPL(nvme_delete_wq);
 
+struct workqueue_struct *nvme_setup_wq;
+EXPORT_SYMBOL_GPL(nvme_setup_wq);
+
 static LIST_HEAD(nvme_subsystems);
 DEFINE_MUTEX(nvme_subsystems_lock);
 
@@ -5415,10 +5419,14 @@ static int __init nvme_core_init(void)
 	if (!nvme_delete_wq)
 		goto destroy_reset_wq;
 
+	nvme_setup_wq = alloc_workqueue("nvme-setup-wq", wq_flags, 0);
+	if (!nvme_setup_wq)
+		goto destroy_delete_wq;
+
 	result = alloc_chrdev_region(&nvme_ctrl_base_chr_devt, 0,
 			NVME_MINORS, "nvme");
 	if (result < 0)
-		goto destroy_delete_wq;
+		goto destroy_setup_wq;
 
 	result = class_register(&nvme_class);
 	if (result)
@@ -5452,6 +5460,8 @@ static int __init nvme_core_init(void)
 	class_unregister(&nvme_class);
 unregister_chrdev:
 	unregister_chrdev_region(nvme_ctrl_base_chr_devt, NVME_MINORS);
+destroy_setup_wq:
+	destroy_workqueue(nvme_setup_wq);
 destroy_delete_wq:
 	destroy_workqueue(nvme_delete_wq);
 destroy_reset_wq:
@@ -5470,6 +5480,7 @@ static void __exit nvme_core_exit(void)
 	class_unregister(&nvme_class);
 	unregister_chrdev_region(nvme_ns_chr_devt, NVME_MINORS);
 	unregister_chrdev_region(nvme_ctrl_base_chr_devt, NVME_MINORS);
+	destroy_workqueue(nvme_setup_wq);
 	destroy_workqueue(nvme_delete_wq);
 	destroy_workqueue(nvme_reset_wq);
 	destroy_workqueue(nvme_wq);
diff --git a/drivers/nvme/host/nvme.h b/drivers/nvme/host/nvme.h
index ccd5e05dac98..9cc49f5ad8af 100644
--- a/drivers/nvme/host/nvme.h
+++ b/drivers/nvme/host/nvme.h
@@ -49,6 +49,7 @@ extern unsigned int admin_timeout;
 extern struct workqueue_struct *nvme_wq;
 extern struct workqueue_struct *nvme_reset_wq;
 extern struct workqueue_struct *nvme_delete_wq;
+extern struct workqueue_struct *nvme_setup_wq;
 extern struct mutex nvme_subsystems_lock;
 
 /*
diff --git a/drivers/nvme/host/rdma.c b/drivers/nvme/host/rdma.c
index f77c960f7632..b2db43bd6e84 100644
--- a/drivers/nvme/host/rdma.c
+++ b/drivers/nvme/host/rdma.c
@@ -98,6 +98,8 @@ struct nvme_rdma_queue {
 	bool			pi_support;
 	int			cq_size;
 	struct mutex		queue_lock;
+	struct work_struct	setup_work;
+	int			queue_idx;
 };
 
 struct nvme_rdma_ctrl {
@@ -125,6 +127,7 @@ struct nvme_rdma_ctrl {
 	struct nvme_ctrl	ctrl;
 	bool			use_inline_data;
 	u32			io_queues[HCTX_MAX_TYPES];
+	atomic_t		qsetup_err;
 };
 
 static inline struct nvme_rdma_ctrl *to_rdma_ctrl(struct nvme_ctrl *ctrl)
@@ -566,16 +569,14 @@ static int nvme_rdma_create_queue_ib(struct nvme_rdma_queue *queue)
 	return ret;
 }
 
-static int nvme_rdma_alloc_queue(struct nvme_rdma_ctrl *ctrl,
-		int idx, size_t queue_size)
+static int nvme_rdma_alloc_queue(struct nvme_rdma_queue *queue)
 {
-	struct nvme_rdma_queue *queue;
+	struct nvme_rdma_ctrl *ctrl = queue->ctrl;
+	int idx = nvme_rdma_queue_idx(queue);
 	struct sockaddr *src_addr = NULL;
 	int ret;
 
-	queue = &ctrl->queues[idx];
 	mutex_init(&queue->queue_lock);
-	queue->ctrl = ctrl;
 	if (idx && ctrl->ctrl.max_integrity_segments)
 		queue->pi_support = true;
 	else
@@ -587,8 +588,6 @@ static int nvme_rdma_alloc_queue(struct nvme_rdma_ctrl *ctrl,
 	else
 		queue->cmnd_capsule_len = sizeof(struct nvme_command);
 
-	queue->queue_size = queue_size;
-
 	queue->cm_id = rdma_create_id(&init_net, nvme_rdma_cm_handler, queue,
 			RDMA_PS_TCP, IB_QPT_RC);
 	if (IS_ERR(queue->cm_id)) {
@@ -694,59 +693,59 @@ static int nvme_rdma_start_queue(struct nvme_rdma_ctrl *ctrl, int idx)
 	return ret;
 }
 
-static int nvme_rdma_start_io_queues(struct nvme_rdma_ctrl *ctrl,
-				     int first, int last)
+static void nvme_rdma_setup_queue_work(struct work_struct *work)
 {
-	int i, ret = 0;
+	struct nvme_rdma_queue *queue =
+		container_of(work, struct nvme_rdma_queue, setup_work);
+	int ret;
 
-	for (i = first; i < last; i++) {
-		ret = nvme_rdma_start_queue(ctrl, i);
-		if (ret)
-			goto out_stop_queues;
-	}
+	ret = nvme_rdma_alloc_queue(queue);
+	if (ret)
+		goto out_err;
 
-	return 0;
+	ret = nvme_rdma_start_queue(queue->ctrl, queue->queue_idx);
+	if (ret)
+		goto out_err;
 
-out_stop_queues:
-	for (i--; i >= first; i--)
-		nvme_rdma_stop_queue(&ctrl->queues[i]);
-	return ret;
+	return;
+
+out_err:
+	atomic_cmpxchg(&queue->ctrl->qsetup_err, 0, ret);
 }
 
-static int nvme_rdma_alloc_io_queues(struct nvme_rdma_ctrl *ctrl)
+static int nvme_rdma_setup_io_queues(struct nvme_rdma_ctrl *ctrl, int first,
+				     int last, size_t queue_size)
 {
-	struct nvmf_ctrl_options *opts = ctrl->ctrl.opts;
-	unsigned int nr_io_queues;
-	int i, ret;
+	int nr_queues = last - first;
+	int ret, i;
 
-	nr_io_queues = nvmf_nr_io_queues(opts);
-	ret = nvme_set_queue_count(&ctrl->ctrl, &nr_io_queues);
-	if (ret)
-		return ret;
+	atomic_set(&ctrl->qsetup_err, 0);
 
-	if (nr_io_queues == 0) {
-		dev_err(ctrl->ctrl.device,
-			"unable to set any I/O queues\n");
-		return -ENOMEM;
-	}
-
-	ctrl->ctrl.queue_count = nr_io_queues + 1;
-	dev_info(ctrl->ctrl.device,
-		"creating %d I/O queues.\n", nr_io_queues);
+	for (i = 0; i < nr_queues; i++) {
+		struct nvme_rdma_queue *queue = &ctrl->queues[first + i];
 
-	nvmf_set_io_queues(opts, nr_io_queues, ctrl->io_queues);
-	for (i = 1; i < ctrl->ctrl.queue_count; i++) {
-		ret = nvme_rdma_alloc_queue(ctrl, i,
-				ctrl->ctrl.sqsize + 1);
-		if (ret)
-			goto out_free_queues;
+		queue->ctrl = ctrl;
+		queue->queue_idx = first + i;
+		queue->queue_size = queue_size;
+		INIT_WORK(&queue->setup_work, nvme_rdma_setup_queue_work);
+		queue_work(nvme_setup_wq, &queue->setup_work);
 	}
 
-	return 0;
+	for (i = 0; i < nr_queues; i++)
+		flush_work(&ctrl->queues[first + i].setup_work);
 
-out_free_queues:
-	for (i--; i >= 1; i--)
-		nvme_rdma_free_queue(&ctrl->queues[i]);
+	ret = atomic_read(&ctrl->qsetup_err);
+	if (ret) {
+		for (i = 0; i < nr_queues; i++) {
+			struct nvme_rdma_queue *queue =
+				&ctrl->queues[first + i];
+
+			if (test_bit(NVME_RDMA_Q_LIVE, &queue->flags))
+				nvme_rdma_stop_queue(queue);
+			if (test_bit(NVME_RDMA_Q_ALLOCATED, &queue->flags))
+				nvme_rdma_free_queue(queue);
+		}
+	}
 
 	return ret;
 }
@@ -783,7 +782,9 @@ static int nvme_rdma_configure_admin_queue(struct nvme_rdma_ctrl *ctrl,
 	bool pi_capable = false;
 	int error;
 
-	error = nvme_rdma_alloc_queue(ctrl, 0, NVME_AQ_DEPTH);
+	ctrl->queues[0].ctrl = ctrl;
+	ctrl->queues[0].queue_size = NVME_AQ_DEPTH;
+	error = nvme_rdma_alloc_queue(&ctrl->queues[0]);
 	if (error)
 		return error;
 
@@ -864,11 +865,22 @@ static int nvme_rdma_configure_admin_queue(struct nvme_rdma_ctrl *ctrl,
 static int nvme_rdma_configure_io_queues(struct nvme_rdma_ctrl *ctrl, bool new)
 {
 	int ret, nr_queues;
+	unsigned int nr_io_queues;
 
-	ret = nvme_rdma_alloc_io_queues(ctrl);
+	nr_io_queues = nvmf_nr_io_queues(ctrl->ctrl.opts);
+	ret = nvme_set_queue_count(&ctrl->ctrl, &nr_io_queues);
 	if (ret)
 		return ret;
 
+	if (nr_io_queues == 0) {
+		dev_err(ctrl->ctrl.device, "unable to set any I/O queues\n");
+		return -ENOMEM;
+	}
+
+	ctrl->ctrl.queue_count = nr_io_queues + 1;
+	dev_info(ctrl->ctrl.device, "creating %d I/O queues.\n", nr_io_queues);
+	nvmf_set_io_queues(ctrl->ctrl.opts, nr_io_queues, ctrl->io_queues);
+
 	if (new) {
 		ret = nvme_rdma_alloc_tag_set(&ctrl->ctrl);
 		if (ret)
@@ -881,7 +893,9 @@ static int nvme_rdma_configure_io_queues(struct nvme_rdma_ctrl *ctrl, bool new)
 	 * queue number might have changed.
 	 */
 	nr_queues = min(ctrl->tag_set.nr_hw_queues + 1, ctrl->ctrl.queue_count);
-	ret = nvme_rdma_start_io_queues(ctrl, 1, nr_queues);
+	ret = nvme_rdma_setup_io_queues(ctrl, 1, nr_queues,
+					ctrl->ctrl.sqsize + 1);
+
 	if (ret)
 		goto out_cleanup_tagset;
 
@@ -905,12 +919,15 @@ static int nvme_rdma_configure_io_queues(struct nvme_rdma_ctrl *ctrl, bool new)
 
 	/*
 	 * If the number of queues has increased (reconnect case)
-	 * start all new queues now.
+	 * setup all new queues now.
 	 */
-	ret = nvme_rdma_start_io_queues(ctrl, nr_queues,
-					ctrl->tag_set.nr_hw_queues + 1);
-	if (ret)
-		goto out_wait_freeze_timed_out;
+	if (ctrl->tag_set.nr_hw_queues + 1 > nr_queues) {
+		ret = nvme_rdma_setup_io_queues(ctrl, nr_queues,
+						ctrl->tag_set.nr_hw_queues + 1,
+						ctrl->ctrl.sqsize + 1);
+		if (ret)
+			goto out_wait_freeze_timed_out;
+	}
 
 	return 0;
 
-- 
2.54.0




More information about the Linux-nvme mailing list