aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2018-10-08 10:46:25 -0400
committerDavid Howells <dhowells@redhat.com>2018-10-08 17:42:04 -0400
commitc1e15b4944c9fa7fbbb74f7a5920a1e31b4b965a (patch)
treeedb403ae1626e2149adbfac364530b4ced20cc2e /net
parent4e2abd3c051830a0d4189340fe79f2549bdf36de (diff)
rxrpc: Fix the packet reception routine
The rxrpc_input_packet() function and its call tree was built around the assumption that data_ready() handler called from UDP to inform a kernel service that there is data to be had was non-reentrant. This means that certain locking could be dispensed with. This, however, turns out not to be the case with a multi-queue network card that can deliver packets to multiple cpus simultaneously. Each of those cpus can be in the rxrpc_input_packet() function at the same time. Fix by adding or changing some structure members: (1) Add peer->rtt_input_lock to serialise access to the RTT buffer. (2) Make conn->service_id into a 32-bit variable so that it can be cmpxchg'd on all arches. (3) Add call->input_lock to serialise access to the Rx/Tx state. Note that although the Rx and Tx states are (almost) entirely separate, there's no point completing the separation and having separate locks since it's a bi-phasal RPC protocol rather than a bi-direction streaming protocol. Data transmission and data reception do not take place simultaneously on any particular call. and making the following functional changes: (1) In rxrpc_input_data(), hold call->input_lock around the core to prevent simultaneous producing of packets into the Rx ring and updating of tracking state for a particular call. (2) In rxrpc_input_ping_response(), only read call->ping_serial once, and check it before checking RXRPC_CALL_PINGING as that's a cheaper test. The bit test and bit clear can then be combined. No further locking is needed here. (3) In rxrpc_input_ack(), take call->input_lock after we've parsed much of the ACK packet. The superseded ACK check is then done both before and after the lock is taken. The handing of ackinfo data is split, parsing before the lock is taken and processing with it held. This is keyed on rxMTU being non-zero. Congestion management is also done within the locked section. (4) In rxrpc_input_ackall(), take call->input_lock around the Tx window rotation. The ACKALL packet carries no information and is only really useful after all packets have been transmitted since it's imprecise. (5) In rxrpc_input_implicit_end_call(), we use rx->incoming_lock to prevent calls being simultaneously implicitly ended on two cpus and also to prevent any races with incoming call setup. (6) In rxrpc_input_packet(), use cmpxchg() to effect the service upgrade on a connection. It is only permitted to happen once for a connection. (7) In rxrpc_new_incoming_call(), we have to recheck the routing inside rx->incoming_lock to see if someone else set up the call, connection or peer whilst we were getting there. We can't trust the values from the earlier routing check unless we pin refs on them - which we want to avoid. Further, we need to allow for an incoming call to have its state changed on another CPU between us making it live and us adjusting it because the conn is now in the RXRPC_CONN_SERVICE state. (8) In rxrpc_peer_add_rtt(), take peer->rtt_input_lock around the access to the RTT buffer. Don't need to lock around setting peer->rtt. For reference, the inventory of state-accessing or state-altering functions used by the packet input procedure is: > rxrpc_input_packet() * PACKET CHECKING * ROUTING > rxrpc_post_packet_to_local() > rxrpc_find_connection_rcu() - uses RCU > rxrpc_lookup_peer_rcu() - uses RCU > rxrpc_find_service_conn_rcu() - uses RCU > idr_find() - uses RCU * CONNECTION-LEVEL PROCESSING - Service upgrade - Can only happen once per conn ! Changed to use cmpxchg > rxrpc_post_packet_to_conn() - Setting conn->hi_serial - Probably safe not using locks - Maybe use cmpxchg * CALL-LEVEL PROCESSING > Old-call checking > rxrpc_input_implicit_end_call() > rxrpc_call_completed() > rxrpc_queue_call() ! Need to take rx->incoming_lock > __rxrpc_disconnect_call() > rxrpc_notify_socket() > rxrpc_new_incoming_call() - Uses rx->incoming_lock for the entire process - Might be able to drop this earlier in favour of the call lock > rxrpc_incoming_call() ! Conflicts with rxrpc_input_implicit_end_call() > rxrpc_send_ping() - Don't need locks to check rtt state > rxrpc_propose_ACK * PACKET DISTRIBUTION > rxrpc_input_call_packet() > rxrpc_input_data() * QUEUE DATA PACKET ON CALL > rxrpc_reduce_call_timer() - Uses timer_reduce() ! Needs call->input_lock() > rxrpc_receiving_reply() ! Needs locking around ack state > rxrpc_rotate_tx_window() > rxrpc_end_tx_phase() > rxrpc_proto_abort() > rxrpc_input_dup_data() - Fills the Rx buffer - rxrpc_propose_ACK() - rxrpc_notify_socket() > rxrpc_input_ack() * APPLY ACK PACKET TO CALL AND DISCARD PACKET > rxrpc_input_ping_response() - Probably doesn't need any extra locking ! Need READ_ONCE() on call->ping_serial > rxrpc_input_check_for_lost_ack() - Takes call->lock to consult Tx buffer > rxrpc_peer_add_rtt() ! Needs to take a lock (peer->rtt_input_lock) ! Could perhaps manage with cmpxchg() and xadd() instead > rxrpc_input_requested_ack - Consults Tx buffer ! Probably needs a lock > rxrpc_peer_add_rtt() > rxrpc_propose_ack() > rxrpc_input_ackinfo() - Changes call->tx_winsize ! Use cmpxchg to handle change ! Should perhaps track serial number - Uses peer->lock to record MTU specification changes > rxrpc_proto_abort() ! Need to take call->input_lock > rxrpc_rotate_tx_window() > rxrpc_end_tx_phase() > rxrpc_input_soft_acks() - Consults the Tx buffer > rxrpc_congestion_management() - Modifies the Tx annotations ! Needs call->input_lock() > rxrpc_queue_call() > rxrpc_input_abort() * APPLY ABORT PACKET TO CALL AND DISCARD PACKET > rxrpc_set_call_completion() > rxrpc_notify_socket() > rxrpc_input_ackall() * APPLY ACKALL PACKET TO CALL AND DISCARD PACKET ! Need to take call->input_lock > rxrpc_rotate_tx_window() > rxrpc_end_tx_phase() > rxrpc_reject_packet() There are some functions used by the above that queue the packet, after which the procedure is terminated: - rxrpc_post_packet_to_local() - local->event_queue is an sk_buff_head - local->processor is a work_struct - rxrpc_post_packet_to_conn() - conn->rx_queue is an sk_buff_head - conn->processor is a work_struct - rxrpc_reject_packet() - local->reject_queue is an sk_buff_head - local->processor is a work_struct And some that offload processing to process context: - rxrpc_notify_socket() - Uses RCU lock - Uses call->notify_lock to call call->notify_rx - Uses call->recvmsg_lock to queue recvmsg side - rxrpc_queue_call() - call->processor is a work_struct - rxrpc_propose_ACK() - Uses call->lock to wrap __rxrpc_propose_ACK() And a bunch that complete a call, all of which use call->state_lock to protect the call state: - rxrpc_call_completed() - rxrpc_set_call_completion() - rxrpc_abort_call() - rxrpc_proto_abort() - Also uses rxrpc_queue_call() Fixes: 17926a79320a ("[AF_RXRPC]: Provide secure RxRPC sockets for use by userspace and kernel both") Signed-off-by: David Howells <dhowells@redhat.com>
Diffstat (limited to 'net')
-rw-r--r--net/rxrpc/ar-internal.h7
-rw-r--r--net/rxrpc/call_accept.c21
-rw-r--r--net/rxrpc/call_object.c1
-rw-r--r--net/rxrpc/input.c120
-rw-r--r--net/rxrpc/peer_event.c5
-rw-r--r--net/rxrpc/peer_object.c1
6 files changed, 105 insertions, 50 deletions
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 45307463b7dd..a6e6cae82c30 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -302,6 +302,7 @@ struct rxrpc_peer {
302 302
303 /* calculated RTT cache */ 303 /* calculated RTT cache */
304#define RXRPC_RTT_CACHE_SIZE 32 304#define RXRPC_RTT_CACHE_SIZE 32
305 spinlock_t rtt_input_lock; /* RTT lock for input routine */
305 ktime_t rtt_last_req; /* Time of last RTT request */ 306 ktime_t rtt_last_req; /* Time of last RTT request */
306 u64 rtt; /* Current RTT estimate (in nS) */ 307 u64 rtt; /* Current RTT estimate (in nS) */
307 u64 rtt_sum; /* Sum of cache contents */ 308 u64 rtt_sum; /* Sum of cache contents */
@@ -447,7 +448,7 @@ struct rxrpc_connection {
447 atomic_t serial; /* packet serial number counter */ 448 atomic_t serial; /* packet serial number counter */
448 unsigned int hi_serial; /* highest serial number received */ 449 unsigned int hi_serial; /* highest serial number received */
449 u32 security_nonce; /* response re-use preventer */ 450 u32 security_nonce; /* response re-use preventer */
450 u16 service_id; /* Service ID, possibly upgraded */ 451 u32 service_id; /* Service ID, possibly upgraded */
451 u8 size_align; /* data size alignment (for security) */ 452 u8 size_align; /* data size alignment (for security) */
452 u8 security_size; /* security header size */ 453 u8 security_size; /* security header size */
453 u8 security_ix; /* security type */ 454 u8 security_ix; /* security type */
@@ -635,6 +636,8 @@ struct rxrpc_call {
635 bool tx_phase; /* T if transmission phase, F if receive phase */ 636 bool tx_phase; /* T if transmission phase, F if receive phase */
636 u8 nr_jumbo_bad; /* Number of jumbo dups/exceeds-windows */ 637 u8 nr_jumbo_bad; /* Number of jumbo dups/exceeds-windows */
637 638
639 spinlock_t input_lock; /* Lock for packet input to this call */
640
638 /* receive-phase ACK management */ 641 /* receive-phase ACK management */
639 u8 ackr_reason; /* reason to ACK */ 642 u8 ackr_reason; /* reason to ACK */
640 u16 ackr_skew; /* skew on packet being ACK'd */ 643 u16 ackr_skew; /* skew on packet being ACK'd */
@@ -720,8 +723,6 @@ int rxrpc_service_prealloc(struct rxrpc_sock *, gfp_t);
720void rxrpc_discard_prealloc(struct rxrpc_sock *); 723void rxrpc_discard_prealloc(struct rxrpc_sock *);
721struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *, 724struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *,
722 struct rxrpc_sock *, 725 struct rxrpc_sock *,
723 struct rxrpc_peer *,
724 struct rxrpc_connection *,
725 struct sk_buff *); 726 struct sk_buff *);
726void rxrpc_accept_incoming_calls(struct rxrpc_local *); 727void rxrpc_accept_incoming_calls(struct rxrpc_local *);
727struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long, 728struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long,
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 1c4ebc0cb25b..652e314de38e 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -333,11 +333,11 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
333 */ 333 */
334struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local, 334struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
335 struct rxrpc_sock *rx, 335 struct rxrpc_sock *rx,
336 struct rxrpc_peer *peer,
337 struct rxrpc_connection *conn,
338 struct sk_buff *skb) 336 struct sk_buff *skb)
339{ 337{
340 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 338 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
339 struct rxrpc_connection *conn;
340 struct rxrpc_peer *peer;
341 struct rxrpc_call *call; 341 struct rxrpc_call *call;
342 342
343 _enter(""); 343 _enter("");
@@ -354,6 +354,13 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
354 goto out; 354 goto out;
355 } 355 }
356 356
357 /* The peer, connection and call may all have sprung into existence due
358 * to a duplicate packet being handled on another CPU in parallel, so
359 * we have to recheck the routing. However, we're now holding
360 * rx->incoming_lock, so the values should remain stable.
361 */
362 conn = rxrpc_find_connection_rcu(local, skb, &peer);
363
357 call = rxrpc_alloc_incoming_call(rx, local, peer, conn, skb); 364 call = rxrpc_alloc_incoming_call(rx, local, peer, conn, skb);
358 if (!call) { 365 if (!call) {
359 skb->mark = RXRPC_SKB_MARK_REJECT_BUSY; 366 skb->mark = RXRPC_SKB_MARK_REJECT_BUSY;
@@ -396,10 +403,12 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
396 403
397 case RXRPC_CONN_SERVICE: 404 case RXRPC_CONN_SERVICE:
398 write_lock(&call->state_lock); 405 write_lock(&call->state_lock);
399 if (rx->discard_new_call) 406 if (call->state < RXRPC_CALL_COMPLETE) {
400 call->state = RXRPC_CALL_SERVER_RECV_REQUEST; 407 if (rx->discard_new_call)
401 else 408 call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
402 call->state = RXRPC_CALL_SERVER_ACCEPTING; 409 else
410 call->state = RXRPC_CALL_SERVER_ACCEPTING;
411 }
403 write_unlock(&call->state_lock); 412 write_unlock(&call->state_lock);
404 break; 413 break;
405 414
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 0ca2c2dfd196..8f1a8f85b1f9 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -138,6 +138,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
138 init_waitqueue_head(&call->waitq); 138 init_waitqueue_head(&call->waitq);
139 spin_lock_init(&call->lock); 139 spin_lock_init(&call->lock);
140 spin_lock_init(&call->notify_lock); 140 spin_lock_init(&call->notify_lock);
141 spin_lock_init(&call->input_lock);
141 rwlock_init(&call->state_lock); 142 rwlock_init(&call->state_lock);
142 atomic_set(&call->usage, 1); 143 atomic_set(&call->usage, 1);
143 call->debug_id = debug_id; 144 call->debug_id = debug_id;
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 04213a65c1ac..570b49d2da42 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -459,13 +459,15 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb,
459 } 459 }
460 } 460 }
461 461
462 spin_lock(&call->input_lock);
463
462 /* Received data implicitly ACKs all of the request packets we sent 464 /* Received data implicitly ACKs all of the request packets we sent
463 * when we're acting as a client. 465 * when we're acting as a client.
464 */ 466 */
465 if ((state == RXRPC_CALL_CLIENT_SEND_REQUEST || 467 if ((state == RXRPC_CALL_CLIENT_SEND_REQUEST ||
466 state == RXRPC_CALL_CLIENT_AWAIT_REPLY) && 468 state == RXRPC_CALL_CLIENT_AWAIT_REPLY) &&
467 !rxrpc_receiving_reply(call)) 469 !rxrpc_receiving_reply(call))
468 return; 470 goto unlock;
469 471
470 call->ackr_prev_seq = seq; 472 call->ackr_prev_seq = seq;
471 473
@@ -495,12 +497,16 @@ next_subpacket:
495 497
496 if (flags & RXRPC_LAST_PACKET) { 498 if (flags & RXRPC_LAST_PACKET) {
497 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && 499 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
498 seq != call->rx_top) 500 seq != call->rx_top) {
499 return rxrpc_proto_abort("LSN", call, seq); 501 rxrpc_proto_abort("LSN", call, seq);
502 goto unlock;
503 }
500 } else { 504 } else {
501 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && 505 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
502 after_eq(seq, call->rx_top)) 506 after_eq(seq, call->rx_top)) {
503 return rxrpc_proto_abort("LSA", call, seq); 507 rxrpc_proto_abort("LSA", call, seq);
508 goto unlock;
509 }
504 } 510 }
505 511
506 trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation); 512 trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation);
@@ -567,8 +573,10 @@ next_subpacket:
567skip: 573skip:
568 offset += len; 574 offset += len;
569 if (flags & RXRPC_JUMBO_PACKET) { 575 if (flags & RXRPC_JUMBO_PACKET) {
570 if (skb_copy_bits(skb, offset, &flags, 1) < 0) 576 if (skb_copy_bits(skb, offset, &flags, 1) < 0) {
571 return rxrpc_proto_abort("XJF", call, seq); 577 rxrpc_proto_abort("XJF", call, seq);
578 goto unlock;
579 }
572 offset += sizeof(struct rxrpc_jumbo_header); 580 offset += sizeof(struct rxrpc_jumbo_header);
573 seq++; 581 seq++;
574 serial++; 582 serial++;
@@ -608,6 +616,9 @@ ack:
608 trace_rxrpc_notify_socket(call->debug_id, serial); 616 trace_rxrpc_notify_socket(call->debug_id, serial);
609 rxrpc_notify_socket(call); 617 rxrpc_notify_socket(call);
610 } 618 }
619
620unlock:
621 spin_unlock(&call->input_lock);
611 _leave(" [queued]"); 622 _leave(" [queued]");
612} 623}
613 624
@@ -694,15 +705,14 @@ static void rxrpc_input_ping_response(struct rxrpc_call *call,
694 705
695 ping_time = call->ping_time; 706 ping_time = call->ping_time;
696 smp_rmb(); 707 smp_rmb();
697 ping_serial = call->ping_serial; 708 ping_serial = READ_ONCE(call->ping_serial);
698 709
699 if (orig_serial == call->acks_lost_ping) 710 if (orig_serial == call->acks_lost_ping)
700 rxrpc_input_check_for_lost_ack(call); 711 rxrpc_input_check_for_lost_ack(call);
701 712
702 if (!test_bit(RXRPC_CALL_PINGING, &call->flags) || 713 if (before(orig_serial, ping_serial) ||
703 before(orig_serial, ping_serial)) 714 !test_and_clear_bit(RXRPC_CALL_PINGING, &call->flags))
704 return; 715 return;
705 clear_bit(RXRPC_CALL_PINGING, &call->flags);
706 if (after(orig_serial, ping_serial)) 716 if (after(orig_serial, ping_serial))
707 return; 717 return;
708 718
@@ -869,24 +879,31 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
869 } 879 }
870 880
871 /* Discard any out-of-order or duplicate ACKs. */ 881 /* Discard any out-of-order or duplicate ACKs. */
872 if (before_eq(sp->hdr.serial, call->acks_latest)) { 882 if (before_eq(sp->hdr.serial, call->acks_latest))
873 _debug("discard ACK %d <= %d",
874 sp->hdr.serial, call->acks_latest);
875 return; 883 return;
876 } 884
885 buf.info.rxMTU = 0;
886 ioffset = offset + nr_acks + 3;
887 if (skb->len >= ioffset + sizeof(buf.info) &&
888 skb_copy_bits(skb, ioffset, &buf.info, sizeof(buf.info)) < 0)
889 return rxrpc_proto_abort("XAI", call, 0);
890
891 spin_lock(&call->input_lock);
892
893 /* Discard any out-of-order or duplicate ACKs. */
894 if (before_eq(sp->hdr.serial, call->acks_latest))
895 goto out;
877 call->acks_latest_ts = skb->tstamp; 896 call->acks_latest_ts = skb->tstamp;
878 call->acks_latest = sp->hdr.serial; 897 call->acks_latest = sp->hdr.serial;
879 898
880 /* Parse rwind and mtu sizes if provided. */ 899 /* Parse rwind and mtu sizes if provided. */
881 ioffset = offset + nr_acks + 3; 900 if (buf.info.rxMTU)
882 if (skb->len >= ioffset + sizeof(buf.info)) {
883 if (skb_copy_bits(skb, ioffset, &buf.info, sizeof(buf.info)) < 0)
884 return rxrpc_proto_abort("XAI", call, 0);
885 rxrpc_input_ackinfo(call, skb, &buf.info); 901 rxrpc_input_ackinfo(call, skb, &buf.info);
886 }
887 902
888 if (first_soft_ack == 0) 903 if (first_soft_ack == 0) {
889 return rxrpc_proto_abort("AK0", call, 0); 904 rxrpc_proto_abort("AK0", call, 0);
905 goto out;
906 }
890 907
891 /* Ignore ACKs unless we are or have just been transmitting. */ 908 /* Ignore ACKs unless we are or have just been transmitting. */
892 switch (READ_ONCE(call->state)) { 909 switch (READ_ONCE(call->state)) {
@@ -896,25 +913,31 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
896 case RXRPC_CALL_SERVER_AWAIT_ACK: 913 case RXRPC_CALL_SERVER_AWAIT_ACK:
897 break; 914 break;
898 default: 915 default:
899 return; 916 goto out;
900 } 917 }
901 918
902 if (before(hard_ack, call->tx_hard_ack) || 919 if (before(hard_ack, call->tx_hard_ack) ||
903 after(hard_ack, call->tx_top)) 920 after(hard_ack, call->tx_top)) {
904 return rxrpc_proto_abort("AKW", call, 0); 921 rxrpc_proto_abort("AKW", call, 0);
905 if (nr_acks > call->tx_top - hard_ack) 922 goto out;
906 return rxrpc_proto_abort("AKN", call, 0); 923 }
924 if (nr_acks > call->tx_top - hard_ack) {
925 rxrpc_proto_abort("AKN", call, 0);
926 goto out;
927 }
907 928
908 if (after(hard_ack, call->tx_hard_ack)) { 929 if (after(hard_ack, call->tx_hard_ack)) {
909 if (rxrpc_rotate_tx_window(call, hard_ack, &summary)) { 930 if (rxrpc_rotate_tx_window(call, hard_ack, &summary)) {
910 rxrpc_end_tx_phase(call, false, "ETA"); 931 rxrpc_end_tx_phase(call, false, "ETA");
911 return; 932 goto out;
912 } 933 }
913 } 934 }
914 935
915 if (nr_acks > 0) { 936 if (nr_acks > 0) {
916 if (skb_copy_bits(skb, offset, buf.acks, nr_acks) < 0) 937 if (skb_copy_bits(skb, offset, buf.acks, nr_acks) < 0) {
917 return rxrpc_proto_abort("XSA", call, 0); 938 rxrpc_proto_abort("XSA", call, 0);
939 goto out;
940 }
918 rxrpc_input_soft_acks(call, buf.acks, first_soft_ack, nr_acks, 941 rxrpc_input_soft_acks(call, buf.acks, first_soft_ack, nr_acks,
919 &summary); 942 &summary);
920 } 943 }
@@ -927,7 +950,9 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
927 false, true, 950 false, true,
928 rxrpc_propose_ack_ping_for_lost_reply); 951 rxrpc_propose_ack_ping_for_lost_reply);
929 952
930 return rxrpc_congestion_management(call, skb, &summary, acked_serial); 953 rxrpc_congestion_management(call, skb, &summary, acked_serial);
954out:
955 spin_unlock(&call->input_lock);
931} 956}
932 957
933/* 958/*
@@ -940,8 +965,12 @@ static void rxrpc_input_ackall(struct rxrpc_call *call, struct sk_buff *skb)
940 965
941 _proto("Rx ACKALL %%%u", sp->hdr.serial); 966 _proto("Rx ACKALL %%%u", sp->hdr.serial);
942 967
968 spin_lock(&call->input_lock);
969
943 if (rxrpc_rotate_tx_window(call, call->tx_top, &summary)) 970 if (rxrpc_rotate_tx_window(call, call->tx_top, &summary))
944 rxrpc_end_tx_phase(call, false, "ETL"); 971 rxrpc_end_tx_phase(call, false, "ETL");
972
973 spin_unlock(&call->input_lock);
945} 974}
946 975
947/* 976/*
@@ -1024,18 +1053,19 @@ static void rxrpc_input_call_packet(struct rxrpc_call *call,
1024} 1053}
1025 1054
1026/* 1055/*
1027 * Handle a new call on a channel implicitly completing the preceding call on 1056 * Handle a new service call on a channel implicitly completing the preceding
1028 * that channel. 1057 * call on that channel. This does not apply to client conns.
1029 * 1058 *
1030 * TODO: If callNumber > call_id + 1, renegotiate security. 1059 * TODO: If callNumber > call_id + 1, renegotiate security.
1031 */ 1060 */
1032static void rxrpc_input_implicit_end_call(struct rxrpc_connection *conn, 1061static void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
1062 struct rxrpc_connection *conn,
1033 struct rxrpc_call *call) 1063 struct rxrpc_call *call)
1034{ 1064{
1035 switch (READ_ONCE(call->state)) { 1065 switch (READ_ONCE(call->state)) {
1036 case RXRPC_CALL_SERVER_AWAIT_ACK: 1066 case RXRPC_CALL_SERVER_AWAIT_ACK:
1037 rxrpc_call_completed(call); 1067 rxrpc_call_completed(call);
1038 break; 1068 /* Fall through */
1039 case RXRPC_CALL_COMPLETE: 1069 case RXRPC_CALL_COMPLETE:
1040 break; 1070 break;
1041 default: 1071 default:
@@ -1043,11 +1073,13 @@ static void rxrpc_input_implicit_end_call(struct rxrpc_connection *conn,
1043 set_bit(RXRPC_CALL_EV_ABORT, &call->events); 1073 set_bit(RXRPC_CALL_EV_ABORT, &call->events);
1044 rxrpc_queue_call(call); 1074 rxrpc_queue_call(call);
1045 } 1075 }
1076 trace_rxrpc_improper_term(call);
1046 break; 1077 break;
1047 } 1078 }
1048 1079
1049 trace_rxrpc_improper_term(call); 1080 spin_lock(&rx->incoming_lock);
1050 __rxrpc_disconnect_call(conn, call); 1081 __rxrpc_disconnect_call(conn, call);
1082 spin_unlock(&rx->incoming_lock);
1051 rxrpc_notify_socket(call); 1083 rxrpc_notify_socket(call);
1052} 1084}
1053 1085
@@ -1244,10 +1276,16 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
1244 goto wrong_security; 1276 goto wrong_security;
1245 1277
1246 if (sp->hdr.serviceId != conn->service_id) { 1278 if (sp->hdr.serviceId != conn->service_id) {
1247 if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags) || 1279 int old_id;
1248 conn->service_id != conn->params.service_id) 1280
1281 if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
1282 goto reupgrade;
1283 old_id = cmpxchg(&conn->service_id, conn->params.service_id,
1284 sp->hdr.serviceId);
1285
1286 if (old_id != conn->params.service_id &&
1287 old_id != sp->hdr.serviceId)
1249 goto reupgrade; 1288 goto reupgrade;
1250 conn->service_id = sp->hdr.serviceId;
1251 } 1289 }
1252 1290
1253 if (sp->hdr.callNumber == 0) { 1291 if (sp->hdr.callNumber == 0) {
@@ -1305,7 +1343,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
1305 if (rxrpc_to_client(sp)) 1343 if (rxrpc_to_client(sp))
1306 goto reject_packet; 1344 goto reject_packet;
1307 if (call) 1345 if (call)
1308 rxrpc_input_implicit_end_call(conn, call); 1346 rxrpc_input_implicit_end_call(rx, conn, call);
1309 call = NULL; 1347 call = NULL;
1310 } 1348 }
1311 1349
@@ -1325,7 +1363,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
1325 goto bad_message; 1363 goto bad_message;
1326 if (sp->hdr.seq != 1) 1364 if (sp->hdr.seq != 1)
1327 goto discard; 1365 goto discard;
1328 call = rxrpc_new_incoming_call(local, rx, peer, conn, skb); 1366 call = rxrpc_new_incoming_call(local, rx, skb);
1329 if (!call) 1367 if (!call)
1330 goto reject_packet; 1368 goto reject_packet;
1331 rxrpc_send_ping(call, skb, skew); 1369 rxrpc_send_ping(call, skb, skew);
diff --git a/net/rxrpc/peer_event.c b/net/rxrpc/peer_event.c
index f3e6fc670da2..05b51bdbdd41 100644
--- a/net/rxrpc/peer_event.c
+++ b/net/rxrpc/peer_event.c
@@ -301,6 +301,8 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
301 if (rtt < 0) 301 if (rtt < 0)
302 return; 302 return;
303 303
304 spin_lock(&peer->rtt_input_lock);
305
304 /* Replace the oldest datum in the RTT buffer */ 306 /* Replace the oldest datum in the RTT buffer */
305 sum -= peer->rtt_cache[cursor]; 307 sum -= peer->rtt_cache[cursor];
306 sum += rtt; 308 sum += rtt;
@@ -312,6 +314,8 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
312 peer->rtt_usage = usage; 314 peer->rtt_usage = usage;
313 } 315 }
314 316
317 spin_unlock(&peer->rtt_input_lock);
318
315 /* Now recalculate the average */ 319 /* Now recalculate the average */
316 if (usage == RXRPC_RTT_CACHE_SIZE) { 320 if (usage == RXRPC_RTT_CACHE_SIZE) {
317 avg = sum / RXRPC_RTT_CACHE_SIZE; 321 avg = sum / RXRPC_RTT_CACHE_SIZE;
@@ -320,6 +324,7 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
320 do_div(avg, usage); 324 do_div(avg, usage);
321 } 325 }
322 326
327 /* Don't need to update this under lock */
323 peer->rtt = avg; 328 peer->rtt = avg;
324 trace_rxrpc_rtt_rx(call, why, send_serial, resp_serial, rtt, 329 trace_rxrpc_rtt_rx(call, why, send_serial, resp_serial, rtt,
325 usage, avg); 330 usage, avg);
diff --git a/net/rxrpc/peer_object.c b/net/rxrpc/peer_object.c
index 2d39eaf19620..5691b7d266ca 100644
--- a/net/rxrpc/peer_object.c
+++ b/net/rxrpc/peer_object.c
@@ -225,6 +225,7 @@ struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp)
225 peer->service_conns = RB_ROOT; 225 peer->service_conns = RB_ROOT;
226 seqlock_init(&peer->service_conn_lock); 226 seqlock_init(&peer->service_conn_lock);
227 spin_lock_init(&peer->lock); 227 spin_lock_init(&peer->lock);
228 spin_lock_init(&peer->rtt_input_lock);
228 peer->debug_id = atomic_inc_return(&rxrpc_debug_id); 229 peer->debug_id = atomic_inc_return(&rxrpc_debug_id);
229 230
230 if (RXRPC_TX_SMSS > 2190) 231 if (RXRPC_TX_SMSS > 2190)