[PATCH net-next 31/32] rxrpc: Move client call connection to the I/O thread
David Howells
dhowells at redhat.com
Tue Dec 6 08:02:56 PST 2022
Move the connection setup of client calls to the I/O thread so that a whole
load of locking and barrierage can be eliminated. This necessitates the
app thread waiting for connection to complete before it can begin
encrypting data.
Signed-off-by: David Howells <dhowells at redhat.com>
cc: Marc Dionne <marc.dionne at auristor.com>
cc: linux-afs at lists.infradead.org
---
include/trace/events/rxrpc.h | 5
net/rxrpc/ar-internal.h | 22 +-
net/rxrpc/call_object.c | 58 ++++-
net/rxrpc/call_state.c | 2
net/rxrpc/conn_client.c | 525 ++++++++++--------------------------------
net/rxrpc/conn_event.c | 50 +---
net/rxrpc/conn_object.c | 19 +-
net/rxrpc/conn_service.c | 1
net/rxrpc/io_thread.c | 13 +
net/rxrpc/local_object.c | 6
net/rxrpc/proc.c | 1
net/rxrpc/rxkad.c | 21 +-
net/rxrpc/security.c | 34 +--
net/rxrpc/sendmsg.c | 64 +++++
14 files changed, 295 insertions(+), 526 deletions(-)
diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index a4c5bc481cfc..3e228aa2d2b1 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -225,7 +225,6 @@
EM(rxrpc_conn_put_call, "PUT call ") \
EM(rxrpc_conn_put_call_input, "PUT inp-call") \
EM(rxrpc_conn_put_conn_input, "PUT inp-conn") \
- EM(rxrpc_conn_put_discard, "PUT discard ") \
EM(rxrpc_conn_put_discard_idle, "PUT disc-idl") \
EM(rxrpc_conn_put_local_dead, "PUT loc-dead") \
EM(rxrpc_conn_put_noreuse, "PUT noreuse ") \
@@ -247,12 +246,11 @@
EM(rxrpc_client_chan_activate, "ChActv") \
EM(rxrpc_client_chan_disconnect, "ChDisc") \
EM(rxrpc_client_chan_pass, "ChPass") \
- EM(rxrpc_client_chan_wait_failed, "ChWtFl") \
EM(rxrpc_client_cleanup, "Clean ") \
EM(rxrpc_client_discard, "Discar") \
- EM(rxrpc_client_duplicate, "Duplic") \
EM(rxrpc_client_exposed, "Expose") \
EM(rxrpc_client_replace, "Replac") \
+ EM(rxrpc_client_queue_new_call, "Q-Call") \
EM(rxrpc_client_to_active, "->Actv") \
E_(rxrpc_client_to_idle, "->Idle")
@@ -280,6 +278,7 @@
EM(rxrpc_call_put_sendmsg, "PUT sendmsg ") \
EM(rxrpc_call_put_unnotify, "PUT unnotify") \
EM(rxrpc_call_put_userid_exists, "PUT u-exists") \
+ EM(rxrpc_call_put_userid, "PUT user-id ") \
EM(rxrpc_call_see_accept, "SEE accept ") \
EM(rxrpc_call_see_activate_client, "SEE act-clnt") \
EM(rxrpc_call_see_connect_failed, "SEE con-fail") \
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index e79263927a6d..04f6253ae169 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -293,7 +293,6 @@ struct rxrpc_local {
struct rb_root client_bundles; /* Client connection bundles by socket params */
spinlock_t client_bundles_lock; /* Lock for client_bundles */
bool kill_all_client_conns;
- spinlock_t client_conn_cache_lock; /* Lock for ->*_client_conns */
struct list_head idle_client_conns;
struct timer_list client_conn_reap_timer;
unsigned long client_conn_flags;
@@ -305,7 +304,8 @@ struct rxrpc_local {
bool dead;
bool service_closed; /* Service socket closed */
struct idr conn_ids; /* List of connection IDs */
- spinlock_t conn_lock; /* Lock for client connection pool */
+ struct list_head new_client_calls; /* Newly created client calls need connection */
+ spinlock_t client_call_lock; /* Lock for ->new_client_calls */
struct sockaddr_rxrpc srx; /* local address */
};
@@ -386,7 +386,6 @@ enum rxrpc_call_completion {
* Bits in the connection flags.
*/
enum rxrpc_conn_flag {
- RXRPC_CONN_HAS_IDR, /* Has a client conn ID assigned */
RXRPC_CONN_IN_SERVICE_CONNS, /* Conn is in peer->service_conns */
RXRPC_CONN_DONT_REUSE, /* Don't reuse this connection */
RXRPC_CONN_PROBING_FOR_UPGRADE, /* Probing for service upgrade */
@@ -414,6 +413,7 @@ enum rxrpc_conn_event {
*/
enum rxrpc_conn_proto_state {
RXRPC_CONN_UNUSED, /* Connection not yet attempted */
+ RXRPC_CONN_CLIENT_UNSECURED, /* Client connection needs security init */
RXRPC_CONN_CLIENT, /* Client connection */
RXRPC_CONN_SERVICE_PREALLOC, /* Service connection preallocation */
RXRPC_CONN_SERVICE_UNSECURED, /* Service unsecured connection */
@@ -437,11 +437,9 @@ struct rxrpc_bundle {
u32 security_level; /* Security level selected */
u16 service_id; /* Service ID for this connection */
bool try_upgrade; /* True if the bundle is attempting upgrade */
- bool alloc_conn; /* True if someone's getting a conn */
bool exclusive; /* T if conn is exclusive */
bool upgrade; /* T if service ID can be upgraded */
- short alloc_error; /* Error from last conn allocation */
- spinlock_t channel_lock;
+ unsigned short alloc_error; /* Error from last conn allocation */
struct rb_node local_node; /* Node in local->client_conns */
struct list_head waiting_calls; /* Calls waiting for channels */
unsigned long avail_chans; /* Mask of available channels */
@@ -469,7 +467,7 @@ struct rxrpc_connection {
unsigned char act_chans; /* Mask of active channels */
struct rxrpc_channel {
unsigned long final_ack_at; /* Time at which to issue final ACK */
- struct rxrpc_call __rcu *call; /* Active call */
+ struct rxrpc_call *call; /* Active call */
unsigned int call_debug_id; /* call->debug_id */
u32 call_id; /* ID of current call */
u32 call_counter; /* Call ID counter */
@@ -490,6 +488,7 @@ struct rxrpc_connection {
struct list_head link; /* link in master connection list */
struct sk_buff_head rx_queue; /* received conn-level packets */
+ struct mutex security_lock; /* Lock for security management */
const struct rxrpc_security *security; /* applied security module */
union {
struct {
@@ -620,7 +619,7 @@ struct rxrpc_call {
struct work_struct destroyer; /* In-process-context destroyer */
rxrpc_notify_rx_t notify_rx; /* kernel service Rx notification function */
struct list_head link; /* link in master call list */
- struct list_head chan_wait_link; /* Link in conn->bundle->waiting_calls */
+ struct list_head wait_link; /* Link in local->new_client_calls */
struct hlist_node error_link; /* link in error distribution list */
struct list_head accept_link; /* Link in rx->acceptq */
struct list_head recvmsg_link; /* Link in rx->recvmsg_q */
@@ -867,6 +866,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
struct sockaddr_rxrpc *,
struct rxrpc_call_params *, gfp_t,
unsigned int);
+void rxrpc_start_call_timer(struct rxrpc_call *);
void rxrpc_incoming_call(struct rxrpc_sock *, struct rxrpc_call *,
struct sk_buff *);
void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
@@ -901,6 +901,7 @@ static inline void rxrpc_set_call_state(struct rxrpc_call *call,
{
/* Order write of completion info before write of ->state. */
smp_store_release(&call->_state, state);
+ wake_up(&call->waitq);
}
static inline enum rxrpc_call_state __rxrpc_call_state(const struct rxrpc_call *call)
@@ -936,10 +937,11 @@ extern unsigned int rxrpc_reap_client_connections;
extern unsigned long rxrpc_conn_idle_client_expiry;
extern unsigned long rxrpc_conn_idle_client_fast_expiry;
-void rxrpc_destroy_client_conn_ids(struct rxrpc_local *);
+void rxrpc_purge_client_connections(struct rxrpc_local *);
struct rxrpc_bundle *rxrpc_get_bundle(struct rxrpc_bundle *, enum rxrpc_bundle_trace);
void rxrpc_put_bundle(struct rxrpc_bundle *, enum rxrpc_bundle_trace);
-int rxrpc_connect_call(struct rxrpc_call *, gfp_t);
+int rxrpc_look_up_bundle(struct rxrpc_call *, gfp_t);
+void rxrpc_connect_client_calls(struct rxrpc_local *);
void rxrpc_expose_client_call(struct rxrpc_call *);
void rxrpc_disconnect_client_call(struct rxrpc_bundle *, struct rxrpc_call *);
void rxrpc_deactivate_bundle(struct rxrpc_bundle *);
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 88b9c4f5b122..6fed91fa848f 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -150,7 +150,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
INIT_WORK(&call->destroyer, rxrpc_destroy_call);
INIT_LIST_HEAD(&call->link);
- INIT_LIST_HEAD(&call->chan_wait_link);
+ INIT_LIST_HEAD(&call->wait_link);
INIT_LIST_HEAD(&call->accept_link);
INIT_LIST_HEAD(&call->recvmsg_link);
INIT_LIST_HEAD(&call->sock_link);
@@ -243,7 +243,7 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
/*
* Initiate the call ack/resend/expiry timer.
*/
-static void rxrpc_start_call_timer(struct rxrpc_call *call)
+void rxrpc_start_call_timer(struct rxrpc_call *call)
{
unsigned long now = jiffies;
unsigned long j = now + MAX_JIFFY_OFFSET;
@@ -287,6 +287,39 @@ static void rxrpc_put_call_slot(struct rxrpc_call *call)
up(limiter);
}
+/*
+ * Start the process of connecting a call. We obtain a peer and a connection
+ * bundle, but the actual association of a call with a connection is offloaded
+ * to the I/O thread to simplify locking.
+ */
+static int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp)
+{
+ struct rxrpc_local *local = call->local;
+ int ret = 0;
+
+ _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
+
+ call->peer = rxrpc_lookup_peer(local, &call->dest_srx, gfp);
+ if (!call->peer)
+ goto error;
+
+ ret = rxrpc_look_up_bundle(call, gfp);
+ if (ret < 0)
+ goto error;
+
+ trace_rxrpc_client(NULL, -1, rxrpc_client_queue_new_call);
+ rxrpc_get_call(call, rxrpc_call_get_io_thread);
+ spin_lock(&local->client_call_lock);
+ list_add_tail(&call->wait_link, &local->new_client_calls);
+ spin_unlock(&local->client_call_lock);
+ rxrpc_wake_up_io_thread(local);
+ return 0;
+
+error:
+ __set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
+ return ret;
+}
+
/*
* Set up a call for the given parameters.
* - Called with the socket lock held, which it must release.
@@ -370,10 +403,6 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
if (ret < 0)
goto error_attached_to_socket;
- rxrpc_see_call(call, rxrpc_call_see_connected);
-
- rxrpc_start_call_timer(call);
-
_leave(" = %p [new]", call);
return call;
@@ -388,22 +417,20 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
rxrpc_prefail_call(call, RXRPC_CALL_LOCAL_ERROR, -EEXIST);
trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), 0,
rxrpc_call_see_userid_exists);
- rxrpc_release_call(rx, call);
mutex_unlock(&call->user_mutex);
rxrpc_put_call(call, rxrpc_call_put_userid_exists);
_leave(" = -EEXIST");
return ERR_PTR(-EEXIST);
/* We got an error, but the call is attached to the socket and is in
- * need of release. However, we might now race with recvmsg() when
- * completing the call queues it. Return 0 from sys_sendmsg() and
+ * need of release. However, we might now race with recvmsg() when it
+ * completion notifies the socket. Return 0 from sys_sendmsg() and
* leave the error to recvmsg() to deal with.
*/
error_attached_to_socket:
trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), ret,
rxrpc_call_see_connect_failed);
- set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
- rxrpc_prefail_call(call, RXRPC_CALL_LOCAL_ERROR, ret);
+ rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 0, ret);
_leave(" = c=%08x [err]", call->debug_id);
return call;
}
@@ -458,7 +485,7 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
chan = sp->hdr.cid & RXRPC_CHANNELMASK;
conn->channels[chan].call_counter = call->call_id;
conn->channels[chan].call_id = call->call_id;
- rcu_assign_pointer(conn->channels[chan].call, call);
+ conn->channels[chan].call = call;
spin_unlock(&conn->state_lock);
spin_lock(&conn->peer->lock);
@@ -518,7 +545,7 @@ static void rxrpc_cleanup_ring(struct rxrpc_call *call)
void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
{
struct rxrpc_connection *conn = call->conn;
- bool put = false;
+ bool put = false, putu = false;
_enter("{%d,%d}", call->debug_id, refcount_read(&call->ref));
@@ -553,7 +580,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
if (test_and_clear_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
rb_erase(&call->sock_node, &rx->calls);
memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
- rxrpc_put_call(call, rxrpc_call_put_userid_exists);
+ putu = true;
}
list_del(&call->sock_link);
@@ -561,6 +588,9 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
+ if (putu)
+ rxrpc_put_call(call, rxrpc_call_put_userid);
+
_leave("");
}
diff --git a/net/rxrpc/call_state.c b/net/rxrpc/call_state.c
index c1f131618ac4..59a5588805ac 100644
--- a/net/rxrpc/call_state.c
+++ b/net/rxrpc/call_state.c
@@ -65,5 +65,5 @@ void rxrpc_prefail_call(struct rxrpc_call *call, enum rxrpc_call_completion comp
call->completion = compl;
call->_state = RXRPC_CALL_COMPLETE;
trace_rxrpc_call_complete(call);
- __set_bit(RXRPC_CALL_RELEASED, &call->flags);
+ WARN_ON_ONCE(__test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags));
}
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index 6b6b2f72c0ed..6268e9bb9649 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -39,61 +39,19 @@ static void rxrpc_activate_bundle(struct rxrpc_bundle *bundle)
atomic_inc(&bundle->active);
}
-/*
- * Get a connection ID and epoch for a client connection from the global pool.
- * The connection struct pointer is then recorded in the idr radix tree. The
- * epoch doesn't change until the client is rebooted (or, at least, unless the
- * module is unloaded).
- */
-static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn,
- gfp_t gfp)
-{
- struct rxrpc_local *local = conn->local;
- int id;
-
- _enter("");
-
- idr_preload(gfp);
- spin_lock(&local->conn_lock);
-
- id = idr_alloc_cyclic(&local->conn_ids, conn,
- 1, 0x40000000, GFP_NOWAIT);
- if (id < 0)
- goto error;
-
- spin_unlock(&local->conn_lock);
- idr_preload_end();
-
- conn->proto.epoch = local->rxnet->epoch;
- conn->proto.cid = id << RXRPC_CIDSHIFT;
- set_bit(RXRPC_CONN_HAS_IDR, &conn->flags);
- _leave(" [CID %x]", conn->proto.cid);
- return 0;
-
-error:
- spin_unlock(&local->conn_lock);
- idr_preload_end();
- _leave(" = %d", id);
- return id;
-}
-
/*
* Release a connection ID for a client connection.
*/
static void rxrpc_put_client_connection_id(struct rxrpc_local *local,
struct rxrpc_connection *conn)
{
- if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) {
- spin_lock(&local->conn_lock);
- idr_remove(&local->conn_ids, conn->proto.cid >> RXRPC_CIDSHIFT);
- spin_unlock(&local->conn_lock);
- }
+ idr_remove(&local->conn_ids, conn->proto.cid >> RXRPC_CIDSHIFT);
}
/*
* Destroy the client connection ID tree.
*/
-void rxrpc_destroy_client_conn_ids(struct rxrpc_local *local)
+static void rxrpc_destroy_client_conn_ids(struct rxrpc_local *local)
{
struct rxrpc_connection *conn;
int id;
@@ -129,7 +87,6 @@ static struct rxrpc_bundle *rxrpc_alloc_bundle(struct rxrpc_call *call,
bundle->security_level = call->security_level;
refcount_set(&bundle->ref, 1);
atomic_set(&bundle->active, 1);
- spin_lock_init(&bundle->channel_lock);
INIT_LIST_HEAD(&bundle->waiting_calls);
trace_rxrpc_bundle(bundle->debug_id, 1, rxrpc_bundle_new);
}
@@ -169,69 +126,68 @@ void rxrpc_put_bundle(struct rxrpc_bundle *bundle, enum rxrpc_bundle_trace why)
}
}
+/*
+ * Get rid of outstanding client connection preallocations when a local
+ * endpoint is destroyed.
+ */
+void rxrpc_purge_client_connections(struct rxrpc_local *local)
+{
+ rxrpc_destroy_client_conn_ids(local);
+}
+
/*
* Allocate a client connection.
*/
static struct rxrpc_connection *
-rxrpc_alloc_client_connection(struct rxrpc_bundle *bundle, gfp_t gfp)
+rxrpc_alloc_client_connection(struct rxrpc_bundle *bundle)
{
struct rxrpc_connection *conn;
- struct rxrpc_net *rxnet = bundle->local->rxnet;
- int ret;
+ struct rxrpc_local *local = bundle->local;
+ struct rxrpc_net *rxnet = local->rxnet;
+ int id;
_enter("");
- conn = rxrpc_alloc_connection(rxnet, gfp);
- if (!conn) {
- _leave(" = -ENOMEM");
+ conn = rxrpc_alloc_connection(rxnet, GFP_ATOMIC | __GFP_NOWARN);
+ if (!conn)
return ERR_PTR(-ENOMEM);
+
+ id = idr_alloc_cyclic(&local->conn_ids, conn, 1, 0x40000000,
+ GFP_ATOMIC | __GFP_NOWARN);
+ if (id < 0) {
+ kfree(conn);
+ return ERR_PTR(id);
}
refcount_set(&conn->ref, 1);
- conn->bundle = bundle;
- conn->local = bundle->local;
- conn->peer = bundle->peer;
- conn->key = bundle->key;
+ conn->proto.cid = id << RXRPC_CIDSHIFT;
+ conn->proto.epoch = local->rxnet->epoch;
+ conn->out_clientflag = RXRPC_CLIENT_INITIATED;
+ conn->bundle = rxrpc_get_bundle(bundle, rxrpc_bundle_get_client_conn);
+ conn->local = rxrpc_get_local(bundle->local, rxrpc_local_get_client_conn);
+ conn->peer = rxrpc_get_peer(bundle->peer, rxrpc_peer_get_client_conn);
+ conn->key = key_get(bundle->key);
+ conn->security = bundle->security;
conn->exclusive = bundle->exclusive;
conn->upgrade = bundle->upgrade;
conn->orig_service_id = bundle->service_id;
conn->security_level = bundle->security_level;
- conn->out_clientflag = RXRPC_CLIENT_INITIATED;
- conn->state = RXRPC_CONN_CLIENT;
+ conn->state = RXRPC_CONN_CLIENT_UNSECURED;
conn->service_id = conn->orig_service_id;
- ret = rxrpc_get_client_connection_id(conn, gfp);
- if (ret < 0)
- goto error_0;
-
- ret = rxrpc_init_client_conn_security(conn);
- if (ret < 0)
- goto error_1;
+ if (conn->security == &rxrpc_no_security)
+ conn->state = RXRPC_CONN_CLIENT;
atomic_inc(&rxnet->nr_conns);
write_lock(&rxnet->conn_lock);
list_add_tail(&conn->proc_link, &rxnet->conn_proc_list);
write_unlock(&rxnet->conn_lock);
- rxrpc_get_bundle(bundle, rxrpc_bundle_get_client_conn);
- rxrpc_get_peer(conn->peer, rxrpc_peer_get_client_conn);
- rxrpc_get_local(conn->local, rxrpc_local_get_client_conn);
- key_get(conn->key);
-
- trace_rxrpc_conn(conn->debug_id, refcount_read(&conn->ref),
- rxrpc_conn_new_client);
+ rxrpc_see_connection(conn, rxrpc_conn_new_client);
atomic_inc(&rxnet->nr_client_conns);
trace_rxrpc_client(conn, -1, rxrpc_client_alloc);
- _leave(" = %p", conn);
return conn;
-
-error_1:
- rxrpc_put_client_connection_id(bundle->local, conn);
-error_0:
- kfree(conn);
- _leave(" = %d", ret);
- return ERR_PTR(ret);
}
/*
@@ -249,7 +205,8 @@ static bool rxrpc_may_reuse_conn(struct rxrpc_connection *conn)
if (test_bit(RXRPC_CONN_DONT_REUSE, &conn->flags))
goto dont_reuse;
- if (conn->state != RXRPC_CONN_CLIENT ||
+ if ((conn->state != RXRPC_CONN_CLIENT_UNSECURED &&
+ conn->state != RXRPC_CONN_CLIENT) ||
conn->proto.epoch != rxnet->epoch)
goto mark_dont_reuse;
@@ -280,7 +237,7 @@ static bool rxrpc_may_reuse_conn(struct rxrpc_connection *conn)
* Look up the conn bundle that matches the connection parameters, adding it if
* it doesn't yet exist.
*/
-static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t gfp)
+int rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t gfp)
{
static atomic_t rxrpc_bundle_id;
struct rxrpc_bundle *bundle, *candidate;
@@ -295,7 +252,7 @@ static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t
if (test_bit(RXRPC_CALL_EXCLUSIVE, &call->flags)) {
call->bundle = rxrpc_alloc_bundle(call, gfp);
- return call->bundle;
+ return call->bundle ? 0 : -ENOMEM;
}
/* First, see if the bundle is already there. */
@@ -324,7 +281,7 @@ static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t
/* It wasn't. We need to add one. */
candidate = rxrpc_alloc_bundle(call, gfp);
if (!candidate)
- return ERR_PTR(-ENOMEM);
+ return -ENOMEM;
_debug("search 2");
spin_lock(&local->client_bundles_lock);
@@ -355,7 +312,7 @@ static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t
call->bundle = rxrpc_get_bundle(candidate, rxrpc_bundle_get_client_call);
spin_unlock(&local->client_bundles_lock);
_leave(" = B=%u [new]", call->bundle->debug_id);
- return call->bundle;
+ return 0;
found_bundle_free:
rxrpc_free_bundle(candidate);
@@ -364,160 +321,77 @@ static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t
rxrpc_activate_bundle(bundle);
spin_unlock(&local->client_bundles_lock);
_leave(" = B=%u [found]", call->bundle->debug_id);
- return call->bundle;
-}
-
-/*
- * Create or find a client bundle to use for a call.
- *
- * If we return with a connection, the call will be on its waiting list. It's
- * left to the caller to assign a channel and wake up the call.
- */
-static struct rxrpc_bundle *rxrpc_prep_call(struct rxrpc_call *call, gfp_t gfp)
-{
- struct rxrpc_bundle *bundle;
-
- _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
-
- call->peer = rxrpc_lookup_peer(call->local, &call->dest_srx, gfp);
- if (!call->peer)
- goto error;
-
- call->tx_last_sent = ktime_get_real();
- call->cong_ssthresh = call->peer->cong_ssthresh;
- if (call->cong_cwnd >= call->cong_ssthresh)
- call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE;
- else
- call->cong_mode = RXRPC_CALL_SLOW_START;
-
- /* Find the client connection bundle. */
- bundle = rxrpc_look_up_bundle(call, gfp);
- if (!bundle)
- goto error;
-
- /* Get this call queued. Someone else may activate it whilst we're
- * lining up a new connection, but that's fine.
- */
- spin_lock(&bundle->channel_lock);
- list_add_tail(&call->chan_wait_link, &bundle->waiting_calls);
- spin_unlock(&bundle->channel_lock);
-
- _leave(" = [B=%x]", bundle->debug_id);
- return bundle;
-
-error:
- _leave(" = -ENOMEM");
- return ERR_PTR(-ENOMEM);
+ return 0;
}
/*
* Allocate a new connection and add it into a bundle.
*/
-static void rxrpc_add_conn_to_bundle(struct rxrpc_bundle *bundle, gfp_t gfp)
- __releases(bundle->channel_lock)
+static bool rxrpc_add_conn_to_bundle(struct rxrpc_bundle *bundle,
+ unsigned int slot)
{
- struct rxrpc_connection *candidate = NULL, *old = NULL;
- bool conflict;
- int i;
-
- _enter("");
-
- conflict = bundle->alloc_conn;
- if (!conflict)
- bundle->alloc_conn = true;
- spin_unlock(&bundle->channel_lock);
- if (conflict) {
- _leave(" [conf]");
- return;
- }
-
- candidate = rxrpc_alloc_client_connection(bundle, gfp);
-
- spin_lock(&bundle->channel_lock);
- bundle->alloc_conn = false;
-
- if (IS_ERR(candidate)) {
- bundle->alloc_error = PTR_ERR(candidate);
- spin_unlock(&bundle->channel_lock);
- _leave(" [err %ld]", PTR_ERR(candidate));
- return;
- }
-
- bundle->alloc_error = 0;
+ struct rxrpc_connection *conn, *old;
+ unsigned int shift = slot * RXRPC_MAXCALLS;
+ unsigned int i;
- for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) {
- unsigned int shift = i * RXRPC_MAXCALLS;
- int j;
-
- old = bundle->conns[i];
- if (!rxrpc_may_reuse_conn(old)) {
- if (old)
- trace_rxrpc_client(old, -1, rxrpc_client_replace);
- candidate->bundle_shift = shift;
- rxrpc_activate_bundle(bundle);
- bundle->conns[i] = candidate;
- for (j = 0; j < RXRPC_MAXCALLS; j++)
- set_bit(shift + j, &bundle->avail_chans);
- candidate = NULL;
- break;
- }
-
- old = NULL;
+ old = bundle->conns[slot];
+ if (old) {
+ bundle->conns[slot] = NULL;
+ trace_rxrpc_client(old, -1, rxrpc_client_replace);
+ rxrpc_put_connection(old, rxrpc_conn_put_noreuse);
}
- spin_unlock(&bundle->channel_lock);
-
- if (candidate) {
- _debug("discard C=%x", candidate->debug_id);
- trace_rxrpc_client(candidate, -1, rxrpc_client_duplicate);
- rxrpc_put_connection(candidate, rxrpc_conn_put_discard);
+ conn = rxrpc_alloc_client_connection(bundle);
+ if (IS_ERR(conn)) {
+ bundle->alloc_error = PTR_ERR(conn);
+ return false;
}
- rxrpc_put_connection(old, rxrpc_conn_put_noreuse);
- _leave("");
+ rxrpc_activate_bundle(bundle);
+ conn->bundle_shift = shift;
+ bundle->conns[slot] = conn;
+ for (i = 0; i < RXRPC_MAXCALLS; i++)
+ set_bit(shift + i, &bundle->avail_chans);
+ return true;
}
/*
* Add a connection to a bundle if there are no usable connections or we have
* connections waiting for extra capacity.
*/
-static void rxrpc_maybe_add_conn(struct rxrpc_bundle *bundle, gfp_t gfp)
+static bool rxrpc_bundle_has_space(struct rxrpc_bundle *bundle)
{
- struct rxrpc_call *call;
- int i, usable;
+ int slot = -1, i, usable;
_enter("");
- spin_lock(&bundle->channel_lock);
+ bundle->alloc_error = 0;
/* See if there are any usable connections. */
usable = 0;
- for (i = 0; i < ARRAY_SIZE(bundle->conns); i++)
+ for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) {
if (rxrpc_may_reuse_conn(bundle->conns[i]))
usable++;
-
- if (!usable && !list_empty(&bundle->waiting_calls)) {
- call = list_first_entry(&bundle->waiting_calls,
- struct rxrpc_call, chan_wait_link);
- if (test_bit(RXRPC_CALL_UPGRADE, &call->flags))
- bundle->try_upgrade = true;
+ else if (slot == -1)
+ slot = i;
}
+ if (!usable && bundle->upgrade)
+ bundle->try_upgrade = true;
+
if (!usable)
goto alloc_conn;
if (!bundle->avail_chans &&
!bundle->try_upgrade &&
- !list_empty(&bundle->waiting_calls) &&
usable < ARRAY_SIZE(bundle->conns))
goto alloc_conn;
- spin_unlock(&bundle->channel_lock);
_leave("");
- return;
+ return usable;
alloc_conn:
- return rxrpc_add_conn_to_bundle(bundle, gfp);
+ return slot >= 0 ? rxrpc_add_conn_to_bundle(bundle, slot) : false;
}
/*
@@ -531,11 +405,13 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
struct rxrpc_channel *chan = &conn->channels[channel];
struct rxrpc_bundle *bundle = conn->bundle;
struct rxrpc_call *call = list_entry(bundle->waiting_calls.next,
- struct rxrpc_call, chan_wait_link);
+ struct rxrpc_call, wait_link);
u32 call_id = chan->call_counter + 1;
_enter("C=%x,%u", conn->debug_id, channel);
+ list_del_init(&call->wait_link);
+
trace_rxrpc_client(conn, channel, rxrpc_client_chan_activate);
/* Cancel the final ACK on the previous call if it hasn't been sent yet
@@ -545,65 +421,50 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
clear_bit(conn->bundle_shift + channel, &bundle->avail_chans);
rxrpc_see_call(call, rxrpc_call_see_activate_client);
- list_del_init(&call->chan_wait_link);
call->conn = rxrpc_get_connection(conn, rxrpc_conn_get_activate_call);
call->cid = conn->proto.cid | channel;
call->call_id = call_id;
call->dest_srx.srx_service = conn->service_id;
-
- trace_rxrpc_connect_call(call);
-
- rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_SEND_REQUEST);
-
- /* Paired with the read barrier in rxrpc_connect_call(). This orders
- * cid and epoch in the connection wrt to call_id without the need to
- * take the channel_lock.
- *
- * We provisionally assign a callNumber at this point, but we don't
- * confirm it until the call is about to be exposed.
- *
- * TODO: Pair with a barrier in the data_ready handler when that looks
- * at the call ID through a connection channel.
- */
- smp_wmb();
+ call->cong_ssthresh = call->peer->cong_ssthresh;
+ if (call->cong_cwnd >= call->cong_ssthresh)
+ call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE;
+ else
+ call->cong_mode = RXRPC_CALL_SLOW_START;
chan->call_id = call_id;
chan->call_debug_id = call->debug_id;
- rcu_assign_pointer(chan->call, call);
+ chan->call = call;
+
+ rxrpc_see_call(call, rxrpc_call_see_connected);
+ trace_rxrpc_connect_call(call);
+ call->tx_last_sent = ktime_get_real();
+ rxrpc_start_call_timer(call);
+ rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_SEND_REQUEST);
wake_up(&call->waitq);
}
/*
* Remove a connection from the idle list if it's on it.
*/
-static void rxrpc_unidle_conn(struct rxrpc_bundle *bundle, struct rxrpc_connection *conn)
+static void rxrpc_unidle_conn(struct rxrpc_connection *conn)
{
- struct rxrpc_local *local = bundle->local;
- bool drop_ref;
-
if (!list_empty(&conn->cache_link)) {
- drop_ref = false;
- spin_lock(&local->client_conn_cache_lock);
- if (!list_empty(&conn->cache_link)) {
- list_del_init(&conn->cache_link);
- drop_ref = true;
- }
- spin_unlock(&local->client_conn_cache_lock);
- if (drop_ref)
- rxrpc_put_connection(conn, rxrpc_conn_put_unidle);
+ list_del_init(&conn->cache_link);
+ rxrpc_put_connection(conn, rxrpc_conn_put_unidle);
}
}
/*
- * Assign channels and callNumbers to waiting calls with channel_lock
- * held by caller.
+ * Assign channels and callNumbers to waiting calls.
*/
-static void rxrpc_activate_channels_locked(struct rxrpc_bundle *bundle)
+static void rxrpc_activate_channels(struct rxrpc_bundle *bundle)
{
struct rxrpc_connection *conn;
unsigned long avail, mask;
unsigned int channel, slot;
+ trace_rxrpc_client(NULL, -1, rxrpc_client_activate_chans);
+
if (bundle->try_upgrade)
mask = 1;
else
@@ -623,7 +484,7 @@ static void rxrpc_activate_channels_locked(struct rxrpc_bundle *bundle)
if (bundle->try_upgrade)
set_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags);
- rxrpc_unidle_conn(bundle, conn);
+ rxrpc_unidle_conn(conn);
channel &= (RXRPC_MAXCALLS - 1);
conn->act_chans |= 1 << channel;
@@ -632,125 +493,24 @@ static void rxrpc_activate_channels_locked(struct rxrpc_bundle *bundle)
}
/*
- * Assign channels and callNumbers to waiting calls.
+ * Connect waiting channels (called from the I/O thread).
*/
-static void rxrpc_activate_channels(struct rxrpc_bundle *bundle)
-{
- _enter("B=%x", bundle->debug_id);
-
- trace_rxrpc_client(NULL, -1, rxrpc_client_activate_chans);
-
- if (!bundle->avail_chans)
- return;
-
- spin_lock(&bundle->channel_lock);
- rxrpc_activate_channels_locked(bundle);
- spin_unlock(&bundle->channel_lock);
- _leave("");
-}
-
-/*
- * Wait for a callNumber and a channel to be granted to a call.
- */
-static int rxrpc_wait_for_channel(struct rxrpc_bundle *bundle,
- struct rxrpc_call *call, gfp_t gfp)
+void rxrpc_connect_client_calls(struct rxrpc_local *local)
{
- DECLARE_WAITQUEUE(myself, current);
- int ret = 0;
-
- _enter("%d", call->debug_id);
-
- if (!gfpflags_allow_blocking(gfp)) {
- rxrpc_maybe_add_conn(bundle, gfp);
- rxrpc_activate_channels(bundle);
- ret = bundle->alloc_error ?: -EAGAIN;
- goto out;
- }
-
- add_wait_queue_exclusive(&call->waitq, &myself);
- for (;;) {
- rxrpc_maybe_add_conn(bundle, gfp);
- rxrpc_activate_channels(bundle);
- ret = bundle->alloc_error;
- if (ret < 0)
- break;
-
- switch (call->interruptibility) {
- case RXRPC_INTERRUPTIBLE:
- case RXRPC_PREINTERRUPTIBLE:
- set_current_state(TASK_INTERRUPTIBLE);
- break;
- case RXRPC_UNINTERRUPTIBLE:
- default:
- set_current_state(TASK_UNINTERRUPTIBLE);
- break;
- }
- if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN)
- break;
- if ((call->interruptibility == RXRPC_INTERRUPTIBLE ||
- call->interruptibility == RXRPC_PREINTERRUPTIBLE) &&
- signal_pending(current)) {
- ret = -ERESTARTSYS;
- break;
- }
- schedule();
- }
- remove_wait_queue(&call->waitq, &myself);
- __set_current_state(TASK_RUNNING);
-
-out:
- _leave(" = %d", ret);
- return ret;
-}
-
-/*
- * find a connection for a call
- * - called in process context with IRQs enabled
- */
-int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp)
-{
- struct rxrpc_bundle *bundle;
- int ret = 0;
-
- _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
-
- rxrpc_get_call(call, rxrpc_call_get_io_thread);
-
- bundle = rxrpc_prep_call(call, gfp);
- if (IS_ERR(bundle)) {
- rxrpc_put_call(call, rxrpc_call_get_io_thread);
- ret = PTR_ERR(bundle);
- goto out;
- }
-
- if (rxrpc_call_state(call) == RXRPC_CALL_CLIENT_AWAIT_CONN) {
- ret = rxrpc_wait_for_channel(bundle, call, gfp);
- if (ret < 0)
- goto wait_failed;
- }
-
-granted_channel:
- /* Paired with the write barrier in rxrpc_activate_one_channel(). */
- smp_rmb();
+ struct rxrpc_call *call;
-out:
- _leave(" = %d", ret);
- return ret;
+ while ((call = list_first_entry_or_null(&local->new_client_calls,
+ struct rxrpc_call, wait_link))
+ ) {
+ struct rxrpc_bundle *bundle = call->bundle;
-wait_failed:
- spin_lock(&bundle->channel_lock);
- list_del_init(&call->chan_wait_link);
- spin_unlock(&bundle->channel_lock);
+ spin_lock(&local->client_call_lock);
+ list_move_tail(&call->wait_link, &bundle->waiting_calls);
+ spin_unlock(&local->client_call_lock);
- if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN) {
- ret = 0;
- goto granted_channel;
+ if (rxrpc_bundle_has_space(bundle))
+ rxrpc_activate_channels(bundle);
}
-
- trace_rxrpc_client(call->conn, ret, rxrpc_client_chan_wait_failed);
- rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 0, ret);
- rxrpc_disconnect_client_call(bundle, call);
- goto out;
}
/*
@@ -808,7 +568,6 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
_enter("c=%x", call->debug_id);
- spin_lock(&bundle->channel_lock);
set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
/* Calls that have never actually been assigned a channel can simply be
@@ -819,7 +578,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
_debug("call is waiting");
ASSERTCMP(call->call_id, ==, 0);
ASSERT(!test_bit(RXRPC_CALL_EXPOSED, &call->flags));
- list_del_init(&call->chan_wait_link);
+ list_del_init(&call->wait_link);
goto out;
}
@@ -828,10 +587,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
chan = &conn->channels[channel];
trace_rxrpc_client(conn, channel, rxrpc_client_chan_disconnect);
- if (rcu_access_pointer(chan->call) != call) {
- spin_unlock(&bundle->channel_lock);
- BUG();
- }
+ BUG_ON(chan->call != call);
may_reuse = rxrpc_may_reuse_conn(conn);
@@ -852,9 +608,8 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
trace_rxrpc_client(conn, channel, rxrpc_client_to_active);
bundle->try_upgrade = false;
if (may_reuse)
- rxrpc_activate_channels_locked(bundle);
+ rxrpc_activate_channels(bundle);
}
-
}
/* See if we can pass the channel directly to another call. */
@@ -879,7 +634,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
}
/* Deactivate the channel. */
- rcu_assign_pointer(chan->call, NULL);
+ chan->call = NULL;
set_bit(conn->bundle_shift + channel, &conn->bundle->avail_chans);
conn->act_chans &= ~(1 << channel);
@@ -892,15 +647,12 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
conn->idle_timestamp = jiffies;
rxrpc_get_connection(conn, rxrpc_conn_get_idle);
- spin_lock(&local->client_conn_cache_lock);
list_move_tail(&conn->cache_link, &local->idle_client_conns);
- spin_unlock(&local->client_conn_cache_lock);
rxrpc_set_client_reap_timer(local);
}
out:
- spin_unlock(&bundle->channel_lock);
rxrpc_put_call(call, rxrpc_call_put_io_thread);
_leave("");
return;
@@ -913,7 +665,6 @@ static void rxrpc_unbundle_conn(struct rxrpc_connection *conn)
{
struct rxrpc_bundle *bundle = conn->bundle;
unsigned int bindex;
- bool need_drop = false;
int i;
_enter("C=%x", conn->debug_id);
@@ -921,18 +672,13 @@ static void rxrpc_unbundle_conn(struct rxrpc_connection *conn)
if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
rxrpc_process_delayed_final_acks(conn, true);
- spin_lock(&bundle->channel_lock);
bindex = conn->bundle_shift / RXRPC_MAXCALLS;
if (bundle->conns[bindex] == conn) {
_debug("clear slot %u", bindex);
bundle->conns[bindex] = NULL;
for (i = 0; i < RXRPC_MAXCALLS; i++)
clear_bit(conn->bundle_shift + i, &bundle->avail_chans);
- need_drop = true;
- }
- spin_unlock(&bundle->channel_lock);
-
- if (need_drop) {
+ rxrpc_put_client_connection_id(bundle->local, conn);
rxrpc_deactivate_bundle(bundle);
rxrpc_put_connection(conn, rxrpc_conn_put_unbundle);
}
@@ -994,24 +740,16 @@ void rxrpc_discard_expired_client_conns(struct rxrpc_local *local)
_enter("");
- if (list_empty(&local->idle_client_conns)) {
- _leave(" [empty]");
- return;
- }
-
/* We keep an estimate of what the number of conns ought to be after
* we've discarded some so that we don't overdo the discarding.
*/
nr_conns = atomic_read(&local->rxnet->nr_client_conns);
next:
- spin_lock(&local->client_conn_cache_lock);
-
- if (list_empty(&local->idle_client_conns))
- goto out;
-
- conn = list_entry(local->idle_client_conns.next,
- struct rxrpc_connection, cache_link);
+ conn = list_first_entry_or_null(&local->idle_client_conns,
+ struct rxrpc_connection, cache_link);
+ if (!conn)
+ return;
if (!local->kill_all_client_conns) {
/* If the number of connections is over the reap limit, we
@@ -1036,8 +774,6 @@ void rxrpc_discard_expired_client_conns(struct rxrpc_local *local)
trace_rxrpc_client(conn, -1, rxrpc_client_discard);
list_del_init(&conn->cache_link);
- spin_unlock(&local->client_conn_cache_lock);
-
rxrpc_unbundle_conn(conn);
/* Drop the ->cache_link ref */
rxrpc_put_connection(conn, rxrpc_conn_put_discard_idle);
@@ -1057,8 +793,6 @@ void rxrpc_discard_expired_client_conns(struct rxrpc_local *local)
if (!local->kill_all_client_conns)
timer_reduce(&local->client_conn_reap_timer, conn_expires_at);
-out:
- spin_unlock(&local->client_conn_cache_lock);
_leave("");
}
@@ -1067,34 +801,19 @@ void rxrpc_discard_expired_client_conns(struct rxrpc_local *local)
*/
void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
{
- struct rxrpc_connection *conn, *tmp;
- LIST_HEAD(graveyard);
+ struct rxrpc_connection *conn;
_enter("");
- spin_lock(&local->client_conn_cache_lock);
local->kill_all_client_conns = true;
- spin_unlock(&local->client_conn_cache_lock);
del_timer_sync(&local->client_conn_reap_timer);
- spin_lock(&local->client_conn_cache_lock);
-
- list_for_each_entry_safe(conn, tmp, &local->idle_client_conns,
- cache_link) {
- if (conn->local == local) {
- atomic_dec(&conn->active);
- trace_rxrpc_client(conn, -1, rxrpc_client_discard);
- list_move(&conn->cache_link, &graveyard);
- }
- }
-
- spin_unlock(&local->client_conn_cache_lock);
-
- while (!list_empty(&graveyard)) {
- conn = list_entry(graveyard.next,
- struct rxrpc_connection, cache_link);
+ while ((conn = list_first_entry_or_null(&local->idle_client_conns,
+ struct rxrpc_connection, cache_link))) {
list_del_init(&conn->cache_link);
+ atomic_dec(&conn->active);
+ trace_rxrpc_client(conn, -1, rxrpc_client_discard);
rxrpc_unbundle_conn(conn);
rxrpc_put_connection(conn, rxrpc_conn_put_local_dead);
}
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index 8009d7e62ae6..c8e7e1081020 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -100,9 +100,7 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
/* If the last call got moved on whilst we were waiting to run, just
* ignore this packet.
*/
- call_id = READ_ONCE(chan->last_call);
- /* Sync with __rxrpc_disconnect_call() */
- smp_rmb();
+ call_id = chan->last_call;
if (skb && call_id != sp->hdr.callNumber)
return;
@@ -119,9 +117,12 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
iov[2].iov_base = &ack_info;
iov[2].iov_len = sizeof(ack_info);
+ serial = atomic_inc_return(&conn->serial);
+
pkt.whdr.epoch = htonl(conn->proto.epoch);
pkt.whdr.cid = htonl(conn->proto.cid | channel);
pkt.whdr.callNumber = htonl(call_id);
+ pkt.whdr.serial = htonl(serial);
pkt.whdr.seq = 0;
pkt.whdr.type = chan->last_type;
pkt.whdr.flags = conn->out_clientflag;
@@ -158,31 +159,15 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
iov[0].iov_len += sizeof(pkt.ack);
len += sizeof(pkt.ack) + 3 + sizeof(ack_info);
ioc = 3;
- break;
-
- default:
- return;
- }
-
- /* Resync with __rxrpc_disconnect_call() and check that the last call
- * didn't get advanced whilst we were filling out the packets.
- */
- smp_rmb();
- if (READ_ONCE(chan->last_call) != call_id)
- return;
-
- serial = atomic_inc_return(&conn->serial);
- pkt.whdr.serial = htonl(serial);
- switch (chan->last_type) {
- case RXRPC_PACKET_TYPE_ABORT:
- break;
- case RXRPC_PACKET_TYPE_ACK:
trace_rxrpc_tx_ack(chan->call_debug_id, serial,
ntohl(pkt.ack.firstPacket),
ntohl(pkt.ack.serial),
pkt.ack.reason, 0);
break;
+
+ default:
+ return;
}
ret = kernel_sendmsg(conn->local->socket, &msg, iov, ioc, len);
@@ -207,20 +192,14 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn)
_enter("{%d},%x", conn->debug_id, conn->abort_code);
- spin_lock(&conn->bundle->channel_lock);
-
for (i = 0; i < RXRPC_MAXCALLS; i++) {
- call = rcu_dereference_protected(
- conn->channels[i].call,
- lockdep_is_held(&conn->bundle->channel_lock));
- if (call)
+ if ((call = conn->channels[i].call))
rxrpc_set_call_completion(call,
conn->completion,
conn->abort_code,
conn->error);
}
- spin_unlock(&conn->bundle->channel_lock);
_leave("");
}
@@ -316,9 +295,7 @@ void rxrpc_process_delayed_final_acks(struct rxrpc_connection *conn, bool force)
if (!test_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags))
continue;
- smp_rmb(); /* vs rxrpc_disconnect_client_call */
- ack_at = READ_ONCE(chan->final_ack_at);
-
+ ack_at = chan->final_ack_at;
if (time_before(j, ack_at) && !force) {
if (time_before(ack_at, next_j)) {
next_j = ack_at;
@@ -446,15 +423,8 @@ void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb)
if (conn->state != RXRPC_CONN_SERVICE)
break;
- spin_lock(&conn->bundle->channel_lock);
-
for (loop = 0; loop < RXRPC_MAXCALLS; loop++)
- rxrpc_call_is_secure(
- rcu_dereference_protected(
- conn->channels[loop].call,
- lockdep_is_held(&conn->bundle->channel_lock)));
-
- spin_unlock(&conn->bundle->channel_lock);
+ rxrpc_call_is_secure(conn->channels[loop].call);
break;
}
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index ed591a1f82c2..c420a5e0cdd6 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -67,6 +67,7 @@ struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *rxnet,
INIT_WORK(&conn->destructor, rxrpc_clean_up_connection);
INIT_LIST_HEAD(&conn->proc_link);
INIT_LIST_HEAD(&conn->link);
+ mutex_init(&conn->security_lock);
skb_queue_head_init(&conn->rx_queue);
conn->rxnet = rxnet;
conn->security = &rxrpc_no_security;
@@ -157,7 +158,7 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
_enter("%d,%x", conn->debug_id, call->cid);
- if (rcu_access_pointer(chan->call) == call) {
+ if (chan->call == call) {
/* Save the result of the call so that we can repeat it if necessary
* through the channel, whilst disposing of the actual call record.
*/
@@ -177,12 +178,9 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
break;
}
- /* Sync with rxrpc_conn_retransmit(). */
- smp_wmb();
chan->last_call = chan->call_id;
chan->call_id = chan->call_counter;
-
- rcu_assign_pointer(chan->call, NULL);
+ chan->call = NULL;
}
_leave("");
@@ -207,10 +205,7 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
if (rxrpc_is_client_call(call))
return rxrpc_disconnect_client_call(conn->bundle, call);
- spin_lock(&conn->bundle->channel_lock);
__rxrpc_disconnect_call(conn, call);
- spin_unlock(&conn->bundle->channel_lock);
-
set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
conn->idle_timestamp = jiffies;
if (atomic_dec_and_test(&conn->active))
@@ -311,10 +306,10 @@ static void rxrpc_clean_up_connection(struct work_struct *work)
container_of(work, struct rxrpc_connection, destructor);
struct rxrpc_net *rxnet = conn->rxnet;
- ASSERT(!rcu_access_pointer(conn->channels[0].call) &&
- !rcu_access_pointer(conn->channels[1].call) &&
- !rcu_access_pointer(conn->channels[2].call) &&
- !rcu_access_pointer(conn->channels[3].call));
+ ASSERT(!conn->channels[0].call &&
+ !conn->channels[1].call &&
+ !conn->channels[2].call &&
+ !conn->channels[3].call);
ASSERT(list_empty(&conn->cache_link));
del_timer_sync(&conn->timer);
diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c
index 2a55a88b2a5b..f30323de82bd 100644
--- a/net/rxrpc/conn_service.c
+++ b/net/rxrpc/conn_service.c
@@ -11,7 +11,6 @@
static struct rxrpc_bundle rxrpc_service_dummy_bundle = {
.ref = REFCOUNT_INIT(1),
.debug_id = UINT_MAX,
- .channel_lock = __SPIN_LOCK_UNLOCKED(&rxrpc_service_dummy_bundle.channel_lock),
};
/*
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index 6153ea162153..4990ab410b4d 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -376,10 +376,7 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
return just_discard;
}
- rcu_read_lock();
- call = rxrpc_try_get_call(rcu_dereference(chan->call),
- rxrpc_call_get_input);
- rcu_read_unlock();
+ call = rxrpc_try_get_call(chan->call, rxrpc_call_get_input);
if (sp->hdr.callNumber > chan->call_id) {
if (rxrpc_to_client(sp))
@@ -443,6 +440,9 @@ int rxrpc_io_thread(void *data)
&local->client_conn_flags))
rxrpc_discard_expired_client_conns(local);
+ if (!list_empty(&local->new_client_calls))
+ rxrpc_connect_client_calls(local);
+
/* Deal with calls that want immediate attention. */
if ((call = list_first_entry_or_null(&local->call_attend_q,
struct rxrpc_call,
@@ -506,7 +506,10 @@ int rxrpc_io_thread(void *data)
set_current_state(TASK_INTERRUPTIBLE);
if (!skb_queue_empty(&local->rx_queue) ||
!list_empty(&local->call_attend_q) ||
- !list_empty(&local->conn_attend_q)) {
+ !list_empty(&local->conn_attend_q) ||
+ !list_empty(&local->new_client_calls) ||
+ test_bit(RXRPC_CLIENT_CONN_REAP_TIMER,
+ &local->client_conn_flags)) {
__set_current_state(TASK_RUNNING);
continue;
}
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 6a463e90cd8b..ca3e89684d8b 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -118,7 +118,6 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net,
local->client_bundles = RB_ROOT;
spin_lock_init(&local->client_bundles_lock);
local->kill_all_client_conns = false;
- spin_lock_init(&local->client_conn_cache_lock);
INIT_LIST_HEAD(&local->idle_client_conns);
timer_setup(&local->client_conn_reap_timer,
rxrpc_client_conn_reap_timeout, 0);
@@ -134,7 +133,8 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net,
if (tmp == 0)
tmp = 1;
idr_set_cursor(&local->conn_ids, tmp);
- spin_lock_init(&local->conn_lock);
+ INIT_LIST_HEAD(&local->new_client_calls);
+ spin_lock_init(&local->client_call_lock);
trace_rxrpc_local(local->debug_id, rxrpc_local_new, 1, 1);
}
@@ -437,7 +437,7 @@ void rxrpc_destroy_local(struct rxrpc_local *local)
rxrpc_purge_queue(&local->rx_delay_queue);
#endif
rxrpc_purge_queue(&local->rx_queue);
- rxrpc_destroy_client_conn_ids(local);
+ rxrpc_purge_client_connections(local);
}
/*
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index e5cbe88daaa5..fdb0e11a583e 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -12,6 +12,7 @@
static const char *const rxrpc_conn_states[RXRPC_CONN__NR_STATES] = {
[RXRPC_CONN_UNUSED] = "Unused ",
+ [RXRPC_CONN_CLIENT_UNSECURED] = "ClUnsec ",
[RXRPC_CONN_CLIENT] = "Client ",
[RXRPC_CONN_SERVICE_PREALLOC] = "SvPrealc",
[RXRPC_CONN_SERVICE_UNSECURED] = "SvUnsec ",
diff --git a/net/rxrpc/rxkad.c b/net/rxrpc/rxkad.c
index 9abf6bb56b65..82fee7c47481 100644
--- a/net/rxrpc/rxkad.c
+++ b/net/rxrpc/rxkad.c
@@ -1122,36 +1122,31 @@ static int rxkad_verify_response(struct rxrpc_connection *conn,
goto protocol_error_free;
}
- spin_lock(&conn->bundle->channel_lock);
for (i = 0; i < RXRPC_MAXCALLS; i++) {
- struct rxrpc_call *call;
u32 call_id = ntohl(response->encrypted.call_id[i]);
+ u32 counter = READ_ONCE(conn->channels[i].call_counter);
if (call_id > INT_MAX) {
rxrpc_abort_conn(conn, skb, RXKADSEALEDINCON, -EPROTO,
rxkad_abort_resp_bad_callid);
- goto protocol_error_unlock;
+ goto protocol_error_free;
}
- if (call_id < conn->channels[i].call_counter) {
+ if (call_id < counter) {
rxrpc_abort_conn(conn, skb, RXKADSEALEDINCON, -EPROTO,
rxkad_abort_resp_call_ctr);
- goto protocol_error_unlock;
+ goto protocol_error_free;
}
- if (call_id > conn->channels[i].call_counter) {
- call = rcu_dereference_protected(
- conn->channels[i].call,
- lockdep_is_held(&conn->bundle->channel_lock));
- if (call && !__rxrpc_call_is_complete(call)) {
+ if (call_id > counter) {
+ if (conn->channels[i].call) {
rxrpc_abort_conn(conn, skb, RXKADSEALEDINCON, -EPROTO,
rxkad_abort_resp_call_state);
- goto protocol_error_unlock;
+ goto protocol_error_free;
}
conn->channels[i].call_counter = call_id;
}
}
- spin_unlock(&conn->bundle->channel_lock);
if (ntohl(response->encrypted.inc_nonce) != conn->rxkad.nonce + 1) {
rxrpc_abort_conn(conn, skb, RXKADOUTOFSEQUENCE, -EPROTO,
@@ -1179,8 +1174,6 @@ static int rxkad_verify_response(struct rxrpc_connection *conn,
_leave(" = 0");
return 0;
-protocol_error_unlock:
- spin_unlock(&conn->bundle->channel_lock);
protocol_error_free:
kfree(ticket);
protocol_error:
diff --git a/net/rxrpc/security.c b/net/rxrpc/security.c
index 78af14694618..fcb0846406c0 100644
--- a/net/rxrpc/security.c
+++ b/net/rxrpc/security.c
@@ -97,38 +97,32 @@ int rxrpc_init_client_call_security(struct rxrpc_call *call)
*/
int rxrpc_init_client_conn_security(struct rxrpc_connection *conn)
{
- const struct rxrpc_security *sec;
struct rxrpc_key_token *token;
struct key *key = conn->key;
- int ret;
+ int ret = 0;
_enter("{%d},{%x}", conn->debug_id, key_serial(key));
- if (!key)
- return 0;
-
- ret = key_validate(key);
- if (ret < 0)
- return ret;
-
for (token = key->payload.data[0]; token; token = token->next) {
- sec = rxrpc_security_lookup(token->security_index);
- if (sec)
+ if (token->security_index == conn->security->security_index)
goto found;
}
return -EKEYREJECTED;
found:
- conn->security = sec;
-
- ret = conn->security->init_connection_security(conn, token);
- if (ret < 0) {
- conn->security = &rxrpc_no_security;
- return ret;
+ mutex_lock(&conn->security_lock);
+ if (conn->state == RXRPC_CONN_CLIENT_UNSECURED) {
+ ret = conn->security->init_connection_security(conn, token);
+ if (ret == 0) {
+ spin_lock(&conn->state_lock);
+ if (conn->state == RXRPC_CONN_CLIENT_UNSECURED)
+ smp_store_release(&conn->state,
+ RXRPC_CONN_CLIENT);
+ spin_unlock(&conn->state_lock);
+ }
}
-
- _leave(" = 0");
- return 0;
+ mutex_unlock(&conn->security_lock);
+ return ret;
}
/*
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index b0182de63226..7c24e681b9bf 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -37,6 +37,60 @@ bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error,
return false;
}
+/*
+ * Wait for a call to become connected. Interruption here doesn't cause the
+ * call to be aborted.
+ */
+static int rxrpc_wait_to_be_connected(struct rxrpc_call *call, long *timeo)
+{
+ DECLARE_WAITQUEUE(myself, current);
+ int ret = 0;
+
+ _enter("%d", call->debug_id);
+
+ if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN)
+ return call->error;
+
+ add_wait_queue_exclusive(&call->waitq, &myself);
+
+ for (;;) {
+ ret = call->error;
+ if (ret < 0)
+ break;
+
+ switch (call->interruptibility) {
+ case RXRPC_INTERRUPTIBLE:
+ case RXRPC_PREINTERRUPTIBLE:
+ set_current_state(TASK_INTERRUPTIBLE);
+ break;
+ case RXRPC_UNINTERRUPTIBLE:
+ default:
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ break;
+ }
+ if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN) {
+ ret = call->error;
+ break;
+ }
+ if ((call->interruptibility == RXRPC_INTERRUPTIBLE ||
+ call->interruptibility == RXRPC_PREINTERRUPTIBLE) &&
+ signal_pending(current)) {
+ ret = sock_intr_errno(*timeo);
+ break;
+ }
+ *timeo = schedule_timeout(*timeo);
+ }
+
+ remove_wait_queue(&call->waitq, &myself);
+ __set_current_state(TASK_RUNNING);
+
+ if (ret == 0 && rxrpc_call_is_complete(call))
+ ret = call->error;
+
+ _leave(" = %d", ret);
+ return ret;
+}
+
/*
* Return true if there's sufficient Tx queue space.
*/
@@ -238,6 +292,16 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
+ ret = rxrpc_wait_to_be_connected(call, &timeo);
+ if (ret < 0)
+ return ret;
+
+ if (call->conn->state == RXRPC_CONN_CLIENT_UNSECURED) {
+ ret = rxrpc_init_client_conn_security(call->conn);
+ if (ret < 0)
+ return ret;
+ }
+
/* this should be in poll */
sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
More information about the linux-afs
mailing list