aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2016-09-08 06:10:12 -0400
committerDavid Howells <dhowells@redhat.com>2016-09-08 06:10:12 -0400
commit00e907127e6f86d0f9b122d9b4347a8aa09a8b61 (patch)
tree54b18c4600a8053c089cac5bd60da92dc51d351b
parent49e19ec7d3499f79d2b3a45bb28418e89512fd7a (diff)
rxrpc: Preallocate peers, conns and calls for incoming service requests
Make it possible for the data_ready handler called from the UDP transport socket to completely instantiate an rxrpc_call structure and make it immediately live by preallocating all the memory it might need. The idea is to cut out the background thread usage as much as possible. [Note that the preallocated structs are not actually used in this patch - that will be done in a future patch.] If insufficient resources are available in the preallocation buffers, it will be possible to discard the DATA packet in the data_ready handler or schedule a BUSY packet without the need to schedule an attempt at allocation in a background thread. To this end: (1) Preallocate rxrpc_peer, rxrpc_connection and rxrpc_call structs to a maximum number each of the listen backlog size. The backlog size is limited to a maxmimum of 32. Only this many of each can be in the preallocation buffer. (2) For userspace sockets, the preallocation is charged initially by listen() and will be recharged by accepting or rejecting pending new incoming calls. (3) For kernel services {,re,dis}charging of the preallocation buffers is handled manually. Two notifier callbacks have to be provided before kernel_listen() is invoked: (a) An indication that a new call has been instantiated. This can be used to trigger background recharging. (b) An indication that a call is being discarded. This is used when the socket is being released. A function, rxrpc_kernel_charge_accept() is called by the kernel service to preallocate a single call. It should be passed the user ID to be used for that call and a callback to associate the rxrpc call with the kernel service's side of the ID. (4) Discard the preallocation when the socket is closed. (5) Temporarily bump the refcount on the call allocated in rxrpc_incoming_call() so that rxrpc_release_call() can ditch the preallocation ref on service calls unconditionally. This will no longer be necessary once the preallocation is used. Note that this does not yet control the number of active service calls on a client - that will come in a later patch. A future development would be to provide a setsockopt() call that allows a userspace server to manually charge the preallocation buffer. This would allow user call IDs to be provided in advance and the awkward manual accept stage to be bypassed. Signed-off-by: David Howells <dhowells@redhat.com>
-rw-r--r--fs/afs/rxrpc.c71
-rw-r--r--include/net/af_rxrpc.h10
-rw-r--r--net/rxrpc/af_rxrpc.c16
-rw-r--r--net/rxrpc/ar-internal.h32
-rw-r--r--net/rxrpc/call_accept.c229
-rw-r--r--net/rxrpc/call_object.c12
-rw-r--r--net/rxrpc/conn_object.c2
-rw-r--r--net/rxrpc/conn_service.c24
-rw-r--r--net/rxrpc/input.c2
-rw-r--r--net/rxrpc/proc.c8
10 files changed, 391 insertions, 15 deletions
diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c
index 53750dece80e..720ef05a24fe 100644
--- a/fs/afs/rxrpc.c
+++ b/fs/afs/rxrpc.c
@@ -18,6 +18,7 @@
18 18
19struct socket *afs_socket; /* my RxRPC socket */ 19struct socket *afs_socket; /* my RxRPC socket */
20static struct workqueue_struct *afs_async_calls; 20static struct workqueue_struct *afs_async_calls;
21static struct afs_call *afs_spare_incoming_call;
21static atomic_t afs_outstanding_calls; 22static atomic_t afs_outstanding_calls;
22 23
23static void afs_free_call(struct afs_call *); 24static void afs_free_call(struct afs_call *);
@@ -26,7 +27,8 @@ static int afs_wait_for_call_to_complete(struct afs_call *);
26static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long); 27static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
27static int afs_dont_wait_for_call_to_complete(struct afs_call *); 28static int afs_dont_wait_for_call_to_complete(struct afs_call *);
28static void afs_process_async_call(struct work_struct *); 29static void afs_process_async_call(struct work_struct *);
29static void afs_rx_new_call(struct sock *); 30static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
31static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
30static int afs_deliver_cm_op_id(struct afs_call *); 32static int afs_deliver_cm_op_id(struct afs_call *);
31 33
32/* synchronous call management */ 34/* synchronous call management */
@@ -54,8 +56,10 @@ static const struct afs_call_type afs_RXCMxxxx = {
54}; 56};
55 57
56static void afs_collect_incoming_call(struct work_struct *); 58static void afs_collect_incoming_call(struct work_struct *);
59static void afs_charge_preallocation(struct work_struct *);
57 60
58static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call); 61static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call);
62static DECLARE_WORK(afs_charge_preallocation_work, afs_charge_preallocation);
59 63
60static int afs_wait_atomic_t(atomic_t *p) 64static int afs_wait_atomic_t(atomic_t *p)
61{ 65{
@@ -100,13 +104,15 @@ int afs_open_socket(void)
100 if (ret < 0) 104 if (ret < 0)
101 goto error_2; 105 goto error_2;
102 106
103 rxrpc_kernel_new_call_notification(socket, afs_rx_new_call); 107 rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
108 afs_rx_discard_new_call);
104 109
105 ret = kernel_listen(socket, INT_MAX); 110 ret = kernel_listen(socket, INT_MAX);
106 if (ret < 0) 111 if (ret < 0)
107 goto error_2; 112 goto error_2;
108 113
109 afs_socket = socket; 114 afs_socket = socket;
115 afs_charge_preallocation(NULL);
110 _leave(" = 0"); 116 _leave(" = 0");
111 return 0; 117 return 0;
112 118
@@ -126,6 +132,12 @@ void afs_close_socket(void)
126{ 132{
127 _enter(""); 133 _enter("");
128 134
135 if (afs_spare_incoming_call) {
136 atomic_inc(&afs_outstanding_calls);
137 afs_free_call(afs_spare_incoming_call);
138 afs_spare_incoming_call = NULL;
139 }
140
129 _debug("outstanding %u", atomic_read(&afs_outstanding_calls)); 141 _debug("outstanding %u", atomic_read(&afs_outstanding_calls));
130 wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t, 142 wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
131 TASK_UNINTERRUPTIBLE); 143 TASK_UNINTERRUPTIBLE);
@@ -635,12 +647,65 @@ static void afs_collect_incoming_call(struct work_struct *work)
635 afs_free_call(call); 647 afs_free_call(call);
636} 648}
637 649
650static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
651{
652 struct afs_call *call = (struct afs_call *)user_call_ID;
653
654 call->rxcall = rxcall;
655}
656
657/*
658 * Charge the incoming call preallocation.
659 */
660static void afs_charge_preallocation(struct work_struct *work)
661{
662 struct afs_call *call = afs_spare_incoming_call;
663
664 for (;;) {
665 if (!call) {
666 call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
667 if (!call)
668 break;
669
670 INIT_WORK(&call->async_work, afs_process_async_call);
671 call->wait_mode = &afs_async_incoming_call;
672 call->type = &afs_RXCMxxxx;
673 init_waitqueue_head(&call->waitq);
674 call->state = AFS_CALL_AWAIT_OP_ID;
675 }
676
677 if (rxrpc_kernel_charge_accept(afs_socket,
678 afs_wake_up_async_call,
679 afs_rx_attach,
680 (unsigned long)call,
681 GFP_KERNEL) < 0)
682 break;
683 call = NULL;
684 }
685 afs_spare_incoming_call = call;
686}
687
688/*
689 * Discard a preallocated call when a socket is shut down.
690 */
691static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
692 unsigned long user_call_ID)
693{
694 struct afs_call *call = (struct afs_call *)user_call_ID;
695
696 atomic_inc(&afs_outstanding_calls);
697 call->rxcall = NULL;
698 afs_free_call(call);
699}
700
638/* 701/*
639 * Notification of an incoming call. 702 * Notification of an incoming call.
640 */ 703 */
641static void afs_rx_new_call(struct sock *sk) 704static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
705 unsigned long user_call_ID)
642{ 706{
643 queue_work(afs_wq, &afs_collect_incoming_call_work); 707 queue_work(afs_wq, &afs_collect_incoming_call_work);
708 queue_work(afs_wq, &afs_charge_preallocation_work);
644} 709}
645 710
646/* 711/*
diff --git a/include/net/af_rxrpc.h b/include/net/af_rxrpc.h
index 08ed8729126c..9cf551be916b 100644
--- a/include/net/af_rxrpc.h
+++ b/include/net/af_rxrpc.h
@@ -21,10 +21,14 @@ struct rxrpc_call;
21 21
22typedef void (*rxrpc_notify_rx_t)(struct sock *, struct rxrpc_call *, 22typedef void (*rxrpc_notify_rx_t)(struct sock *, struct rxrpc_call *,
23 unsigned long); 23 unsigned long);
24typedef void (*rxrpc_notify_new_call_t)(struct sock *); 24typedef void (*rxrpc_notify_new_call_t)(struct sock *, struct rxrpc_call *,
25 unsigned long);
26typedef void (*rxrpc_discard_new_call_t)(struct rxrpc_call *, unsigned long);
27typedef void (*rxrpc_user_attach_call_t)(struct rxrpc_call *, unsigned long);
25 28
26void rxrpc_kernel_new_call_notification(struct socket *, 29void rxrpc_kernel_new_call_notification(struct socket *,
27 rxrpc_notify_new_call_t); 30 rxrpc_notify_new_call_t,
31 rxrpc_discard_new_call_t);
28struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *, 32struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
29 struct sockaddr_rxrpc *, 33 struct sockaddr_rxrpc *,
30 struct key *, 34 struct key *,
@@ -43,5 +47,7 @@ struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *, unsigned long,
43int rxrpc_kernel_reject_call(struct socket *); 47int rxrpc_kernel_reject_call(struct socket *);
44void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *, 48void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
45 struct sockaddr_rxrpc *); 49 struct sockaddr_rxrpc *);
50int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t,
51 rxrpc_user_attach_call_t, unsigned long, gfp_t);
46 52
47#endif /* _NET_RXRPC_H */ 53#endif /* _NET_RXRPC_H */
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index f13cca1e973e..1e8cf3ded81f 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -193,7 +193,7 @@ static int rxrpc_listen(struct socket *sock, int backlog)
193{ 193{
194 struct sock *sk = sock->sk; 194 struct sock *sk = sock->sk;
195 struct rxrpc_sock *rx = rxrpc_sk(sk); 195 struct rxrpc_sock *rx = rxrpc_sk(sk);
196 unsigned int max; 196 unsigned int max, old;
197 int ret; 197 int ret;
198 198
199 _enter("%p,%d", rx, backlog); 199 _enter("%p,%d", rx, backlog);
@@ -212,9 +212,13 @@ static int rxrpc_listen(struct socket *sock, int backlog)
212 backlog = max; 212 backlog = max;
213 else if (backlog < 0 || backlog > max) 213 else if (backlog < 0 || backlog > max)
214 break; 214 break;
215 old = sk->sk_max_ack_backlog;
215 sk->sk_max_ack_backlog = backlog; 216 sk->sk_max_ack_backlog = backlog;
216 rx->sk.sk_state = RXRPC_SERVER_LISTENING; 217 ret = rxrpc_service_prealloc(rx, GFP_KERNEL);
217 ret = 0; 218 if (ret == 0)
219 rx->sk.sk_state = RXRPC_SERVER_LISTENING;
220 else
221 sk->sk_max_ack_backlog = old;
218 break; 222 break;
219 default: 223 default:
220 ret = -EBUSY; 224 ret = -EBUSY;
@@ -303,16 +307,19 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call);
303 * rxrpc_kernel_new_call_notification - Get notifications of new calls 307 * rxrpc_kernel_new_call_notification - Get notifications of new calls
304 * @sock: The socket to intercept received messages on 308 * @sock: The socket to intercept received messages on
305 * @notify_new_call: Function to be called when new calls appear 309 * @notify_new_call: Function to be called when new calls appear
310 * @discard_new_call: Function to discard preallocated calls
306 * 311 *
307 * Allow a kernel service to be given notifications about new calls. 312 * Allow a kernel service to be given notifications about new calls.
308 */ 313 */
309void rxrpc_kernel_new_call_notification( 314void rxrpc_kernel_new_call_notification(
310 struct socket *sock, 315 struct socket *sock,
311 rxrpc_notify_new_call_t notify_new_call) 316 rxrpc_notify_new_call_t notify_new_call,
317 rxrpc_discard_new_call_t discard_new_call)
312{ 318{
313 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 319 struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
314 320
315 rx->notify_new_call = notify_new_call; 321 rx->notify_new_call = notify_new_call;
322 rx->discard_new_call = discard_new_call;
316} 323}
317EXPORT_SYMBOL(rxrpc_kernel_new_call_notification); 324EXPORT_SYMBOL(rxrpc_kernel_new_call_notification);
318 325
@@ -622,6 +629,7 @@ static int rxrpc_release_sock(struct sock *sk)
622 } 629 }
623 630
624 /* try to flush out this socket */ 631 /* try to flush out this socket */
632 rxrpc_discard_prealloc(rx);
625 rxrpc_release_calls_on_socket(rx); 633 rxrpc_release_calls_on_socket(rx);
626 flush_workqueue(rxrpc_workqueue); 634 flush_workqueue(rxrpc_workqueue);
627 rxrpc_purge_queue(&sk->sk_receive_queue); 635 rxrpc_purge_queue(&sk->sk_receive_queue);
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 027791261768..45e1c269f90e 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -64,19 +64,42 @@ enum {
64}; 64};
65 65
66/* 66/*
67 * Service backlog preallocation.
68 *
69 * This contains circular buffers of preallocated peers, connections and calls
70 * for incoming service calls and their head and tail pointers. This allows
71 * calls to be set up in the data_ready handler, thereby avoiding the need to
72 * shuffle packets around so much.
73 */
74struct rxrpc_backlog {
75 unsigned short peer_backlog_head;
76 unsigned short peer_backlog_tail;
77 unsigned short conn_backlog_head;
78 unsigned short conn_backlog_tail;
79 unsigned short call_backlog_head;
80 unsigned short call_backlog_tail;
81#define RXRPC_BACKLOG_MAX 32
82 struct rxrpc_peer *peer_backlog[RXRPC_BACKLOG_MAX];
83 struct rxrpc_connection *conn_backlog[RXRPC_BACKLOG_MAX];
84 struct rxrpc_call *call_backlog[RXRPC_BACKLOG_MAX];
85};
86
87/*
67 * RxRPC socket definition 88 * RxRPC socket definition
68 */ 89 */
69struct rxrpc_sock { 90struct rxrpc_sock {
70 /* WARNING: sk has to be the first member */ 91 /* WARNING: sk has to be the first member */
71 struct sock sk; 92 struct sock sk;
72 rxrpc_notify_new_call_t notify_new_call; /* Func to notify of new call */ 93 rxrpc_notify_new_call_t notify_new_call; /* Func to notify of new call */
94 rxrpc_discard_new_call_t discard_new_call; /* Func to discard a new call */
73 struct rxrpc_local *local; /* local endpoint */ 95 struct rxrpc_local *local; /* local endpoint */
74 struct hlist_node listen_link; /* link in the local endpoint's listen list */ 96 struct hlist_node listen_link; /* link in the local endpoint's listen list */
75 struct list_head secureq; /* calls awaiting connection security clearance */ 97 struct list_head secureq; /* calls awaiting connection security clearance */
76 struct list_head acceptq; /* calls awaiting acceptance */ 98 struct list_head acceptq; /* calls awaiting acceptance */
99 struct rxrpc_backlog *backlog; /* Preallocation for services */
77 struct key *key; /* security for this socket */ 100 struct key *key; /* security for this socket */
78 struct key *securities; /* list of server security descriptors */ 101 struct key *securities; /* list of server security descriptors */
79 struct rb_root calls; /* outstanding calls on this socket */ 102 struct rb_root calls; /* User ID -> call mapping */
80 unsigned long flags; 103 unsigned long flags;
81#define RXRPC_SOCK_CONNECTED 0 /* connect_srx is set */ 104#define RXRPC_SOCK_CONNECTED 0 /* connect_srx is set */
82 rwlock_t call_lock; /* lock for calls */ 105 rwlock_t call_lock; /* lock for calls */
@@ -290,6 +313,7 @@ enum rxrpc_conn_cache_state {
290enum rxrpc_conn_proto_state { 313enum rxrpc_conn_proto_state {
291 RXRPC_CONN_UNUSED, /* Connection not yet attempted */ 314 RXRPC_CONN_UNUSED, /* Connection not yet attempted */
292 RXRPC_CONN_CLIENT, /* Client connection */ 315 RXRPC_CONN_CLIENT, /* Client connection */
316 RXRPC_CONN_SERVICE_PREALLOC, /* Service connection preallocation */
293 RXRPC_CONN_SERVICE_UNSECURED, /* Service unsecured connection */ 317 RXRPC_CONN_SERVICE_UNSECURED, /* Service unsecured connection */
294 RXRPC_CONN_SERVICE_CHALLENGING, /* Service challenging for security */ 318 RXRPC_CONN_SERVICE_CHALLENGING, /* Service challenging for security */
295 RXRPC_CONN_SERVICE, /* Service secured connection */ 319 RXRPC_CONN_SERVICE, /* Service secured connection */
@@ -408,6 +432,7 @@ enum rxrpc_call_state {
408 RXRPC_CALL_CLIENT_AWAIT_REPLY, /* - client awaiting reply */ 432 RXRPC_CALL_CLIENT_AWAIT_REPLY, /* - client awaiting reply */
409 RXRPC_CALL_CLIENT_RECV_REPLY, /* - client receiving reply phase */ 433 RXRPC_CALL_CLIENT_RECV_REPLY, /* - client receiving reply phase */
410 RXRPC_CALL_CLIENT_FINAL_ACK, /* - client sending final ACK phase */ 434 RXRPC_CALL_CLIENT_FINAL_ACK, /* - client sending final ACK phase */
435 RXRPC_CALL_SERVER_PREALLOC, /* - service preallocation */
411 RXRPC_CALL_SERVER_SECURING, /* - server securing request connection */ 436 RXRPC_CALL_SERVER_SECURING, /* - server securing request connection */
412 RXRPC_CALL_SERVER_ACCEPTING, /* - server accepting request */ 437 RXRPC_CALL_SERVER_ACCEPTING, /* - server accepting request */
413 RXRPC_CALL_SERVER_RECV_REQUEST, /* - server receiving request */ 438 RXRPC_CALL_SERVER_RECV_REQUEST, /* - server receiving request */
@@ -534,6 +559,8 @@ extern struct workqueue_struct *rxrpc_workqueue;
534/* 559/*
535 * call_accept.c 560 * call_accept.c
536 */ 561 */
562int rxrpc_service_prealloc(struct rxrpc_sock *, gfp_t);
563void rxrpc_discard_prealloc(struct rxrpc_sock *);
537void rxrpc_accept_incoming_calls(struct rxrpc_local *); 564void rxrpc_accept_incoming_calls(struct rxrpc_local *);
538struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long, 565struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long,
539 rxrpc_notify_rx_t); 566 rxrpc_notify_rx_t);
@@ -557,6 +584,7 @@ extern struct list_head rxrpc_calls;
557extern rwlock_t rxrpc_call_lock; 584extern rwlock_t rxrpc_call_lock;
558 585
559struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *, unsigned long); 586struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *, unsigned long);
587struct rxrpc_call *rxrpc_alloc_call(gfp_t);
560struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *, 588struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
561 struct rxrpc_conn_parameters *, 589 struct rxrpc_conn_parameters *,
562 struct sockaddr_rxrpc *, 590 struct sockaddr_rxrpc *,
@@ -573,6 +601,7 @@ void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
573void rxrpc_put_call(struct rxrpc_call *, enum rxrpc_call_trace); 601void rxrpc_put_call(struct rxrpc_call *, enum rxrpc_call_trace);
574void rxrpc_get_call_for_skb(struct rxrpc_call *, struct sk_buff *); 602void rxrpc_get_call_for_skb(struct rxrpc_call *, struct sk_buff *);
575void rxrpc_put_call_for_skb(struct rxrpc_call *, struct sk_buff *); 603void rxrpc_put_call_for_skb(struct rxrpc_call *, struct sk_buff *);
604void rxrpc_cleanup_call(struct rxrpc_call *);
576void __exit rxrpc_destroy_all_calls(void); 605void __exit rxrpc_destroy_all_calls(void);
577 606
578static inline bool rxrpc_is_service_call(const struct rxrpc_call *call) 607static inline bool rxrpc_is_service_call(const struct rxrpc_call *call)
@@ -757,6 +786,7 @@ struct rxrpc_connection *rxrpc_find_service_conn_rcu(struct rxrpc_peer *,
757struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *, 786struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *,
758 struct sockaddr_rxrpc *, 787 struct sockaddr_rxrpc *,
759 struct sk_buff *); 788 struct sk_buff *);
789struct rxrpc_connection *rxrpc_prealloc_service_connection(gfp_t);
760void rxrpc_unpublish_service_conn(struct rxrpc_connection *); 790void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
761 791
762/* 792/*
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 4c71efcf82ed..cc7194e05a15 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -20,12 +20,210 @@
20#include <linux/in6.h> 20#include <linux/in6.h>
21#include <linux/icmp.h> 21#include <linux/icmp.h>
22#include <linux/gfp.h> 22#include <linux/gfp.h>
23#include <linux/circ_buf.h>
23#include <net/sock.h> 24#include <net/sock.h>
24#include <net/af_rxrpc.h> 25#include <net/af_rxrpc.h>
25#include <net/ip.h> 26#include <net/ip.h>
26#include "ar-internal.h" 27#include "ar-internal.h"
27 28
28/* 29/*
30 * Preallocate a single service call, connection and peer and, if possible,
31 * give them a user ID and attach the user's side of the ID to them.
32 */
33static int rxrpc_service_prealloc_one(struct rxrpc_sock *rx,
34 struct rxrpc_backlog *b,
35 rxrpc_notify_rx_t notify_rx,
36 rxrpc_user_attach_call_t user_attach_call,
37 unsigned long user_call_ID, gfp_t gfp)
38{
39 const void *here = __builtin_return_address(0);
40 struct rxrpc_call *call;
41 int max, tmp;
42 unsigned int size = RXRPC_BACKLOG_MAX;
43 unsigned int head, tail, call_head, call_tail;
44
45 max = rx->sk.sk_max_ack_backlog;
46 tmp = rx->sk.sk_ack_backlog;
47 if (tmp >= max) {
48 _leave(" = -ENOBUFS [full %u]", max);
49 return -ENOBUFS;
50 }
51 max -= tmp;
52
53 /* We don't need more conns and peers than we have calls, but on the
54 * other hand, we shouldn't ever use more peers than conns or conns
55 * than calls.
56 */
57 call_head = b->call_backlog_head;
58 call_tail = READ_ONCE(b->call_backlog_tail);
59 tmp = CIRC_CNT(call_head, call_tail, size);
60 if (tmp >= max) {
61 _leave(" = -ENOBUFS [enough %u]", tmp);
62 return -ENOBUFS;
63 }
64 max = tmp + 1;
65
66 head = b->peer_backlog_head;
67 tail = READ_ONCE(b->peer_backlog_tail);
68 if (CIRC_CNT(head, tail, size) < max) {
69 struct rxrpc_peer *peer = rxrpc_alloc_peer(rx->local, gfp);
70 if (!peer)
71 return -ENOMEM;
72 b->peer_backlog[head] = peer;
73 smp_store_release(&b->peer_backlog_head,
74 (head + 1) & (size - 1));
75 }
76
77 head = b->conn_backlog_head;
78 tail = READ_ONCE(b->conn_backlog_tail);
79 if (CIRC_CNT(head, tail, size) < max) {
80 struct rxrpc_connection *conn;
81
82 conn = rxrpc_prealloc_service_connection(gfp);
83 if (!conn)
84 return -ENOMEM;
85 b->conn_backlog[head] = conn;
86 smp_store_release(&b->conn_backlog_head,
87 (head + 1) & (size - 1));
88 }
89
90 /* Now it gets complicated, because calls get registered with the
91 * socket here, particularly if a user ID is preassigned by the user.
92 */
93 call = rxrpc_alloc_call(gfp);
94 if (!call)
95 return -ENOMEM;
96 call->flags |= (1 << RXRPC_CALL_IS_SERVICE);
97 call->state = RXRPC_CALL_SERVER_PREALLOC;
98
99 trace_rxrpc_call(call, rxrpc_call_new_service,
100 atomic_read(&call->usage),
101 here, (const void *)user_call_ID);
102
103 write_lock(&rx->call_lock);
104 if (user_attach_call) {
105 struct rxrpc_call *xcall;
106 struct rb_node *parent, **pp;
107
108 /* Check the user ID isn't already in use */
109 pp = &rx->calls.rb_node;
110 parent = NULL;
111 while (*pp) {
112 parent = *pp;
113 xcall = rb_entry(parent, struct rxrpc_call, sock_node);
114 if (user_call_ID < call->user_call_ID)
115 pp = &(*pp)->rb_left;
116 else if (user_call_ID > call->user_call_ID)
117 pp = &(*pp)->rb_right;
118 else
119 goto id_in_use;
120 }
121
122 call->user_call_ID = user_call_ID;
123 call->notify_rx = notify_rx;
124 rxrpc_get_call(call, rxrpc_call_got);
125 user_attach_call(call, user_call_ID);
126 rxrpc_get_call(call, rxrpc_call_got_userid);
127 rb_link_node(&call->sock_node, parent, pp);
128 rb_insert_color(&call->sock_node, &rx->calls);
129 set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
130 }
131
132 write_unlock(&rx->call_lock);
133
134 write_lock(&rxrpc_call_lock);
135 list_add_tail(&call->link, &rxrpc_calls);
136 write_unlock(&rxrpc_call_lock);
137
138 b->call_backlog[call_head] = call;
139 smp_store_release(&b->call_backlog_head, (call_head + 1) & (size - 1));
140 _leave(" = 0 [%d -> %lx]", call->debug_id, user_call_ID);
141 return 0;
142
143id_in_use:
144 write_unlock(&rx->call_lock);
145 rxrpc_cleanup_call(call);
146 _leave(" = -EBADSLT");
147 return -EBADSLT;
148}
149
150/*
151 * Preallocate sufficient service connections, calls and peers to cover the
152 * entire backlog of a socket. When a new call comes in, if we don't have
153 * sufficient of each available, the call gets rejected as busy or ignored.
154 *
155 * The backlog is replenished when a connection is accepted or rejected.
156 */
157int rxrpc_service_prealloc(struct rxrpc_sock *rx, gfp_t gfp)
158{
159 struct rxrpc_backlog *b = rx->backlog;
160
161 if (!b) {
162 b = kzalloc(sizeof(struct rxrpc_backlog), gfp);
163 if (!b)
164 return -ENOMEM;
165 rx->backlog = b;
166 }
167
168 if (rx->discard_new_call)
169 return 0;
170
171 while (rxrpc_service_prealloc_one(rx, b, NULL, NULL, 0, gfp) == 0)
172 ;
173
174 return 0;
175}
176
177/*
178 * Discard the preallocation on a service.
179 */
180void rxrpc_discard_prealloc(struct rxrpc_sock *rx)
181{
182 struct rxrpc_backlog *b = rx->backlog;
183 unsigned int size = RXRPC_BACKLOG_MAX, head, tail;
184
185 if (!b)
186 return;
187 rx->backlog = NULL;
188
189 head = b->peer_backlog_head;
190 tail = b->peer_backlog_tail;
191 while (CIRC_CNT(head, tail, size) > 0) {
192 struct rxrpc_peer *peer = b->peer_backlog[tail];
193 kfree(peer);
194 tail = (tail + 1) & (size - 1);
195 }
196
197 head = b->conn_backlog_head;
198 tail = b->conn_backlog_tail;
199 while (CIRC_CNT(head, tail, size) > 0) {
200 struct rxrpc_connection *conn = b->conn_backlog[tail];
201 write_lock(&rxrpc_connection_lock);
202 list_del(&conn->link);
203 list_del(&conn->proc_link);
204 write_unlock(&rxrpc_connection_lock);
205 kfree(conn);
206 tail = (tail + 1) & (size - 1);
207 }
208
209 head = b->call_backlog_head;
210 tail = b->call_backlog_tail;
211 while (CIRC_CNT(head, tail, size) > 0) {
212 struct rxrpc_call *call = b->call_backlog[tail];
213 if (rx->discard_new_call) {
214 _debug("discard %lx", call->user_call_ID);
215 rx->discard_new_call(call, call->user_call_ID);
216 }
217 rxrpc_call_completed(call);
218 rxrpc_release_call(rx, call);
219 rxrpc_put_call(call, rxrpc_call_put);
220 tail = (tail + 1) & (size - 1);
221 }
222
223 kfree(b);
224}
225
226/*
29 * generate a connection-level abort 227 * generate a connection-level abort
30 */ 228 */
31static int rxrpc_busy(struct rxrpc_local *local, struct sockaddr_rxrpc *srx, 229static int rxrpc_busy(struct rxrpc_local *local, struct sockaddr_rxrpc *srx,
@@ -450,3 +648,34 @@ int rxrpc_kernel_reject_call(struct socket *sock)
450 return ret; 648 return ret;
451} 649}
452EXPORT_SYMBOL(rxrpc_kernel_reject_call); 650EXPORT_SYMBOL(rxrpc_kernel_reject_call);
651
652/*
653 * rxrpc_kernel_charge_accept - Charge up socket with preallocated calls
654 * @sock: The socket on which to preallocate
655 * @notify_rx: Event notification function for the call
656 * @user_attach_call: Func to attach call to user_call_ID
657 * @user_call_ID: The tag to attach to the preallocated call
658 * @gfp: The allocation conditions.
659 *
660 * Charge up the socket with preallocated calls, each with a user ID. A
661 * function should be provided to effect the attachment from the user's side.
662 * The user is given a ref to hold on the call.
663 *
664 * Note that the call may be come connected before this function returns.
665 */
666int rxrpc_kernel_charge_accept(struct socket *sock,
667 rxrpc_notify_rx_t notify_rx,
668 rxrpc_user_attach_call_t user_attach_call,
669 unsigned long user_call_ID, gfp_t gfp)
670{
671 struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
672 struct rxrpc_backlog *b = rx->backlog;
673
674 if (sock->sk->sk_state == RXRPC_CLOSE)
675 return -ESHUTDOWN;
676
677 return rxrpc_service_prealloc_one(rx, b, notify_rx,
678 user_attach_call, user_call_ID,
679 gfp);
680}
681EXPORT_SYMBOL(rxrpc_kernel_charge_accept);
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index f843397e03b6..d233adc9b5e5 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -31,6 +31,7 @@ const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
31 [RXRPC_CALL_CLIENT_AWAIT_REPLY] = "ClAwtRpl", 31 [RXRPC_CALL_CLIENT_AWAIT_REPLY] = "ClAwtRpl",
32 [RXRPC_CALL_CLIENT_RECV_REPLY] = "ClRcvRpl", 32 [RXRPC_CALL_CLIENT_RECV_REPLY] = "ClRcvRpl",
33 [RXRPC_CALL_CLIENT_FINAL_ACK] = "ClFnlACK", 33 [RXRPC_CALL_CLIENT_FINAL_ACK] = "ClFnlACK",
34 [RXRPC_CALL_SERVER_PREALLOC] = "SvPrealc",
34 [RXRPC_CALL_SERVER_SECURING] = "SvSecure", 35 [RXRPC_CALL_SERVER_SECURING] = "SvSecure",
35 [RXRPC_CALL_SERVER_ACCEPTING] = "SvAccept", 36 [RXRPC_CALL_SERVER_ACCEPTING] = "SvAccept",
36 [RXRPC_CALL_SERVER_RECV_REQUEST] = "SvRcvReq", 37 [RXRPC_CALL_SERVER_RECV_REQUEST] = "SvRcvReq",
@@ -71,7 +72,6 @@ DEFINE_RWLOCK(rxrpc_call_lock);
71static void rxrpc_call_life_expired(unsigned long _call); 72static void rxrpc_call_life_expired(unsigned long _call);
72static void rxrpc_ack_time_expired(unsigned long _call); 73static void rxrpc_ack_time_expired(unsigned long _call);
73static void rxrpc_resend_time_expired(unsigned long _call); 74static void rxrpc_resend_time_expired(unsigned long _call);
74static void rxrpc_cleanup_call(struct rxrpc_call *call);
75 75
76/* 76/*
77 * find an extant server call 77 * find an extant server call
@@ -113,7 +113,7 @@ found_extant_call:
113/* 113/*
114 * allocate a new call 114 * allocate a new call
115 */ 115 */
116static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp) 116struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
117{ 117{
118 struct rxrpc_call *call; 118 struct rxrpc_call *call;
119 119
@@ -392,6 +392,9 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
392 if (call_id <= conn->channels[chan].call_counter) 392 if (call_id <= conn->channels[chan].call_counter)
393 goto old_call; /* TODO: Just drop packet */ 393 goto old_call; /* TODO: Just drop packet */
394 394
395 /* Temporary: Mirror the backlog prealloc ref (TODO: use prealloc) */
396 rxrpc_get_call(candidate, rxrpc_call_got);
397
395 /* make the call available */ 398 /* make the call available */
396 _debug("new call"); 399 _debug("new call");
397 call = candidate; 400 call = candidate;
@@ -596,6 +599,9 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
596 del_timer_sync(&call->ack_timer); 599 del_timer_sync(&call->ack_timer);
597 del_timer_sync(&call->lifetimer); 600 del_timer_sync(&call->lifetimer);
598 601
602 /* We have to release the prealloc backlog ref */
603 if (rxrpc_is_service_call(call))
604 rxrpc_put_call(call, rxrpc_call_put);
599 _leave(""); 605 _leave("");
600} 606}
601 607
@@ -682,7 +688,7 @@ static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
682/* 688/*
683 * clean up a call 689 * clean up a call
684 */ 690 */
685static void rxrpc_cleanup_call(struct rxrpc_call *call) 691void rxrpc_cleanup_call(struct rxrpc_call *call)
686{ 692{
687 _net("DESTROY CALL %d", call->debug_id); 693 _net("DESTROY CALL %d", call->debug_id);
688 694
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 9c6685b97e70..8da82e3aa00e 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -286,6 +286,8 @@ static void rxrpc_connection_reaper(struct work_struct *work)
286 ASSERTCMP(atomic_read(&conn->usage), >, 0); 286 ASSERTCMP(atomic_read(&conn->usage), >, 0);
287 if (likely(atomic_read(&conn->usage) > 1)) 287 if (likely(atomic_read(&conn->usage) > 1))
288 continue; 288 continue;
289 if (conn->state == RXRPC_CONN_SERVICE_PREALLOC)
290 continue;
289 291
290 idle_timestamp = READ_ONCE(conn->idle_timestamp); 292 idle_timestamp = READ_ONCE(conn->idle_timestamp);
291 _debug("reap CONN %d { u=%d,t=%ld }", 293 _debug("reap CONN %d { u=%d,t=%ld }",
diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c
index 316a92107fee..189338a60457 100644
--- a/net/rxrpc/conn_service.c
+++ b/net/rxrpc/conn_service.c
@@ -119,6 +119,30 @@ replace_old_connection:
119} 119}
120 120
121/* 121/*
122 * Preallocate a service connection. The connection is placed on the proc and
123 * reap lists so that we don't have to get the lock from BH context.
124 */
125struct rxrpc_connection *rxrpc_prealloc_service_connection(gfp_t gfp)
126{
127 struct rxrpc_connection *conn = rxrpc_alloc_connection(gfp);
128
129 if (conn) {
130 /* We maintain an extra ref on the connection whilst it is on
131 * the rxrpc_connections list.
132 */
133 conn->state = RXRPC_CONN_SERVICE_PREALLOC;
134 atomic_set(&conn->usage, 2);
135
136 write_lock(&rxrpc_connection_lock);
137 list_add_tail(&conn->link, &rxrpc_connections);
138 list_add_tail(&conn->proc_link, &rxrpc_connection_proc_list);
139 write_unlock(&rxrpc_connection_lock);
140 }
141
142 return conn;
143}
144
145/*
122 * get a record of an incoming connection 146 * get a record of an incoming connection
123 */ 147 */
124struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local, 148struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 6c4b7df05e95..5906579060cd 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -102,7 +102,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
102 rx->notify_new_call) { 102 rx->notify_new_call) {
103 spin_unlock_bh(&sk->sk_receive_queue.lock); 103 spin_unlock_bh(&sk->sk_receive_queue.lock);
104 skb_queue_tail(&call->knlrecv_queue, skb); 104 skb_queue_tail(&call->knlrecv_queue, skb);
105 rx->notify_new_call(&rx->sk); 105 rx->notify_new_call(&rx->sk, NULL, 0);
106 } else if (call->notify_rx) { 106 } else if (call->notify_rx) {
107 spin_unlock_bh(&sk->sk_receive_queue.lock); 107 spin_unlock_bh(&sk->sk_receive_queue.lock);
108 skb_queue_tail(&call->knlrecv_queue, skb); 108 skb_queue_tail(&call->knlrecv_queue, skb);
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index dfad23821a62..d529d1b4021c 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -17,6 +17,7 @@
17static const char *const rxrpc_conn_states[RXRPC_CONN__NR_STATES] = { 17static const char *const rxrpc_conn_states[RXRPC_CONN__NR_STATES] = {
18 [RXRPC_CONN_UNUSED] = "Unused ", 18 [RXRPC_CONN_UNUSED] = "Unused ",
19 [RXRPC_CONN_CLIENT] = "Client ", 19 [RXRPC_CONN_CLIENT] = "Client ",
20 [RXRPC_CONN_SERVICE_PREALLOC] = "SvPrealc",
20 [RXRPC_CONN_SERVICE_UNSECURED] = "SvUnsec ", 21 [RXRPC_CONN_SERVICE_UNSECURED] = "SvUnsec ",
21 [RXRPC_CONN_SERVICE_CHALLENGING] = "SvChall ", 22 [RXRPC_CONN_SERVICE_CHALLENGING] = "SvChall ",
22 [RXRPC_CONN_SERVICE] = "SvSecure", 23 [RXRPC_CONN_SERVICE] = "SvSecure",
@@ -156,6 +157,11 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
156 } 157 }
157 158
158 conn = list_entry(v, struct rxrpc_connection, proc_link); 159 conn = list_entry(v, struct rxrpc_connection, proc_link);
160 if (conn->state == RXRPC_CONN_SERVICE_PREALLOC) {
161 strcpy(lbuff, "no_local");
162 strcpy(rbuff, "no_connection");
163 goto print;
164 }
159 165
160 sprintf(lbuff, "%pI4:%u", 166 sprintf(lbuff, "%pI4:%u",
161 &conn->params.local->srx.transport.sin.sin_addr, 167 &conn->params.local->srx.transport.sin.sin_addr,
@@ -164,7 +170,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
164 sprintf(rbuff, "%pI4:%u", 170 sprintf(rbuff, "%pI4:%u",
165 &conn->params.peer->srx.transport.sin.sin_addr, 171 &conn->params.peer->srx.transport.sin.sin_addr,
166 ntohs(conn->params.peer->srx.transport.sin.sin_port)); 172 ntohs(conn->params.peer->srx.transport.sin.sin_port));
167 173print:
168 seq_printf(seq, 174 seq_printf(seq,
169 "UDP %-22.22s %-22.22s %4x %08x %s %3u" 175 "UDP %-22.22s %-22.22s %4x %08x %s %3u"
170 " %s %08x %08x %08x\n", 176 " %s %08x %08x %08x\n",