aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2016-08-24 02:30:52 -0400
committerDavid Howells <dhowells@redhat.com>2016-08-24 10:17:14 -0400
commit45025bceef17ed5d5ed3006b63c85cf289f79dc8 (patch)
tree242a154cebc7e8420fda34ceefdd47d0491be810
parent4d028b2c82991e2f9ae89ad90aeaaeb713495043 (diff)
rxrpc: Improve management and caching of client connection objects
Improve the management and caching of client rxrpc connection objects. From this point, client connections will be managed separately from service connections because AF_RXRPC controls the creation and re-use of client connections but doesn't have that luxury with service connections. Further, there will be limits on the numbers of client connections that may be live on a machine. No direct restriction will be placed on the number of client calls, excepting that each client connection can support a maximum of four concurrent calls. Note that, for a number of reasons, we don't want to simply discard a client connection as soon as the last call is apparently finished: (1) Security is negotiated per-connection and the context is then shared between all calls on that connection. The context can be negotiated again if the connection lapses, but that involves holding up calls whilst at least two packets are exchanged and various crypto bits are performed - so we'd ideally like to cache it for a little while at least. (2) If a packet goes astray, we will need to retransmit a final ACK or ABORT packet. To make this work, we need to keep around the connection details for a little while. (3) The locally held structures represent some amount of setup time, to be weighed against their occupation of memory when idle. To this end, the client connection cache is managed by a state machine on each connection. There are five states: (1) INACTIVE - The connection is not held in any list and may not have been exposed to the world. If it has been previously exposed, it was discarded from the idle list after expiring. (2) WAITING - The connection is waiting for the number of client conns to drop below the maximum capacity. Calls may be in progress upon it from when it was active and got culled. The connection is on the rxrpc_waiting_client_conns list which is kept in to-be-granted order. Culled conns with waiters go to the back of the queue just like new conns. (3) ACTIVE - The connection has at least one call in progress upon it, it may freely grant available channels to new calls and calls may be waiting on it for channels to become available. The connection is on the rxrpc_active_client_conns list which is kept in activation order for culling purposes. (4) CULLED - The connection got summarily culled to try and free up capacity. Calls currently in progress on the connection are allowed to continue, but new calls will have to wait. There can be no waiters in this state - the conn would have to go to the WAITING state instead. (5) IDLE - The connection has no calls in progress upon it and must have been exposed to the world (ie. the EXPOSED flag must be set). When it expires, the EXPOSED flag is cleared and the connection transitions to the INACTIVE state. The connection is on the rxrpc_idle_client_conns list which is kept in order of how soon they'll expire. A connection in the ACTIVE or CULLED state must have at least one active call upon it; if in the WAITING state it may have active calls upon it; other states may not have active calls. As long as a connection remains active and doesn't get culled, it may continue to process calls - even if there are connections on the wait queue. This simplifies things a bit and reduces the amount of checking we need do. There are a couple flags of relevance to the cache: (1) EXPOSED - The connection ID got exposed to the world. If this flag is set, an extra ref is added to the connection preventing it from being reaped when it has no calls outstanding. This flag is cleared and the ref dropped when a conn is discarded from the idle list. (2) DONT_REUSE - The connection should be discarded as soon as possible and should not be reused. This commit also provides a number of new settings: (*) /proc/net/rxrpc/max_client_conns The maximum number of live client connections. Above this number, new connections get added to the wait list and must wait for an active conn to be culled. Culled connections can be reused, but they will go to the back of the wait list and have to wait. (*) /proc/net/rxrpc/reap_client_conns If the number of desired connections exceeds the maximum above, the active connection list will be culled until there are only this many left in it. (*) /proc/net/rxrpc/idle_conn_expiry The normal expiry time for a client connection, provided there are fewer than reap_client_conns of them around. (*) /proc/net/rxrpc/idle_conn_fast_expiry The expedited expiry time, used when there are more than reap_client_conns of them around. Note that I combined the Tx wait queue with the channel grant wait queue to save space as only one of these should be in use at once. Note also that, for the moment, the service connection cache still uses the old connection management code. Signed-off-by: David Howells <dhowells@redhat.com>
-rw-r--r--net/rxrpc/ar-internal.h56
-rw-r--r--net/rxrpc/call_event.c4
-rw-r--r--net/rxrpc/call_object.c5
-rw-r--r--net/rxrpc/conn_client.c910
-rw-r--r--net/rxrpc/conn_object.c71
-rw-r--r--net/rxrpc/conn_service.c5
-rw-r--r--net/rxrpc/output.c6
-rw-r--r--net/rxrpc/sysctl.c33
8 files changed, 933 insertions, 157 deletions
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 2efbfba87cbe..c761124961cc 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -255,6 +255,9 @@ enum rxrpc_conn_flag {
255 RXRPC_CONN_HAS_IDR, /* Has a client conn ID assigned */ 255 RXRPC_CONN_HAS_IDR, /* Has a client conn ID assigned */
256 RXRPC_CONN_IN_SERVICE_CONNS, /* Conn is in peer->service_conns */ 256 RXRPC_CONN_IN_SERVICE_CONNS, /* Conn is in peer->service_conns */
257 RXRPC_CONN_IN_CLIENT_CONNS, /* Conn is in local->client_conns */ 257 RXRPC_CONN_IN_CLIENT_CONNS, /* Conn is in local->client_conns */
258 RXRPC_CONN_EXPOSED, /* Conn has extra ref for exposure */
259 RXRPC_CONN_DONT_REUSE, /* Don't reuse this connection */
260 RXRPC_CONN_COUNTED, /* Counted by rxrpc_nr_client_conns */
258}; 261};
259 262
260/* 263/*
@@ -265,6 +268,17 @@ enum rxrpc_conn_event {
265}; 268};
266 269
267/* 270/*
271 * The connection cache state.
272 */
273enum rxrpc_conn_cache_state {
274 RXRPC_CONN_CLIENT_INACTIVE, /* Conn is not yet listed */
275 RXRPC_CONN_CLIENT_WAITING, /* Conn is on wait list, waiting for capacity */
276 RXRPC_CONN_CLIENT_ACTIVE, /* Conn is on active list, doing calls */
277 RXRPC_CONN_CLIENT_CULLED, /* Conn is culled and delisted, doing calls */
278 RXRPC_CONN_CLIENT_IDLE, /* Conn is on idle list, doing mostly nothing */
279};
280
281/*
268 * The connection protocol state. 282 * The connection protocol state.
269 */ 283 */
270enum rxrpc_conn_proto_state { 284enum rxrpc_conn_proto_state {
@@ -276,6 +290,7 @@ enum rxrpc_conn_proto_state {
276 RXRPC_CONN_REMOTELY_ABORTED, /* Conn aborted by peer */ 290 RXRPC_CONN_REMOTELY_ABORTED, /* Conn aborted by peer */
277 RXRPC_CONN_LOCALLY_ABORTED, /* Conn aborted locally */ 291 RXRPC_CONN_LOCALLY_ABORTED, /* Conn aborted locally */
278 RXRPC_CONN_NETWORK_ERROR, /* Conn terminated by network error */ 292 RXRPC_CONN_NETWORK_ERROR, /* Conn terminated by network error */
293 RXRPC_CONN_LOCAL_ERROR, /* Conn terminated by local error */
279 RXRPC_CONN__NR_STATES 294 RXRPC_CONN__NR_STATES
280}; 295};
281 296
@@ -288,8 +303,14 @@ struct rxrpc_connection {
288 struct rxrpc_conn_proto proto; 303 struct rxrpc_conn_proto proto;
289 struct rxrpc_conn_parameters params; 304 struct rxrpc_conn_parameters params;
290 305
291 spinlock_t channel_lock; 306 atomic_t usage;
307 struct rcu_head rcu;
308 struct list_head cache_link;
292 309
310 spinlock_t channel_lock;
311 unsigned char active_chans; /* Mask of active channels */
312#define RXRPC_ACTIVE_CHANS_MASK ((1 << RXRPC_MAXCALLS) - 1)
313 struct list_head waiting_calls; /* Calls waiting for channels */
293 struct rxrpc_channel { 314 struct rxrpc_channel {
294 struct rxrpc_call __rcu *call; /* Active call */ 315 struct rxrpc_call __rcu *call; /* Active call */
295 u32 call_id; /* ID of current call */ 316 u32 call_id; /* ID of current call */
@@ -302,9 +323,7 @@ struct rxrpc_connection {
302 u32 last_abort; 323 u32 last_abort;
303 }; 324 };
304 } channels[RXRPC_MAXCALLS]; 325 } channels[RXRPC_MAXCALLS];
305 wait_queue_head_t channel_wq; /* queue to wait for channel to become available */
306 326
307 struct rcu_head rcu;
308 struct work_struct processor; /* connection event processor */ 327 struct work_struct processor; /* connection event processor */
309 union { 328 union {
310 struct rb_node client_node; /* Node in local->client_conns */ 329 struct rb_node client_node; /* Node in local->client_conns */
@@ -321,7 +340,7 @@ struct rxrpc_connection {
321 unsigned long events; 340 unsigned long events;
322 unsigned long idle_timestamp; /* Time at which last became idle */ 341 unsigned long idle_timestamp; /* Time at which last became idle */
323 spinlock_t state_lock; /* state-change lock */ 342 spinlock_t state_lock; /* state-change lock */
324 atomic_t usage; 343 enum rxrpc_conn_cache_state cache_state : 8;
325 enum rxrpc_conn_proto_state state : 8; /* current state of connection */ 344 enum rxrpc_conn_proto_state state : 8; /* current state of connection */
326 u32 local_abort; /* local abort code */ 345 u32 local_abort; /* local abort code */
327 u32 remote_abort; /* remote abort code */ 346 u32 remote_abort; /* remote abort code */
@@ -329,7 +348,6 @@ struct rxrpc_connection {
329 int debug_id; /* debug ID for printks */ 348 int debug_id; /* debug ID for printks */
330 atomic_t serial; /* packet serial number counter */ 349 atomic_t serial; /* packet serial number counter */
331 unsigned int hi_serial; /* highest serial number received */ 350 unsigned int hi_serial; /* highest serial number received */
332 atomic_t avail_chans; /* number of channels available */
333 u8 size_align; /* data size alignment (for security) */ 351 u8 size_align; /* data size alignment (for security) */
334 u8 header_size; /* rxrpc + security header size */ 352 u8 header_size; /* rxrpc + security header size */
335 u8 security_size; /* security header size */ 353 u8 security_size; /* security header size */
@@ -351,6 +369,7 @@ enum rxrpc_call_flag {
351 RXRPC_CALL_HAS_USERID, /* has a user ID attached */ 369 RXRPC_CALL_HAS_USERID, /* has a user ID attached */
352 RXRPC_CALL_EXPECT_OOS, /* expect out of sequence packets */ 370 RXRPC_CALL_EXPECT_OOS, /* expect out of sequence packets */
353 RXRPC_CALL_IS_SERVICE, /* Call is service call */ 371 RXRPC_CALL_IS_SERVICE, /* Call is service call */
372 RXRPC_CALL_EXPOSED, /* The call was exposed to the world */
354}; 373};
355 374
356/* 375/*
@@ -417,13 +436,14 @@ struct rxrpc_call {
417 struct work_struct destroyer; /* call destroyer */ 436 struct work_struct destroyer; /* call destroyer */
418 struct work_struct processor; /* packet processor and ACK generator */ 437 struct work_struct processor; /* packet processor and ACK generator */
419 struct list_head link; /* link in master call list */ 438 struct list_head link; /* link in master call list */
439 struct list_head chan_wait_link; /* Link in conn->waiting_calls */
420 struct hlist_node error_link; /* link in error distribution list */ 440 struct hlist_node error_link; /* link in error distribution list */
421 struct list_head accept_link; /* calls awaiting acceptance */ 441 struct list_head accept_link; /* calls awaiting acceptance */
422 struct rb_node sock_node; /* node in socket call tree */ 442 struct rb_node sock_node; /* node in socket call tree */
423 struct sk_buff_head rx_queue; /* received packets */ 443 struct sk_buff_head rx_queue; /* received packets */
424 struct sk_buff_head rx_oos_queue; /* packets received out of sequence */ 444 struct sk_buff_head rx_oos_queue; /* packets received out of sequence */
425 struct sk_buff *tx_pending; /* Tx socket buffer being filled */ 445 struct sk_buff *tx_pending; /* Tx socket buffer being filled */
426 wait_queue_head_t tx_waitq; /* wait for Tx window space to become available */ 446 wait_queue_head_t waitq; /* Wait queue for channel or Tx */
427 __be32 crypto_buf[2]; /* Temporary packet crypto buffer */ 447 __be32 crypto_buf[2]; /* Temporary packet crypto buffer */
428 unsigned long user_call_ID; /* user-defined call ID */ 448 unsigned long user_call_ID; /* user-defined call ID */
429 unsigned long creation_jif; /* time of call creation */ 449 unsigned long creation_jif; /* time of call creation */
@@ -546,12 +566,19 @@ static inline bool rxrpc_is_client_call(const struct rxrpc_call *call)
546/* 566/*
547 * conn_client.c 567 * conn_client.c
548 */ 568 */
569extern unsigned int rxrpc_max_client_connections;
570extern unsigned int rxrpc_reap_client_connections;
571extern unsigned int rxrpc_conn_idle_client_expiry;
572extern unsigned int rxrpc_conn_idle_client_fast_expiry;
549extern struct idr rxrpc_client_conn_ids; 573extern struct idr rxrpc_client_conn_ids;
550 574
551void rxrpc_destroy_client_conn_ids(void); 575void rxrpc_destroy_client_conn_ids(void);
552int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *, 576int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *,
553 struct sockaddr_rxrpc *, gfp_t); 577 struct sockaddr_rxrpc *, gfp_t);
554void rxrpc_unpublish_client_conn(struct rxrpc_connection *); 578void rxrpc_expose_client_call(struct rxrpc_call *);
579void rxrpc_disconnect_client_call(struct rxrpc_call *);
580void rxrpc_put_client_conn(struct rxrpc_connection *);
581void __exit rxrpc_destroy_all_client_connections(void);
555 582
556/* 583/*
557 * conn_event.c 584 * conn_event.c
@@ -572,8 +599,9 @@ int rxrpc_extract_addr_from_skb(struct sockaddr_rxrpc *, struct sk_buff *);
572struct rxrpc_connection *rxrpc_alloc_connection(gfp_t); 599struct rxrpc_connection *rxrpc_alloc_connection(gfp_t);
573struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *, 600struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *,
574 struct sk_buff *); 601 struct sk_buff *);
575void __rxrpc_disconnect_call(struct rxrpc_call *); 602void __rxrpc_disconnect_call(struct rxrpc_connection *, struct rxrpc_call *);
576void rxrpc_disconnect_call(struct rxrpc_call *); 603void rxrpc_disconnect_call(struct rxrpc_call *);
604void rxrpc_kill_connection(struct rxrpc_connection *);
577void __rxrpc_put_connection(struct rxrpc_connection *); 605void __rxrpc_put_connection(struct rxrpc_connection *);
578void __exit rxrpc_destroy_all_connections(void); 606void __exit rxrpc_destroy_all_connections(void);
579 607
@@ -600,8 +628,16 @@ struct rxrpc_connection *rxrpc_get_connection_maybe(struct rxrpc_connection *con
600 628
601static inline void rxrpc_put_connection(struct rxrpc_connection *conn) 629static inline void rxrpc_put_connection(struct rxrpc_connection *conn)
602{ 630{
603 if (conn && atomic_dec_return(&conn->usage) == 1) 631 if (!conn)
604 __rxrpc_put_connection(conn); 632 return;
633
634 if (rxrpc_conn_is_client(conn)) {
635 if (atomic_dec_and_test(&conn->usage))
636 rxrpc_put_client_conn(conn);
637 } else {
638 if (atomic_dec_return(&conn->usage) == 1)
639 __rxrpc_put_connection(conn);
640 }
605} 641}
606 642
607 643
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index 3d1961d82325..5292bcfd8816 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -193,6 +193,8 @@ static void rxrpc_resend(struct rxrpc_call *call)
193 stop = true; 193 stop = true;
194 sp->resend_at = jiffies + 3; 194 sp->resend_at = jiffies + 3;
195 } else { 195 } else {
196 if (rxrpc_is_client_call(call))
197 rxrpc_expose_client_call(call);
196 sp->resend_at = 198 sp->resend_at =
197 jiffies + rxrpc_resend_timeout; 199 jiffies + rxrpc_resend_timeout;
198 } 200 }
@@ -378,7 +380,7 @@ static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
378 call->acks_hard++; 380 call->acks_hard++;
379 } 381 }
380 382
381 wake_up(&call->tx_waitq); 383 wake_up(&call->waitq);
382} 384}
383 385
384/* 386/*
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index f23432591a0f..e7cbcc4a87cf 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -127,10 +127,11 @@ static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
127 INIT_WORK(&call->destroyer, &rxrpc_destroy_call); 127 INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
128 INIT_WORK(&call->processor, &rxrpc_process_call); 128 INIT_WORK(&call->processor, &rxrpc_process_call);
129 INIT_LIST_HEAD(&call->link); 129 INIT_LIST_HEAD(&call->link);
130 INIT_LIST_HEAD(&call->chan_wait_link);
130 INIT_LIST_HEAD(&call->accept_link); 131 INIT_LIST_HEAD(&call->accept_link);
131 skb_queue_head_init(&call->rx_queue); 132 skb_queue_head_init(&call->rx_queue);
132 skb_queue_head_init(&call->rx_oos_queue); 133 skb_queue_head_init(&call->rx_oos_queue);
133 init_waitqueue_head(&call->tx_waitq); 134 init_waitqueue_head(&call->waitq);
134 spin_lock_init(&call->lock); 135 spin_lock_init(&call->lock);
135 rwlock_init(&call->state_lock); 136 rwlock_init(&call->state_lock);
136 atomic_set(&call->usage, 1); 137 atomic_set(&call->usage, 1);
@@ -358,7 +359,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
358 call->debug_id, rxrpc_call_states[call->state]); 359 call->debug_id, rxrpc_call_states[call->state]);
359 360
360 if (call->state >= RXRPC_CALL_COMPLETE) { 361 if (call->state >= RXRPC_CALL_COMPLETE) {
361 __rxrpc_disconnect_call(call); 362 __rxrpc_disconnect_call(conn, call);
362 } else { 363 } else {
363 spin_unlock(&conn->channel_lock); 364 spin_unlock(&conn->channel_lock);
364 kmem_cache_free(rxrpc_call_jar, candidate); 365 kmem_cache_free(rxrpc_call_jar, candidate);
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index 6e1099ed1dbd..349402b08e5a 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -7,6 +7,68 @@
7 * modify it under the terms of the GNU General Public Licence 7 * modify it under the terms of the GNU General Public Licence
8 * as published by the Free Software Foundation; either version 8 * as published by the Free Software Foundation; either version
9 * 2 of the Licence, or (at your option) any later version. 9 * 2 of the Licence, or (at your option) any later version.
10 *
11 *
12 * Client connections need to be cached for a little while after they've made a
13 * call so as to handle retransmitted DATA packets in case the server didn't
14 * receive the final ACK or terminating ABORT we sent it.
15 *
16 * Client connections can be in one of a number of cache states:
17 *
18 * (1) INACTIVE - The connection is not held in any list and may not have been
19 * exposed to the world. If it has been previously exposed, it was
20 * discarded from the idle list after expiring.
21 *
22 * (2) WAITING - The connection is waiting for the number of client conns to
23 * drop below the maximum capacity. Calls may be in progress upon it from
24 * when it was active and got culled.
25 *
26 * The connection is on the rxrpc_waiting_client_conns list which is kept
27 * in to-be-granted order. Culled conns with waiters go to the back of
28 * the queue just like new conns.
29 *
30 * (3) ACTIVE - The connection has at least one call in progress upon it, it
31 * may freely grant available channels to new calls and calls may be
32 * waiting on it for channels to become available.
33 *
34 * The connection is on the rxrpc_active_client_conns list which is kept
35 * in activation order for culling purposes.
36 *
37 * rxrpc_nr_active_client_conns is held incremented also.
38 *
39 * (4) CULLED - The connection got summarily culled to try and free up
40 * capacity. Calls currently in progress on the connection are allowed to
41 * continue, but new calls will have to wait. There can be no waiters in
42 * this state - the conn would have to go to the WAITING state instead.
43 *
44 * (5) IDLE - The connection has no calls in progress upon it and must have
45 * been exposed to the world (ie. the EXPOSED flag must be set). When it
46 * expires, the EXPOSED flag is cleared and the connection transitions to
47 * the INACTIVE state.
48 *
49 * The connection is on the rxrpc_idle_client_conns list which is kept in
50 * order of how soon they'll expire.
51 *
52 * There are flags of relevance to the cache:
53 *
54 * (1) EXPOSED - The connection ID got exposed to the world. If this flag is
55 * set, an extra ref is added to the connection preventing it from being
56 * reaped when it has no calls outstanding. This flag is cleared and the
57 * ref dropped when a conn is discarded from the idle list.
58 *
59 * This allows us to move terminal call state retransmission to the
60 * connection and to discard the call immediately we think it is done
61 * with. It also give us a chance to reuse the connection.
62 *
63 * (2) DONT_REUSE - The connection should be discarded as soon as possible and
64 * should not be reused. This is set when an exclusive connection is used
65 * or a call ID counter overflows.
66 *
67 * The caching state may only be changed if the cache lock is held.
68 *
69 * There are two idle client connection expiry durations. If the total number
70 * of connections is below the reap threshold, we use the normal duration; if
71 * it's above, we use the fast duration.
10 */ 72 */
11 73
12#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 74#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
@@ -16,22 +78,37 @@
16#include <linux/timer.h> 78#include <linux/timer.h>
17#include "ar-internal.h" 79#include "ar-internal.h"
18 80
81__read_mostly unsigned int rxrpc_max_client_connections = 1000;
82__read_mostly unsigned int rxrpc_reap_client_connections = 900;
83__read_mostly unsigned int rxrpc_conn_idle_client_expiry = 2 * 60 * HZ;
84__read_mostly unsigned int rxrpc_conn_idle_client_fast_expiry = 2 * HZ;
85
86static unsigned int rxrpc_nr_client_conns;
87static unsigned int rxrpc_nr_active_client_conns;
88static __read_mostly bool rxrpc_kill_all_client_conns;
89
90static DEFINE_SPINLOCK(rxrpc_client_conn_cache_lock);
91static DEFINE_SPINLOCK(rxrpc_client_conn_discard_mutex);
92static LIST_HEAD(rxrpc_waiting_client_conns);
93static LIST_HEAD(rxrpc_active_client_conns);
94static LIST_HEAD(rxrpc_idle_client_conns);
95
19/* 96/*
20 * We use machine-unique IDs for our client connections. 97 * We use machine-unique IDs for our client connections.
21 */ 98 */
22DEFINE_IDR(rxrpc_client_conn_ids); 99DEFINE_IDR(rxrpc_client_conn_ids);
23static DEFINE_SPINLOCK(rxrpc_conn_id_lock); 100static DEFINE_SPINLOCK(rxrpc_conn_id_lock);
24 101
102static void rxrpc_cull_active_client_conns(void);
103static void rxrpc_discard_expired_client_conns(struct work_struct *);
104
105static DECLARE_DELAYED_WORK(rxrpc_client_conn_reap,
106 rxrpc_discard_expired_client_conns);
107
25/* 108/*
26 * Get a connection ID and epoch for a client connection from the global pool. 109 * Get a connection ID and epoch for a client connection from the global pool.
27 * The connection struct pointer is then recorded in the idr radix tree. The 110 * The connection struct pointer is then recorded in the idr radix tree. The
28 * epoch is changed if this wraps. 111 * epoch is changed if this wraps.
29 *
30 * TODO: The IDR tree gets very expensive on memory if the connection IDs are
31 * widely scattered throughout the number space, so we shall need to retire
32 * connections that have, say, an ID more than four times the maximum number of
33 * client conns away from the current allocation point to try and keep the IDs
34 * concentrated. We will also need to retire connections from an old epoch.
35 */ 112 */
36static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn, 113static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn,
37 gfp_t gfp) 114 gfp_t gfp)
@@ -114,8 +191,7 @@ void rxrpc_destroy_client_conn_ids(void)
114} 191}
115 192
116/* 193/*
117 * Allocate a client connection. The caller must take care to clear any 194 * Allocate a client connection.
118 * padding bytes in *cp.
119 */ 195 */
120static struct rxrpc_connection * 196static struct rxrpc_connection *
121rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp) 197rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
@@ -131,6 +207,10 @@ rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
131 return ERR_PTR(-ENOMEM); 207 return ERR_PTR(-ENOMEM);
132 } 208 }
133 209
210 atomic_set(&conn->usage, 1);
211 if (conn->params.exclusive)
212 __set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
213
134 conn->params = *cp; 214 conn->params = *cp;
135 conn->out_clientflag = RXRPC_CLIENT_INITIATED; 215 conn->out_clientflag = RXRPC_CLIENT_INITIATED;
136 conn->state = RXRPC_CONN_CLIENT; 216 conn->state = RXRPC_CONN_CLIENT;
@@ -148,7 +228,6 @@ rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
148 goto error_2; 228 goto error_2;
149 229
150 write_lock(&rxrpc_connection_lock); 230 write_lock(&rxrpc_connection_lock);
151 list_add_tail(&conn->link, &rxrpc_connections);
152 list_add_tail(&conn->proc_link, &rxrpc_connection_proc_list); 231 list_add_tail(&conn->proc_link, &rxrpc_connection_proc_list);
153 write_unlock(&rxrpc_connection_lock); 232 write_unlock(&rxrpc_connection_lock);
154 233
@@ -171,32 +250,68 @@ error_0:
171} 250}
172 251
173/* 252/*
174 * find a connection for a call 253 * Determine if a connection may be reused.
175 * - called in process context with IRQs enabled
176 */ 254 */
177int rxrpc_connect_call(struct rxrpc_call *call, 255static bool rxrpc_may_reuse_conn(struct rxrpc_connection *conn)
178 struct rxrpc_conn_parameters *cp, 256{
179 struct sockaddr_rxrpc *srx, 257 int id_cursor, id, distance, limit;
180 gfp_t gfp) 258
259 if (test_bit(RXRPC_CONN_DONT_REUSE, &conn->flags))
260 goto dont_reuse;
261
262 if (conn->proto.epoch != rxrpc_epoch)
263 goto mark_dont_reuse;
264
265 /* The IDR tree gets very expensive on memory if the connection IDs are
266 * widely scattered throughout the number space, so we shall want to
267 * kill off connections that, say, have an ID more than about four
268 * times the maximum number of client conns away from the current
269 * allocation point to try and keep the IDs concentrated.
270 */
271 id_cursor = READ_ONCE(rxrpc_client_conn_ids.cur);
272 id = conn->proto.cid >> RXRPC_CIDSHIFT;
273 distance = id - id_cursor;
274 if (distance < 0)
275 distance = -distance;
276 limit = round_up(rxrpc_max_client_connections, IDR_SIZE) * 4;
277 if (distance > limit)
278 goto mark_dont_reuse;
279
280 return true;
281
282mark_dont_reuse:
283 set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
284dont_reuse:
285 return false;
286}
287
288/*
289 * Create or find a client connection to use for a call.
290 *
291 * If we return with a connection, the call will be on its waiting list. It's
292 * left to the caller to assign a channel and wake up the call.
293 */
294static int rxrpc_get_client_conn(struct rxrpc_call *call,
295 struct rxrpc_conn_parameters *cp,
296 struct sockaddr_rxrpc *srx,
297 gfp_t gfp)
181{ 298{
182 struct rxrpc_connection *conn, *candidate = NULL; 299 struct rxrpc_connection *conn, *candidate = NULL;
183 struct rxrpc_local *local = cp->local; 300 struct rxrpc_local *local = cp->local;
184 struct rb_node *p, **pp, *parent; 301 struct rb_node *p, **pp, *parent;
185 long diff; 302 long diff;
186 int chan; 303 int ret = -ENOMEM;
187
188 DECLARE_WAITQUEUE(myself, current);
189 304
190 _enter("{%d,%lx},", call->debug_id, call->user_call_ID); 305 _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
191 306
192 cp->peer = rxrpc_lookup_peer(cp->local, srx, gfp); 307 cp->peer = rxrpc_lookup_peer(cp->local, srx, gfp);
193 if (!cp->peer) 308 if (!cp->peer)
194 return -ENOMEM; 309 goto error;
195 310
311 /* If the connection is not meant to be exclusive, search the available
312 * connections to see if the connection we want to use already exists.
313 */
196 if (!cp->exclusive) { 314 if (!cp->exclusive) {
197 /* Search for a existing client connection unless this is going
198 * to be a connection that's used exclusively for a single call.
199 */
200 _debug("search 1"); 315 _debug("search 1");
201 spin_lock(&local->client_conns_lock); 316 spin_lock(&local->client_conns_lock);
202 p = local->client_conns.rb_node; 317 p = local->client_conns.rb_node;
@@ -207,39 +322,55 @@ int rxrpc_connect_call(struct rxrpc_call *call,
207 diff = (cmp(peer) ?: 322 diff = (cmp(peer) ?:
208 cmp(key) ?: 323 cmp(key) ?:
209 cmp(security_level)); 324 cmp(security_level));
210 if (diff < 0) 325#undef cmp
326 if (diff < 0) {
211 p = p->rb_left; 327 p = p->rb_left;
212 else if (diff > 0) 328 } else if (diff > 0) {
213 p = p->rb_right; 329 p = p->rb_right;
214 else 330 } else {
215 goto found_extant_conn; 331 if (rxrpc_may_reuse_conn(conn) &&
332 rxrpc_get_connection_maybe(conn))
333 goto found_extant_conn;
334 /* The connection needs replacing. It's better
335 * to effect that when we have something to
336 * replace it with so that we don't have to
337 * rebalance the tree twice.
338 */
339 break;
340 }
216 } 341 }
217 spin_unlock(&local->client_conns_lock); 342 spin_unlock(&local->client_conns_lock);
218 } 343 }
219 344
220 /* We didn't find a connection or we want an exclusive one. */ 345 /* There wasn't a connection yet or we need an exclusive connection.
221 _debug("get new conn"); 346 * We need to create a candidate and then potentially redo the search
347 * in case we're racing with another thread also trying to connect on a
348 * shareable connection.
349 */
350 _debug("new conn");
222 candidate = rxrpc_alloc_client_connection(cp, gfp); 351 candidate = rxrpc_alloc_client_connection(cp, gfp);
223 if (!candidate) { 352 if (IS_ERR(candidate)) {
224 _leave(" = -ENOMEM"); 353 ret = PTR_ERR(candidate);
225 return -ENOMEM; 354 goto error_peer;
226 } 355 }
227 356
357 /* Add the call to the new connection's waiting list in case we're
358 * going to have to wait for the connection to come live. It's our
359 * connection, so we want first dibs on the channel slots. We would
360 * normally have to take channel_lock but we do this before anyone else
361 * can see the connection.
362 */
363 list_add_tail(&call->chan_wait_link, &candidate->waiting_calls);
364
228 if (cp->exclusive) { 365 if (cp->exclusive) {
229 /* Assign the call on an exclusive connection to channel 0 and 366 call->conn = candidate;
230 * don't add the connection to the endpoint's shareable conn 367 _leave(" = 0 [exclusive %d]", candidate->debug_id);
231 * lookup tree. 368 return 0;
232 */
233 _debug("exclusive chan 0");
234 conn = candidate;
235 atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1);
236 spin_lock(&conn->channel_lock);
237 chan = 0;
238 goto found_channel;
239 } 369 }
240 370
241 /* We need to redo the search before attempting to add a new connection 371 /* Publish the new connection for userspace to find. We need to redo
242 * lest we race with someone else adding a conflicting instance. 372 * the search before doing this lest we race with someone else adding a
373 * conflicting instance.
243 */ 374 */
244 _debug("search 2"); 375 _debug("search 2");
245 spin_lock(&local->client_conns_lock); 376 spin_lock(&local->client_conns_lock);
@@ -250,123 +381,672 @@ int rxrpc_connect_call(struct rxrpc_call *call,
250 parent = *pp; 381 parent = *pp;
251 conn = rb_entry(parent, struct rxrpc_connection, client_node); 382 conn = rb_entry(parent, struct rxrpc_connection, client_node);
252 383
384#define cmp(X) ((long)conn->params.X - (long)candidate->params.X)
253 diff = (cmp(peer) ?: 385 diff = (cmp(peer) ?:
254 cmp(key) ?: 386 cmp(key) ?:
255 cmp(security_level)); 387 cmp(security_level));
256 if (diff < 0) 388#undef cmp
389 if (diff < 0) {
257 pp = &(*pp)->rb_left; 390 pp = &(*pp)->rb_left;
258 else if (diff > 0) 391 } else if (diff > 0) {
259 pp = &(*pp)->rb_right; 392 pp = &(*pp)->rb_right;
260 else 393 } else {
261 goto found_extant_conn; 394 if (rxrpc_may_reuse_conn(conn) &&
395 rxrpc_get_connection_maybe(conn))
396 goto found_extant_conn;
397 /* The old connection is from an outdated epoch. */
398 _debug("replace conn");
399 clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags);
400 rb_replace_node(&conn->client_node,
401 &candidate->client_node,
402 &local->client_conns);
403 goto candidate_published;
404 }
262 } 405 }
263 406
264 /* The second search also failed; simply add the new connection with
265 * the new call in channel 0. Note that we need to take the channel
266 * lock before dropping the client conn lock.
267 */
268 _debug("new conn"); 407 _debug("new conn");
269 set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
270 rb_link_node(&candidate->client_node, parent, pp); 408 rb_link_node(&candidate->client_node, parent, pp);
271 rb_insert_color(&candidate->client_node, &local->client_conns); 409 rb_insert_color(&candidate->client_node, &local->client_conns);
272attached:
273 conn = candidate;
274 candidate = NULL;
275 410
276 atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1); 411candidate_published:
277 spin_lock(&conn->channel_lock); 412 set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
413 call->conn = candidate;
278 spin_unlock(&local->client_conns_lock); 414 spin_unlock(&local->client_conns_lock);
279 chan = 0; 415 _leave(" = 0 [new %d]", candidate->debug_id);
416 return 0;
280 417
281found_channel: 418 /* We come here if we found a suitable connection already in existence.
282 _debug("found chan"); 419 * Discard any candidate we may have allocated, and try to get a
283 call->conn = conn; 420 * channel on this one.
284 call->peer = rxrpc_get_peer(conn->params.peer); 421 */
285 call->cid = conn->proto.cid | chan; 422found_extant_conn:
286 call->call_id = ++conn->channels[chan].call_counter; 423 _debug("found conn");
287 conn->channels[chan].call_id = call->call_id; 424 spin_unlock(&local->client_conns_lock);
288 rcu_assign_pointer(conn->channels[chan].call, call);
289 425
290 _net("CONNECT call %d on conn %d", call->debug_id, conn->debug_id); 426 rxrpc_put_connection(candidate);
427 candidate = NULL;
291 428
429 spin_lock(&conn->channel_lock);
430 call->conn = conn;
431 list_add(&call->chan_wait_link, &conn->waiting_calls);
292 spin_unlock(&conn->channel_lock); 432 spin_unlock(&conn->channel_lock);
433 _leave(" = 0 [extant %d]", conn->debug_id);
434 return 0;
435
436error_peer:
293 rxrpc_put_peer(cp->peer); 437 rxrpc_put_peer(cp->peer);
294 cp->peer = NULL; 438 cp->peer = NULL;
295 _leave(" = %p {u=%d}", conn, atomic_read(&conn->usage)); 439error:
296 return 0; 440 _leave(" = %d", ret);
441 return ret;
442}
297 443
298 /* We found a potentially suitable connection already in existence. If 444/*
299 * we can reuse it (ie. its usage count hasn't been reduced to 0 by the 445 * Activate a connection.
300 * reaper), discard any candidate we may have allocated, and try to get 446 */
301 * a channel on this one, otherwise we have to replace it. 447static void rxrpc_activate_conn(struct rxrpc_connection *conn)
302 */ 448{
303found_extant_conn: 449 conn->cache_state = RXRPC_CONN_CLIENT_ACTIVE;
304 _debug("found conn"); 450 rxrpc_nr_active_client_conns++;
305 if (!rxrpc_get_connection_maybe(conn)) { 451 list_move_tail(&conn->cache_link, &rxrpc_active_client_conns);
306 set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags); 452}
307 rb_replace_node(&conn->client_node, 453
308 &candidate->client_node, 454/*
309 &local->client_conns); 455 * Attempt to animate a connection for a new call.
310 clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags); 456 *
311 goto attached; 457 * If it's not exclusive, the connection is in the endpoint tree, and we're in
458 * the conn's list of those waiting to grab a channel. There is, however, a
459 * limit on the number of live connections allowed at any one time, so we may
460 * have to wait for capacity to become available.
461 *
462 * Note that a connection on the waiting queue might *also* have active
463 * channels if it has been culled to make space and then re-requested by a new
464 * call.
465 */
466static void rxrpc_animate_client_conn(struct rxrpc_connection *conn)
467{
468 unsigned int nr_conns;
469
470 _enter("%d,%d", conn->debug_id, conn->cache_state);
471
472 if (conn->cache_state == RXRPC_CONN_CLIENT_ACTIVE)
473 goto out;
474
475 spin_lock(&rxrpc_client_conn_cache_lock);
476
477 nr_conns = rxrpc_nr_client_conns;
478 if (!test_and_set_bit(RXRPC_CONN_COUNTED, &conn->flags))
479 rxrpc_nr_client_conns = nr_conns + 1;
480
481 switch (conn->cache_state) {
482 case RXRPC_CONN_CLIENT_ACTIVE:
483 case RXRPC_CONN_CLIENT_WAITING:
484 break;
485
486 case RXRPC_CONN_CLIENT_INACTIVE:
487 case RXRPC_CONN_CLIENT_CULLED:
488 case RXRPC_CONN_CLIENT_IDLE:
489 if (nr_conns >= rxrpc_max_client_connections)
490 goto wait_for_capacity;
491 goto activate_conn;
492
493 default:
494 BUG();
312 } 495 }
313 496
314 spin_unlock(&local->client_conns_lock); 497out_unlock:
498 spin_unlock(&rxrpc_client_conn_cache_lock);
499out:
500 _leave(" [%d]", conn->cache_state);
501 return;
315 502
316 rxrpc_put_connection(candidate); 503activate_conn:
504 _debug("activate");
505 rxrpc_activate_conn(conn);
506 goto out_unlock;
507
508wait_for_capacity:
509 _debug("wait");
510 conn->cache_state = RXRPC_CONN_CLIENT_WAITING;
511 list_move_tail(&conn->cache_link, &rxrpc_waiting_client_conns);
512 goto out_unlock;
513}
514
515/*
516 * Deactivate a channel.
517 */
518static void rxrpc_deactivate_one_channel(struct rxrpc_connection *conn,
519 unsigned int channel)
520{
521 struct rxrpc_channel *chan = &conn->channels[channel];
522
523 rcu_assign_pointer(chan->call, NULL);
524 conn->active_chans &= ~(1 << channel);
525}
526
527/*
528 * Assign a channel to the call at the front of the queue and wake the call up.
529 * We don't increment the callNumber counter until this number has been exposed
530 * to the world.
531 */
532static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
533 unsigned int channel)
534{
535 struct rxrpc_channel *chan = &conn->channels[channel];
536 struct rxrpc_call *call = list_entry(conn->waiting_calls.next,
537 struct rxrpc_call, chan_wait_link);
538 u32 call_id = chan->call_counter + 1;
539
540 list_del_init(&call->chan_wait_link);
541 conn->active_chans |= 1 << channel;
542 call->peer = rxrpc_get_peer(conn->params.peer);
543 call->cid = conn->proto.cid | channel;
544 call->call_id = call_id;
545
546 _net("CONNECT call %08x:%08x as call %d on conn %d",
547 call->cid, call->call_id, call->debug_id, conn->debug_id);
548
549 /* Paired with the read barrier in rxrpc_wait_for_channel(). This
550 * orders cid and epoch in the connection wrt to call_id without the
551 * need to take the channel_lock.
552 *
553 * We provisionally assign a callNumber at this point, but we don't
554 * confirm it until the call is about to be exposed.
555 *
556 * TODO: Pair with a barrier in the data_ready handler when that looks
557 * at the call ID through a connection channel.
558 */
559 smp_wmb();
560 chan->call_id = call_id;
561 rcu_assign_pointer(chan->call, call);
562 wake_up(&call->waitq);
563}
564
565/*
566 * Assign channels and callNumbers to waiting calls.
567 */
568static void rxrpc_activate_channels(struct rxrpc_connection *conn)
569{
570 unsigned char mask;
571
572 _enter("%d", conn->debug_id);
573
574 if (conn->cache_state != RXRPC_CONN_CLIENT_ACTIVE ||
575 conn->active_chans == RXRPC_ACTIVE_CHANS_MASK)
576 return;
577
578 spin_lock(&conn->channel_lock);
579
580 while (!list_empty(&conn->waiting_calls) &&
581 (mask = ~conn->active_chans,
582 mask &= RXRPC_ACTIVE_CHANS_MASK,
583 mask != 0))
584 rxrpc_activate_one_channel(conn, __ffs(mask));
585
586 spin_unlock(&conn->channel_lock);
587 _leave("");
588}
589
590/*
591 * Wait for a callNumber and a channel to be granted to a call.
592 */
593static int rxrpc_wait_for_channel(struct rxrpc_call *call, gfp_t gfp)
594{
595 int ret = 0;
596
597 _enter("%d", call->debug_id);
598
599 if (!call->call_id) {
600 DECLARE_WAITQUEUE(myself, current);
317 601
318 if (!atomic_add_unless(&conn->avail_chans, -1, 0)) {
319 if (!gfpflags_allow_blocking(gfp)) { 602 if (!gfpflags_allow_blocking(gfp)) {
320 rxrpc_put_connection(conn); 603 ret = -EAGAIN;
321 _leave(" = -EAGAIN"); 604 goto out;
322 return -EAGAIN;
323 } 605 }
324 606
325 add_wait_queue(&conn->channel_wq, &myself); 607 add_wait_queue_exclusive(&call->waitq, &myself);
326 for (;;) { 608 for (;;) {
327 set_current_state(TASK_INTERRUPTIBLE); 609 set_current_state(TASK_INTERRUPTIBLE);
328 if (atomic_add_unless(&conn->avail_chans, -1, 0)) 610 if (call->call_id)
611 break;
612 if (signal_pending(current)) {
613 ret = -ERESTARTSYS;
329 break; 614 break;
330 if (signal_pending(current)) 615 }
331 goto interrupted;
332 schedule(); 616 schedule();
333 } 617 }
334 remove_wait_queue(&conn->channel_wq, &myself); 618 remove_wait_queue(&call->waitq, &myself);
335 __set_current_state(TASK_RUNNING); 619 __set_current_state(TASK_RUNNING);
336 } 620 }
337 621
338 /* The connection allegedly now has a free channel and we can now 622 /* Paired with the write barrier in rxrpc_activate_one_channel(). */
339 * attach the call to it. 623 smp_rmb();
340 */ 624
625out:
626 _leave(" = %d", ret);
627 return ret;
628}
629
630/*
631 * find a connection for a call
632 * - called in process context with IRQs enabled
633 */
634int rxrpc_connect_call(struct rxrpc_call *call,
635 struct rxrpc_conn_parameters *cp,
636 struct sockaddr_rxrpc *srx,
637 gfp_t gfp)
638{
639 int ret;
640
641 _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
642
643 rxrpc_discard_expired_client_conns(NULL);
644 rxrpc_cull_active_client_conns();
645
646 ret = rxrpc_get_client_conn(call, cp, srx, gfp);
647 if (ret < 0)
648 return ret;
649
650 rxrpc_animate_client_conn(call->conn);
651 rxrpc_activate_channels(call->conn);
652
653 ret = rxrpc_wait_for_channel(call, gfp);
654 if (ret < 0)
655 rxrpc_disconnect_client_call(call);
656
657 _leave(" = %d", ret);
658 return ret;
659}
660
661/*
662 * Note that a connection is about to be exposed to the world. Once it is
663 * exposed, we maintain an extra ref on it that stops it from being summarily
664 * discarded before it's (a) had a chance to deal with retransmission and (b)
665 * had a chance at re-use (the per-connection security negotiation is
666 * expensive).
667 */
668static void rxrpc_expose_client_conn(struct rxrpc_connection *conn)
669{
670 if (!test_and_set_bit(RXRPC_CONN_EXPOSED, &conn->flags))
671 rxrpc_get_connection(conn);
672}
673
674/*
675 * Note that a call, and thus a connection, is about to be exposed to the
676 * world.
677 */
678void rxrpc_expose_client_call(struct rxrpc_call *call)
679{
680 struct rxrpc_connection *conn = call->conn;
681 struct rxrpc_channel *chan =
682 &conn->channels[call->cid & RXRPC_CHANNELMASK];
683
684 if (!test_and_set_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
685 /* Mark the call ID as being used. If the callNumber counter
686 * exceeds ~2 billion, we kill the connection after its
687 * outstanding calls have finished so that the counter doesn't
688 * wrap.
689 */
690 chan->call_counter++;
691 if (chan->call_counter >= INT_MAX)
692 set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
693 rxrpc_expose_client_conn(conn);
694 }
695}
696
697/*
698 * Disconnect a client call.
699 */
700void rxrpc_disconnect_client_call(struct rxrpc_call *call)
701{
702 unsigned int channel = call->cid & RXRPC_CHANNELMASK;
703 struct rxrpc_connection *conn = call->conn;
704 struct rxrpc_channel *chan = &conn->channels[channel];
705
706 call->conn = NULL;
707
341 spin_lock(&conn->channel_lock); 708 spin_lock(&conn->channel_lock);
342 709
343 for (chan = 0; chan < RXRPC_MAXCALLS; chan++) 710 /* Calls that have never actually been assigned a channel can simply be
344 if (!conn->channels[chan].call) 711 * discarded. If the conn didn't get used either, it will follow
345 goto found_channel; 712 * immediately unless someone else grabs it in the meantime.
346 BUG(); 713 */
714 if (!list_empty(&call->chan_wait_link)) {
715 _debug("call is waiting");
716 ASSERTCMP(call->call_id, ==, 0);
717 ASSERT(!test_bit(RXRPC_CALL_EXPOSED, &call->flags));
718 list_del_init(&call->chan_wait_link);
719
720 /* We must deactivate or idle the connection if it's now
721 * waiting for nothing.
722 */
723 spin_lock(&rxrpc_client_conn_cache_lock);
724 if (conn->cache_state == RXRPC_CONN_CLIENT_WAITING &&
725 list_empty(&conn->waiting_calls) &&
726 !conn->active_chans)
727 goto idle_connection;
728 goto out;
729 }
730
731 ASSERTCMP(rcu_access_pointer(chan->call), ==, call);
732 ASSERTCMP(atomic_read(&conn->usage), >=, 2);
733
734 /* If a client call was exposed to the world, we save the result for
735 * retransmission.
736 *
737 * We use a barrier here so that the call number and abort code can be
738 * read without needing to take a lock.
739 *
740 * TODO: Make the incoming packet handler check this and handle
741 * terminal retransmission without requiring access to the call.
742 */
743 if (test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
744 _debug("exposed %u,%u", call->call_id, call->local_abort);
745 __rxrpc_disconnect_call(conn, call);
746 }
747
748 /* See if we can pass the channel directly to another call. */
749 if (conn->cache_state == RXRPC_CONN_CLIENT_ACTIVE &&
750 !list_empty(&conn->waiting_calls)) {
751 _debug("pass chan");
752 rxrpc_activate_one_channel(conn, channel);
753 goto out_2;
754 }
755
756 /* Things are more complex and we need the cache lock. We might be
757 * able to simply idle the conn or it might now be lurking on the wait
758 * list. It might even get moved back to the active list whilst we're
759 * waiting for the lock.
760 */
761 spin_lock(&rxrpc_client_conn_cache_lock);
762
763 switch (conn->cache_state) {
764 case RXRPC_CONN_CLIENT_ACTIVE:
765 if (list_empty(&conn->waiting_calls)) {
766 rxrpc_deactivate_one_channel(conn, channel);
767 if (!conn->active_chans) {
768 rxrpc_nr_active_client_conns--;
769 goto idle_connection;
770 }
771 goto out;
772 }
773
774 _debug("pass chan 2");
775 rxrpc_activate_one_channel(conn, channel);
776 goto out;
777
778 case RXRPC_CONN_CLIENT_CULLED:
779 rxrpc_deactivate_one_channel(conn, channel);
780 ASSERT(list_empty(&conn->waiting_calls));
781 if (!conn->active_chans)
782 goto idle_connection;
783 goto out;
784
785 case RXRPC_CONN_CLIENT_WAITING:
786 rxrpc_deactivate_one_channel(conn, channel);
787 goto out;
788
789 default:
790 BUG();
791 }
347 792
348interrupted: 793out:
349 remove_wait_queue(&conn->channel_wq, &myself); 794 spin_unlock(&rxrpc_client_conn_cache_lock);
350 __set_current_state(TASK_RUNNING); 795out_2:
796 spin_unlock(&conn->channel_lock);
351 rxrpc_put_connection(conn); 797 rxrpc_put_connection(conn);
352 rxrpc_put_peer(cp->peer); 798 _leave("");
353 cp->peer = NULL; 799 return;
354 _leave(" = -ERESTARTSYS"); 800
355 return -ERESTARTSYS; 801idle_connection:
802 /* As no channels remain active, the connection gets deactivated
803 * immediately or moved to the idle list for a short while.
804 */
805 if (test_bit(RXRPC_CONN_EXPOSED, &conn->flags)) {
806 _debug("make idle");
807 conn->idle_timestamp = jiffies;
808 conn->cache_state = RXRPC_CONN_CLIENT_IDLE;
809 list_move_tail(&conn->cache_link, &rxrpc_idle_client_conns);
810 if (rxrpc_idle_client_conns.next == &conn->cache_link &&
811 !rxrpc_kill_all_client_conns)
812 queue_delayed_work(rxrpc_workqueue,
813 &rxrpc_client_conn_reap,
814 rxrpc_conn_idle_client_expiry);
815 } else {
816 _debug("make inactive");
817 conn->cache_state = RXRPC_CONN_CLIENT_INACTIVE;
818 list_del_init(&conn->cache_link);
819 }
820 goto out;
356} 821}
357 822
358/* 823/*
359 * Remove a client connection from the local endpoint's tree, thereby removing 824 * Clean up a dead client connection.
360 * it as a target for reuse for new client calls.
361 */ 825 */
362void rxrpc_unpublish_client_conn(struct rxrpc_connection *conn) 826static struct rxrpc_connection *
827rxrpc_put_one_client_conn(struct rxrpc_connection *conn)
363{ 828{
829 struct rxrpc_connection *next;
364 struct rxrpc_local *local = conn->params.local; 830 struct rxrpc_local *local = conn->params.local;
831 unsigned int nr_conns;
365 832
366 spin_lock(&local->client_conns_lock); 833 if (test_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags)) {
367 if (test_and_clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags)) 834 spin_lock(&local->client_conns_lock);
368 rb_erase(&conn->client_node, &local->client_conns); 835 if (test_and_clear_bit(RXRPC_CONN_IN_CLIENT_CONNS,
369 spin_unlock(&local->client_conns_lock); 836 &conn->flags))
837 rb_erase(&conn->client_node, &local->client_conns);
838 spin_unlock(&local->client_conns_lock);
839 }
370 840
371 rxrpc_put_client_connection_id(conn); 841 rxrpc_put_client_connection_id(conn);
842
843 ASSERTCMP(conn->cache_state, ==, RXRPC_CONN_CLIENT_INACTIVE);
844
845 if (!test_bit(RXRPC_CONN_COUNTED, &conn->flags))
846 return NULL;
847
848 spin_lock(&rxrpc_client_conn_cache_lock);
849 nr_conns = --rxrpc_nr_client_conns;
850
851 next = NULL;
852 if (nr_conns < rxrpc_max_client_connections &&
853 !list_empty(&rxrpc_waiting_client_conns)) {
854 next = list_entry(rxrpc_waiting_client_conns.next,
855 struct rxrpc_connection, cache_link);
856 rxrpc_get_connection(next);
857 rxrpc_activate_conn(next);
858 }
859
860 spin_unlock(&rxrpc_client_conn_cache_lock);
861 rxrpc_kill_connection(conn);
862
863 if (next)
864 rxrpc_activate_channels(next);
865
866 /* We need to get rid of the temporary ref we took upon next, but we
867 * can't call rxrpc_put_connection() recursively.
868 */
869 return next;
870}
871
872/*
873 * Clean up a dead client connections.
874 */
875void rxrpc_put_client_conn(struct rxrpc_connection *conn)
876{
877 struct rxrpc_connection *next;
878
879 do {
880 _enter("%p{u=%d,d=%d}",
881 conn, atomic_read(&conn->usage), conn->debug_id);
882
883 next = rxrpc_put_one_client_conn(conn);
884
885 if (!next)
886 break;
887 conn = next;
888 } while (atomic_dec_and_test(&conn->usage));
889
890 _leave("");
891}
892
893/*
894 * Kill the longest-active client connections to make room for new ones.
895 */
896static void rxrpc_cull_active_client_conns(void)
897{
898 struct rxrpc_connection *conn;
899 unsigned int nr_conns = rxrpc_nr_client_conns;
900 unsigned int nr_active, limit;
901
902 _enter("");
903
904 ASSERTCMP(nr_conns, >=, 0);
905 if (nr_conns < rxrpc_max_client_connections) {
906 _leave(" [ok]");
907 return;
908 }
909 limit = rxrpc_reap_client_connections;
910
911 spin_lock(&rxrpc_client_conn_cache_lock);
912 nr_active = rxrpc_nr_active_client_conns;
913
914 while (nr_active > limit) {
915 ASSERT(!list_empty(&rxrpc_active_client_conns));
916 conn = list_entry(rxrpc_active_client_conns.next,
917 struct rxrpc_connection, cache_link);
918 ASSERTCMP(conn->cache_state, ==, RXRPC_CONN_CLIENT_ACTIVE);
919
920 if (list_empty(&conn->waiting_calls)) {
921 conn->cache_state = RXRPC_CONN_CLIENT_CULLED;
922 list_del_init(&conn->cache_link);
923 } else {
924 conn->cache_state = RXRPC_CONN_CLIENT_WAITING;
925 list_move_tail(&conn->cache_link,
926 &rxrpc_waiting_client_conns);
927 }
928
929 nr_active--;
930 }
931
932 rxrpc_nr_active_client_conns = nr_active;
933 spin_unlock(&rxrpc_client_conn_cache_lock);
934 ASSERTCMP(nr_active, >=, 0);
935 _leave(" [culled]");
936}
937
938/*
939 * Discard expired client connections from the idle list. Each conn in the
940 * idle list has been exposed and holds an extra ref because of that.
941 *
942 * This may be called from conn setup or from a work item so cannot be
943 * considered non-reentrant.
944 */
945static void rxrpc_discard_expired_client_conns(struct work_struct *work)
946{
947 struct rxrpc_connection *conn;
948 unsigned long expiry, conn_expires_at, now;
949 unsigned int nr_conns;
950 bool did_discard = false;
951
952 _enter("%c", work ? 'w' : 'n');
953
954 if (list_empty(&rxrpc_idle_client_conns)) {
955 _leave(" [empty]");
956 return;
957 }
958
959 /* Don't double up on the discarding */
960 if (!spin_trylock(&rxrpc_client_conn_discard_mutex)) {
961 _leave(" [already]");
962 return;
963 }
964
965 /* We keep an estimate of what the number of conns ought to be after
966 * we've discarded some so that we don't overdo the discarding.
967 */
968 nr_conns = rxrpc_nr_client_conns;
969
970next:
971 spin_lock(&rxrpc_client_conn_cache_lock);
972
973 if (list_empty(&rxrpc_idle_client_conns))
974 goto out;
975
976 conn = list_entry(rxrpc_idle_client_conns.next,
977 struct rxrpc_connection, cache_link);
978 ASSERT(test_bit(RXRPC_CONN_EXPOSED, &conn->flags));
979
980 if (!rxrpc_kill_all_client_conns) {
981 /* If the number of connections is over the reap limit, we
982 * expedite discard by reducing the expiry timeout. We must,
983 * however, have at least a short grace period to be able to do
984 * final-ACK or ABORT retransmission.
985 */
986 expiry = rxrpc_conn_idle_client_expiry;
987 if (nr_conns > rxrpc_reap_client_connections)
988 expiry = rxrpc_conn_idle_client_fast_expiry;
989
990 conn_expires_at = conn->idle_timestamp + expiry;
991
992 now = READ_ONCE(jiffies);
993 if (time_after(conn_expires_at, now))
994 goto not_yet_expired;
995 }
996
997 _debug("discard conn %d", conn->debug_id);
998 if (!test_and_clear_bit(RXRPC_CONN_EXPOSED, &conn->flags))
999 BUG();
1000 conn->cache_state = RXRPC_CONN_CLIENT_INACTIVE;
1001 list_del_init(&conn->cache_link);
1002
1003 spin_unlock(&rxrpc_client_conn_cache_lock);
1004
1005 /* When we cleared the EXPOSED flag, we took on responsibility for the
1006 * reference that that had on the usage count. We deal with that here.
1007 * If someone re-sets the flag and re-gets the ref, that's fine.
1008 */
1009 rxrpc_put_connection(conn);
1010 did_discard = true;
1011 nr_conns--;
1012 goto next;
1013
1014not_yet_expired:
1015 /* The connection at the front of the queue hasn't yet expired, so
1016 * schedule the work item for that point if we discarded something.
1017 *
1018 * We don't worry if the work item is already scheduled - it can look
1019 * after rescheduling itself at a later time. We could cancel it, but
1020 * then things get messier.
1021 */
1022 _debug("not yet");
1023 if (!rxrpc_kill_all_client_conns)
1024 queue_delayed_work(rxrpc_workqueue,
1025 &rxrpc_client_conn_reap,
1026 conn_expires_at - now);
1027
1028out:
1029 spin_unlock(&rxrpc_client_conn_cache_lock);
1030 spin_unlock(&rxrpc_client_conn_discard_mutex);
1031 _leave("");
1032}
1033
1034/*
1035 * Preemptively destroy all the client connection records rather than waiting
1036 * for them to time out
1037 */
1038void __exit rxrpc_destroy_all_client_connections(void)
1039{
1040 _enter("");
1041
1042 spin_lock(&rxrpc_client_conn_cache_lock);
1043 rxrpc_kill_all_client_conns = true;
1044 spin_unlock(&rxrpc_client_conn_cache_lock);
1045
1046 cancel_delayed_work(&rxrpc_client_conn_reap);
1047
1048 if (!queue_delayed_work(rxrpc_workqueue, &rxrpc_client_conn_reap, 0))
1049 _debug("destroy: queue failed");
1050
1051 _leave("");
372} 1052}
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index afc2d83d5995..5b45b6c367e7 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -1,6 +1,6 @@
1/* RxRPC virtual connection handler 1/* RxRPC virtual connection handler, common bits.
2 * 2 *
3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 3 * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com) 4 * Written by David Howells (dhowells@redhat.com)
5 * 5 *
6 * This program is free software; you can redistribute it and/or 6 * This program is free software; you can redistribute it and/or
@@ -15,8 +15,6 @@
15#include <linux/slab.h> 15#include <linux/slab.h>
16#include <linux/net.h> 16#include <linux/net.h>
17#include <linux/skbuff.h> 17#include <linux/skbuff.h>
18#include <net/sock.h>
19#include <net/af_rxrpc.h>
20#include "ar-internal.h" 18#include "ar-internal.h"
21 19
22/* 20/*
@@ -31,6 +29,8 @@ LIST_HEAD(rxrpc_connection_proc_list);
31DEFINE_RWLOCK(rxrpc_connection_lock); 29DEFINE_RWLOCK(rxrpc_connection_lock);
32static DECLARE_DELAYED_WORK(rxrpc_connection_reap, rxrpc_connection_reaper); 30static DECLARE_DELAYED_WORK(rxrpc_connection_reap, rxrpc_connection_reaper);
33 31
32static void rxrpc_destroy_connection(struct rcu_head *);
33
34/* 34/*
35 * allocate a new connection 35 * allocate a new connection
36 */ 36 */
@@ -42,20 +42,16 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
42 42
43 conn = kzalloc(sizeof(struct rxrpc_connection), gfp); 43 conn = kzalloc(sizeof(struct rxrpc_connection), gfp);
44 if (conn) { 44 if (conn) {
45 INIT_LIST_HEAD(&conn->cache_link);
45 spin_lock_init(&conn->channel_lock); 46 spin_lock_init(&conn->channel_lock);
46 init_waitqueue_head(&conn->channel_wq); 47 INIT_LIST_HEAD(&conn->waiting_calls);
47 INIT_WORK(&conn->processor, &rxrpc_process_connection); 48 INIT_WORK(&conn->processor, &rxrpc_process_connection);
48 INIT_LIST_HEAD(&conn->proc_link); 49 INIT_LIST_HEAD(&conn->proc_link);
49 INIT_LIST_HEAD(&conn->link); 50 INIT_LIST_HEAD(&conn->link);
50 skb_queue_head_init(&conn->rx_queue); 51 skb_queue_head_init(&conn->rx_queue);
51 conn->security = &rxrpc_no_security; 52 conn->security = &rxrpc_no_security;
52 spin_lock_init(&conn->state_lock); 53 spin_lock_init(&conn->state_lock);
53 /* We maintain an extra ref on the connection whilst it is
54 * on the rxrpc_connections list.
55 */
56 atomic_set(&conn->usage, 2);
57 conn->debug_id = atomic_inc_return(&rxrpc_debug_id); 54 conn->debug_id = atomic_inc_return(&rxrpc_debug_id);
58 atomic_set(&conn->avail_chans, RXRPC_MAXCALLS);
59 conn->size_align = 4; 55 conn->size_align = 4;
60 conn->header_size = sizeof(struct rxrpc_wire_header); 56 conn->header_size = sizeof(struct rxrpc_wire_header);
61 conn->idle_timestamp = jiffies; 57 conn->idle_timestamp = jiffies;
@@ -156,9 +152,9 @@ not_found:
156 * terminates. The caller must hold the channel_lock and must release the 152 * terminates. The caller must hold the channel_lock and must release the
157 * call's ref on the connection. 153 * call's ref on the connection.
158 */ 154 */
159void __rxrpc_disconnect_call(struct rxrpc_call *call) 155void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
156 struct rxrpc_call *call)
160{ 157{
161 struct rxrpc_connection *conn = call->conn;
162 struct rxrpc_channel *chan = 158 struct rxrpc_channel *chan =
163 &conn->channels[call->cid & RXRPC_CHANNELMASK]; 159 &conn->channels[call->cid & RXRPC_CHANNELMASK];
164 160
@@ -182,8 +178,6 @@ void __rxrpc_disconnect_call(struct rxrpc_call *call)
182 chan->call_id = chan->call_counter; 178 chan->call_id = chan->call_counter;
183 179
184 rcu_assign_pointer(chan->call, NULL); 180 rcu_assign_pointer(chan->call, NULL);
185 atomic_inc(&conn->avail_chans);
186 wake_up(&conn->channel_wq);
187 } 181 }
188 182
189 _leave(""); 183 _leave("");
@@ -197,8 +191,11 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
197{ 191{
198 struct rxrpc_connection *conn = call->conn; 192 struct rxrpc_connection *conn = call->conn;
199 193
194 if (rxrpc_is_client_call(call))
195 return rxrpc_disconnect_client_call(call);
196
200 spin_lock(&conn->channel_lock); 197 spin_lock(&conn->channel_lock);
201 __rxrpc_disconnect_call(call); 198 __rxrpc_disconnect_call(conn, call);
202 spin_unlock(&conn->channel_lock); 199 spin_unlock(&conn->channel_lock);
203 200
204 call->conn = NULL; 201 call->conn = NULL;
@@ -207,6 +204,34 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
207} 204}
208 205
209/* 206/*
207 * Kill off a connection.
208 */
209void rxrpc_kill_connection(struct rxrpc_connection *conn)
210{
211 ASSERT(!rcu_access_pointer(conn->channels[0].call) &&
212 !rcu_access_pointer(conn->channels[1].call) &&
213 !rcu_access_pointer(conn->channels[2].call) &&
214 !rcu_access_pointer(conn->channels[3].call));
215 ASSERT(list_empty(&conn->cache_link));
216
217 write_lock(&rxrpc_connection_lock);
218 list_del_init(&conn->proc_link);
219 write_unlock(&rxrpc_connection_lock);
220
221 /* Drain the Rx queue. Note that even though we've unpublished, an
222 * incoming packet could still be being added to our Rx queue, so we
223 * will need to drain it again in the RCU cleanup handler.
224 */
225 rxrpc_purge_queue(&conn->rx_queue);
226
227 /* Leave final destruction to RCU. The connection processor work item
228 * must carry a ref on the connection to prevent us getting here whilst
229 * it is queued or running.
230 */
231 call_rcu(&conn->rcu, rxrpc_destroy_connection);
232}
233
234/*
210 * release a virtual connection 235 * release a virtual connection
211 */ 236 */
212void __rxrpc_put_connection(struct rxrpc_connection *conn) 237void __rxrpc_put_connection(struct rxrpc_connection *conn)
@@ -241,7 +266,7 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu)
241} 266}
242 267
243/* 268/*
244 * reap dead connections 269 * reap dead service connections
245 */ 270 */
246static void rxrpc_connection_reaper(struct work_struct *work) 271static void rxrpc_connection_reaper(struct work_struct *work)
247{ 272{
@@ -280,12 +305,11 @@ static void rxrpc_connection_reaper(struct work_struct *work)
280 continue; 305 continue;
281 306
282 if (rxrpc_conn_is_client(conn)) 307 if (rxrpc_conn_is_client(conn))
283 rxrpc_unpublish_client_conn(conn); 308 BUG();
284 else 309 else
285 rxrpc_unpublish_service_conn(conn); 310 rxrpc_unpublish_service_conn(conn);
286 311
287 list_move_tail(&conn->link, &graveyard); 312 list_move_tail(&conn->link, &graveyard);
288 list_del_init(&conn->proc_link);
289 } 313 }
290 write_unlock(&rxrpc_connection_lock); 314 write_unlock(&rxrpc_connection_lock);
291 315
@@ -302,16 +326,15 @@ static void rxrpc_connection_reaper(struct work_struct *work)
302 list_del_init(&conn->link); 326 list_del_init(&conn->link);
303 327
304 ASSERTCMP(atomic_read(&conn->usage), ==, 0); 328 ASSERTCMP(atomic_read(&conn->usage), ==, 0);
305 skb_queue_purge(&conn->rx_queue); 329 rxrpc_kill_connection(conn);
306 call_rcu(&conn->rcu, rxrpc_destroy_connection);
307 } 330 }
308 331
309 _leave(""); 332 _leave("");
310} 333}
311 334
312/* 335/*
313 * preemptively destroy all the connection records rather than waiting for them 336 * preemptively destroy all the service connection records rather than
314 * to time out 337 * waiting for them to time out
315 */ 338 */
316void __exit rxrpc_destroy_all_connections(void) 339void __exit rxrpc_destroy_all_connections(void)
317{ 340{
@@ -320,6 +343,8 @@ void __exit rxrpc_destroy_all_connections(void)
320 343
321 _enter(""); 344 _enter("");
322 345
346 rxrpc_destroy_all_client_connections();
347
323 rxrpc_connection_expiry = 0; 348 rxrpc_connection_expiry = 0;
324 cancel_delayed_work(&rxrpc_connection_reap); 349 cancel_delayed_work(&rxrpc_connection_reap);
325 rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0); 350 rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
@@ -334,6 +359,8 @@ void __exit rxrpc_destroy_all_connections(void)
334 write_unlock(&rxrpc_connection_lock); 359 write_unlock(&rxrpc_connection_lock);
335 BUG_ON(leak); 360 BUG_ON(leak);
336 361
362 ASSERT(list_empty(&rxrpc_connection_proc_list));
363
337 /* Make sure the local and peer records pinned by any dying connections 364 /* Make sure the local and peer records pinned by any dying connections
338 * are released. 365 * are released.
339 */ 366 */
diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c
index 6ad6ae926cc3..316a92107fee 100644
--- a/net/rxrpc/conn_service.c
+++ b/net/rxrpc/conn_service.c
@@ -185,6 +185,11 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
185 185
186 rxrpc_get_local(local); 186 rxrpc_get_local(local);
187 187
188 /* We maintain an extra ref on the connection whilst it is on
189 * the rxrpc_connections list.
190 */
191 atomic_set(&conn->usage, 2);
192
188 write_lock(&rxrpc_connection_lock); 193 write_lock(&rxrpc_connection_lock);
189 list_add_tail(&conn->link, &rxrpc_connections); 194 list_add_tail(&conn->link, &rxrpc_connections);
190 list_add_tail(&conn->proc_link, &rxrpc_connection_proc_list); 195 list_add_tail(&conn->proc_link, &rxrpc_connection_proc_list);
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index e3a08d542fb7..8a9917cba6fe 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -390,7 +390,7 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
390 call->acks_winsz), 390 call->acks_winsz),
391 *timeo); 391 *timeo);
392 392
393 add_wait_queue(&call->tx_waitq, &myself); 393 add_wait_queue(&call->waitq, &myself);
394 394
395 for (;;) { 395 for (;;) {
396 set_current_state(TASK_INTERRUPTIBLE); 396 set_current_state(TASK_INTERRUPTIBLE);
@@ -408,7 +408,7 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
408 lock_sock(&rx->sk); 408 lock_sock(&rx->sk);
409 } 409 }
410 410
411 remove_wait_queue(&call->tx_waitq, &myself); 411 remove_wait_queue(&call->waitq, &myself);
412 set_current_state(TASK_RUNNING); 412 set_current_state(TASK_RUNNING);
413 _leave(" = %d", ret); 413 _leave(" = %d", ret);
414 return ret; 414 return ret;
@@ -482,6 +482,8 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
482 if (try_to_del_timer_sync(&call->ack_timer) >= 0) { 482 if (try_to_del_timer_sync(&call->ack_timer) >= 0) {
483 /* the packet may be freed by rxrpc_process_call() before this 483 /* the packet may be freed by rxrpc_process_call() before this
484 * returns */ 484 * returns */
485 if (rxrpc_is_client_call(call))
486 rxrpc_expose_client_call(call);
485 ret = rxrpc_send_data_packet(call->conn, skb); 487 ret = rxrpc_send_data_packet(call->conn, skb);
486 _net("sent skb %p", skb); 488 _net("sent skb %p", skb);
487 } else { 489 } else {
diff --git a/net/rxrpc/sysctl.c b/net/rxrpc/sysctl.c
index 03ad08774d4e..dc380af8a81e 100644
--- a/net/rxrpc/sysctl.c
+++ b/net/rxrpc/sysctl.c
@@ -62,6 +62,22 @@ static struct ctl_table rxrpc_sysctl_table[] = {
62 .proc_handler = proc_dointvec_ms_jiffies, 62 .proc_handler = proc_dointvec_ms_jiffies,
63 .extra1 = (void *)&one, 63 .extra1 = (void *)&one,
64 }, 64 },
65 {
66 .procname = "idle_conn_expiry",
67 .data = &rxrpc_conn_idle_client_expiry,
68 .maxlen = sizeof(unsigned int),
69 .mode = 0644,
70 .proc_handler = proc_dointvec_ms_jiffies,
71 .extra1 = (void *)&one,
72 },
73 {
74 .procname = "idle_conn_fast_expiry",
75 .data = &rxrpc_conn_idle_client_fast_expiry,
76 .maxlen = sizeof(unsigned int),
77 .mode = 0644,
78 .proc_handler = proc_dointvec_ms_jiffies,
79 .extra1 = (void *)&one,
80 },
65 81
66 /* Values measured in seconds but used in jiffies */ 82 /* Values measured in seconds but used in jiffies */
67 { 83 {
@@ -81,17 +97,24 @@ static struct ctl_table rxrpc_sysctl_table[] = {
81 .extra1 = (void *)&one, 97 .extra1 = (void *)&one,
82 }, 98 },
83 99
84 /* Values measured in seconds */ 100 /* Non-time values */
101 {
102 .procname = "max_client_conns",
103 .data = &rxrpc_max_client_connections,
104 .maxlen = sizeof(unsigned int),
105 .mode = 0644,
106 .proc_handler = proc_dointvec_minmax,
107 .extra1 = (void *)&rxrpc_reap_client_connections,
108 },
85 { 109 {
86 .procname = "connection_expiry", 110 .procname = "reap_client_conns",
87 .data = &rxrpc_connection_expiry, 111 .data = &rxrpc_reap_client_connections,
88 .maxlen = sizeof(unsigned int), 112 .maxlen = sizeof(unsigned int),
89 .mode = 0644, 113 .mode = 0644,
90 .proc_handler = proc_dointvec_minmax, 114 .proc_handler = proc_dointvec_minmax,
91 .extra1 = (void *)&one, 115 .extra1 = (void *)&one,
116 .extra2 = (void *)&rxrpc_max_client_connections,
92 }, 117 },
93
94 /* Non-time values */
95 { 118 {
96 .procname = "max_backlog", 119 .procname = "max_backlog",
97 .data = &rxrpc_max_backlog, 120 .data = &rxrpc_max_backlog,