[PATCH 14/17] nvmet-tcp: reference counting for queues
Sagi Grimberg
sagi at grimberg.me
Sun Aug 13 07:01:22 PDT 2023
On 8/11/23 15:17, Hannes Reinecke wrote:
> The 'queue' structure is referenced from various places and
> used as an argument of asynchronous functions, so it's really
> hard to figure out if the queue is still valid when the
> asynchronous function triggers.
> So add reference counting to validate the queue structure.
>
> Signed-off-by: Hannes Reinecke <hare at suse.de>
> ---
> drivers/nvme/target/tcp.c | 74 ++++++++++++++++++++++++++++++---------
> 1 file changed, 57 insertions(+), 17 deletions(-)
>
> diff --git a/drivers/nvme/target/tcp.c b/drivers/nvme/target/tcp.c
> index f19ea9d923fd..84b726dfc1c4 100644
> --- a/drivers/nvme/target/tcp.c
> +++ b/drivers/nvme/target/tcp.c
> @@ -127,6 +127,7 @@ enum nvmet_tcp_queue_state {
> };
>
> struct nvmet_tcp_queue {
> + struct kref kref;
> struct socket *sock;
> struct nvmet_tcp_port *port;
> struct work_struct io_work;
> @@ -192,6 +193,9 @@ static struct workqueue_struct *nvmet_tcp_wq;
> static const struct nvmet_fabrics_ops nvmet_tcp_ops;
> static void nvmet_tcp_free_cmd(struct nvmet_tcp_cmd *c);
> static void nvmet_tcp_free_cmd_buffers(struct nvmet_tcp_cmd *cmd);
> +static int nvmet_tcp_get_queue(struct nvmet_tcp_queue *queue);
> +static void nvmet_tcp_put_queue(struct nvmet_tcp_queue *queue);
> +static void nvmet_tcp_data_ready(struct sock *sk);
>
> static inline u16 nvmet_tcp_cmd_tag(struct nvmet_tcp_queue *queue,
> struct nvmet_tcp_cmd *cmd)
> @@ -1437,11 +1441,21 @@ static void nvmet_tcp_restore_socket_callbacks(struct nvmet_tcp_queue *queue)
> struct socket *sock = queue->sock;
>
> write_lock_bh(&sock->sk->sk_callback_lock);
> + /*
> + * Check if nvmet_tcp_set_queue_sock() has been called;
> + * if not the queue reference has not been increased
> + * and we're getting an refcount error on exit.
> + */
> + if (sock->sk->sk_data_ready != nvmet_tcp_data_ready) {
> + write_unlock_bh(&sock->sk->sk_callback_lock);
> + return;
> + }
This is really ugly I think.
> sock->sk->sk_data_ready = queue->data_ready;
> sock->sk->sk_state_change = queue->state_change;
> sock->sk->sk_write_space = queue->write_space;
> sock->sk->sk_user_data = NULL;
> write_unlock_bh(&sock->sk->sk_callback_lock);
> + nvmet_tcp_put_queue(queue);
> }
>
> static void nvmet_tcp_uninit_data_in_cmds(struct nvmet_tcp_queue *queue)
> @@ -1474,6 +1488,30 @@ static void nvmet_tcp_free_cmd_data_in_buffers(struct nvmet_tcp_queue *queue)
> nvmet_tcp_free_cmd_buffers(&queue->connect);
> }
>
> +static void nvmet_tcp_release_queue_final(struct kref *kref)
> +{
> + struct nvmet_tcp_queue *queue = container_of(kref, struct nvmet_tcp_queue, kref);
> +
> + WARN_ON(queue->state != NVMET_TCP_Q_DISCONNECTING);
> + nvmet_tcp_free_cmds(queue);
> + ida_free(&nvmet_tcp_queue_ida, queue->idx);
> + /* ->sock will be released by fput() */
> + fput(queue->sock->file);
> + kfree(queue);
> +}
> +
> +static int nvmet_tcp_get_queue(struct nvmet_tcp_queue *queue)
> +{
> + if (!queue)
> + return 0;
> + return kref_get_unless_zero(&queue->kref);
> +}
> +
> +static void nvmet_tcp_put_queue(struct nvmet_tcp_queue *queue)
> +{
> + kref_put(&queue->kref, nvmet_tcp_release_queue_final);
> +}
> +
> static void nvmet_tcp_release_queue_work(struct work_struct *w)
> {
> struct page *page;
> @@ -1493,15 +1531,11 @@ static void nvmet_tcp_release_queue_work(struct work_struct *w)
> nvmet_sq_destroy(&queue->nvme_sq);
> cancel_work_sync(&queue->io_work);
> nvmet_tcp_free_cmd_data_in_buffers(queue);
> - /* ->sock will be released by fput() */
> - fput(queue->sock->file);
> - nvmet_tcp_free_cmds(queue);
> if (queue->hdr_digest || queue->data_digest)
> nvmet_tcp_free_crypto(queue);
> - ida_free(&nvmet_tcp_queue_ida, queue->idx);
> page = virt_to_head_page(queue->pf_cache.va);
> __page_frag_cache_drain(page, queue->pf_cache.pagecnt_bias);
> - kfree(queue);
> + nvmet_tcp_put_queue(queue);
What made you pick these vs. the others for before/after the
final reference?
> }
>
> static void nvmet_tcp_data_ready(struct sock *sk)
> @@ -1512,8 +1546,10 @@ static void nvmet_tcp_data_ready(struct sock *sk)
>
> read_lock_bh(&sk->sk_callback_lock);
> queue = sk->sk_user_data;
> - if (likely(queue))
> + if (likely(nvmet_tcp_get_queue(queue))) {
> queue_work_on(queue_cpu(queue), nvmet_tcp_wq, &queue->io_work);
> + nvmet_tcp_put_queue(queue);
> + }
No... Why?
The shutdown code should serialize perfectly without this. Why add
a kref to the normal I/O path?
I thought we'd simply move release_work to do a kref_put and take
an extra reference when we fire the tls handshake...
> read_unlock_bh(&sk->sk_callback_lock);
> }
>
> @@ -1523,18 +1559,16 @@ static void nvmet_tcp_write_space(struct sock *sk)
>
> read_lock_bh(&sk->sk_callback_lock);
> queue = sk->sk_user_data;
> - if (unlikely(!queue))
> + if (unlikely(!nvmet_tcp_get_queue(queue)))
> goto out;
>
> if (unlikely(queue->state == NVMET_TCP_Q_CONNECTING)) {
> queue->write_space(sk);
> - goto out;
> - }
> -
> - if (sk_stream_is_writeable(sk)) {
> + } else if (sk_stream_is_writeable(sk)) {
> clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
> queue_work_on(queue_cpu(queue), nvmet_tcp_wq, &queue->io_work);
> }
> + nvmet_tcp_put_queue(queue);
Same here...
Why not simply exclude the kref get/put to the tls handshake where
there is actually a race?
> out:
> read_unlock_bh(&sk->sk_callback_lock);
> }
> @@ -1545,7 +1579,7 @@ static void nvmet_tcp_state_change(struct sock *sk)
>
> read_lock_bh(&sk->sk_callback_lock);
> queue = sk->sk_user_data;
> - if (!queue)
> + if (!nvmet_tcp_get_queue(queue))
> goto done;
>
> switch (sk->sk_state) {
> @@ -1562,6 +1596,7 @@ static void nvmet_tcp_state_change(struct sock *sk)
> pr_warn("queue %d unhandled state %d\n",
> queue->idx, sk->sk_state);
> }
> + nvmet_tcp_put_queue(queue);
> done:
> read_unlock_bh(&sk->sk_callback_lock);
> }
> @@ -1582,6 +1617,9 @@ static int nvmet_tcp_set_queue_sock(struct nvmet_tcp_queue *queue)
> if (ret < 0)
> return ret;
>
> + if (unlikely(!nvmet_tcp_get_queue(queue)))
> + return -ECONNRESET;
> +
Can this actually fail?
> /*
> * Cleanup whatever is sitting in the TCP transmit queue on socket
> * close. This is done to prevent stale data from being sent should
> @@ -1603,6 +1641,7 @@ static int nvmet_tcp_set_queue_sock(struct nvmet_tcp_queue *queue)
> * If the socket is already closing, don't even start
> * consuming it
> */
> + nvmet_tcp_put_queue(queue);
> ret = -ENOTCONN;
> } else {
> sock->sk->sk_user_data = queue;
> @@ -1636,6 +1675,7 @@ static void nvmet_tcp_alloc_queue(struct nvmet_tcp_port *port,
>
> INIT_WORK(&queue->release_work, nvmet_tcp_release_queue_work);
> INIT_WORK(&queue->io_work, nvmet_tcp_io_work);
> + kref_init(&queue->kref);
> queue->sock = newsock;
> queue->port = port;
> queue->nr_cmds = 0;
> @@ -1672,15 +1712,15 @@ static void nvmet_tcp_alloc_queue(struct nvmet_tcp_port *port,
> mutex_unlock(&nvmet_tcp_queue_mutex);
>
> ret = nvmet_tcp_set_queue_sock(queue);
> - if (ret)
> - goto out_destroy_sq;
> + if (!ret)
> + return;
>
> - return;
> -out_destroy_sq:
> + queue->state = NVMET_TCP_Q_DISCONNECTING;
> mutex_lock(&nvmet_tcp_queue_mutex);
> list_del_init(&queue->queue_list);
> mutex_unlock(&nvmet_tcp_queue_mutex);
> - nvmet_sq_destroy(&queue->nvme_sq);
> + nvmet_tcp_put_queue(queue);
> + return;
> out_free_connect:
> nvmet_tcp_free_cmd(&queue->connect);
> out_ida_remove:
More information about the Linux-nvme
mailing list