aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2016-04-04 09:00:35 -0400
committerDavid Howells <dhowells@redhat.com>2016-06-15 10:38:17 -0400
commit4f95dd78a77edc42454de55bb32332be293fb461 (patch)
tree45e369bc5cb6fdbdd96cf8169d069ee9a75d1994
parent875636163b4e694c092625ed98b17e10d582b3ca (diff)
rxrpc: Rework local endpoint management
Rework the local RxRPC endpoint management. Local endpoint objects are maintained in a flat list as before. This should be okay as there shouldn't be more than one per open AF_RXRPC socket (there can be fewer as local endpoints can be shared if their local service ID is 0 and they share the same local transport parameters). Changes: (1) Local endpoints may now only be shared if they have local service ID 0 (ie. they're not being used for listening). This prevents a scenario where process A is listening of the Cache Manager port and process B contacts a fileserver - which may then attempt to send CM requests back to B. But if A and B are sharing a local endpoint, A will get the CM requests meant for B. (2) We use a mutex to handle lookups and don't provide RCU-only lookups since we only expect to access the list when opening a socket or destroying an endpoint. The local endpoint object is pointed to by the transport socket's sk_user_data for the life of the transport socket - allowing us to refer to it directly from the sk_data_ready and sk_error_report callbacks. (3) atomic_inc_not_zero() now exists and can be used to only share a local endpoint if the last reference hasn't yet gone. (4) We can remove rxrpc_local_lock - a spinlock that had to be taken with BH processing disabled given that we assume sk_user_data won't change under us. (5) The transport socket is shut down before we clear the sk_user_data pointer so that we can be sure that the transport socket's callbacks won't be invoked once the RCU destruction is scheduled. (6) Local endpoints have a work item that handles both destruction and event processing. The means that destruction doesn't then need to wait for event processing. The event queues can then be cleared after the transport socket is shut down. (7) Local endpoints are no longer available for resurrection beyond the life of the sockets that had them open. As soon as their last ref goes, they are scheduled for destruction and may not have their usage count moved from 0. Signed-off-by: David Howells <dhowells@redhat.com>
-rw-r--r--net/rxrpc/af_rxrpc.c19
-rw-r--r--net/rxrpc/ar-internal.h55
-rw-r--r--net/rxrpc/call_accept.c25
-rw-r--r--net/rxrpc/conn_event.c15
-rw-r--r--net/rxrpc/input.c29
-rw-r--r--net/rxrpc/local_event.c10
-rw-r--r--net/rxrpc/local_object.c353
7 files changed, 276 insertions, 230 deletions
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index ba373caddbeb..c83c3c75d665 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -102,6 +102,8 @@ static int rxrpc_validate_address(struct rxrpc_sock *rx,
102 102
103 switch (srx->transport.family) { 103 switch (srx->transport.family) {
104 case AF_INET: 104 case AF_INET:
105 if (srx->transport_len < sizeof(struct sockaddr_in))
106 return -EINVAL;
105 _debug("INET: %x @ %pI4", 107 _debug("INET: %x @ %pI4",
106 ntohs(srx->transport.sin.sin_port), 108 ntohs(srx->transport.sin.sin_port),
107 &srx->transport.sin.sin_addr); 109 &srx->transport.sin.sin_addr);
@@ -835,12 +837,27 @@ static void __exit af_rxrpc_exit(void)
835 rxrpc_destroy_all_calls(); 837 rxrpc_destroy_all_calls();
836 rxrpc_destroy_all_connections(); 838 rxrpc_destroy_all_connections();
837 rxrpc_destroy_all_transports(); 839 rxrpc_destroy_all_transports();
838 rxrpc_destroy_all_locals();
839 840
840 ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0); 841 ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0);
841 842
843 /* We need to flush the scheduled work twice because the local endpoint
844 * records involve a work item in their destruction as they can only be
845 * destroyed from process context. However, a connection may have a
846 * work item outstanding - and this will pin the local endpoint record
847 * until the connection goes away.
848 *
849 * Peers don't pin locals and calls pin sockets - which prevents the
850 * module from being unloaded - so we should only need two flushes.
851 */
842 _debug("flush scheduled work"); 852 _debug("flush scheduled work");
843 flush_workqueue(rxrpc_workqueue); 853 flush_workqueue(rxrpc_workqueue);
854 _debug("flush scheduled work 2");
855 flush_workqueue(rxrpc_workqueue);
856 _debug("synchronise RCU");
857 rcu_barrier();
858 _debug("destroy locals");
859 rxrpc_destroy_all_locals();
860
844 remove_proc_entry("rxrpc_conns", init_net.proc_net); 861 remove_proc_entry("rxrpc_conns", init_net.proc_net);
845 remove_proc_entry("rxrpc_calls", init_net.proc_net); 862 remove_proc_entry("rxrpc_calls", init_net.proc_net);
846 destroy_workqueue(rxrpc_workqueue); 863 destroy_workqueue(rxrpc_workqueue);
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index fa50b09eaa63..c168268467cd 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -170,25 +170,26 @@ struct rxrpc_security {
170}; 170};
171 171
172/* 172/*
173 * RxRPC local transport endpoint definition 173 * RxRPC local transport endpoint description
174 * - matched by local port, address and protocol type 174 * - owned by a single AF_RXRPC socket
175 * - pointed to by transport socket struct sk_user_data
175 */ 176 */
176struct rxrpc_local { 177struct rxrpc_local {
178 struct rcu_head rcu;
179 atomic_t usage;
180 struct list_head link;
177 struct socket *socket; /* my UDP socket */ 181 struct socket *socket; /* my UDP socket */
178 struct work_struct destroyer; /* endpoint destroyer */ 182 struct work_struct processor;
179 struct work_struct acceptor; /* incoming call processor */
180 struct work_struct rejecter; /* packet reject writer */
181 struct work_struct event_processor; /* endpoint event processor */
182 struct list_head services; /* services listening on this endpoint */ 183 struct list_head services; /* services listening on this endpoint */
183 struct list_head link; /* link in endpoint list */
184 struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */ 184 struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */
185 struct sk_buff_head accept_queue; /* incoming calls awaiting acceptance */ 185 struct sk_buff_head accept_queue; /* incoming calls awaiting acceptance */
186 struct sk_buff_head reject_queue; /* packets awaiting rejection */ 186 struct sk_buff_head reject_queue; /* packets awaiting rejection */
187 struct sk_buff_head event_queue; /* endpoint event packets awaiting processing */ 187 struct sk_buff_head event_queue; /* endpoint event packets awaiting processing */
188 struct mutex conn_lock; /* Client connection creation lock */
188 spinlock_t lock; /* access lock */ 189 spinlock_t lock; /* access lock */
189 rwlock_t services_lock; /* lock for services list */ 190 rwlock_t services_lock; /* lock for services list */
190 atomic_t usage;
191 int debug_id; /* debug ID for printks */ 191 int debug_id; /* debug ID for printks */
192 bool dead;
192 struct sockaddr_rxrpc srx; /* local address */ 193 struct sockaddr_rxrpc srx; /* local address */
193}; 194};
194 195
@@ -487,7 +488,7 @@ extern struct rxrpc_transport *rxrpc_name_to_transport(struct rxrpc_sock *,
487/* 488/*
488 * call_accept.c 489 * call_accept.c
489 */ 490 */
490void rxrpc_accept_incoming_calls(struct work_struct *); 491void rxrpc_accept_incoming_calls(struct rxrpc_local *);
491struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long); 492struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long);
492int rxrpc_reject_call(struct rxrpc_sock *); 493int rxrpc_reject_call(struct rxrpc_sock *);
493 494
@@ -527,7 +528,7 @@ void __exit rxrpc_destroy_all_calls(void);
527 */ 528 */
528void rxrpc_process_connection(struct work_struct *); 529void rxrpc_process_connection(struct work_struct *);
529void rxrpc_reject_packet(struct rxrpc_local *, struct sk_buff *); 530void rxrpc_reject_packet(struct rxrpc_local *, struct sk_buff *);
530void rxrpc_reject_packets(struct work_struct *); 531void rxrpc_reject_packets(struct rxrpc_local *);
531 532
532/* 533/*
533 * conn_object.c 534 * conn_object.c
@@ -575,17 +576,32 @@ int rxrpc_get_server_data_key(struct rxrpc_connection *, const void *, time_t,
575/* 576/*
576 * local_event.c 577 * local_event.c
577 */ 578 */
578extern void rxrpc_process_local_events(struct work_struct *); 579extern void rxrpc_process_local_events(struct rxrpc_local *);
579 580
580/* 581/*
581 * local_object.c 582 * local_object.c
582 */ 583 */
583extern rwlock_t rxrpc_local_lock; 584struct rxrpc_local *rxrpc_lookup_local(const struct sockaddr_rxrpc *);
584 585void __rxrpc_put_local(struct rxrpc_local *);
585struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *);
586void rxrpc_put_local(struct rxrpc_local *);
587void __exit rxrpc_destroy_all_locals(void); 586void __exit rxrpc_destroy_all_locals(void);
588 587
588static inline void rxrpc_get_local(struct rxrpc_local *local)
589{
590 atomic_inc(&local->usage);
591}
592
593static inline
594struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *local)
595{
596 return atomic_inc_not_zero(&local->usage) ? local : NULL;
597}
598
599static inline void rxrpc_put_local(struct rxrpc_local *local)
600{
601 if (atomic_dec_and_test(&local->usage))
602 __rxrpc_put_local(local);
603}
604
589/* 605/*
590 * misc.c 606 * misc.c
591 */ 607 */
@@ -874,15 +890,6 @@ static inline void rxrpc_purge_queue(struct sk_buff_head *list)
874 rxrpc_free_skb(skb); 890 rxrpc_free_skb(skb);
875} 891}
876 892
877static inline void __rxrpc_get_local(struct rxrpc_local *local, const char *f)
878{
879 CHECK_SLAB_OKAY(&local->usage);
880 if (atomic_inc_return(&local->usage) == 1)
881 printk("resurrected (%s)\n", f);
882}
883
884#define rxrpc_get_local(LOCAL) __rxrpc_get_local((LOCAL), __func__)
885
886#define rxrpc_get_call(CALL) \ 893#define rxrpc_get_call(CALL) \
887do { \ 894do { \
888 CHECK_SLAB_OKAY(&(CALL)->usage); \ 895 CHECK_SLAB_OKAY(&(CALL)->usage); \
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index e5723f4dce89..50136c76ebd1 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -202,10 +202,8 @@ error_nofree:
202 * accept incoming calls that need peer, transport and/or connection setting up 202 * accept incoming calls that need peer, transport and/or connection setting up
203 * - the packets we get are all incoming client DATA packets that have seq == 1 203 * - the packets we get are all incoming client DATA packets that have seq == 1
204 */ 204 */
205void rxrpc_accept_incoming_calls(struct work_struct *work) 205void rxrpc_accept_incoming_calls(struct rxrpc_local *local)
206{ 206{
207 struct rxrpc_local *local =
208 container_of(work, struct rxrpc_local, acceptor);
209 struct rxrpc_skb_priv *sp; 207 struct rxrpc_skb_priv *sp;
210 struct sockaddr_rxrpc srx; 208 struct sockaddr_rxrpc srx;
211 struct rxrpc_sock *rx; 209 struct rxrpc_sock *rx;
@@ -215,21 +213,8 @@ void rxrpc_accept_incoming_calls(struct work_struct *work)
215 213
216 _enter("%d", local->debug_id); 214 _enter("%d", local->debug_id);
217 215
218 read_lock_bh(&rxrpc_local_lock);
219 if (atomic_read(&local->usage) > 0)
220 rxrpc_get_local(local);
221 else
222 local = NULL;
223 read_unlock_bh(&rxrpc_local_lock);
224 if (!local) {
225 _leave(" [local dead]");
226 return;
227 }
228
229process_next_packet:
230 skb = skb_dequeue(&local->accept_queue); 216 skb = skb_dequeue(&local->accept_queue);
231 if (!skb) { 217 if (!skb) {
232 rxrpc_put_local(local);
233 _leave("\n"); 218 _leave("\n");
234 return; 219 return;
235 } 220 }
@@ -292,7 +277,7 @@ found_service:
292 case -ECONNRESET: /* old calls are ignored */ 277 case -ECONNRESET: /* old calls are ignored */
293 case -ECONNABORTED: /* aborted calls are reaborted or ignored */ 278 case -ECONNABORTED: /* aborted calls are reaborted or ignored */
294 case 0: 279 case 0:
295 goto process_next_packet; 280 return;
296 case -ECONNREFUSED: 281 case -ECONNREFUSED:
297 goto invalid_service; 282 goto invalid_service;
298 case -EBUSY: 283 case -EBUSY:
@@ -308,18 +293,18 @@ backlog_full:
308busy: 293busy:
309 rxrpc_busy(local, &srx, &whdr); 294 rxrpc_busy(local, &srx, &whdr);
310 rxrpc_free_skb(skb); 295 rxrpc_free_skb(skb);
311 goto process_next_packet; 296 return;
312 297
313invalid_service: 298invalid_service:
314 skb->priority = RX_INVALID_OPERATION; 299 skb->priority = RX_INVALID_OPERATION;
315 rxrpc_reject_packet(local, skb); 300 rxrpc_reject_packet(local, skb);
316 goto process_next_packet; 301 return;
317 302
318 /* can't change connection security type mid-flow */ 303 /* can't change connection security type mid-flow */
319security_mismatch: 304security_mismatch:
320 skb->priority = RX_PROTOCOL_ERROR; 305 skb->priority = RX_PROTOCOL_ERROR;
321 rxrpc_reject_packet(local, skb); 306 rxrpc_reject_packet(local, skb);
322 goto process_next_packet; 307 return;
323} 308}
324 309
325/* 310/*
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index 8bdd692d4862..00c92b614485 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -314,19 +314,14 @@ void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
314{ 314{
315 CHECK_SLAB_OKAY(&local->usage); 315 CHECK_SLAB_OKAY(&local->usage);
316 316
317 if (!atomic_inc_not_zero(&local->usage)) {
318 printk("resurrected on reject\n");
319 BUG();
320 }
321
322 skb_queue_tail(&local->reject_queue, skb); 317 skb_queue_tail(&local->reject_queue, skb);
323 rxrpc_queue_work(&local->rejecter); 318 rxrpc_queue_work(&local->processor);
324} 319}
325 320
326/* 321/*
327 * reject packets through the local endpoint 322 * reject packets through the local endpoint
328 */ 323 */
329void rxrpc_reject_packets(struct work_struct *work) 324void rxrpc_reject_packets(struct rxrpc_local *local)
330{ 325{
331 union { 326 union {
332 struct sockaddr sa; 327 struct sockaddr sa;
@@ -334,16 +329,12 @@ void rxrpc_reject_packets(struct work_struct *work)
334 } sa; 329 } sa;
335 struct rxrpc_skb_priv *sp; 330 struct rxrpc_skb_priv *sp;
336 struct rxrpc_wire_header whdr; 331 struct rxrpc_wire_header whdr;
337 struct rxrpc_local *local;
338 struct sk_buff *skb; 332 struct sk_buff *skb;
339 struct msghdr msg; 333 struct msghdr msg;
340 struct kvec iov[2]; 334 struct kvec iov[2];
341 size_t size; 335 size_t size;
342 __be32 code; 336 __be32 code;
343 337
344 local = container_of(work, struct rxrpc_local, rejecter);
345 rxrpc_get_local(local);
346
347 _enter("%d", local->debug_id); 338 _enter("%d", local->debug_id);
348 339
349 iov[0].iov_base = &whdr; 340 iov[0].iov_base = &whdr;
@@ -395,9 +386,7 @@ void rxrpc_reject_packets(struct work_struct *work)
395 } 386 }
396 387
397 rxrpc_free_skb(skb); 388 rxrpc_free_skb(skb);
398 rxrpc_put_local(local);
399 } 389 }
400 390
401 rxrpc_put_local(local);
402 _leave(""); 391 _leave("");
403} 392}
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 3b405dbf3a05..47fb167af3e4 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -594,9 +594,8 @@ static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
594{ 594{
595 _enter("%p,%p", local, skb); 595 _enter("%p,%p", local, skb);
596 596
597 atomic_inc(&local->usage);
598 skb_queue_tail(&local->event_queue, skb); 597 skb_queue_tail(&local->event_queue, skb);
599 rxrpc_queue_work(&local->event_processor); 598 rxrpc_queue_work(&local->processor);
600} 599}
601 600
602/* 601/*
@@ -664,11 +663,15 @@ cant_find_conn:
664/* 663/*
665 * handle data received on the local endpoint 664 * handle data received on the local endpoint
666 * - may be called in interrupt context 665 * - may be called in interrupt context
666 *
667 * The socket is locked by the caller and this prevents the socket from being
668 * shut down and the local endpoint from going away, thus sk_user_data will not
669 * be cleared until this function returns.
667 */ 670 */
668void rxrpc_data_ready(struct sock *sk) 671void rxrpc_data_ready(struct sock *sk)
669{ 672{
670 struct rxrpc_skb_priv *sp; 673 struct rxrpc_skb_priv *sp;
671 struct rxrpc_local *local; 674 struct rxrpc_local *local = sk->sk_user_data;
672 struct sk_buff *skb; 675 struct sk_buff *skb;
673 int ret; 676 int ret;
674 677
@@ -676,21 +679,8 @@ void rxrpc_data_ready(struct sock *sk)
676 679
677 ASSERT(!irqs_disabled()); 680 ASSERT(!irqs_disabled());
678 681
679 read_lock_bh(&rxrpc_local_lock);
680 local = sk->sk_user_data;
681 if (local && atomic_read(&local->usage) > 0)
682 rxrpc_get_local(local);
683 else
684 local = NULL;
685 read_unlock_bh(&rxrpc_local_lock);
686 if (!local) {
687 _leave(" [local dead]");
688 return;
689 }
690
691 skb = skb_recv_datagram(sk, 0, 1, &ret); 682 skb = skb_recv_datagram(sk, 0, 1, &ret);
692 if (!skb) { 683 if (!skb) {
693 rxrpc_put_local(local);
694 if (ret == -EAGAIN) 684 if (ret == -EAGAIN)
695 return; 685 return;
696 _debug("UDP socket error %d", ret); 686 _debug("UDP socket error %d", ret);
@@ -704,7 +694,6 @@ void rxrpc_data_ready(struct sock *sk)
704 /* we'll probably need to checksum it (didn't call sock_recvmsg) */ 694 /* we'll probably need to checksum it (didn't call sock_recvmsg) */
705 if (skb_checksum_complete(skb)) { 695 if (skb_checksum_complete(skb)) {
706 rxrpc_free_skb(skb); 696 rxrpc_free_skb(skb);
707 rxrpc_put_local(local);
708 __UDP_INC_STATS(&init_net, UDP_MIB_INERRORS, 0); 697 __UDP_INC_STATS(&init_net, UDP_MIB_INERRORS, 0);
709 _leave(" [CSUM failed]"); 698 _leave(" [CSUM failed]");
710 return; 699 return;
@@ -769,7 +758,6 @@ void rxrpc_data_ready(struct sock *sk)
769 } 758 }
770 759
771out: 760out:
772 rxrpc_put_local(local);
773 return; 761 return;
774 762
775cant_route_call: 763cant_route_call:
@@ -779,8 +767,7 @@ cant_route_call:
779 if (sp->hdr.seq == 1) { 767 if (sp->hdr.seq == 1) {
780 _debug("first packet"); 768 _debug("first packet");
781 skb_queue_tail(&local->accept_queue, skb); 769 skb_queue_tail(&local->accept_queue, skb);
782 rxrpc_queue_work(&local->acceptor); 770 rxrpc_queue_work(&local->processor);
783 rxrpc_put_local(local);
784 _leave(" [incoming]"); 771 _leave(" [incoming]");
785 return; 772 return;
786 } 773 }
@@ -793,13 +780,11 @@ cant_route_call:
793 _debug("reject type %d",sp->hdr.type); 780 _debug("reject type %d",sp->hdr.type);
794 rxrpc_reject_packet(local, skb); 781 rxrpc_reject_packet(local, skb);
795 } 782 }
796 rxrpc_put_local(local);
797 _leave(" [no call]"); 783 _leave(" [no call]");
798 return; 784 return;
799 785
800bad_message: 786bad_message:
801 skb->priority = RX_PROTOCOL_ERROR; 787 skb->priority = RX_PROTOCOL_ERROR;
802 rxrpc_reject_packet(local, skb); 788 rxrpc_reject_packet(local, skb);
803 rxrpc_put_local(local);
804 _leave(" [badmsg]"); 789 _leave(" [badmsg]");
805} 790}
diff --git a/net/rxrpc/local_event.c b/net/rxrpc/local_event.c
index 194db2e6d548..31a3f86ef2f6 100644
--- a/net/rxrpc/local_event.c
+++ b/net/rxrpc/local_event.c
@@ -82,17 +82,15 @@ static void rxrpc_send_version_request(struct rxrpc_local *local,
82/* 82/*
83 * Process event packets targetted at a local endpoint. 83 * Process event packets targetted at a local endpoint.
84 */ 84 */
85void rxrpc_process_local_events(struct work_struct *work) 85void rxrpc_process_local_events(struct rxrpc_local *local)
86{ 86{
87 struct rxrpc_local *local = container_of(work, struct rxrpc_local, event_processor);
88 struct sk_buff *skb; 87 struct sk_buff *skb;
89 char v; 88 char v;
90 89
91 _enter(""); 90 _enter("");
92 91
93 atomic_inc(&local->usage); 92 skb = skb_dequeue(&local->event_queue);
94 93 if (skb) {
95 while ((skb = skb_dequeue(&local->event_queue))) {
96 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 94 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
97 95
98 _debug("{%d},{%u}", local->debug_id, sp->hdr.type); 96 _debug("{%d},{%u}", local->debug_id, sp->hdr.type);
@@ -111,10 +109,8 @@ void rxrpc_process_local_events(struct work_struct *work)
111 break; 109 break;
112 } 110 }
113 111
114 rxrpc_put_local(local);
115 rxrpc_free_skb(skb); 112 rxrpc_free_skb(skb);
116 } 113 }
117 114
118 rxrpc_put_local(local);
119 _leave(""); 115 _leave("");
120} 116}
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index c1b8d745bf5e..009b321712bc 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -1,6 +1,6 @@
1/* Local endpoint object management 1/* Local endpoint object management
2 * 2 *
3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 3 * Copyright (C) 2016 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com) 4 * Written by David Howells (dhowells@redhat.com)
5 * 5 *
6 * This program is free software; you can redistribute it and/or 6 * This program is free software; you can redistribute it and/or
@@ -17,40 +17,72 @@
17#include <linux/slab.h> 17#include <linux/slab.h>
18#include <linux/udp.h> 18#include <linux/udp.h>
19#include <linux/ip.h> 19#include <linux/ip.h>
20#include <linux/hashtable.h>
20#include <net/sock.h> 21#include <net/sock.h>
21#include <net/af_rxrpc.h> 22#include <net/af_rxrpc.h>
22#include "ar-internal.h" 23#include "ar-internal.h"
23 24
24static LIST_HEAD(rxrpc_locals); 25static void rxrpc_local_processor(struct work_struct *);
25DEFINE_RWLOCK(rxrpc_local_lock); 26static void rxrpc_local_rcu(struct rcu_head *);
26static DECLARE_RWSEM(rxrpc_local_sem);
27static DECLARE_WAIT_QUEUE_HEAD(rxrpc_local_wq);
28 27
29static void rxrpc_destroy_local(struct work_struct *work); 28static DEFINE_MUTEX(rxrpc_local_mutex);
29static LIST_HEAD(rxrpc_local_endpoints);
30 30
31/* 31/*
32 * allocate a new local 32 * Compare a local to an address. Return -ve, 0 or +ve to indicate less than,
33 * same or greater than.
34 *
35 * We explicitly don't compare the RxRPC service ID as we want to reject
36 * conflicting uses by differing services. Further, we don't want to share
37 * addresses with different options (IPv6), so we don't compare those bits
38 * either.
33 */ 39 */
34static 40static long rxrpc_local_cmp_key(const struct rxrpc_local *local,
35struct rxrpc_local *rxrpc_alloc_local(struct sockaddr_rxrpc *srx) 41 const struct sockaddr_rxrpc *srx)
42{
43 long diff;
44
45 diff = ((local->srx.transport_type - srx->transport_type) ?:
46 (local->srx.transport_len - srx->transport_len) ?:
47 (local->srx.transport.family - srx->transport.family));
48 if (diff != 0)
49 return diff;
50
51 switch (srx->transport.family) {
52 case AF_INET:
53 /* If the choice of UDP port is left up to the transport, then
54 * the endpoint record doesn't match.
55 */
56 return ((u16 __force)local->srx.transport.sin.sin_port -
57 (u16 __force)srx->transport.sin.sin_port) ?:
58 memcmp(&local->srx.transport.sin.sin_addr,
59 &srx->transport.sin.sin_addr,
60 sizeof(struct in_addr));
61 default:
62 BUG();
63 }
64}
65
66/*
67 * Allocate a new local endpoint.
68 */
69static struct rxrpc_local *rxrpc_alloc_local(const struct sockaddr_rxrpc *srx)
36{ 70{
37 struct rxrpc_local *local; 71 struct rxrpc_local *local;
38 72
39 local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL); 73 local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL);
40 if (local) { 74 if (local) {
41 INIT_WORK(&local->destroyer, &rxrpc_destroy_local); 75 atomic_set(&local->usage, 1);
42 INIT_WORK(&local->acceptor, &rxrpc_accept_incoming_calls);
43 INIT_WORK(&local->rejecter, &rxrpc_reject_packets);
44 INIT_WORK(&local->event_processor, &rxrpc_process_local_events);
45 INIT_LIST_HEAD(&local->services);
46 INIT_LIST_HEAD(&local->link); 76 INIT_LIST_HEAD(&local->link);
77 INIT_WORK(&local->processor, rxrpc_local_processor);
78 INIT_LIST_HEAD(&local->services);
47 init_rwsem(&local->defrag_sem); 79 init_rwsem(&local->defrag_sem);
48 skb_queue_head_init(&local->accept_queue); 80 skb_queue_head_init(&local->accept_queue);
49 skb_queue_head_init(&local->reject_queue); 81 skb_queue_head_init(&local->reject_queue);
50 skb_queue_head_init(&local->event_queue); 82 skb_queue_head_init(&local->event_queue);
83 mutex_init(&local->conn_lock);
51 spin_lock_init(&local->lock); 84 spin_lock_init(&local->lock);
52 rwlock_init(&local->services_lock); 85 rwlock_init(&local->services_lock);
53 atomic_set(&local->usage, 1);
54 local->debug_id = atomic_inc_return(&rxrpc_debug_id); 86 local->debug_id = atomic_inc_return(&rxrpc_debug_id);
55 memcpy(&local->srx, srx, sizeof(*srx)); 87 memcpy(&local->srx, srx, sizeof(*srx));
56 } 88 }
@@ -61,9 +93,9 @@ struct rxrpc_local *rxrpc_alloc_local(struct sockaddr_rxrpc *srx)
61 93
62/* 94/*
63 * create the local socket 95 * create the local socket
64 * - must be called with rxrpc_local_sem writelocked 96 * - must be called with rxrpc_local_mutex locked
65 */ 97 */
66static int rxrpc_create_local(struct rxrpc_local *local) 98static int rxrpc_open_socket(struct rxrpc_local *local)
67{ 99{
68 struct sock *sock; 100 struct sock *sock;
69 int ret, opt; 101 int ret, opt;
@@ -82,10 +114,10 @@ static int rxrpc_create_local(struct rxrpc_local *local)
82 if (local->srx.transport_len > sizeof(sa_family_t)) { 114 if (local->srx.transport_len > sizeof(sa_family_t)) {
83 _debug("bind"); 115 _debug("bind");
84 ret = kernel_bind(local->socket, 116 ret = kernel_bind(local->socket,
85 (struct sockaddr *) &local->srx.transport, 117 (struct sockaddr *)&local->srx.transport,
86 local->srx.transport_len); 118 local->srx.transport_len);
87 if (ret < 0) { 119 if (ret < 0) {
88 _debug("bind failed"); 120 _debug("bind failed %d", ret);
89 goto error; 121 goto error;
90 } 122 }
91 } 123 }
@@ -108,10 +140,6 @@ static int rxrpc_create_local(struct rxrpc_local *local)
108 goto error; 140 goto error;
109 } 141 }
110 142
111 write_lock_bh(&rxrpc_local_lock);
112 list_add(&local->link, &rxrpc_locals);
113 write_unlock_bh(&rxrpc_local_lock);
114
115 /* set the socket up */ 143 /* set the socket up */
116 sock = local->socket->sk; 144 sock = local->socket->sk;
117 sock->sk_user_data = local; 145 sock->sk_user_data = local;
@@ -131,188 +159,227 @@ error:
131} 159}
132 160
133/* 161/*
134 * create a new local endpoint using the specified UDP address 162 * Look up or create a new local endpoint using the specified local address.
135 */ 163 */
136struct rxrpc_local *rxrpc_lookup_local(struct sockaddr_rxrpc *srx) 164struct rxrpc_local *rxrpc_lookup_local(const struct sockaddr_rxrpc *srx)
137{ 165{
138 struct rxrpc_local *local; 166 struct rxrpc_local *local;
167 struct list_head *cursor;
168 const char *age;
169 long diff;
139 int ret; 170 int ret;
140 171
141 _enter("{%d,%u,%pI4+%hu}", 172 if (srx->transport.family == AF_INET) {
142 srx->transport_type, 173 _enter("{%d,%u,%pI4+%hu}",
143 srx->transport.family, 174 srx->transport_type,
144 &srx->transport.sin.sin_addr, 175 srx->transport.family,
145 ntohs(srx->transport.sin.sin_port)); 176 &srx->transport.sin.sin_addr,
146 177 ntohs(srx->transport.sin.sin_port));
147 down_write(&rxrpc_local_sem); 178 } else {
179 _enter("{%d,%u}",
180 srx->transport_type,
181 srx->transport.family);
182 return ERR_PTR(-EAFNOSUPPORT);
183 }
148 184
149 /* see if we have a suitable local local endpoint already */ 185 mutex_lock(&rxrpc_local_mutex);
150 read_lock_bh(&rxrpc_local_lock);
151 186
152 list_for_each_entry(local, &rxrpc_locals, link) { 187 for (cursor = rxrpc_local_endpoints.next;
153 _debug("CMP {%d,%u,%pI4+%hu}", 188 cursor != &rxrpc_local_endpoints;
154 local->srx.transport_type, 189 cursor = cursor->next) {
155 local->srx.transport.family, 190 local = list_entry(cursor, struct rxrpc_local, link);
156 &local->srx.transport.sin.sin_addr,
157 ntohs(local->srx.transport.sin.sin_port));
158 191
159 if (local->srx.transport_type != srx->transport_type || 192 diff = rxrpc_local_cmp_key(local, srx);
160 local->srx.transport.family != srx->transport.family) 193 if (diff < 0)
161 continue; 194 continue;
195 if (diff > 0)
196 break;
197
198 /* Services aren't allowed to share transport sockets, so
199 * reject that here. It is possible that the object is dying -
200 * but it may also still have the local transport address that
201 * we want bound.
202 */
203 if (srx->srx_service) {
204 local = NULL;
205 goto addr_in_use;
206 }
162 207
163 switch (srx->transport.family) { 208 /* Found a match. We replace a dying object. Attempting to
164 case AF_INET: 209 * bind the transport socket may still fail if we're attempting
165 if (local->srx.transport.sin.sin_port != 210 * to use a local address that the dying object is still using.
166 srx->transport.sin.sin_port) 211 */
167 continue; 212 if (!atomic_inc_not_zero(&local->usage)) {
168 if (memcmp(&local->srx.transport.sin.sin_addr, 213 cursor = cursor->next;
169 &srx->transport.sin.sin_addr, 214 list_del_init(&local->link);
170 sizeof(struct in_addr)) != 0) 215 break;
171 continue;
172 goto found_local;
173
174 default:
175 BUG();
176 } 216 }
177 }
178 217
179 read_unlock_bh(&rxrpc_local_lock); 218 age = "old";
219 goto found;
220 }
180 221
181 /* we didn't find one, so we need to create one */
182 local = rxrpc_alloc_local(srx); 222 local = rxrpc_alloc_local(srx);
183 if (!local) { 223 if (!local)
184 up_write(&rxrpc_local_sem); 224 goto nomem;
185 return ERR_PTR(-ENOMEM);
186 }
187 225
188 ret = rxrpc_create_local(local); 226 ret = rxrpc_open_socket(local);
189 if (ret < 0) { 227 if (ret < 0)
190 up_write(&rxrpc_local_sem); 228 goto sock_error;
191 kfree(local); 229
192 _leave(" = %d", ret); 230 list_add_tail(&local->link, cursor);
193 return ERR_PTR(ret); 231 age = "new";
194 }
195 232
196 up_write(&rxrpc_local_sem); 233found:
234 mutex_unlock(&rxrpc_local_mutex);
197 235
198 _net("LOCAL new %d {%d,%u,%pI4+%hu}", 236 _net("LOCAL %s %d {%d,%u,%pI4+%hu}",
237 age,
199 local->debug_id, 238 local->debug_id,
200 local->srx.transport_type, 239 local->srx.transport_type,
201 local->srx.transport.family, 240 local->srx.transport.family,
202 &local->srx.transport.sin.sin_addr, 241 &local->srx.transport.sin.sin_addr,
203 ntohs(local->srx.transport.sin.sin_port)); 242 ntohs(local->srx.transport.sin.sin_port));
204 243
205 _leave(" = %p [new]", local); 244 _leave(" = %p", local);
206 return local; 245 return local;
207 246
208found_local: 247nomem:
209 rxrpc_get_local(local); 248 ret = -ENOMEM;
210 read_unlock_bh(&rxrpc_local_lock); 249sock_error:
211 up_write(&rxrpc_local_sem); 250 mutex_unlock(&rxrpc_local_mutex);
251 kfree(local);
252 _leave(" = %d", ret);
253 return ERR_PTR(ret);
212 254
213 _net("LOCAL old %d {%d,%u,%pI4+%hu}", 255addr_in_use:
214 local->debug_id, 256 mutex_unlock(&rxrpc_local_mutex);
215 local->srx.transport_type, 257 _leave(" = -EADDRINUSE");
216 local->srx.transport.family, 258 return ERR_PTR(-EADDRINUSE);
217 &local->srx.transport.sin.sin_addr, 259}
218 ntohs(local->srx.transport.sin.sin_port));
219 260
220 _leave(" = %p [reuse]", local); 261/*
221 return local; 262 * A local endpoint reached its end of life.
263 */
264void __rxrpc_put_local(struct rxrpc_local *local)
265{
266 _enter("%d", local->debug_id);
267 rxrpc_queue_work(&local->processor);
222} 268}
223 269
224/* 270/*
225 * release a local endpoint 271 * Destroy a local endpoint's socket and then hand the record to RCU to dispose
272 * of.
273 *
274 * Closing the socket cannot be done from bottom half context or RCU callback
275 * context because it might sleep.
226 */ 276 */
227void rxrpc_put_local(struct rxrpc_local *local) 277static void rxrpc_local_destroyer(struct rxrpc_local *local)
228{ 278{
229 _enter("%p{u=%d}", local, atomic_read(&local->usage)); 279 struct socket *socket = local->socket;
230 280
231 ASSERTCMP(atomic_read(&local->usage), >, 0); 281 _enter("%d", local->debug_id);
232 282
233 /* to prevent a race, the decrement and the dequeue must be effectively 283 /* We can get a race between an incoming call packet queueing the
234 * atomic */ 284 * processor again and the work processor starting the destruction
235 write_lock_bh(&rxrpc_local_lock); 285 * process which will shut down the UDP socket.
236 if (unlikely(atomic_dec_and_test(&local->usage))) { 286 */
237 _debug("destroy local"); 287 if (local->dead) {
238 rxrpc_queue_work(&local->destroyer); 288 _leave(" [already dead]");
289 return;
239 } 290 }
240 write_unlock_bh(&rxrpc_local_lock); 291 local->dead = true;
241 _leave(""); 292
293 mutex_lock(&rxrpc_local_mutex);
294 list_del_init(&local->link);
295 mutex_unlock(&rxrpc_local_mutex);
296
297 ASSERT(list_empty(&local->services));
298
299 if (socket) {
300 local->socket = NULL;
301 kernel_sock_shutdown(socket, SHUT_RDWR);
302 socket->sk->sk_user_data = NULL;
303 sock_release(socket);
304 }
305
306 /* At this point, there should be no more packets coming in to the
307 * local endpoint.
308 */
309 rxrpc_purge_queue(&local->accept_queue);
310 rxrpc_purge_queue(&local->reject_queue);
311 rxrpc_purge_queue(&local->event_queue);
312
313 _debug("rcu local %d", local->debug_id);
314 call_rcu(&local->rcu, rxrpc_local_rcu);
242} 315}
243 316
244/* 317/*
245 * destroy a local endpoint 318 * Process events on an endpoint
246 */ 319 */
247static void rxrpc_destroy_local(struct work_struct *work) 320static void rxrpc_local_processor(struct work_struct *work)
248{ 321{
249 struct rxrpc_local *local = 322 struct rxrpc_local *local =
250 container_of(work, struct rxrpc_local, destroyer); 323 container_of(work, struct rxrpc_local, processor);
324 bool again;
251 325
252 _enter("%p{%d}", local, atomic_read(&local->usage)); 326 _enter("%d", local->debug_id);
253 327
254 down_write(&rxrpc_local_sem); 328 do {
329 again = false;
330 if (atomic_read(&local->usage) == 0)
331 return rxrpc_local_destroyer(local);
255 332
256 write_lock_bh(&rxrpc_local_lock); 333 if (!skb_queue_empty(&local->accept_queue)) {
257 if (atomic_read(&local->usage) > 0) { 334 rxrpc_accept_incoming_calls(local);
258 write_unlock_bh(&rxrpc_local_lock); 335 again = true;
259 up_read(&rxrpc_local_sem); 336 }
260 _leave(" [resurrected]");
261 return;
262 }
263 337
264 list_del(&local->link); 338 if (!skb_queue_empty(&local->reject_queue)) {
265 local->socket->sk->sk_user_data = NULL; 339 rxrpc_reject_packets(local);
266 write_unlock_bh(&rxrpc_local_lock); 340 again = true;
341 }
267 342
268 downgrade_write(&rxrpc_local_sem); 343 if (!skb_queue_empty(&local->event_queue)) {
344 rxrpc_process_local_events(local);
345 again = true;
346 }
347 } while (again);
348}
269 349
270 ASSERT(list_empty(&local->services)); 350/*
271 ASSERT(!work_pending(&local->acceptor)); 351 * Destroy a local endpoint after the RCU grace period expires.
272 ASSERT(!work_pending(&local->rejecter)); 352 */
273 ASSERT(!work_pending(&local->event_processor)); 353static void rxrpc_local_rcu(struct rcu_head *rcu)
354{
355 struct rxrpc_local *local = container_of(rcu, struct rxrpc_local, rcu);
274 356
275 /* finish cleaning up the local descriptor */ 357 _enter("%d", local->debug_id);
276 rxrpc_purge_queue(&local->accept_queue);
277 rxrpc_purge_queue(&local->reject_queue);
278 rxrpc_purge_queue(&local->event_queue);
279 kernel_sock_shutdown(local->socket, SHUT_RDWR);
280 sock_release(local->socket);
281 358
282 up_read(&rxrpc_local_sem); 359 ASSERT(!work_pending(&local->processor));
283 360
284 _net("DESTROY LOCAL %d", local->debug_id); 361 _net("DESTROY LOCAL %d", local->debug_id);
285 kfree(local); 362 kfree(local);
286
287 if (list_empty(&rxrpc_locals))
288 wake_up_all(&rxrpc_local_wq);
289
290 _leave(""); 363 _leave("");
291} 364}
292 365
293/* 366/*
294 * preemptively destroy all local local endpoint rather than waiting for 367 * Verify the local endpoint list is empty by this point.
295 * them to be destroyed
296 */ 368 */
297void __exit rxrpc_destroy_all_locals(void) 369void __exit rxrpc_destroy_all_locals(void)
298{ 370{
299 DECLARE_WAITQUEUE(myself,current); 371 struct rxrpc_local *local;
300 372
301 _enter(""); 373 _enter("");
302 374
303 /* we simply have to wait for them to go away */ 375 if (list_empty(&rxrpc_local_endpoints))
304 if (!list_empty(&rxrpc_locals)) { 376 return;
305 set_current_state(TASK_UNINTERRUPTIBLE);
306 add_wait_queue(&rxrpc_local_wq, &myself);
307
308 while (!list_empty(&rxrpc_locals)) {
309 schedule();
310 set_current_state(TASK_UNINTERRUPTIBLE);
311 }
312 377
313 remove_wait_queue(&rxrpc_local_wq, &myself); 378 mutex_lock(&rxrpc_local_mutex);
314 set_current_state(TASK_RUNNING); 379 list_for_each_entry(local, &rxrpc_local_endpoints, link) {
380 pr_err("AF_RXRPC: Leaked local %p {%d}\n",
381 local, atomic_read(&local->usage));
315 } 382 }
316 383 mutex_unlock(&rxrpc_local_mutex);
317 _leave(""); 384 BUG();
318} 385}