aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2016-06-30 05:45:22 -0400
committerDavid Howells <dhowells@redhat.com>2016-07-06 05:50:04 -0400
commit001c11224910b25e59a65ce1b49cfecdb4c631c0 (patch)
treec51c38af1e4d79b14debf5ee780c0c277c4c7c45
parentd991b4a32f65076efaf78739c4a46406ca8c7e79 (diff)
rxrpc: Maintain an extra ref on a conn for the cache list
Overhaul the usage count accounting for the rxrpc_connection struct to make it easier to implement RCU access from the data_ready handler. The problem is that currently we're using a lock to prevent the garbage collector from trying to clean up a connection that we're contemplating unidling. We could just stick incoming packets on the connection we find, but we've then got a problem that we may race when dispatching a work item to process it as we need to give that a ref to prevent the rxrpc_connection struct from disappearing in the meantime. Further, incoming packets may get discarded if attached to an rxrpc_connection struct that is going away. Whilst this is not a total disaster - the client will presumably resend - it would delay processing of the call. This would affect the AFS client filesystem's service manager operation. To this end: (1) We now maintain an extra count on the connection usage count whilst it is on the connection list. This mean it is not in use when its refcount is 1. (2) When trying to reuse an old connection, we only increment the refcount if it is greater than 0. If it is 0, we replace it in the tree with a new candidate connection. (3) Two connection flags are added to indicate whether or not a connection is in the local's client connection tree (used by sendmsg) or the peer's service connection tree (used by data_ready). This makes sure that we don't try and remove a connection if it got replaced. The flags are tested under lock with the removal operation to prevent the reaper from killing the rxrpc_connection struct whilst someone else is trying to effect a replacement. This could probably be alleviated by using memory barriers between the flag set/test and the rb_tree ops. The rb_tree op would still need to be under the lock, however. (4) When trying to reap an old connection, we try to flip the usage count from 1 to 0. If it's not 1 at that point, then it must've come back to life temporarily and we ignore it. Signed-off-by: David Howells <dhowells@redhat.com>
-rw-r--r--net/rxrpc/ar-internal.h5
-rw-r--r--net/rxrpc/conn_client.c42
-rw-r--r--net/rxrpc/conn_object.c74
-rw-r--r--net/rxrpc/conn_service.c34
4 files changed, 97 insertions, 58 deletions
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index ad48f851b40c..d8e4d6e6a030 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -258,6 +258,8 @@ struct rxrpc_conn_parameters {
258 */ 258 */
259enum rxrpc_conn_flag { 259enum rxrpc_conn_flag {
260 RXRPC_CONN_HAS_IDR, /* Has a client conn ID assigned */ 260 RXRPC_CONN_HAS_IDR, /* Has a client conn ID assigned */
261 RXRPC_CONN_IN_SERVICE_CONNS, /* Conn is in peer->service_conns */
262 RXRPC_CONN_IN_CLIENT_CONNS, /* Conn is in local->client_conns */
261}; 263};
262 264
263/* 265/*
@@ -544,10 +546,10 @@ void __exit rxrpc_destroy_all_calls(void);
544 */ 546 */
545extern struct idr rxrpc_client_conn_ids; 547extern struct idr rxrpc_client_conn_ids;
546 548
547void rxrpc_put_client_connection_id(struct rxrpc_connection *);
548void rxrpc_destroy_client_conn_ids(void); 549void rxrpc_destroy_client_conn_ids(void);
549int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *, 550int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *,
550 struct sockaddr_rxrpc *, gfp_t); 551 struct sockaddr_rxrpc *, gfp_t);
552void rxrpc_unpublish_client_conn(struct rxrpc_connection *);
551 553
552/* 554/*
553 * conn_event.c 555 * conn_event.c
@@ -609,6 +611,7 @@ static inline void rxrpc_queue_conn(struct rxrpc_connection *conn)
609struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *, 611struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *,
610 struct sockaddr_rxrpc *, 612 struct sockaddr_rxrpc *,
611 struct sk_buff *); 613 struct sk_buff *);
614void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
612 615
613/* 616/*
614 * input.c 617 * input.c
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index 9180164a51aa..aa21462f3236 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -84,7 +84,7 @@ error:
84/* 84/*
85 * Release a connection ID for a client connection from the global pool. 85 * Release a connection ID for a client connection from the global pool.
86 */ 86 */
87void rxrpc_put_client_connection_id(struct rxrpc_connection *conn) 87static void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
88{ 88{
89 if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) { 89 if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) {
90 spin_lock(&rxrpc_conn_id_lock); 90 spin_lock(&rxrpc_conn_id_lock);
@@ -278,12 +278,13 @@ int rxrpc_connect_call(struct rxrpc_call *call,
278 * lock before dropping the client conn lock. 278 * lock before dropping the client conn lock.
279 */ 279 */
280 _debug("new conn"); 280 _debug("new conn");
281 set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
282 rb_link_node(&candidate->client_node, parent, pp);
283 rb_insert_color(&candidate->client_node, &local->client_conns);
284attached:
281 conn = candidate; 285 conn = candidate;
282 candidate = NULL; 286 candidate = NULL;
283 287
284 rb_link_node(&conn->client_node, parent, pp);
285 rb_insert_color(&conn->client_node, &local->client_conns);
286
287 atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1); 288 atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1);
288 spin_lock(&conn->channel_lock); 289 spin_lock(&conn->channel_lock);
289 spin_unlock(&local->client_conns_lock); 290 spin_unlock(&local->client_conns_lock);
@@ -307,13 +308,22 @@ found_channel:
307 _leave(" = %p {u=%d}", conn, atomic_read(&conn->usage)); 308 _leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
308 return 0; 309 return 0;
309 310
310 /* We found a suitable connection already in existence. Discard any 311 /* We found a potentially suitable connection already in existence. If
311 * candidate we may have allocated, and try to get a channel on this 312 * we can reuse it (ie. its usage count hasn't been reduced to 0 by the
312 * one. 313 * reaper), discard any candidate we may have allocated, and try to get
314 * a channel on this one, otherwise we have to replace it.
313 */ 315 */
314found_extant_conn: 316found_extant_conn:
315 _debug("found conn"); 317 _debug("found conn");
316 rxrpc_get_connection(conn); 318 if (!rxrpc_get_connection_maybe(conn)) {
319 set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
320 rb_replace_node(&conn->client_node,
321 &candidate->client_node,
322 &local->client_conns);
323 clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags);
324 goto attached;
325 }
326
317 spin_unlock(&local->client_conns_lock); 327 spin_unlock(&local->client_conns_lock);
318 328
319 rxrpc_put_connection(candidate); 329 rxrpc_put_connection(candidate);
@@ -357,3 +367,19 @@ interrupted:
357 _leave(" = -ERESTARTSYS"); 367 _leave(" = -ERESTARTSYS");
358 return -ERESTARTSYS; 368 return -ERESTARTSYS;
359} 369}
370
371/*
372 * Remove a client connection from the local endpoint's tree, thereby removing
373 * it as a target for reuse for new client calls.
374 */
375void rxrpc_unpublish_client_conn(struct rxrpc_connection *conn)
376{
377 struct rxrpc_local *local = conn->params.local;
378
379 spin_lock(&local->client_conns_lock);
380 if (test_and_clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags))
381 rb_erase(&conn->client_node, &local->client_conns);
382 spin_unlock(&local->client_conns_lock);
383
384 rxrpc_put_client_connection_id(conn);
385}
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 8379e3748d13..89bc6480b4e2 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -49,7 +49,10 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
49 skb_queue_head_init(&conn->rx_queue); 49 skb_queue_head_init(&conn->rx_queue);
50 conn->security = &rxrpc_no_security; 50 conn->security = &rxrpc_no_security;
51 spin_lock_init(&conn->state_lock); 51 spin_lock_init(&conn->state_lock);
52 atomic_set(&conn->usage, 1); 52 /* We maintain an extra ref on the connection whilst it is
53 * on the rxrpc_connections list.
54 */
55 atomic_set(&conn->usage, 2);
53 conn->debug_id = atomic_inc_return(&rxrpc_debug_id); 56 conn->debug_id = atomic_inc_return(&rxrpc_debug_id);
54 atomic_set(&conn->avail_chans, RXRPC_MAXCALLS); 57 atomic_set(&conn->avail_chans, RXRPC_MAXCALLS);
55 conn->size_align = 4; 58 conn->size_align = 4;
@@ -111,7 +114,7 @@ struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *local,
111 return NULL; 114 return NULL;
112 115
113found: 116found:
114 rxrpc_get_connection(conn); 117 conn = rxrpc_get_connection_maybe(conn);
115 read_unlock_bh(&peer->conn_lock); 118 read_unlock_bh(&peer->conn_lock);
116 _leave(" = %p", conn); 119 _leave(" = %p", conn);
117 return conn; 120 return conn;
@@ -173,10 +176,10 @@ void rxrpc_put_connection(struct rxrpc_connection *conn)
173 _enter("%p{u=%d,d=%d}", 176 _enter("%p{u=%d,d=%d}",
174 conn, atomic_read(&conn->usage), conn->debug_id); 177 conn, atomic_read(&conn->usage), conn->debug_id);
175 178
176 ASSERTCMP(atomic_read(&conn->usage), >, 0); 179 ASSERTCMP(atomic_read(&conn->usage), >, 1);
177 180
178 conn->put_time = ktime_get_seconds(); 181 conn->put_time = ktime_get_seconds();
179 if (atomic_dec_and_test(&conn->usage)) { 182 if (atomic_dec_return(&conn->usage) == 1) {
180 _debug("zombie"); 183 _debug("zombie");
181 rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0); 184 rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
182 } 185 }
@@ -216,59 +219,41 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu)
216static void rxrpc_connection_reaper(struct work_struct *work) 219static void rxrpc_connection_reaper(struct work_struct *work)
217{ 220{
218 struct rxrpc_connection *conn, *_p; 221 struct rxrpc_connection *conn, *_p;
219 struct rxrpc_peer *peer; 222 unsigned long reap_older_than, earliest, put_time, now;
220 unsigned long now, earliest, reap_time;
221 223
222 LIST_HEAD(graveyard); 224 LIST_HEAD(graveyard);
223 225
224 _enter(""); 226 _enter("");
225 227
226 now = ktime_get_seconds(); 228 now = ktime_get_seconds();
229 reap_older_than = now - rxrpc_connection_expiry;
227 earliest = ULONG_MAX; 230 earliest = ULONG_MAX;
228 231
229 write_lock(&rxrpc_connection_lock); 232 write_lock(&rxrpc_connection_lock);
230 list_for_each_entry_safe(conn, _p, &rxrpc_connections, link) { 233 list_for_each_entry_safe(conn, _p, &rxrpc_connections, link) {
231 _debug("reap CONN %d { u=%d,t=%ld }", 234 ASSERTCMP(atomic_read(&conn->usage), >, 0);
232 conn->debug_id, atomic_read(&conn->usage), 235 if (likely(atomic_read(&conn->usage) > 1))
233 (long) now - (long) conn->put_time);
234
235 if (likely(atomic_read(&conn->usage) > 0))
236 continue; 236 continue;
237 237
238 if (rxrpc_conn_is_client(conn)) { 238 put_time = READ_ONCE(conn->put_time);
239 struct rxrpc_local *local = conn->params.local; 239 if (time_after(put_time, reap_older_than)) {
240 spin_lock(&local->client_conns_lock); 240 if (time_before(put_time, earliest))
241 reap_time = conn->put_time + rxrpc_connection_expiry; 241 earliest = put_time;
242 242 continue;
243 if (atomic_read(&conn->usage) > 0) {
244 ;
245 } else if (reap_time <= now) {
246 list_move_tail(&conn->link, &graveyard);
247 rxrpc_put_client_connection_id(conn);
248 rb_erase(&conn->client_node,
249 &local->client_conns);
250 } else if (reap_time < earliest) {
251 earliest = reap_time;
252 }
253
254 spin_unlock(&local->client_conns_lock);
255 } else {
256 peer = conn->params.peer;
257 write_lock_bh(&peer->conn_lock);
258 reap_time = conn->put_time + rxrpc_connection_expiry;
259
260 if (atomic_read(&conn->usage) > 0) {
261 ;
262 } else if (reap_time <= now) {
263 list_move_tail(&conn->link, &graveyard);
264 rb_erase(&conn->service_node,
265 &peer->service_conns);
266 } else if (reap_time < earliest) {
267 earliest = reap_time;
268 }
269
270 write_unlock_bh(&peer->conn_lock);
271 } 243 }
244
245 /* The usage count sits at 1 whilst the object is unused on the
246 * list; we reduce that to 0 to make the object unavailable.
247 */
248 if (atomic_cmpxchg(&conn->usage, 1, 0) != 1)
249 continue;
250
251 if (rxrpc_conn_is_client(conn))
252 rxrpc_unpublish_client_conn(conn);
253 else
254 rxrpc_unpublish_service_conn(conn);
255
256 list_move_tail(&conn->link, &graveyard);
272 } 257 }
273 write_unlock(&rxrpc_connection_lock); 258 write_unlock(&rxrpc_connection_lock);
274 259
@@ -279,7 +264,6 @@ static void rxrpc_connection_reaper(struct work_struct *work)
279 (earliest - now) * HZ); 264 (earliest - now) * HZ);
280 } 265 }
281 266
282 /* then destroy all those pulled out */
283 while (!list_empty(&graveyard)) { 267 while (!list_empty(&graveyard)) {
284 conn = list_entry(graveyard.next, struct rxrpc_connection, 268 conn = list_entry(graveyard.next, struct rxrpc_connection,
285 link); 269 link);
diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c
index a42b210c40a5..77a509e6003a 100644
--- a/net/rxrpc/conn_service.c
+++ b/net/rxrpc/conn_service.c
@@ -104,10 +104,12 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
104 } 104 }
105 105
106 /* we can now add the new candidate to the list */ 106 /* we can now add the new candidate to the list */
107 set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &candidate->flags);
108 rb_link_node(&candidate->service_node, p, pp);
109 rb_insert_color(&candidate->service_node, &peer->service_conns);
110attached:
107 conn = candidate; 111 conn = candidate;
108 candidate = NULL; 112 candidate = NULL;
109 rb_link_node(&conn->service_node, p, pp);
110 rb_insert_color(&conn->service_node, &peer->service_conns);
111 rxrpc_get_peer(peer); 113 rxrpc_get_peer(peer);
112 rxrpc_get_local(local); 114 rxrpc_get_local(local);
113 115
@@ -128,11 +130,19 @@ success:
128 130
129 /* we found the connection in the list immediately */ 131 /* we found the connection in the list immediately */
130found_extant_connection: 132found_extant_connection:
133 if (!rxrpc_get_connection_maybe(conn)) {
134 set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &candidate->flags);
135 rb_replace_node(&conn->service_node,
136 &candidate->service_node,
137 &peer->service_conns);
138 clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags);
139 goto attached;
140 }
141
131 if (sp->hdr.securityIndex != conn->security_ix) { 142 if (sp->hdr.securityIndex != conn->security_ix) {
132 read_unlock_bh(&peer->conn_lock); 143 read_unlock_bh(&peer->conn_lock);
133 goto security_mismatch; 144 goto security_mismatch_put;
134 } 145 }
135 rxrpc_get_connection(conn);
136 read_unlock_bh(&peer->conn_lock); 146 read_unlock_bh(&peer->conn_lock);
137 goto success; 147 goto success;
138 148
@@ -147,8 +157,24 @@ found_extant_second:
147 kfree(candidate); 157 kfree(candidate);
148 goto success; 158 goto success;
149 159
160security_mismatch_put:
161 rxrpc_put_connection(conn);
150security_mismatch: 162security_mismatch:
151 kfree(candidate); 163 kfree(candidate);
152 _leave(" = -EKEYREJECTED"); 164 _leave(" = -EKEYREJECTED");
153 return ERR_PTR(-EKEYREJECTED); 165 return ERR_PTR(-EKEYREJECTED);
154} 166}
167
168/*
169 * Remove the service connection from the peer's tree, thereby removing it as a
170 * target for incoming packets.
171 */
172void rxrpc_unpublish_service_conn(struct rxrpc_connection *conn)
173{
174 struct rxrpc_peer *peer = conn->params.peer;
175
176 write_lock_bh(&peer->conn_lock);
177 if (test_and_clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags))
178 rb_erase(&conn->service_node, &peer->service_conns);
179 write_unlock_bh(&peer->conn_lock);
180}