aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2016-09-07 04:19:31 -0400
committerDavid Howells <dhowells@redhat.com>2016-09-07 10:33:20 -0400
commit8d94aa381dab19f3c0f524f5d255248b0ae50125 (patch)
tree3e892a6e923f93c3af363e179fddd832330c2168
parent6543ac523558b2392271f3f8088e6455b3f00bb1 (diff)
rxrpc: Calls shouldn't hold socket refs
rxrpc calls shouldn't hold refs on the sock struct. This was done so that the socket wouldn't go away whilst the call was in progress, such that the call could reach the socket's queues. However, we can mark the socket as requiring an RCU release and rely on the RCU read lock. To make this work, we do: (1) rxrpc_release_call() removes the call's call user ID. This is now only called from socket operations and not from the call processor: rxrpc_accept_call() / rxrpc_kernel_accept_call() rxrpc_reject_call() / rxrpc_kernel_reject_call() rxrpc_kernel_end_call() rxrpc_release_calls_on_socket() rxrpc_recvmsg() Though it is also called in the cleanup path of rxrpc_accept_incoming_call() before we assign a user ID. (2) Pass the socket pointer into rxrpc_release_call() rather than getting it from the call so that we can get rid of uninitialised calls. (3) Fix call processor queueing to pass a ref to the work queue and to release that ref at the end of the processor function (or to pass it back to the work queue if we have to requeue). (4) Skip out of the call processor function asap if the call is complete and don't requeue it if the call is complete. (5) Clean up the call immediately that the refcount reaches 0 rather than trying to defer it. Actual deallocation is deferred to RCU, however. (6) Don't hold socket refs for allocated calls. (7) Use the RCU read lock when queueing a message on a socket and treat the call's socket pointer according to RCU rules and check it for NULL. We also need to use the RCU read lock when viewing a call through procfs. (8) Transmit the final ACK/ABORT to a client call in rxrpc_release_call() if this hasn't been done yet so that we can then disconnect the call. Once the call is disconnected, it won't have any access to the connection struct and the UDP socket for the call work processor to be able to send the ACK. Terminal retransmission will be handled by the connection processor. (9) Release all calls immediately on the closing of a socket rather than trying to defer this. Incomplete calls will be aborted. The call refcount model is much simplified. Refs are held on the call by: (1) A socket's user ID tree. (2) A socket's incoming call secureq and acceptq. (3) A kernel service that has a call in progress. (4) A queued call work processor. We have to take care to put any call that we failed to queue. (5) sk_buffs on a socket's receive queue. A future patch will get rid of this. Whilst we're at it, we can do: (1) Get rid of the RXRPC_CALL_EV_RELEASE event. Release is now done entirely from the socket routines and never from the call's processor. (2) Get rid of the RXRPC_CALL_DEAD state. Calls now end in the RXRPC_CALL_COMPLETE state. (3) Get rid of the rxrpc_call::destroyer work item. Calls are now torn down when their refcount reaches 0 and then handed over to RCU for final cleanup. (4) Get rid of the rxrpc_call::deadspan timer. Calls are cleaned up immediately they're finished with and don't hang around. Post-completion retransmission is handled by the connection processor once the call is disconnected. (5) Get rid of the dead call expiry setting as there's no longer a timer to set. (6) rxrpc_destroy_all_calls() can just check that the call list is empty. Signed-off-by: David Howells <dhowells@redhat.com>
-rw-r--r--net/rxrpc/af_rxrpc.c4
-rw-r--r--net/rxrpc/ar-internal.h15
-rw-r--r--net/rxrpc/call_accept.c55
-rw-r--r--net/rxrpc/call_event.c74
-rw-r--r--net/rxrpc/call_object.c224
-rw-r--r--net/rxrpc/input.c26
-rw-r--r--net/rxrpc/output.c145
-rw-r--r--net/rxrpc/proc.c4
-rw-r--r--net/rxrpc/recvmsg.c24
-rw-r--r--net/rxrpc/skbuff.c3
-rw-r--r--net/rxrpc/sysctl.c8
11 files changed, 303 insertions, 279 deletions
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index 8356cd003d51..77a132abf140 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -294,8 +294,7 @@ EXPORT_SYMBOL(rxrpc_kernel_begin_call);
294void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call) 294void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call)
295{ 295{
296 _enter("%d{%d}", call->debug_id, atomic_read(&call->usage)); 296 _enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
297 rxrpc_remove_user_ID(rxrpc_sk(sock->sk), call); 297 rxrpc_release_call(rxrpc_sk(sock->sk), call);
298 rxrpc_purge_queue(&call->knlrecv_queue);
299 rxrpc_put_call(call, rxrpc_call_put); 298 rxrpc_put_call(call, rxrpc_call_put);
300} 299}
301EXPORT_SYMBOL(rxrpc_kernel_end_call); 300EXPORT_SYMBOL(rxrpc_kernel_end_call);
@@ -558,6 +557,7 @@ static int rxrpc_create(struct net *net, struct socket *sock, int protocol,
558 return -ENOMEM; 557 return -ENOMEM;
559 558
560 sock_init_data(sock, sk); 559 sock_init_data(sock, sk);
560 sock_set_flag(sk, SOCK_RCU_FREE);
561 sk->sk_state = RXRPC_UNBOUND; 561 sk->sk_state = RXRPC_UNBOUND;
562 sk->sk_write_space = rxrpc_write_space; 562 sk->sk_write_space = rxrpc_write_space;
563 sk->sk_max_ack_backlog = 0; 563 sk->sk_max_ack_backlog = 0;
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index e3dfc9da05fe..3addda4bfa6b 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -35,8 +35,6 @@ struct rxrpc_crypt {
35#define rxrpc_queue_delayed_work(WS,D) \ 35#define rxrpc_queue_delayed_work(WS,D) \
36 queue_delayed_work(rxrpc_workqueue, (WS), (D)) 36 queue_delayed_work(rxrpc_workqueue, (WS), (D))
37 37
38#define rxrpc_queue_call(CALL) rxrpc_queue_work(&(CALL)->processor)
39
40struct rxrpc_connection; 38struct rxrpc_connection;
41 39
42/* 40/*
@@ -397,7 +395,6 @@ enum rxrpc_call_event {
397 RXRPC_CALL_EV_ACCEPTED, /* incoming call accepted by userspace app */ 395 RXRPC_CALL_EV_ACCEPTED, /* incoming call accepted by userspace app */
398 RXRPC_CALL_EV_SECURED, /* incoming call's connection is now secure */ 396 RXRPC_CALL_EV_SECURED, /* incoming call's connection is now secure */
399 RXRPC_CALL_EV_POST_ACCEPT, /* need to post an "accept?" message to the app */ 397 RXRPC_CALL_EV_POST_ACCEPT, /* need to post an "accept?" message to the app */
400 RXRPC_CALL_EV_RELEASE, /* need to release the call's resources */
401}; 398};
402 399
403/* 400/*
@@ -417,7 +414,6 @@ enum rxrpc_call_state {
417 RXRPC_CALL_SERVER_SEND_REPLY, /* - server sending reply */ 414 RXRPC_CALL_SERVER_SEND_REPLY, /* - server sending reply */
418 RXRPC_CALL_SERVER_AWAIT_ACK, /* - server awaiting final ACK */ 415 RXRPC_CALL_SERVER_AWAIT_ACK, /* - server awaiting final ACK */
419 RXRPC_CALL_COMPLETE, /* - call complete */ 416 RXRPC_CALL_COMPLETE, /* - call complete */
420 RXRPC_CALL_DEAD, /* - call is dead */
421 NR__RXRPC_CALL_STATES 417 NR__RXRPC_CALL_STATES
422}; 418};
423 419
@@ -442,12 +438,10 @@ struct rxrpc_call {
442 struct rcu_head rcu; 438 struct rcu_head rcu;
443 struct rxrpc_connection *conn; /* connection carrying call */ 439 struct rxrpc_connection *conn; /* connection carrying call */
444 struct rxrpc_peer *peer; /* Peer record for remote address */ 440 struct rxrpc_peer *peer; /* Peer record for remote address */
445 struct rxrpc_sock *socket; /* socket responsible */ 441 struct rxrpc_sock __rcu *socket; /* socket responsible */
446 struct timer_list lifetimer; /* lifetime remaining on call */ 442 struct timer_list lifetimer; /* lifetime remaining on call */
447 struct timer_list deadspan; /* reap timer for re-ACK'ing, etc */
448 struct timer_list ack_timer; /* ACK generation timer */ 443 struct timer_list ack_timer; /* ACK generation timer */
449 struct timer_list resend_timer; /* Tx resend timer */ 444 struct timer_list resend_timer; /* Tx resend timer */
450 struct work_struct destroyer; /* call destroyer */
451 struct work_struct processor; /* packet processor and ACK generator */ 445 struct work_struct processor; /* packet processor and ACK generator */
452 rxrpc_notify_rx_t notify_rx; /* kernel service Rx notification function */ 446 rxrpc_notify_rx_t notify_rx; /* kernel service Rx notification function */
453 struct list_head link; /* link in master call list */ 447 struct list_head link; /* link in master call list */
@@ -558,7 +552,6 @@ void rxrpc_process_call(struct work_struct *);
558extern const char *const rxrpc_call_states[]; 552extern const char *const rxrpc_call_states[];
559extern const char *const rxrpc_call_completions[]; 553extern const char *const rxrpc_call_completions[];
560extern unsigned int rxrpc_max_call_lifetime; 554extern unsigned int rxrpc_max_call_lifetime;
561extern unsigned int rxrpc_dead_call_expiry;
562extern struct kmem_cache *rxrpc_call_jar; 555extern struct kmem_cache *rxrpc_call_jar;
563extern struct list_head rxrpc_calls; 556extern struct list_head rxrpc_calls;
564extern rwlock_t rxrpc_call_lock; 557extern rwlock_t rxrpc_call_lock;
@@ -571,8 +564,10 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
571struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *, 564struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *,
572 struct rxrpc_connection *, 565 struct rxrpc_connection *,
573 struct sk_buff *); 566 struct sk_buff *);
574void rxrpc_release_call(struct rxrpc_call *); 567void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
575void rxrpc_release_calls_on_socket(struct rxrpc_sock *); 568void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
569bool __rxrpc_queue_call(struct rxrpc_call *);
570bool rxrpc_queue_call(struct rxrpc_call *);
576void rxrpc_see_call(struct rxrpc_call *); 571void rxrpc_see_call(struct rxrpc_call *);
577void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace); 572void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
578void rxrpc_put_call(struct rxrpc_call *, enum rxrpc_call_trace); 573void rxrpc_put_call(struct rxrpc_call *, enum rxrpc_call_trace);
@@ -835,6 +830,7 @@ extern const char *rxrpc_acks(u8 reason);
835/* 830/*
836 * output.c 831 * output.c
837 */ 832 */
833int rxrpc_send_call_packet(struct rxrpc_call *, u8);
838int rxrpc_send_data_packet(struct rxrpc_connection *, struct sk_buff *); 834int rxrpc_send_data_packet(struct rxrpc_connection *, struct sk_buff *);
839 835
840/* 836/*
@@ -880,7 +876,6 @@ extern const struct file_operations rxrpc_connection_seq_fops;
880/* 876/*
881 * recvmsg.c 877 * recvmsg.c
882 */ 878 */
883void rxrpc_remove_user_ID(struct rxrpc_sock *, struct rxrpc_call *);
884int rxrpc_recvmsg(struct socket *, struct msghdr *, size_t, int); 879int rxrpc_recvmsg(struct socket *, struct msghdr *, size_t, int);
885 880
886/* 881/*
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 487ae7aa86db..879a964de80c 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -163,13 +163,7 @@ invalid_service:
163 _debug("invalid"); 163 _debug("invalid");
164 read_unlock_bh(&local->services_lock); 164 read_unlock_bh(&local->services_lock);
165 165
166 read_lock_bh(&call->state_lock); 166 rxrpc_release_call(rx, call);
167 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
168 !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
169 rxrpc_get_call(call, rxrpc_call_got);
170 rxrpc_queue_call(call);
171 }
172 read_unlock_bh(&call->state_lock);
173 rxrpc_put_call(call, rxrpc_call_put); 167 rxrpc_put_call(call, rxrpc_call_put);
174 ret = -ECONNREFUSED; 168 ret = -ECONNREFUSED;
175error: 169error:
@@ -236,13 +230,11 @@ found_service:
236 if (sk_acceptq_is_full(&rx->sk)) 230 if (sk_acceptq_is_full(&rx->sk))
237 goto backlog_full; 231 goto backlog_full;
238 sk_acceptq_added(&rx->sk); 232 sk_acceptq_added(&rx->sk);
239 sock_hold(&rx->sk);
240 read_unlock_bh(&local->services_lock); 233 read_unlock_bh(&local->services_lock);
241 234
242 ret = rxrpc_accept_incoming_call(local, rx, skb, &srx); 235 ret = rxrpc_accept_incoming_call(local, rx, skb, &srx);
243 if (ret < 0) 236 if (ret < 0)
244 sk_acceptq_removed(&rx->sk); 237 sk_acceptq_removed(&rx->sk);
245 sock_put(&rx->sk);
246 switch (ret) { 238 switch (ret) {
247 case -ECONNRESET: /* old calls are ignored */ 239 case -ECONNRESET: /* old calls are ignored */
248 case -ECONNABORTED: /* aborted calls are reaborted or ignored */ 240 case -ECONNABORTED: /* aborted calls are reaborted or ignored */
@@ -333,9 +325,6 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
333 case RXRPC_CALL_COMPLETE: 325 case RXRPC_CALL_COMPLETE:
334 ret = call->error; 326 ret = call->error;
335 goto out_release; 327 goto out_release;
336 case RXRPC_CALL_DEAD:
337 ret = -ETIME;
338 goto out_discard;
339 default: 328 default:
340 BUG(); 329 BUG();
341 } 330 }
@@ -350,24 +339,20 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
350 BUG(); 339 BUG();
351 if (test_and_set_bit(RXRPC_CALL_EV_ACCEPTED, &call->events)) 340 if (test_and_set_bit(RXRPC_CALL_EV_ACCEPTED, &call->events))
352 BUG(); 341 BUG();
353 rxrpc_queue_call(call);
354 342
355 write_unlock_bh(&call->state_lock); 343 write_unlock_bh(&call->state_lock);
356 write_unlock(&rx->call_lock); 344 write_unlock(&rx->call_lock);
345 rxrpc_queue_call(call);
357 _leave(" = %p{%d}", call, call->debug_id); 346 _leave(" = %p{%d}", call, call->debug_id);
358 return call; 347 return call;
359 348
360 /* if the call is already dying or dead, then we leave the socket's ref
361 * on it to be released by rxrpc_dead_call_expired() as induced by
362 * rxrpc_release_call() */
363out_release: 349out_release:
364 _debug("release %p", call);
365 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
366 !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
367 rxrpc_queue_call(call);
368out_discard:
369 write_unlock_bh(&call->state_lock); 350 write_unlock_bh(&call->state_lock);
370 _debug("discard %p", call); 351 write_unlock(&rx->call_lock);
352 _debug("release %p", call);
353 rxrpc_release_call(rx, call);
354 _leave(" = %d", ret);
355 return ERR_PTR(ret);
371out: 356out:
372 write_unlock(&rx->call_lock); 357 write_unlock(&rx->call_lock);
373 _leave(" = %d", ret); 358 _leave(" = %d", ret);
@@ -390,8 +375,11 @@ int rxrpc_reject_call(struct rxrpc_sock *rx)
390 write_lock(&rx->call_lock); 375 write_lock(&rx->call_lock);
391 376
392 ret = -ENODATA; 377 ret = -ENODATA;
393 if (list_empty(&rx->acceptq)) 378 if (list_empty(&rx->acceptq)) {
394 goto out; 379 write_unlock(&rx->call_lock);
380 _leave(" = -ENODATA");
381 return -ENODATA;
382 }
395 383
396 /* dequeue the first call and check it's still valid */ 384 /* dequeue the first call and check it's still valid */
397 call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link); 385 call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
@@ -407,30 +395,17 @@ int rxrpc_reject_call(struct rxrpc_sock *rx)
407 if (test_and_set_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events)) 395 if (test_and_set_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events))
408 rxrpc_queue_call(call); 396 rxrpc_queue_call(call);
409 ret = 0; 397 ret = 0;
410 goto out_release; 398 break;
411 case RXRPC_CALL_COMPLETE: 399 case RXRPC_CALL_COMPLETE:
412 ret = call->error; 400 ret = call->error;
413 goto out_release; 401 break;
414 case RXRPC_CALL_DEAD:
415 ret = -ETIME;
416 goto out_discard;
417 default: 402 default:
418 BUG(); 403 BUG();
419 } 404 }
420 405
421 /* if the call is already dying or dead, then we leave the socket's ref
422 * on it to be released by rxrpc_dead_call_expired() as induced by
423 * rxrpc_release_call() */
424out_release:
425 _debug("release %p", call);
426 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
427 !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
428 rxrpc_queue_call(call);
429out_discard:
430 write_unlock_bh(&call->state_lock); 406 write_unlock_bh(&call->state_lock);
431 _debug("discard %p", call);
432out:
433 write_unlock(&rx->call_lock); 407 write_unlock(&rx->call_lock);
408 rxrpc_release_call(rx, call);
434 _leave(" = %d", ret); 409 _leave(" = %d", ret);
435 return ret; 410 return ret;
436} 411}
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index fee8b6ddb334..8365d3366114 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -811,8 +811,9 @@ static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
811} 811}
812 812
813/* 813/*
814 * handle background processing of incoming call packets and ACK / abort 814 * Handle background processing of incoming call packets and ACK / abort
815 * generation 815 * generation. A ref on the call is donated to us by whoever queued the work
816 * item.
816 */ 817 */
817void rxrpc_process_call(struct work_struct *work) 818void rxrpc_process_call(struct work_struct *work)
818{ 819{
@@ -827,6 +828,7 @@ void rxrpc_process_call(struct work_struct *work)
827 unsigned long bits; 828 unsigned long bits;
828 __be32 data, pad; 829 __be32 data, pad;
829 size_t len; 830 size_t len;
831 bool requeue = false;
830 int loop, nbit, ioc, ret, mtu; 832 int loop, nbit, ioc, ret, mtu;
831 u32 serial, abort_code = RX_PROTOCOL_ERROR; 833 u32 serial, abort_code = RX_PROTOCOL_ERROR;
832 u8 *acks = NULL; 834 u8 *acks = NULL;
@@ -838,6 +840,11 @@ void rxrpc_process_call(struct work_struct *work)
838 call->debug_id, rxrpc_call_states[call->state], call->events, 840 call->debug_id, rxrpc_call_states[call->state], call->events,
839 (jiffies - call->creation_jif) / (HZ / 10)); 841 (jiffies - call->creation_jif) / (HZ / 10));
840 842
843 if (call->state >= RXRPC_CALL_COMPLETE) {
844 rxrpc_put_call(call, rxrpc_call_put);
845 return;
846 }
847
841 if (!call->conn) 848 if (!call->conn)
842 goto skip_msg_init; 849 goto skip_msg_init;
843 850
@@ -1088,16 +1095,21 @@ skip_msg_init:
1088 spin_lock_bh(&call->lock); 1095 spin_lock_bh(&call->lock);
1089 1096
1090 if (call->state == RXRPC_CALL_SERVER_SECURING) { 1097 if (call->state == RXRPC_CALL_SERVER_SECURING) {
1098 struct rxrpc_sock *rx;
1091 _debug("securing"); 1099 _debug("securing");
1092 write_lock(&call->socket->call_lock); 1100 rcu_read_lock();
1093 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 1101 rx = rcu_dereference(call->socket);
1094 !test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) { 1102 if (rx) {
1095 _debug("not released"); 1103 write_lock(&rx->call_lock);
1096 call->state = RXRPC_CALL_SERVER_ACCEPTING; 1104 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
1097 list_move_tail(&call->accept_link, 1105 _debug("not released");
1098 &call->socket->acceptq); 1106 call->state = RXRPC_CALL_SERVER_ACCEPTING;
1107 list_move_tail(&call->accept_link,
1108 &rx->acceptq);
1109 }
1110 write_unlock(&rx->call_lock);
1099 } 1111 }
1100 write_unlock(&call->socket->call_lock); 1112 rcu_read_unlock();
1101 read_lock(&call->state_lock); 1113 read_lock(&call->state_lock);
1102 if (call->state < RXRPC_CALL_COMPLETE) 1114 if (call->state < RXRPC_CALL_COMPLETE)
1103 set_bit(RXRPC_CALL_EV_POST_ACCEPT, &call->events); 1115 set_bit(RXRPC_CALL_EV_POST_ACCEPT, &call->events);
@@ -1139,11 +1151,6 @@ skip_msg_init:
1139 goto maybe_reschedule; 1151 goto maybe_reschedule;
1140 } 1152 }
1141 1153
1142 if (test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
1143 rxrpc_release_call(call);
1144 clear_bit(RXRPC_CALL_EV_RELEASE, &call->events);
1145 }
1146
1147 /* other events may have been raised since we started checking */ 1154 /* other events may have been raised since we started checking */
1148 goto maybe_reschedule; 1155 goto maybe_reschedule;
1149 1156
@@ -1209,10 +1216,8 @@ send_message_2:
1209 &msg, iov, ioc, len); 1216 &msg, iov, ioc, len);
1210 if (ret < 0) { 1217 if (ret < 0) {
1211 _debug("sendmsg failed: %d", ret); 1218 _debug("sendmsg failed: %d", ret);
1212 read_lock_bh(&call->state_lock); 1219 if (call->state < RXRPC_CALL_COMPLETE)
1213 if (call->state < RXRPC_CALL_DEAD) 1220 requeue = true;
1214 rxrpc_queue_call(call);
1215 read_unlock_bh(&call->state_lock);
1216 goto error; 1221 goto error;
1217 } 1222 }
1218 1223
@@ -1245,41 +1250,22 @@ send_message_2:
1245 1250
1246kill_ACKs: 1251kill_ACKs:
1247 del_timer_sync(&call->ack_timer); 1252 del_timer_sync(&call->ack_timer);
1248 if (test_and_clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events))
1249 rxrpc_put_call(call, rxrpc_call_put);
1250 clear_bit(RXRPC_CALL_EV_ACK, &call->events); 1253 clear_bit(RXRPC_CALL_EV_ACK, &call->events);
1251 1254
1252maybe_reschedule: 1255maybe_reschedule:
1253 if (call->events || !skb_queue_empty(&call->rx_queue)) { 1256 if (call->events || !skb_queue_empty(&call->rx_queue)) {
1254 read_lock_bh(&call->state_lock); 1257 if (call->state < RXRPC_CALL_COMPLETE)
1255 if (call->state < RXRPC_CALL_DEAD) 1258 requeue = true;
1256 rxrpc_queue_call(call);
1257 read_unlock_bh(&call->state_lock);
1258 }
1259
1260 /* don't leave aborted connections on the accept queue */
1261 if (call->state >= RXRPC_CALL_COMPLETE &&
1262 !list_empty(&call->accept_link)) {
1263 _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
1264 call, call->events, call->flags, call->conn->proto.cid);
1265
1266 read_lock_bh(&call->state_lock);
1267 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1268 !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
1269 rxrpc_queue_call(call);
1270 read_unlock_bh(&call->state_lock);
1271 } 1259 }
1272 1260
1273error: 1261error:
1274 kfree(acks); 1262 kfree(acks);
1275 1263
1276 /* because we don't want two CPUs both processing the work item for one 1264 if ((requeue || call->events) && !work_pending(&call->processor)) {
1277 * call at the same time, we use a flag to note when it's busy; however
1278 * this means there's a race between clearing the flag and setting the
1279 * work pending bit and the work item being processed again */
1280 if (call->events && !work_pending(&call->processor)) {
1281 _debug("jumpstart %x", call->conn->proto.cid); 1265 _debug("jumpstart %x", call->conn->proto.cid);
1282 rxrpc_queue_call(call); 1266 __rxrpc_queue_call(call);
1267 } else {
1268 rxrpc_put_call(call, rxrpc_call_put);
1283 } 1269 }
1284 1270
1285 _leave(""); 1271 _leave("");
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 83019e489555..be5733d55794 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -24,11 +24,6 @@
24 */ 24 */
25unsigned int rxrpc_max_call_lifetime = 60 * HZ; 25unsigned int rxrpc_max_call_lifetime = 60 * HZ;
26 26
27/*
28 * Time till dead call expires after last use (in jiffies).
29 */
30unsigned int rxrpc_dead_call_expiry = 2 * HZ;
31
32const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = { 27const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
33 [RXRPC_CALL_UNINITIALISED] = "Uninit ", 28 [RXRPC_CALL_UNINITIALISED] = "Uninit ",
34 [RXRPC_CALL_CLIENT_AWAIT_CONN] = "ClWtConn", 29 [RXRPC_CALL_CLIENT_AWAIT_CONN] = "ClWtConn",
@@ -43,7 +38,6 @@ const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
43 [RXRPC_CALL_SERVER_SEND_REPLY] = "SvSndRpl", 38 [RXRPC_CALL_SERVER_SEND_REPLY] = "SvSndRpl",
44 [RXRPC_CALL_SERVER_AWAIT_ACK] = "SvAwtACK", 39 [RXRPC_CALL_SERVER_AWAIT_ACK] = "SvAwtACK",
45 [RXRPC_CALL_COMPLETE] = "Complete", 40 [RXRPC_CALL_COMPLETE] = "Complete",
46 [RXRPC_CALL_DEAD] = "Dead ",
47}; 41};
48 42
49const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = { 43const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = {
@@ -74,11 +68,10 @@ struct kmem_cache *rxrpc_call_jar;
74LIST_HEAD(rxrpc_calls); 68LIST_HEAD(rxrpc_calls);
75DEFINE_RWLOCK(rxrpc_call_lock); 69DEFINE_RWLOCK(rxrpc_call_lock);
76 70
77static void rxrpc_destroy_call(struct work_struct *work);
78static void rxrpc_call_life_expired(unsigned long _call); 71static void rxrpc_call_life_expired(unsigned long _call);
79static void rxrpc_dead_call_expired(unsigned long _call);
80static void rxrpc_ack_time_expired(unsigned long _call); 72static void rxrpc_ack_time_expired(unsigned long _call);
81static void rxrpc_resend_time_expired(unsigned long _call); 73static void rxrpc_resend_time_expired(unsigned long _call);
74static void rxrpc_cleanup_call(struct rxrpc_call *call);
82 75
83/* 76/*
84 * find an extant server call 77 * find an extant server call
@@ -138,13 +131,10 @@ static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
138 131
139 setup_timer(&call->lifetimer, &rxrpc_call_life_expired, 132 setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
140 (unsigned long) call); 133 (unsigned long) call);
141 setup_timer(&call->deadspan, &rxrpc_dead_call_expired,
142 (unsigned long) call);
143 setup_timer(&call->ack_timer, &rxrpc_ack_time_expired, 134 setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
144 (unsigned long) call); 135 (unsigned long) call);
145 setup_timer(&call->resend_timer, &rxrpc_resend_time_expired, 136 setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
146 (unsigned long) call); 137 (unsigned long) call);
147 INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
148 INIT_WORK(&call->processor, &rxrpc_process_call); 138 INIT_WORK(&call->processor, &rxrpc_process_call);
149 INIT_LIST_HEAD(&call->link); 139 INIT_LIST_HEAD(&call->link);
150 INIT_LIST_HEAD(&call->chan_wait_link); 140 INIT_LIST_HEAD(&call->chan_wait_link);
@@ -185,11 +175,9 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
185 if (!call) 175 if (!call)
186 return ERR_PTR(-ENOMEM); 176 return ERR_PTR(-ENOMEM);
187 call->state = RXRPC_CALL_CLIENT_AWAIT_CONN; 177 call->state = RXRPC_CALL_CLIENT_AWAIT_CONN;
188
189 sock_hold(&rx->sk);
190 call->socket = rx;
191 call->rx_data_post = 1; 178 call->rx_data_post = 1;
192 call->service_id = srx->srx_service; 179 call->service_id = srx->srx_service;
180 rcu_assign_pointer(call->socket, rx);
193 181
194 _leave(" = %p", call); 182 _leave(" = %p", call);
195 return call; 183 return call;
@@ -244,8 +232,9 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
244 return call; 232 return call;
245 } 233 }
246 234
247 trace_rxrpc_call(call, 0, atomic_read(&call->usage), 0, here, 235 trace_rxrpc_call(call, rxrpc_call_new_client,
248 (const void *)user_call_ID); 236 atomic_read(&call->usage), 0,
237 here, (const void *)user_call_ID);
249 238
250 /* Publish the call, even though it is incompletely set up as yet */ 239 /* Publish the call, even though it is incompletely set up as yet */
251 call->user_call_ID = user_call_ID; 240 call->user_call_ID = user_call_ID;
@@ -295,8 +284,10 @@ error:
295 list_del_init(&call->link); 284 list_del_init(&call->link);
296 write_unlock_bh(&rxrpc_call_lock); 285 write_unlock_bh(&rxrpc_call_lock);
297 286
287error_out:
288 __rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
289 RX_CALL_DEAD, ret);
298 set_bit(RXRPC_CALL_RELEASED, &call->flags); 290 set_bit(RXRPC_CALL_RELEASED, &call->flags);
299 call->state = RXRPC_CALL_DEAD;
300 rxrpc_put_call(call, rxrpc_call_put); 291 rxrpc_put_call(call, rxrpc_call_put);
301 _leave(" = %d", ret); 292 _leave(" = %d", ret);
302 return ERR_PTR(ret); 293 return ERR_PTR(ret);
@@ -308,11 +299,8 @@ error:
308 */ 299 */
309found_user_ID_now_present: 300found_user_ID_now_present:
310 write_unlock(&rx->call_lock); 301 write_unlock(&rx->call_lock);
311 set_bit(RXRPC_CALL_RELEASED, &call->flags); 302 ret = -EEXIST;
312 call->state = RXRPC_CALL_DEAD; 303 goto error_out;
313 rxrpc_put_call(call, rxrpc_call_put);
314 _leave(" = -EEXIST [%p]", call);
315 return ERR_PTR(-EEXIST);
316} 304}
317 305
318/* 306/*
@@ -340,7 +328,6 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
340 atomic_read(&candidate->usage), 0, here, NULL); 328 atomic_read(&candidate->usage), 0, here, NULL);
341 329
342 chan = sp->hdr.cid & RXRPC_CHANNELMASK; 330 chan = sp->hdr.cid & RXRPC_CHANNELMASK;
343 candidate->socket = rx;
344 candidate->conn = conn; 331 candidate->conn = conn;
345 candidate->peer = conn->params.peer; 332 candidate->peer = conn->params.peer;
346 candidate->cid = sp->hdr.cid; 333 candidate->cid = sp->hdr.cid;
@@ -351,6 +338,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
351 candidate->flags |= (1 << RXRPC_CALL_IS_SERVICE); 338 candidate->flags |= (1 << RXRPC_CALL_IS_SERVICE);
352 if (conn->security_ix > 0) 339 if (conn->security_ix > 0)
353 candidate->state = RXRPC_CALL_SERVER_SECURING; 340 candidate->state = RXRPC_CALL_SERVER_SECURING;
341 rcu_assign_pointer(candidate->socket, rx);
354 342
355 spin_lock(&conn->channel_lock); 343 spin_lock(&conn->channel_lock);
356 344
@@ -411,7 +399,6 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
411 candidate = NULL; 399 candidate = NULL;
412 conn->channels[chan].call_counter = call_id; 400 conn->channels[chan].call_counter = call_id;
413 rcu_assign_pointer(conn->channels[chan].call, call); 401 rcu_assign_pointer(conn->channels[chan].call, call);
414 sock_hold(&rx->sk);
415 rxrpc_get_connection(conn); 402 rxrpc_get_connection(conn);
416 rxrpc_get_peer(call->peer); 403 rxrpc_get_peer(call->peer);
417 spin_unlock(&conn->channel_lock); 404 spin_unlock(&conn->channel_lock);
@@ -453,6 +440,39 @@ old_call:
453} 440}
454 441
455/* 442/*
443 * Queue a call's work processor, getting a ref to pass to the work queue.
444 */
445bool rxrpc_queue_call(struct rxrpc_call *call)
446{
447 const void *here = __builtin_return_address(0);
448 int n = __atomic_add_unless(&call->usage, 1, 0);
449 int m = atomic_read(&call->skb_count);
450 if (n == 0)
451 return false;
452 if (rxrpc_queue_work(&call->processor))
453 trace_rxrpc_call(call, rxrpc_call_queued, n + 1, m, here, NULL);
454 else
455 rxrpc_put_call(call, rxrpc_call_put_noqueue);
456 return true;
457}
458
459/*
460 * Queue a call's work processor, passing the callers ref to the work queue.
461 */
462bool __rxrpc_queue_call(struct rxrpc_call *call)
463{
464 const void *here = __builtin_return_address(0);
465 int n = atomic_read(&call->usage);
466 int m = atomic_read(&call->skb_count);
467 ASSERTCMP(n, >=, 1);
468 if (rxrpc_queue_work(&call->processor))
469 trace_rxrpc_call(call, rxrpc_call_queued_ref, n, m, here, NULL);
470 else
471 rxrpc_put_call(call, rxrpc_call_put_noqueue);
472 return true;
473}
474
475/*
456 * Note the re-emergence of a call. 476 * Note the re-emergence of a call.
457 */ 477 */
458void rxrpc_see_call(struct rxrpc_call *call) 478void rxrpc_see_call(struct rxrpc_call *call)
@@ -493,11 +513,8 @@ void rxrpc_get_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
493/* 513/*
494 * detach a call from a socket and set up for release 514 * detach a call from a socket and set up for release
495 */ 515 */
496void rxrpc_release_call(struct rxrpc_call *call) 516void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
497{ 517{
498 struct rxrpc_connection *conn = call->conn;
499 struct rxrpc_sock *rx = call->socket;
500
501 _enter("{%d,%d,%d,%d}", 518 _enter("{%d,%d,%d,%d}",
502 call->debug_id, atomic_read(&call->usage), 519 call->debug_id, atomic_read(&call->usage),
503 atomic_read(&call->ackr_not_idle), 520 atomic_read(&call->ackr_not_idle),
@@ -513,7 +530,7 @@ void rxrpc_release_call(struct rxrpc_call *call)
513 /* dissociate from the socket 530 /* dissociate from the socket
514 * - the socket's ref on the call is passed to the death timer 531 * - the socket's ref on the call is passed to the death timer
515 */ 532 */
516 _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn); 533 _debug("RELEASE CALL %p (%d)", call, call->debug_id);
517 534
518 if (call->peer) { 535 if (call->peer) {
519 spin_lock(&call->peer->lock); 536 spin_lock(&call->peer->lock);
@@ -532,20 +549,30 @@ void rxrpc_release_call(struct rxrpc_call *call)
532 rb_erase(&call->sock_node, &rx->calls); 549 rb_erase(&call->sock_node, &rx->calls);
533 memset(&call->sock_node, 0xdd, sizeof(call->sock_node)); 550 memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
534 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags); 551 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
552 rxrpc_put_call(call, rxrpc_call_put_userid);
535 } 553 }
536 write_unlock_bh(&rx->call_lock); 554 write_unlock_bh(&rx->call_lock);
537 555
538 /* free up the channel for reuse */ 556 /* free up the channel for reuse */
539 write_lock_bh(&call->state_lock); 557 if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK) {
558 clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
559 rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ACK);
560 rxrpc_call_completed(call);
561 } else {
562 write_lock_bh(&call->state_lock);
563
564 if (call->state < RXRPC_CALL_COMPLETE) {
565 _debug("+++ ABORTING STATE %d +++\n", call->state);
566 __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
567 clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
568 rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ABORT);
569 }
540 570
541 if (call->state < RXRPC_CALL_COMPLETE && 571 write_unlock_bh(&call->state_lock);
542 call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
543 _debug("+++ ABORTING STATE %d +++\n", call->state);
544 __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
545 } 572 }
546 write_unlock_bh(&call->state_lock);
547 573
548 rxrpc_disconnect_call(call); 574 if (call->conn)
575 rxrpc_disconnect_call(call);
549 576
550 /* clean up the Rx queue */ 577 /* clean up the Rx queue */
551 if (!skb_queue_empty(&call->rx_queue) || 578 if (!skb_queue_empty(&call->rx_queue) ||
@@ -569,53 +596,16 @@ void rxrpc_release_call(struct rxrpc_call *call)
569 } 596 }
570 spin_unlock_bh(&call->lock); 597 spin_unlock_bh(&call->lock);
571 } 598 }
599 rxrpc_purge_queue(&call->knlrecv_queue);
572 600
573 del_timer_sync(&call->resend_timer); 601 del_timer_sync(&call->resend_timer);
574 del_timer_sync(&call->ack_timer); 602 del_timer_sync(&call->ack_timer);
575 del_timer_sync(&call->lifetimer); 603 del_timer_sync(&call->lifetimer);
576 call->deadspan.expires = jiffies + rxrpc_dead_call_expiry;
577 add_timer(&call->deadspan);
578 604
579 _leave(""); 605 _leave("");
580} 606}
581 607
582/* 608/*
583 * handle a dead call being ready for reaping
584 */
585static void rxrpc_dead_call_expired(unsigned long _call)
586{
587 struct rxrpc_call *call = (struct rxrpc_call *) _call;
588
589 _enter("{%d}", call->debug_id);
590
591 rxrpc_see_call(call);
592 write_lock_bh(&call->state_lock);
593 call->state = RXRPC_CALL_DEAD;
594 write_unlock_bh(&call->state_lock);
595 rxrpc_put_call(call, rxrpc_call_put);
596}
597
598/*
599 * mark a call as to be released, aborting it if it's still in progress
600 * - called with softirqs disabled
601 */
602static void rxrpc_mark_call_released(struct rxrpc_call *call)
603{
604 bool sched = false;
605
606 rxrpc_see_call(call);
607 write_lock(&call->state_lock);
608 if (call->state < RXRPC_CALL_DEAD) {
609 sched = __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
610 if (!test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
611 sched = true;
612 }
613 write_unlock(&call->state_lock);
614 if (sched)
615 rxrpc_queue_call(call);
616}
617
618/*
619 * release all the calls associated with a socket 609 * release all the calls associated with a socket
620 */ 610 */
621void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx) 611void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
@@ -629,17 +619,17 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
629 619
630 /* kill the not-yet-accepted incoming calls */ 620 /* kill the not-yet-accepted incoming calls */
631 list_for_each_entry(call, &rx->secureq, accept_link) { 621 list_for_each_entry(call, &rx->secureq, accept_link) {
632 rxrpc_mark_call_released(call); 622 rxrpc_release_call(rx, call);
633 } 623 }
634 624
635 list_for_each_entry(call, &rx->acceptq, accept_link) { 625 list_for_each_entry(call, &rx->acceptq, accept_link) {
636 rxrpc_mark_call_released(call); 626 rxrpc_release_call(rx, call);
637 } 627 }
638 628
639 /* mark all the calls as no longer wanting incoming packets */ 629 /* mark all the calls as no longer wanting incoming packets */
640 for (p = rb_first(&rx->calls); p; p = rb_next(p)) { 630 for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
641 call = rb_entry(p, struct rxrpc_call, sock_node); 631 call = rb_entry(p, struct rxrpc_call, sock_node);
642 rxrpc_mark_call_released(call); 632 rxrpc_release_call(rx, call);
643 } 633 }
644 634
645 read_unlock_bh(&rx->call_lock); 635 read_unlock_bh(&rx->call_lock);
@@ -663,8 +653,7 @@ void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
663 if (n == 0) { 653 if (n == 0) {
664 _debug("call %d dead", call->debug_id); 654 _debug("call %d dead", call->debug_id);
665 WARN_ON(m != 0); 655 WARN_ON(m != 0);
666 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD); 656 rxrpc_cleanup_call(call);
667 rxrpc_queue_work(&call->destroyer);
668 } 657 }
669} 658}
670 659
@@ -683,8 +672,7 @@ void rxrpc_put_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
683 if (n == 0) { 672 if (n == 0) {
684 _debug("call %d dead", call->debug_id); 673 _debug("call %d dead", call->debug_id);
685 WARN_ON(m != 0); 674 WARN_ON(m != 0);
686 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD); 675 rxrpc_cleanup_call(call);
687 rxrpc_queue_work(&call->destroyer);
688 } 676 }
689} 677}
690 678
@@ -708,23 +696,19 @@ static void rxrpc_cleanup_call(struct rxrpc_call *call)
708{ 696{
709 _net("DESTROY CALL %d", call->debug_id); 697 _net("DESTROY CALL %d", call->debug_id);
710 698
711 ASSERT(call->socket); 699 write_lock_bh(&rxrpc_call_lock);
700 list_del_init(&call->link);
701 write_unlock_bh(&rxrpc_call_lock);
712 702
713 memset(&call->sock_node, 0xcd, sizeof(call->sock_node)); 703 memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
714 704
715 del_timer_sync(&call->lifetimer); 705 del_timer_sync(&call->lifetimer);
716 del_timer_sync(&call->deadspan);
717 del_timer_sync(&call->ack_timer); 706 del_timer_sync(&call->ack_timer);
718 del_timer_sync(&call->resend_timer); 707 del_timer_sync(&call->resend_timer);
719 708
709 ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
720 ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags)); 710 ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
721 ASSERTCMP(call->events, ==, 0); 711 ASSERT(!work_pending(&call->processor));
722 if (work_pending(&call->processor)) {
723 _debug("defer destroy");
724 rxrpc_queue_work(&call->destroyer);
725 return;
726 }
727
728 ASSERTCMP(call->conn, ==, NULL); 712 ASSERTCMP(call->conn, ==, NULL);
729 713
730 if (call->acks_window) { 714 if (call->acks_window) {
@@ -753,40 +737,21 @@ static void rxrpc_cleanup_call(struct rxrpc_call *call)
753 rxrpc_purge_queue(&call->rx_queue); 737 rxrpc_purge_queue(&call->rx_queue);
754 ASSERT(skb_queue_empty(&call->rx_oos_queue)); 738 ASSERT(skb_queue_empty(&call->rx_oos_queue));
755 rxrpc_purge_queue(&call->knlrecv_queue); 739 rxrpc_purge_queue(&call->knlrecv_queue);
756 sock_put(&call->socket->sk);
757 call_rcu(&call->rcu, rxrpc_rcu_destroy_call); 740 call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
758} 741}
759 742
760/* 743/*
761 * destroy a call 744 * Make sure that all calls are gone.
762 */
763static void rxrpc_destroy_call(struct work_struct *work)
764{
765 struct rxrpc_call *call =
766 container_of(work, struct rxrpc_call, destroyer);
767
768 _enter("%p{%d,%x,%p}",
769 call, atomic_read(&call->usage), call->cid, call->conn);
770
771 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
772
773 write_lock_bh(&rxrpc_call_lock);
774 list_del_init(&call->link);
775 write_unlock_bh(&rxrpc_call_lock);
776
777 rxrpc_cleanup_call(call);
778 _leave("");
779}
780
781/*
782 * preemptively destroy all the call records from a transport endpoint rather
783 * than waiting for them to time out
784 */ 745 */
785void __exit rxrpc_destroy_all_calls(void) 746void __exit rxrpc_destroy_all_calls(void)
786{ 747{
787 struct rxrpc_call *call; 748 struct rxrpc_call *call;
788 749
789 _enter(""); 750 _enter("");
751
752 if (list_empty(&rxrpc_calls))
753 return;
754
790 write_lock_bh(&rxrpc_call_lock); 755 write_lock_bh(&rxrpc_call_lock);
791 756
792 while (!list_empty(&rxrpc_calls)) { 757 while (!list_empty(&rxrpc_calls)) {
@@ -796,28 +761,15 @@ void __exit rxrpc_destroy_all_calls(void)
796 rxrpc_see_call(call); 761 rxrpc_see_call(call);
797 list_del_init(&call->link); 762 list_del_init(&call->link);
798 763
799 switch (atomic_read(&call->usage)) { 764 pr_err("Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
800 case 0: 765 call, atomic_read(&call->usage),
801 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD); 766 atomic_read(&call->ackr_not_idle),
802 break; 767 rxrpc_call_states[call->state],
803 case 1: 768 call->flags, call->events);
804 if (del_timer_sync(&call->deadspan) != 0 && 769 if (!skb_queue_empty(&call->rx_queue))
805 call->state != RXRPC_CALL_DEAD) 770 pr_err("Rx queue occupied\n");
806 rxrpc_dead_call_expired((unsigned long) call); 771 if (!skb_queue_empty(&call->rx_oos_queue))
807 if (call->state != RXRPC_CALL_DEAD) 772 pr_err("OOS queue occupied\n");
808 break;
809 default:
810 pr_err("Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
811 call, atomic_read(&call->usage),
812 atomic_read(&call->ackr_not_idle),
813 rxrpc_call_states[call->state],
814 call->flags, call->events);
815 if (!skb_queue_empty(&call->rx_queue))
816 pr_err("Rx queue occupied\n");
817 if (!skb_queue_empty(&call->rx_oos_queue))
818 pr_err("OOS queue occupied\n");
819 break;
820 }
821 773
822 write_unlock_bh(&rxrpc_call_lock); 774 write_unlock_bh(&rxrpc_call_lock);
823 cond_resched(); 775 cond_resched();
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 8267f42a7753..79f3f585cdc3 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -39,7 +39,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
39 bool force, bool terminal) 39 bool force, bool terminal)
40{ 40{
41 struct rxrpc_skb_priv *sp; 41 struct rxrpc_skb_priv *sp;
42 struct rxrpc_sock *rx = call->socket; 42 struct rxrpc_sock *rx;
43 struct sock *sk; 43 struct sock *sk;
44 int ret; 44 int ret;
45 45
@@ -59,7 +59,15 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
59 return 0; 59 return 0;
60 } 60 }
61 61
62 /* The socket may go away under us */
63 ret = 0;
64 rcu_read_lock();
65 rx = rcu_dereference(call->socket);
66 if (!rx)
67 goto out;
62 sk = &rx->sk; 68 sk = &rx->sk;
69 if (sock_flag(sk, SOCK_DEAD))
70 goto out;
63 71
64 if (!force) { 72 if (!force) {
65 /* cast skb->rcvbuf to unsigned... It's pointless, but 73 /* cast skb->rcvbuf to unsigned... It's pointless, but
@@ -78,7 +86,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
78 spin_lock_bh(&sk->sk_receive_queue.lock); 86 spin_lock_bh(&sk->sk_receive_queue.lock);
79 if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags) && 87 if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags) &&
80 !test_bit(RXRPC_CALL_RELEASED, &call->flags) && 88 !test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
81 call->socket->sk.sk_state != RXRPC_CLOSE) { 89 sk->sk_state != RXRPC_CLOSE) {
82 skb->destructor = rxrpc_packet_destructor; 90 skb->destructor = rxrpc_packet_destructor;
83 skb->dev = NULL; 91 skb->dev = NULL;
84 skb->sk = sk; 92 skb->sk = sk;
@@ -104,8 +112,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
104 __skb_queue_tail(&sk->sk_receive_queue, skb); 112 __skb_queue_tail(&sk->sk_receive_queue, skb);
105 spin_unlock_bh(&sk->sk_receive_queue.lock); 113 spin_unlock_bh(&sk->sk_receive_queue.lock);
106 114
107 if (!sock_flag(sk, SOCK_DEAD)) 115 sk->sk_data_ready(sk);
108 sk->sk_data_ready(sk);
109 } 116 }
110 skb = NULL; 117 skb = NULL;
111 } else { 118 } else {
@@ -115,6 +122,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
115 122
116out: 123out:
117 rxrpc_free_skb(skb); 124 rxrpc_free_skb(skb);
125 rcu_read_unlock();
118 126
119 _leave(" = %d", ret); 127 _leave(" = %d", ret);
120 return ret; 128 return ret;
@@ -266,7 +274,7 @@ enqueue_packet:
266 skb_queue_tail(&call->rx_queue, skb); 274 skb_queue_tail(&call->rx_queue, skb);
267 atomic_inc(&call->ackr_not_idle); 275 atomic_inc(&call->ackr_not_idle);
268 read_lock(&call->state_lock); 276 read_lock(&call->state_lock);
269 if (call->state < RXRPC_CALL_DEAD) 277 if (call->state < RXRPC_CALL_COMPLETE)
270 rxrpc_queue_call(call); 278 rxrpc_queue_call(call);
271 read_unlock(&call->state_lock); 279 read_unlock(&call->state_lock);
272 _leave(" = 0 [queued]"); 280 _leave(" = 0 [queued]");
@@ -408,7 +416,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
408 case RXRPC_PACKET_TYPE_ACK: 416 case RXRPC_PACKET_TYPE_ACK:
409 /* ACK processing is done in process context */ 417 /* ACK processing is done in process context */
410 read_lock_bh(&call->state_lock); 418 read_lock_bh(&call->state_lock);
411 if (call->state < RXRPC_CALL_DEAD) { 419 if (call->state < RXRPC_CALL_COMPLETE) {
412 skb_queue_tail(&call->rx_queue, skb); 420 skb_queue_tail(&call->rx_queue, skb);
413 rxrpc_queue_call(call); 421 rxrpc_queue_call(call);
414 skb = NULL; 422 skb = NULL;
@@ -511,9 +519,6 @@ static void rxrpc_post_packet_to_call(struct rxrpc_connection *conn,
511 519
512 read_lock(&call->state_lock); 520 read_lock(&call->state_lock);
513 switch (call->state) { 521 switch (call->state) {
514 case RXRPC_CALL_DEAD:
515 goto dead_call;
516
517 case RXRPC_CALL_COMPLETE: 522 case RXRPC_CALL_COMPLETE:
518 switch (call->completion) { 523 switch (call->completion) {
519 case RXRPC_CALL_LOCALLY_ABORTED: 524 case RXRPC_CALL_LOCALLY_ABORTED:
@@ -538,7 +543,6 @@ static void rxrpc_post_packet_to_call(struct rxrpc_connection *conn,
538 } 543 }
539 544
540 read_unlock(&call->state_lock); 545 read_unlock(&call->state_lock);
541 rxrpc_get_call(call, rxrpc_call_got);
542 546
543 if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA && 547 if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
544 sp->hdr.flags & RXRPC_JUMBO_PACKET) 548 sp->hdr.flags & RXRPC_JUMBO_PACKET)
@@ -546,12 +550,10 @@ static void rxrpc_post_packet_to_call(struct rxrpc_connection *conn,
546 else 550 else
547 rxrpc_fast_process_packet(call, skb); 551 rxrpc_fast_process_packet(call, skb);
548 552
549 rxrpc_put_call(call, rxrpc_call_put);
550 goto done; 553 goto done;
551 554
552resend_final_ack: 555resend_final_ack:
553 _debug("final ack again"); 556 _debug("final ack again");
554 rxrpc_get_call(call, rxrpc_call_got);
555 set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events); 557 set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
556 rxrpc_queue_call(call); 558 rxrpc_queue_call(call);
557 goto free_unlock; 559 goto free_unlock;
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 5b5508f6fc2a..8756d74fd74b 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -19,6 +19,151 @@
19#include <net/af_rxrpc.h> 19#include <net/af_rxrpc.h>
20#include "ar-internal.h" 20#include "ar-internal.h"
21 21
22struct rxrpc_pkt_buffer {
23 struct rxrpc_wire_header whdr;
24 union {
25 struct {
26 struct rxrpc_ackpacket ack;
27 u8 acks[255];
28 u8 pad[3];
29 };
30 __be32 abort_code;
31 };
32 struct rxrpc_ackinfo ackinfo;
33};
34
35/*
36 * Fill out an ACK packet.
37 */
38static size_t rxrpc_fill_out_ack(struct rxrpc_call *call,
39 struct rxrpc_pkt_buffer *pkt)
40{
41 u32 mtu, jmax;
42 u8 *ackp = pkt->acks;
43
44 pkt->ack.bufferSpace = htons(8);
45 pkt->ack.maxSkew = htons(0);
46 pkt->ack.firstPacket = htonl(call->rx_data_eaten + 1);
47 pkt->ack.previousPacket = htonl(call->ackr_prev_seq);
48 pkt->ack.serial = htonl(call->ackr_serial);
49 pkt->ack.reason = RXRPC_ACK_IDLE;
50 pkt->ack.nAcks = 0;
51
52 mtu = call->peer->if_mtu;
53 mtu -= call->peer->hdrsize;
54 jmax = rxrpc_rx_jumbo_max;
55 pkt->ackinfo.rxMTU = htonl(rxrpc_rx_mtu);
56 pkt->ackinfo.maxMTU = htonl(mtu);
57 pkt->ackinfo.rwind = htonl(rxrpc_rx_window_size);
58 pkt->ackinfo.jumbo_max = htonl(jmax);
59
60 *ackp++ = 0;
61 *ackp++ = 0;
62 *ackp++ = 0;
63 return 3;
64}
65
66/*
67 * Send a final ACK or ABORT call packet.
68 */
69int rxrpc_send_call_packet(struct rxrpc_call *call, u8 type)
70{
71 struct rxrpc_connection *conn = NULL;
72 struct rxrpc_pkt_buffer *pkt;
73 struct msghdr msg;
74 struct kvec iov[2];
75 rxrpc_serial_t serial;
76 size_t len, n;
77 int ioc, ret;
78 u32 abort_code;
79
80 _enter("%u,%s", call->debug_id, rxrpc_pkts[type]);
81
82 spin_lock_bh(&call->lock);
83 if (call->conn)
84 conn = rxrpc_get_connection_maybe(call->conn);
85 spin_unlock_bh(&call->lock);
86 if (!conn)
87 return -ECONNRESET;
88
89 pkt = kzalloc(sizeof(*pkt), GFP_KERNEL);
90 if (!pkt) {
91 rxrpc_put_connection(conn);
92 return -ENOMEM;
93 }
94
95 serial = atomic_inc_return(&conn->serial);
96
97 msg.msg_name = &call->peer->srx.transport;
98 msg.msg_namelen = call->peer->srx.transport_len;
99 msg.msg_control = NULL;
100 msg.msg_controllen = 0;
101 msg.msg_flags = 0;
102
103 pkt->whdr.epoch = htonl(conn->proto.epoch);
104 pkt->whdr.cid = htonl(call->cid);
105 pkt->whdr.callNumber = htonl(call->call_id);
106 pkt->whdr.seq = 0;
107 pkt->whdr.serial = htonl(serial);
108 pkt->whdr.type = type;
109 pkt->whdr.flags = conn->out_clientflag;
110 pkt->whdr.userStatus = 0;
111 pkt->whdr.securityIndex = call->security_ix;
112 pkt->whdr._rsvd = 0;
113 pkt->whdr.serviceId = htons(call->service_id);
114
115 iov[0].iov_base = pkt;
116 iov[0].iov_len = sizeof(pkt->whdr);
117 len = sizeof(pkt->whdr);
118
119 switch (type) {
120 case RXRPC_PACKET_TYPE_ACK:
121 spin_lock_bh(&call->lock);
122 n = rxrpc_fill_out_ack(call, pkt);
123 call->ackr_reason = 0;
124
125 spin_unlock_bh(&call->lock);
126
127 _proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
128 serial,
129 ntohs(pkt->ack.maxSkew),
130 ntohl(pkt->ack.firstPacket),
131 ntohl(pkt->ack.previousPacket),
132 ntohl(pkt->ack.serial),
133 rxrpc_acks(pkt->ack.reason),
134 pkt->ack.nAcks);
135
136 iov[0].iov_len += sizeof(pkt->ack) + n;
137 iov[1].iov_base = &pkt->ackinfo;
138 iov[1].iov_len = sizeof(pkt->ackinfo);
139 len += sizeof(pkt->ack) + n + sizeof(pkt->ackinfo);
140 ioc = 2;
141 break;
142
143 case RXRPC_PACKET_TYPE_ABORT:
144 abort_code = call->abort_code;
145 pkt->abort_code = htonl(abort_code);
146 _proto("Tx ABORT %%%u { %d }", serial, abort_code);
147 iov[0].iov_len += sizeof(pkt->abort_code);
148 len += sizeof(pkt->abort_code);
149 ioc = 1;
150 break;
151
152 default:
153 BUG();
154 ret = -ENOANO;
155 goto out;
156 }
157
158 ret = kernel_sendmsg(conn->params.local->socket,
159 &msg, iov, ioc, len);
160
161out:
162 rxrpc_put_connection(conn);
163 kfree(pkt);
164 return ret;
165}
166
22/* 167/*
23 * send a packet through the transport endpoint 168 * send a packet through the transport endpoint
24 */ 169 */
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index 82c64055449d..dfad23821a62 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -29,6 +29,7 @@ static const char *const rxrpc_conn_states[RXRPC_CONN__NR_STATES] = {
29 */ 29 */
30static void *rxrpc_call_seq_start(struct seq_file *seq, loff_t *_pos) 30static void *rxrpc_call_seq_start(struct seq_file *seq, loff_t *_pos)
31{ 31{
32 rcu_read_lock();
32 read_lock(&rxrpc_call_lock); 33 read_lock(&rxrpc_call_lock);
33 return seq_list_start_head(&rxrpc_calls, *_pos); 34 return seq_list_start_head(&rxrpc_calls, *_pos);
34} 35}
@@ -41,6 +42,7 @@ static void *rxrpc_call_seq_next(struct seq_file *seq, void *v, loff_t *pos)
41static void rxrpc_call_seq_stop(struct seq_file *seq, void *v) 42static void rxrpc_call_seq_stop(struct seq_file *seq, void *v)
42{ 43{
43 read_unlock(&rxrpc_call_lock); 44 read_unlock(&rxrpc_call_lock);
45 rcu_read_unlock();
44} 46}
45 47
46static int rxrpc_call_seq_show(struct seq_file *seq, void *v) 48static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
@@ -61,7 +63,7 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
61 63
62 call = list_entry(v, struct rxrpc_call, link); 64 call = list_entry(v, struct rxrpc_call, link);
63 65
64 rx = READ_ONCE(call->socket); 66 rx = rcu_dereference(call->socket);
65 if (rx) { 67 if (rx) {
66 local = READ_ONCE(rx->local); 68 local = READ_ONCE(rx->local);
67 if (local) 69 if (local)
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 97f8ee76c67c..6876ffb3b410 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -19,28 +19,6 @@
19#include "ar-internal.h" 19#include "ar-internal.h"
20 20
21/* 21/*
22 * removal a call's user ID from the socket tree to make the user ID available
23 * again and so that it won't be seen again in association with that call
24 */
25void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
26{
27 _debug("RELEASE CALL %d", call->debug_id);
28
29 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
30 write_lock_bh(&rx->call_lock);
31 rb_erase(&call->sock_node, &call->socket->calls);
32 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
33 write_unlock_bh(&rx->call_lock);
34 }
35
36 read_lock_bh(&call->state_lock);
37 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
38 !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
39 rxrpc_queue_call(call);
40 read_unlock_bh(&call->state_lock);
41}
42
43/*
44 * receive a message from an RxRPC socket 22 * receive a message from an RxRPC socket
45 * - we need to be careful about two or more threads calling recvmsg 23 * - we need to be careful about two or more threads calling recvmsg
46 * simultaneously 24 * simultaneously
@@ -338,7 +316,7 @@ terminal_message:
338 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 316 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
339 BUG(); 317 BUG();
340 rxrpc_free_skb(skb); 318 rxrpc_free_skb(skb);
341 rxrpc_remove_user_ID(rx, call); 319 rxrpc_release_call(rx, call);
342 } 320 }
343 321
344 release_sock(&rx->sk); 322 release_sock(&rx->sk);
diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c
index c0613ab6d2d5..9b8f8456d3bf 100644
--- a/net/rxrpc/skbuff.c
+++ b/net/rxrpc/skbuff.c
@@ -33,9 +33,6 @@ static void rxrpc_request_final_ACK(struct rxrpc_call *call)
33 call->state = RXRPC_CALL_CLIENT_FINAL_ACK; 33 call->state = RXRPC_CALL_CLIENT_FINAL_ACK;
34 _debug("request final ACK"); 34 _debug("request final ACK");
35 35
36 /* get an extra ref on the call for the final-ACK generator to
37 * release */
38 rxrpc_get_call(call, rxrpc_call_got);
39 set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events); 36 set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
40 if (try_to_del_timer_sync(&call->ack_timer) >= 0) 37 if (try_to_del_timer_sync(&call->ack_timer) >= 0)
41 rxrpc_queue_call(call); 38 rxrpc_queue_call(call);
diff --git a/net/rxrpc/sysctl.c b/net/rxrpc/sysctl.c
index dc380af8a81e..b7ca8cf13c84 100644
--- a/net/rxrpc/sysctl.c
+++ b/net/rxrpc/sysctl.c
@@ -88,14 +88,6 @@ static struct ctl_table rxrpc_sysctl_table[] = {
88 .proc_handler = proc_dointvec_jiffies, 88 .proc_handler = proc_dointvec_jiffies,
89 .extra1 = (void *)&one, 89 .extra1 = (void *)&one,
90 }, 90 },
91 {
92 .procname = "dead_call_expiry",
93 .data = &rxrpc_dead_call_expiry,
94 .maxlen = sizeof(unsigned int),
95 .mode = 0644,
96 .proc_handler = proc_dointvec_jiffies,
97 .extra1 = (void *)&one,
98 },
99 91
100 /* Non-time values */ 92 /* Non-time values */
101 { 93 {