aboutsummaryrefslogtreecommitdiffstats
path: root/net/rxrpc
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2007-04-26 18:50:17 -0400
committerDavid S. Miller <davem@davemloft.net>2007-04-26 18:50:17 -0400
commit651350d10f93bed7003c9a66e24cf25e0f8eed3d (patch)
tree4748c1dd0b1a905b0e34b100c3c6ced6565a06de /net/rxrpc
parentec26815ad847dbf74a1e27aa5515fb7d5dc6ee6f (diff)
[AF_RXRPC]: Add an interface to the AF_RXRPC module for the AFS filesystem to use
Add an interface to the AF_RXRPC module so that the AFS filesystem module can more easily make use of the services available. AFS still opens a socket but then uses the action functions in lieu of sendmsg() and registers an intercept functions to grab messages before they're queued on the socket Rx queue. This permits AFS (or whatever) to: (1) Avoid the overhead of using the recvmsg() call. (2) Use different keys directly on individual client calls on one socket rather than having to open a whole slew of sockets, one for each key it might want to use. (3) Avoid calling request_key() at the point of issue of a call or opening of a socket. This is done instead by AFS at the point of open(), unlink() or other VFS operation and the key handed through. (4) Request the use of something other than GFP_KERNEL to allocate memory. Furthermore: (*) The socket buffer markings used by RxRPC are made available for AFS so that it can interpret the cooked RxRPC messages itself. (*) rxgen (un)marshalling abort codes are made available. The following documentation for the kernel interface is added to Documentation/networking/rxrpc.txt: ========================= AF_RXRPC KERNEL INTERFACE ========================= The AF_RXRPC module also provides an interface for use by in-kernel utilities such as the AFS filesystem. This permits such a utility to: (1) Use different keys directly on individual client calls on one socket rather than having to open a whole slew of sockets, one for each key it might want to use. (2) Avoid having RxRPC call request_key() at the point of issue of a call or opening of a socket. Instead the utility is responsible for requesting a key at the appropriate point. AFS, for instance, would do this during VFS operations such as open() or unlink(). The key is then handed through when the call is initiated. (3) Request the use of something other than GFP_KERNEL to allocate memory. (4) Avoid the overhead of using the recvmsg() call. RxRPC messages can be intercepted before they get put into the socket Rx queue and the socket buffers manipulated directly. To use the RxRPC facility, a kernel utility must still open an AF_RXRPC socket, bind an addess as appropriate and listen if it's to be a server socket, but then it passes this to the kernel interface functions. The kernel interface functions are as follows: (*) Begin a new client call. struct rxrpc_call * rxrpc_kernel_begin_call(struct socket *sock, struct sockaddr_rxrpc *srx, struct key *key, unsigned long user_call_ID, gfp_t gfp); This allocates the infrastructure to make a new RxRPC call and assigns call and connection numbers. The call will be made on the UDP port that the socket is bound to. The call will go to the destination address of a connected client socket unless an alternative is supplied (srx is non-NULL). If a key is supplied then this will be used to secure the call instead of the key bound to the socket with the RXRPC_SECURITY_KEY sockopt. Calls secured in this way will still share connections if at all possible. The user_call_ID is equivalent to that supplied to sendmsg() in the control data buffer. It is entirely feasible to use this to point to a kernel data structure. If this function is successful, an opaque reference to the RxRPC call is returned. The caller now holds a reference on this and it must be properly ended. (*) End a client call. void rxrpc_kernel_end_call(struct rxrpc_call *call); This is used to end a previously begun call. The user_call_ID is expunged from AF_RXRPC's knowledge and will not be seen again in association with the specified call. (*) Send data through a call. int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg, size_t len); This is used to supply either the request part of a client call or the reply part of a server call. msg.msg_iovlen and msg.msg_iov specify the data buffers to be used. msg_iov may not be NULL and must point exclusively to in-kernel virtual addresses. msg.msg_flags may be given MSG_MORE if there will be subsequent data sends for this call. The msg must not specify a destination address, control data or any flags other than MSG_MORE. len is the total amount of data to transmit. (*) Abort a call. void rxrpc_kernel_abort_call(struct rxrpc_call *call, u32 abort_code); This is used to abort a call if it's still in an abortable state. The abort code specified will be placed in the ABORT message sent. (*) Intercept received RxRPC messages. typedef void (*rxrpc_interceptor_t)(struct sock *sk, unsigned long user_call_ID, struct sk_buff *skb); void rxrpc_kernel_intercept_rx_messages(struct socket *sock, rxrpc_interceptor_t interceptor); This installs an interceptor function on the specified AF_RXRPC socket. All messages that would otherwise wind up in the socket's Rx queue are then diverted to this function. Note that care must be taken to process the messages in the right order to maintain DATA message sequentiality. The interceptor function itself is provided with the address of the socket and handling the incoming message, the ID assigned by the kernel utility to the call and the socket buffer containing the message. The skb->mark field indicates the type of message: MARK MEANING =============================== ======================================= RXRPC_SKB_MARK_DATA Data message RXRPC_SKB_MARK_FINAL_ACK Final ACK received for an incoming call RXRPC_SKB_MARK_BUSY Client call rejected as server busy RXRPC_SKB_MARK_REMOTE_ABORT Call aborted by peer RXRPC_SKB_MARK_NET_ERROR Network error detected RXRPC_SKB_MARK_LOCAL_ERROR Local error encountered RXRPC_SKB_MARK_NEW_CALL New incoming call awaiting acceptance The remote abort message can be probed with rxrpc_kernel_get_abort_code(). The two error messages can be probed with rxrpc_kernel_get_error_number(). A new call can be accepted with rxrpc_kernel_accept_call(). Data messages can have their contents extracted with the usual bunch of socket buffer manipulation functions. A data message can be determined to be the last one in a sequence with rxrpc_kernel_is_data_last(). When a data message has been used up, rxrpc_kernel_data_delivered() should be called on it.. Non-data messages should be handled to rxrpc_kernel_free_skb() to dispose of. It is possible to get extra refs on all types of message for later freeing, but this may pin the state of a call until the message is finally freed. (*) Accept an incoming call. struct rxrpc_call * rxrpc_kernel_accept_call(struct socket *sock, unsigned long user_call_ID); This is used to accept an incoming call and to assign it a call ID. This function is similar to rxrpc_kernel_begin_call() and calls accepted must be ended in the same way. If this function is successful, an opaque reference to the RxRPC call is returned. The caller now holds a reference on this and it must be properly ended. (*) Reject an incoming call. int rxrpc_kernel_reject_call(struct socket *sock); This is used to reject the first incoming call on the socket's queue with a BUSY message. -ENODATA is returned if there were no incoming calls. Other errors may be returned if the call had been aborted (-ECONNABORTED) or had timed out (-ETIME). (*) Record the delivery of a data message and free it. void rxrpc_kernel_data_delivered(struct sk_buff *skb); This is used to record a data message as having been delivered and to update the ACK state for the call. The socket buffer will be freed. (*) Free a message. void rxrpc_kernel_free_skb(struct sk_buff *skb); This is used to free a non-DATA socket buffer intercepted from an AF_RXRPC socket. (*) Determine if a data message is the last one on a call. bool rxrpc_kernel_is_data_last(struct sk_buff *skb); This is used to determine if a socket buffer holds the last data message to be received for a call (true will be returned if it does, false if not). The data message will be part of the reply on a client call and the request on an incoming call. In the latter case there will be more messages, but in the former case there will not. (*) Get the abort code from an abort message. u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb); This is used to extract the abort code from a remote abort message. (*) Get the error number from a local or network error message. int rxrpc_kernel_get_error_number(struct sk_buff *skb); This is used to extract the error number from a message indicating either a local error occurred or a network error occurred. Signed-off-by: David Howells <dhowells@redhat.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/rxrpc')
-rw-r--r--net/rxrpc/af_rxrpc.c141
-rw-r--r--net/rxrpc/ar-accept.c119
-rw-r--r--net/rxrpc/ar-ack.c10
-rw-r--r--net/rxrpc/ar-call.c75
-rw-r--r--net/rxrpc/ar-connection.c28
-rw-r--r--net/rxrpc/ar-connevent.c20
-rw-r--r--net/rxrpc/ar-error.c6
-rw-r--r--net/rxrpc/ar-input.c60
-rw-r--r--net/rxrpc/ar-internal.h44
-rw-r--r--net/rxrpc/ar-local.c2
-rw-r--r--net/rxrpc/ar-output.c84
-rw-r--r--net/rxrpc/ar-peer.c2
-rw-r--r--net/rxrpc/ar-recvmsg.c75
-rw-r--r--net/rxrpc/ar-skbuff.c16
-rw-r--r--net/rxrpc/ar-transport.c8
15 files changed, 563 insertions, 127 deletions
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index bfa8822e2286..2c57df9c131b 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -41,6 +41,8 @@ atomic_t rxrpc_debug_id;
41/* count of skbs currently in use */ 41/* count of skbs currently in use */
42atomic_t rxrpc_n_skbs; 42atomic_t rxrpc_n_skbs;
43 43
44struct workqueue_struct *rxrpc_workqueue;
45
44static void rxrpc_sock_destructor(struct sock *); 46static void rxrpc_sock_destructor(struct sock *);
45 47
46/* 48/*
@@ -214,7 +216,8 @@ static int rxrpc_listen(struct socket *sock, int backlog)
214 */ 216 */
215static struct rxrpc_transport *rxrpc_name_to_transport(struct socket *sock, 217static struct rxrpc_transport *rxrpc_name_to_transport(struct socket *sock,
216 struct sockaddr *addr, 218 struct sockaddr *addr,
217 int addr_len, int flags) 219 int addr_len, int flags,
220 gfp_t gfp)
218{ 221{
219 struct sockaddr_rxrpc *srx = (struct sockaddr_rxrpc *) addr; 222 struct sockaddr_rxrpc *srx = (struct sockaddr_rxrpc *) addr;
220 struct rxrpc_transport *trans; 223 struct rxrpc_transport *trans;
@@ -232,17 +235,129 @@ static struct rxrpc_transport *rxrpc_name_to_transport(struct socket *sock,
232 return ERR_PTR(-EAFNOSUPPORT); 235 return ERR_PTR(-EAFNOSUPPORT);
233 236
234 /* find a remote transport endpoint from the local one */ 237 /* find a remote transport endpoint from the local one */
235 peer = rxrpc_get_peer(srx, GFP_KERNEL); 238 peer = rxrpc_get_peer(srx, gfp);
236 if (IS_ERR(peer)) 239 if (IS_ERR(peer))
237 return ERR_PTR(PTR_ERR(peer)); 240 return ERR_PTR(PTR_ERR(peer));
238 241
239 /* find a transport */ 242 /* find a transport */
240 trans = rxrpc_get_transport(rx->local, peer, GFP_KERNEL); 243 trans = rxrpc_get_transport(rx->local, peer, gfp);
241 rxrpc_put_peer(peer); 244 rxrpc_put_peer(peer);
242 _leave(" = %p", trans); 245 _leave(" = %p", trans);
243 return trans; 246 return trans;
244} 247}
245 248
249/**
250 * rxrpc_kernel_begin_call - Allow a kernel service to begin a call
251 * @sock: The socket on which to make the call
252 * @srx: The address of the peer to contact (defaults to socket setting)
253 * @key: The security context to use (defaults to socket setting)
254 * @user_call_ID: The ID to use
255 *
256 * Allow a kernel service to begin a call on the nominated socket. This just
257 * sets up all the internal tracking structures and allocates connection and
258 * call IDs as appropriate. The call to be used is returned.
259 *
260 * The default socket destination address and security may be overridden by
261 * supplying @srx and @key.
262 */
263struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
264 struct sockaddr_rxrpc *srx,
265 struct key *key,
266 unsigned long user_call_ID,
267 gfp_t gfp)
268{
269 struct rxrpc_conn_bundle *bundle;
270 struct rxrpc_transport *trans;
271 struct rxrpc_call *call;
272 struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
273 __be16 service_id;
274
275 _enter(",,%x,%lx", key_serial(key), user_call_ID);
276
277 lock_sock(&rx->sk);
278
279 if (srx) {
280 trans = rxrpc_name_to_transport(sock, (struct sockaddr *) srx,
281 sizeof(*srx), 0, gfp);
282 if (IS_ERR(trans)) {
283 call = ERR_PTR(PTR_ERR(trans));
284 trans = NULL;
285 goto out;
286 }
287 } else {
288 trans = rx->trans;
289 if (!trans) {
290 call = ERR_PTR(-ENOTCONN);
291 goto out;
292 }
293 atomic_inc(&trans->usage);
294 }
295
296 service_id = rx->service_id;
297 if (srx)
298 service_id = htons(srx->srx_service);
299
300 if (!key)
301 key = rx->key;
302 if (key && !key->payload.data)
303 key = NULL; /* a no-security key */
304
305 bundle = rxrpc_get_bundle(rx, trans, key, service_id, gfp);
306 if (IS_ERR(bundle)) {
307 call = ERR_PTR(PTR_ERR(bundle));
308 goto out;
309 }
310
311 call = rxrpc_get_client_call(rx, trans, bundle, user_call_ID, true,
312 gfp);
313 rxrpc_put_bundle(trans, bundle);
314out:
315 rxrpc_put_transport(trans);
316 release_sock(&rx->sk);
317 _leave(" = %p", call);
318 return call;
319}
320
321EXPORT_SYMBOL(rxrpc_kernel_begin_call);
322
323/**
324 * rxrpc_kernel_end_call - Allow a kernel service to end a call it was using
325 * @call: The call to end
326 *
327 * Allow a kernel service to end a call it was using. The call must be
328 * complete before this is called (the call should be aborted if necessary).
329 */
330void rxrpc_kernel_end_call(struct rxrpc_call *call)
331{
332 _enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
333 rxrpc_remove_user_ID(call->socket, call);
334 rxrpc_put_call(call);
335}
336
337EXPORT_SYMBOL(rxrpc_kernel_end_call);
338
339/**
340 * rxrpc_kernel_intercept_rx_messages - Intercept received RxRPC messages
341 * @sock: The socket to intercept received messages on
342 * @interceptor: The function to pass the messages to
343 *
344 * Allow a kernel service to intercept messages heading for the Rx queue on an
345 * RxRPC socket. They get passed to the specified function instead.
346 * @interceptor should free the socket buffers it is given. @interceptor is
347 * called with the socket receive queue spinlock held and softirqs disabled -
348 * this ensures that the messages will be delivered in the right order.
349 */
350void rxrpc_kernel_intercept_rx_messages(struct socket *sock,
351 rxrpc_interceptor_t interceptor)
352{
353 struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
354
355 _enter("");
356 rx->interceptor = interceptor;
357}
358
359EXPORT_SYMBOL(rxrpc_kernel_intercept_rx_messages);
360
246/* 361/*
247 * connect an RxRPC socket 362 * connect an RxRPC socket
248 * - this just targets it at a specific destination; no actual connection 363 * - this just targets it at a specific destination; no actual connection
@@ -294,7 +409,8 @@ static int rxrpc_connect(struct socket *sock, struct sockaddr *addr,
294 return -EBUSY; /* server sockets can't connect as well */ 409 return -EBUSY; /* server sockets can't connect as well */
295 } 410 }
296 411
297 trans = rxrpc_name_to_transport(sock, addr, addr_len, flags); 412 trans = rxrpc_name_to_transport(sock, addr, addr_len, flags,
413 GFP_KERNEL);
298 if (IS_ERR(trans)) { 414 if (IS_ERR(trans)) {
299 release_sock(&rx->sk); 415 release_sock(&rx->sk);
300 _leave(" = %ld", PTR_ERR(trans)); 416 _leave(" = %ld", PTR_ERR(trans));
@@ -344,7 +460,7 @@ static int rxrpc_sendmsg(struct kiocb *iocb, struct socket *sock,
344 if (m->msg_name) { 460 if (m->msg_name) {
345 ret = -EISCONN; 461 ret = -EISCONN;
346 trans = rxrpc_name_to_transport(sock, m->msg_name, 462 trans = rxrpc_name_to_transport(sock, m->msg_name,
347 m->msg_namelen, 0); 463 m->msg_namelen, 0, GFP_KERNEL);
348 if (IS_ERR(trans)) { 464 if (IS_ERR(trans)) {
349 ret = PTR_ERR(trans); 465 ret = PTR_ERR(trans);
350 trans = NULL; 466 trans = NULL;
@@ -576,7 +692,7 @@ static int rxrpc_release_sock(struct sock *sk)
576 692
577 /* try to flush out this socket */ 693 /* try to flush out this socket */
578 rxrpc_release_calls_on_socket(rx); 694 rxrpc_release_calls_on_socket(rx);
579 flush_scheduled_work(); 695 flush_workqueue(rxrpc_workqueue);
580 rxrpc_purge_queue(&sk->sk_receive_queue); 696 rxrpc_purge_queue(&sk->sk_receive_queue);
581 697
582 if (rx->conn) { 698 if (rx->conn) {
@@ -673,15 +789,21 @@ static int __init af_rxrpc_init(void)
673 789
674 rxrpc_epoch = htonl(xtime.tv_sec); 790 rxrpc_epoch = htonl(xtime.tv_sec);
675 791
792 ret = -ENOMEM;
676 rxrpc_call_jar = kmem_cache_create( 793 rxrpc_call_jar = kmem_cache_create(
677 "rxrpc_call_jar", sizeof(struct rxrpc_call), 0, 794 "rxrpc_call_jar", sizeof(struct rxrpc_call), 0,
678 SLAB_HWCACHE_ALIGN, NULL, NULL); 795 SLAB_HWCACHE_ALIGN, NULL, NULL);
679 if (!rxrpc_call_jar) { 796 if (!rxrpc_call_jar) {
680 printk(KERN_NOTICE "RxRPC: Failed to allocate call jar\n"); 797 printk(KERN_NOTICE "RxRPC: Failed to allocate call jar\n");
681 ret = -ENOMEM;
682 goto error_call_jar; 798 goto error_call_jar;
683 } 799 }
684 800
801 rxrpc_workqueue = create_workqueue("krxrpcd");
802 if (!rxrpc_workqueue) {
803 printk(KERN_NOTICE "RxRPC: Failed to allocate work queue\n");
804 goto error_work_queue;
805 }
806
685 ret = proto_register(&rxrpc_proto, 1); 807 ret = proto_register(&rxrpc_proto, 1);
686 if (ret < 0) { 808 if (ret < 0) {
687 printk(KERN_CRIT "RxRPC: Cannot register protocol\n"); 809 printk(KERN_CRIT "RxRPC: Cannot register protocol\n");
@@ -719,6 +841,8 @@ error_key_type:
719error_sock: 841error_sock:
720 proto_unregister(&rxrpc_proto); 842 proto_unregister(&rxrpc_proto);
721error_proto: 843error_proto:
844 destroy_workqueue(rxrpc_workqueue);
845error_work_queue:
722 kmem_cache_destroy(rxrpc_call_jar); 846 kmem_cache_destroy(rxrpc_call_jar);
723error_call_jar: 847error_call_jar:
724 return ret; 848 return ret;
@@ -743,9 +867,10 @@ static void __exit af_rxrpc_exit(void)
743 ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0); 867 ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0);
744 868
745 _debug("flush scheduled work"); 869 _debug("flush scheduled work");
746 flush_scheduled_work(); 870 flush_workqueue(rxrpc_workqueue);
747 proc_net_remove("rxrpc_conns"); 871 proc_net_remove("rxrpc_conns");
748 proc_net_remove("rxrpc_calls"); 872 proc_net_remove("rxrpc_calls");
873 destroy_workqueue(rxrpc_workqueue);
749 kmem_cache_destroy(rxrpc_call_jar); 874 kmem_cache_destroy(rxrpc_call_jar);
750 _leave(""); 875 _leave("");
751} 876}
diff --git a/net/rxrpc/ar-accept.c b/net/rxrpc/ar-accept.c
index e7af780cd6f9..92a87fde8bfe 100644
--- a/net/rxrpc/ar-accept.c
+++ b/net/rxrpc/ar-accept.c
@@ -139,7 +139,7 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
139 call->conn->state = RXRPC_CONN_SERVER_CHALLENGING; 139 call->conn->state = RXRPC_CONN_SERVER_CHALLENGING;
140 atomic_inc(&call->conn->usage); 140 atomic_inc(&call->conn->usage);
141 set_bit(RXRPC_CONN_CHALLENGE, &call->conn->events); 141 set_bit(RXRPC_CONN_CHALLENGE, &call->conn->events);
142 schedule_work(&call->conn->processor); 142 rxrpc_queue_conn(call->conn);
143 } else { 143 } else {
144 _debug("conn ready"); 144 _debug("conn ready");
145 call->state = RXRPC_CALL_SERVER_ACCEPTING; 145 call->state = RXRPC_CALL_SERVER_ACCEPTING;
@@ -183,7 +183,7 @@ invalid_service:
183 if (!test_bit(RXRPC_CALL_RELEASE, &call->flags) && 183 if (!test_bit(RXRPC_CALL_RELEASE, &call->flags) &&
184 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) { 184 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) {
185 rxrpc_get_call(call); 185 rxrpc_get_call(call);
186 schedule_work(&call->processor); 186 rxrpc_queue_call(call);
187 } 187 }
188 read_unlock_bh(&call->state_lock); 188 read_unlock_bh(&call->state_lock);
189 rxrpc_put_call(call); 189 rxrpc_put_call(call);
@@ -310,7 +310,8 @@ security_mismatch:
310 * handle acceptance of a call by userspace 310 * handle acceptance of a call by userspace
311 * - assign the user call ID to the call at the front of the queue 311 * - assign the user call ID to the call at the front of the queue
312 */ 312 */
313int rxrpc_accept_call(struct rxrpc_sock *rx, unsigned long user_call_ID) 313struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
314 unsigned long user_call_ID)
314{ 315{
315 struct rxrpc_call *call; 316 struct rxrpc_call *call;
316 struct rb_node *parent, **pp; 317 struct rb_node *parent, **pp;
@@ -374,12 +375,76 @@ int rxrpc_accept_call(struct rxrpc_sock *rx, unsigned long user_call_ID)
374 BUG(); 375 BUG();
375 if (test_and_set_bit(RXRPC_CALL_ACCEPTED, &call->events)) 376 if (test_and_set_bit(RXRPC_CALL_ACCEPTED, &call->events))
376 BUG(); 377 BUG();
377 schedule_work(&call->processor); 378 rxrpc_queue_call(call);
378 379
380 rxrpc_get_call(call);
379 write_unlock_bh(&call->state_lock); 381 write_unlock_bh(&call->state_lock);
380 write_unlock(&rx->call_lock); 382 write_unlock(&rx->call_lock);
381 _leave(" = 0"); 383 _leave(" = %p{%d}", call, call->debug_id);
382 return 0; 384 return call;
385
386 /* if the call is already dying or dead, then we leave the socket's ref
387 * on it to be released by rxrpc_dead_call_expired() as induced by
388 * rxrpc_release_call() */
389out_release:
390 _debug("release %p", call);
391 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
392 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
393 rxrpc_queue_call(call);
394out_discard:
395 write_unlock_bh(&call->state_lock);
396 _debug("discard %p", call);
397out:
398 write_unlock(&rx->call_lock);
399 _leave(" = %d", ret);
400 return ERR_PTR(ret);
401}
402
403/*
404 * handle rejectance of a call by userspace
405 * - reject the call at the front of the queue
406 */
407int rxrpc_reject_call(struct rxrpc_sock *rx)
408{
409 struct rxrpc_call *call;
410 int ret;
411
412 _enter("");
413
414 ASSERT(!irqs_disabled());
415
416 write_lock(&rx->call_lock);
417
418 ret = -ENODATA;
419 if (list_empty(&rx->acceptq))
420 goto out;
421
422 /* dequeue the first call and check it's still valid */
423 call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
424 list_del_init(&call->accept_link);
425 sk_acceptq_removed(&rx->sk);
426
427 write_lock_bh(&call->state_lock);
428 switch (call->state) {
429 case RXRPC_CALL_SERVER_ACCEPTING:
430 call->state = RXRPC_CALL_SERVER_BUSY;
431 if (test_and_set_bit(RXRPC_CALL_REJECT_BUSY, &call->events))
432 rxrpc_queue_call(call);
433 ret = 0;
434 goto out_release;
435 case RXRPC_CALL_REMOTELY_ABORTED:
436 case RXRPC_CALL_LOCALLY_ABORTED:
437 ret = -ECONNABORTED;
438 goto out_release;
439 case RXRPC_CALL_NETWORK_ERROR:
440 ret = call->conn->error;
441 goto out_release;
442 case RXRPC_CALL_DEAD:
443 ret = -ETIME;
444 goto out_discard;
445 default:
446 BUG();
447 }
383 448
384 /* if the call is already dying or dead, then we leave the socket's ref 449 /* if the call is already dying or dead, then we leave the socket's ref
385 * on it to be released by rxrpc_dead_call_expired() as induced by 450 * on it to be released by rxrpc_dead_call_expired() as induced by
@@ -388,7 +453,7 @@ out_release:
388 _debug("release %p", call); 453 _debug("release %p", call);
389 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 454 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
390 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 455 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
391 schedule_work(&call->processor); 456 rxrpc_queue_call(call);
392out_discard: 457out_discard:
393 write_unlock_bh(&call->state_lock); 458 write_unlock_bh(&call->state_lock);
394 _debug("discard %p", call); 459 _debug("discard %p", call);
@@ -397,3 +462,43 @@ out:
397 _leave(" = %d", ret); 462 _leave(" = %d", ret);
398 return ret; 463 return ret;
399} 464}
465
466/**
467 * rxrpc_kernel_accept_call - Allow a kernel service to accept an incoming call
468 * @sock: The socket on which the impending call is waiting
469 * @user_call_ID: The tag to attach to the call
470 *
471 * Allow a kernel service to accept an incoming call, assuming the incoming
472 * call is still valid.
473 */
474struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *sock,
475 unsigned long user_call_ID)
476{
477 struct rxrpc_call *call;
478
479 _enter(",%lx", user_call_ID);
480 call = rxrpc_accept_call(rxrpc_sk(sock->sk), user_call_ID);
481 _leave(" = %p", call);
482 return call;
483}
484
485EXPORT_SYMBOL(rxrpc_kernel_accept_call);
486
487/**
488 * rxrpc_kernel_reject_call - Allow a kernel service to reject an incoming call
489 * @sock: The socket on which the impending call is waiting
490 *
491 * Allow a kernel service to reject an incoming call with a BUSY message,
492 * assuming the incoming call is still valid.
493 */
494int rxrpc_kernel_reject_call(struct socket *sock)
495{
496 int ret;
497
498 _enter("");
499 ret = rxrpc_reject_call(rxrpc_sk(sock->sk));
500 _leave(" = %d", ret);
501 return ret;
502}
503
504EXPORT_SYMBOL(rxrpc_kernel_reject_call);
diff --git a/net/rxrpc/ar-ack.c b/net/rxrpc/ar-ack.c
index 8f7764eca96c..fc07a926df56 100644
--- a/net/rxrpc/ar-ack.c
+++ b/net/rxrpc/ar-ack.c
@@ -113,7 +113,7 @@ cancel_timer:
113 read_lock_bh(&call->state_lock); 113 read_lock_bh(&call->state_lock);
114 if (call->state <= RXRPC_CALL_COMPLETE && 114 if (call->state <= RXRPC_CALL_COMPLETE &&
115 !test_and_set_bit(RXRPC_CALL_ACK, &call->events)) 115 !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
116 schedule_work(&call->processor); 116 rxrpc_queue_call(call);
117 read_unlock_bh(&call->state_lock); 117 read_unlock_bh(&call->state_lock);
118} 118}
119 119
@@ -1166,7 +1166,7 @@ send_message_2:
1166 _debug("sendmsg failed: %d", ret); 1166 _debug("sendmsg failed: %d", ret);
1167 read_lock_bh(&call->state_lock); 1167 read_lock_bh(&call->state_lock);
1168 if (call->state < RXRPC_CALL_DEAD) 1168 if (call->state < RXRPC_CALL_DEAD)
1169 schedule_work(&call->processor); 1169 rxrpc_queue_call(call);
1170 read_unlock_bh(&call->state_lock); 1170 read_unlock_bh(&call->state_lock);
1171 goto error; 1171 goto error;
1172 } 1172 }
@@ -1210,7 +1210,7 @@ maybe_reschedule:
1210 if (call->events || !skb_queue_empty(&call->rx_queue)) { 1210 if (call->events || !skb_queue_empty(&call->rx_queue)) {
1211 read_lock_bh(&call->state_lock); 1211 read_lock_bh(&call->state_lock);
1212 if (call->state < RXRPC_CALL_DEAD) 1212 if (call->state < RXRPC_CALL_DEAD)
1213 schedule_work(&call->processor); 1213 rxrpc_queue_call(call);
1214 read_unlock_bh(&call->state_lock); 1214 read_unlock_bh(&call->state_lock);
1215 } 1215 }
1216 1216
@@ -1224,7 +1224,7 @@ maybe_reschedule:
1224 read_lock_bh(&call->state_lock); 1224 read_lock_bh(&call->state_lock);
1225 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 1225 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1226 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 1226 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
1227 schedule_work(&call->processor); 1227 rxrpc_queue_call(call);
1228 read_unlock_bh(&call->state_lock); 1228 read_unlock_bh(&call->state_lock);
1229 } 1229 }
1230 1230
@@ -1238,7 +1238,7 @@ error:
1238 * work pending bit and the work item being processed again */ 1238 * work pending bit and the work item being processed again */
1239 if (call->events && !work_pending(&call->processor)) { 1239 if (call->events && !work_pending(&call->processor)) {
1240 _debug("jumpstart %x", ntohl(call->conn->cid)); 1240 _debug("jumpstart %x", ntohl(call->conn->cid));
1241 schedule_work(&call->processor); 1241 rxrpc_queue_call(call);
1242 } 1242 }
1243 1243
1244 _leave(""); 1244 _leave("");
diff --git a/net/rxrpc/ar-call.c b/net/rxrpc/ar-call.c
index ac31cceda2f1..4d92d88ff1fc 100644
--- a/net/rxrpc/ar-call.c
+++ b/net/rxrpc/ar-call.c
@@ -19,7 +19,7 @@ struct kmem_cache *rxrpc_call_jar;
19LIST_HEAD(rxrpc_calls); 19LIST_HEAD(rxrpc_calls);
20DEFINE_RWLOCK(rxrpc_call_lock); 20DEFINE_RWLOCK(rxrpc_call_lock);
21static unsigned rxrpc_call_max_lifetime = 60; 21static unsigned rxrpc_call_max_lifetime = 60;
22static unsigned rxrpc_dead_call_timeout = 10; 22static unsigned rxrpc_dead_call_timeout = 2;
23 23
24static void rxrpc_destroy_call(struct work_struct *work); 24static void rxrpc_destroy_call(struct work_struct *work);
25static void rxrpc_call_life_expired(unsigned long _call); 25static void rxrpc_call_life_expired(unsigned long _call);
@@ -264,7 +264,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
264 switch (call->state) { 264 switch (call->state) {
265 case RXRPC_CALL_LOCALLY_ABORTED: 265 case RXRPC_CALL_LOCALLY_ABORTED:
266 if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events)) 266 if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
267 schedule_work(&call->processor); 267 rxrpc_queue_call(call);
268 case RXRPC_CALL_REMOTELY_ABORTED: 268 case RXRPC_CALL_REMOTELY_ABORTED:
269 read_unlock(&call->state_lock); 269 read_unlock(&call->state_lock);
270 goto aborted_call; 270 goto aborted_call;
@@ -398,6 +398,7 @@ found_extant_call:
398 */ 398 */
399void rxrpc_release_call(struct rxrpc_call *call) 399void rxrpc_release_call(struct rxrpc_call *call)
400{ 400{
401 struct rxrpc_connection *conn = call->conn;
401 struct rxrpc_sock *rx = call->socket; 402 struct rxrpc_sock *rx = call->socket;
402 403
403 _enter("{%d,%d,%d,%d}", 404 _enter("{%d,%d,%d,%d}",
@@ -413,8 +414,7 @@ void rxrpc_release_call(struct rxrpc_call *call)
413 /* dissociate from the socket 414 /* dissociate from the socket
414 * - the socket's ref on the call is passed to the death timer 415 * - the socket's ref on the call is passed to the death timer
415 */ 416 */
416 _debug("RELEASE CALL %p (%d CONN %p)", 417 _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
417 call, call->debug_id, call->conn);
418 418
419 write_lock_bh(&rx->call_lock); 419 write_lock_bh(&rx->call_lock);
420 if (!list_empty(&call->accept_link)) { 420 if (!list_empty(&call->accept_link)) {
@@ -430,24 +430,42 @@ void rxrpc_release_call(struct rxrpc_call *call)
430 } 430 }
431 write_unlock_bh(&rx->call_lock); 431 write_unlock_bh(&rx->call_lock);
432 432
433 if (call->conn->out_clientflag)
434 spin_lock(&call->conn->trans->client_lock);
435 write_lock_bh(&call->conn->lock);
436
437 /* free up the channel for reuse */ 433 /* free up the channel for reuse */
438 if (call->conn->out_clientflag) { 434 spin_lock(&conn->trans->client_lock);
439 call->conn->avail_calls++; 435 write_lock_bh(&conn->lock);
440 if (call->conn->avail_calls == RXRPC_MAXCALLS) 436 write_lock(&call->state_lock);
441 list_move_tail(&call->conn->bundle_link, 437
442 &call->conn->bundle->unused_conns); 438 if (conn->channels[call->channel] == call)
443 else if (call->conn->avail_calls == 1) 439 conn->channels[call->channel] = NULL;
444 list_move_tail(&call->conn->bundle_link, 440
445 &call->conn->bundle->avail_conns); 441 if (conn->out_clientflag && conn->bundle) {
442 conn->avail_calls++;
443 switch (conn->avail_calls) {
444 case 1:
445 list_move_tail(&conn->bundle_link,
446 &conn->bundle->avail_conns);
447 case 2 ... RXRPC_MAXCALLS - 1:
448 ASSERT(conn->channels[0] == NULL ||
449 conn->channels[1] == NULL ||
450 conn->channels[2] == NULL ||
451 conn->channels[3] == NULL);
452 break;
453 case RXRPC_MAXCALLS:
454 list_move_tail(&conn->bundle_link,
455 &conn->bundle->unused_conns);
456 ASSERT(conn->channels[0] == NULL &&
457 conn->channels[1] == NULL &&
458 conn->channels[2] == NULL &&
459 conn->channels[3] == NULL);
460 break;
461 default:
462 printk(KERN_ERR "RxRPC: conn->avail_calls=%d\n",
463 conn->avail_calls);
464 BUG();
465 }
446 } 466 }
447 467
448 write_lock(&call->state_lock); 468 spin_unlock(&conn->trans->client_lock);
449 if (call->conn->channels[call->channel] == call)
450 call->conn->channels[call->channel] = NULL;
451 469
452 if (call->state < RXRPC_CALL_COMPLETE && 470 if (call->state < RXRPC_CALL_COMPLETE &&
453 call->state != RXRPC_CALL_CLIENT_FINAL_ACK) { 471 call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
@@ -455,13 +473,12 @@ void rxrpc_release_call(struct rxrpc_call *call)
455 call->state = RXRPC_CALL_LOCALLY_ABORTED; 473 call->state = RXRPC_CALL_LOCALLY_ABORTED;
456 call->abort_code = RX_CALL_DEAD; 474 call->abort_code = RX_CALL_DEAD;
457 set_bit(RXRPC_CALL_ABORT, &call->events); 475 set_bit(RXRPC_CALL_ABORT, &call->events);
458 schedule_work(&call->processor); 476 rxrpc_queue_call(call);
459 } 477 }
460 write_unlock(&call->state_lock); 478 write_unlock(&call->state_lock);
461 write_unlock_bh(&call->conn->lock); 479 write_unlock_bh(&conn->lock);
462 if (call->conn->out_clientflag)
463 spin_unlock(&call->conn->trans->client_lock);
464 480
481 /* clean up the Rx queue */
465 if (!skb_queue_empty(&call->rx_queue) || 482 if (!skb_queue_empty(&call->rx_queue) ||
466 !skb_queue_empty(&call->rx_oos_queue)) { 483 !skb_queue_empty(&call->rx_oos_queue)) {
467 struct rxrpc_skb_priv *sp; 484 struct rxrpc_skb_priv *sp;
@@ -538,7 +555,7 @@ static void rxrpc_mark_call_released(struct rxrpc_call *call)
538 if (!test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 555 if (!test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
539 sched = true; 556 sched = true;
540 if (sched) 557 if (sched)
541 schedule_work(&call->processor); 558 rxrpc_queue_call(call);
542 } 559 }
543 write_unlock(&call->state_lock); 560 write_unlock(&call->state_lock);
544} 561}
@@ -588,7 +605,7 @@ void __rxrpc_put_call(struct rxrpc_call *call)
588 if (atomic_dec_and_test(&call->usage)) { 605 if (atomic_dec_and_test(&call->usage)) {
589 _debug("call %d dead", call->debug_id); 606 _debug("call %d dead", call->debug_id);
590 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD); 607 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
591 schedule_work(&call->destroyer); 608 rxrpc_queue_work(&call->destroyer);
592 } 609 }
593 _leave(""); 610 _leave("");
594} 611}
@@ -613,7 +630,7 @@ static void rxrpc_cleanup_call(struct rxrpc_call *call)
613 ASSERTCMP(call->events, ==, 0); 630 ASSERTCMP(call->events, ==, 0);
614 if (work_pending(&call->processor)) { 631 if (work_pending(&call->processor)) {
615 _debug("defer destroy"); 632 _debug("defer destroy");
616 schedule_work(&call->destroyer); 633 rxrpc_queue_work(&call->destroyer);
617 return; 634 return;
618 } 635 }
619 636
@@ -742,7 +759,7 @@ static void rxrpc_call_life_expired(unsigned long _call)
742 read_lock_bh(&call->state_lock); 759 read_lock_bh(&call->state_lock);
743 if (call->state < RXRPC_CALL_COMPLETE) { 760 if (call->state < RXRPC_CALL_COMPLETE) {
744 set_bit(RXRPC_CALL_LIFE_TIMER, &call->events); 761 set_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
745 schedule_work(&call->processor); 762 rxrpc_queue_call(call);
746 } 763 }
747 read_unlock_bh(&call->state_lock); 764 read_unlock_bh(&call->state_lock);
748} 765}
@@ -763,7 +780,7 @@ static void rxrpc_resend_time_expired(unsigned long _call)
763 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); 780 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
764 if (call->state < RXRPC_CALL_COMPLETE && 781 if (call->state < RXRPC_CALL_COMPLETE &&
765 !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events)) 782 !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
766 schedule_work(&call->processor); 783 rxrpc_queue_call(call);
767 read_unlock_bh(&call->state_lock); 784 read_unlock_bh(&call->state_lock);
768} 785}
769 786
@@ -782,6 +799,6 @@ static void rxrpc_ack_time_expired(unsigned long _call)
782 read_lock_bh(&call->state_lock); 799 read_lock_bh(&call->state_lock);
783 if (call->state < RXRPC_CALL_COMPLETE && 800 if (call->state < RXRPC_CALL_COMPLETE &&
784 !test_and_set_bit(RXRPC_CALL_ACK, &call->events)) 801 !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
785 schedule_work(&call->processor); 802 rxrpc_queue_call(call);
786 read_unlock_bh(&call->state_lock); 803 read_unlock_bh(&call->state_lock);
787} 804}
diff --git a/net/rxrpc/ar-connection.c b/net/rxrpc/ar-connection.c
index 01eb33c30571..43cb3e051ece 100644
--- a/net/rxrpc/ar-connection.c
+++ b/net/rxrpc/ar-connection.c
@@ -356,7 +356,7 @@ static int rxrpc_connect_exclusive(struct rxrpc_sock *rx,
356 conn->out_clientflag = RXRPC_CLIENT_INITIATED; 356 conn->out_clientflag = RXRPC_CLIENT_INITIATED;
357 conn->cid = 0; 357 conn->cid = 0;
358 conn->state = RXRPC_CONN_CLIENT; 358 conn->state = RXRPC_CONN_CLIENT;
359 conn->avail_calls = RXRPC_MAXCALLS; 359 conn->avail_calls = RXRPC_MAXCALLS - 1;
360 conn->security_level = rx->min_sec_level; 360 conn->security_level = rx->min_sec_level;
361 conn->key = key_get(rx->key); 361 conn->key = key_get(rx->key);
362 362
@@ -447,6 +447,11 @@ int rxrpc_connect_call(struct rxrpc_sock *rx,
447 if (--conn->avail_calls == 0) 447 if (--conn->avail_calls == 0)
448 list_move(&conn->bundle_link, 448 list_move(&conn->bundle_link,
449 &bundle->busy_conns); 449 &bundle->busy_conns);
450 ASSERTCMP(conn->avail_calls, <, RXRPC_MAXCALLS);
451 ASSERT(conn->channels[0] == NULL ||
452 conn->channels[1] == NULL ||
453 conn->channels[2] == NULL ||
454 conn->channels[3] == NULL);
450 atomic_inc(&conn->usage); 455 atomic_inc(&conn->usage);
451 break; 456 break;
452 } 457 }
@@ -456,6 +461,12 @@ int rxrpc_connect_call(struct rxrpc_sock *rx,
456 conn = list_entry(bundle->unused_conns.next, 461 conn = list_entry(bundle->unused_conns.next,
457 struct rxrpc_connection, 462 struct rxrpc_connection,
458 bundle_link); 463 bundle_link);
464 ASSERTCMP(conn->avail_calls, ==, RXRPC_MAXCALLS);
465 conn->avail_calls = RXRPC_MAXCALLS - 1;
466 ASSERT(conn->channels[0] == NULL &&
467 conn->channels[1] == NULL &&
468 conn->channels[2] == NULL &&
469 conn->channels[3] == NULL);
459 atomic_inc(&conn->usage); 470 atomic_inc(&conn->usage);
460 list_move(&conn->bundle_link, &bundle->avail_conns); 471 list_move(&conn->bundle_link, &bundle->avail_conns);
461 break; 472 break;
@@ -512,7 +523,7 @@ int rxrpc_connect_call(struct rxrpc_sock *rx,
512 candidate->state = RXRPC_CONN_CLIENT; 523 candidate->state = RXRPC_CONN_CLIENT;
513 candidate->avail_calls = RXRPC_MAXCALLS; 524 candidate->avail_calls = RXRPC_MAXCALLS;
514 candidate->security_level = rx->min_sec_level; 525 candidate->security_level = rx->min_sec_level;
515 candidate->key = key_get(rx->key); 526 candidate->key = key_get(bundle->key);
516 527
517 ret = rxrpc_init_client_conn_security(candidate); 528 ret = rxrpc_init_client_conn_security(candidate);
518 if (ret < 0) { 529 if (ret < 0) {
@@ -555,6 +566,10 @@ int rxrpc_connect_call(struct rxrpc_sock *rx,
555 for (chan = 0; chan < RXRPC_MAXCALLS; chan++) 566 for (chan = 0; chan < RXRPC_MAXCALLS; chan++)
556 if (!conn->channels[chan]) 567 if (!conn->channels[chan])
557 goto found_channel; 568 goto found_channel;
569 ASSERT(conn->channels[0] == NULL ||
570 conn->channels[1] == NULL ||
571 conn->channels[2] == NULL ||
572 conn->channels[3] == NULL);
558 BUG(); 573 BUG();
559 574
560found_channel: 575found_channel:
@@ -567,6 +582,7 @@ found_channel:
567 _net("CONNECT client on conn %d chan %d as call %x", 582 _net("CONNECT client on conn %d chan %d as call %x",
568 conn->debug_id, chan, ntohl(call->call_id)); 583 conn->debug_id, chan, ntohl(call->call_id));
569 584
585 ASSERTCMP(conn->avail_calls, <, RXRPC_MAXCALLS);
570 spin_unlock(&trans->client_lock); 586 spin_unlock(&trans->client_lock);
571 587
572 rxrpc_add_call_ID_to_conn(conn, call); 588 rxrpc_add_call_ID_to_conn(conn, call);
@@ -778,7 +794,7 @@ void rxrpc_put_connection(struct rxrpc_connection *conn)
778 conn->put_time = xtime.tv_sec; 794 conn->put_time = xtime.tv_sec;
779 if (atomic_dec_and_test(&conn->usage)) { 795 if (atomic_dec_and_test(&conn->usage)) {
780 _debug("zombie"); 796 _debug("zombie");
781 schedule_delayed_work(&rxrpc_connection_reap, 0); 797 rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
782 } 798 }
783 799
784 _leave(""); 800 _leave("");
@@ -862,8 +878,8 @@ void rxrpc_connection_reaper(struct work_struct *work)
862 if (earliest != ULONG_MAX) { 878 if (earliest != ULONG_MAX) {
863 _debug("reschedule reaper %ld", (long) earliest - now); 879 _debug("reschedule reaper %ld", (long) earliest - now);
864 ASSERTCMP(earliest, >, now); 880 ASSERTCMP(earliest, >, now);
865 schedule_delayed_work(&rxrpc_connection_reap, 881 rxrpc_queue_delayed_work(&rxrpc_connection_reap,
866 (earliest - now) * HZ); 882 (earliest - now) * HZ);
867 } 883 }
868 884
869 /* then destroy all those pulled out */ 885 /* then destroy all those pulled out */
@@ -889,7 +905,7 @@ void __exit rxrpc_destroy_all_connections(void)
889 905
890 rxrpc_connection_timeout = 0; 906 rxrpc_connection_timeout = 0;
891 cancel_delayed_work(&rxrpc_connection_reap); 907 cancel_delayed_work(&rxrpc_connection_reap);
892 schedule_delayed_work(&rxrpc_connection_reap, 0); 908 rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
893 909
894 _leave(""); 910 _leave("");
895} 911}
diff --git a/net/rxrpc/ar-connevent.c b/net/rxrpc/ar-connevent.c
index 4b02815c1ded..1ada43d51165 100644
--- a/net/rxrpc/ar-connevent.c
+++ b/net/rxrpc/ar-connevent.c
@@ -45,7 +45,7 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state,
45 set_bit(RXRPC_CALL_CONN_ABORT, &call->events); 45 set_bit(RXRPC_CALL_CONN_ABORT, &call->events);
46 else 46 else
47 set_bit(RXRPC_CALL_RCVD_ABORT, &call->events); 47 set_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
48 schedule_work(&call->processor); 48 rxrpc_queue_call(call);
49 } 49 }
50 write_unlock(&call->state_lock); 50 write_unlock(&call->state_lock);
51 } 51 }
@@ -133,7 +133,7 @@ void rxrpc_call_is_secure(struct rxrpc_call *call)
133 read_lock(&call->state_lock); 133 read_lock(&call->state_lock);
134 if (call->state < RXRPC_CALL_COMPLETE && 134 if (call->state < RXRPC_CALL_COMPLETE &&
135 !test_and_set_bit(RXRPC_CALL_SECURED, &call->events)) 135 !test_and_set_bit(RXRPC_CALL_SECURED, &call->events))
136 schedule_work(&call->processor); 136 rxrpc_queue_call(call);
137 read_unlock(&call->state_lock); 137 read_unlock(&call->state_lock);
138 } 138 }
139} 139}
@@ -308,6 +308,22 @@ protocol_error:
308} 308}
309 309
310/* 310/*
311 * put a packet up for transport-level abort
312 */
313void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
314{
315 CHECK_SLAB_OKAY(&local->usage);
316
317 if (!atomic_inc_not_zero(&local->usage)) {
318 printk("resurrected on reject\n");
319 BUG();
320 }
321
322 skb_queue_tail(&local->reject_queue, skb);
323 rxrpc_queue_work(&local->rejecter);
324}
325
326/*
311 * reject packets through the local endpoint 327 * reject packets through the local endpoint
312 */ 328 */
313void rxrpc_reject_packets(struct work_struct *work) 329void rxrpc_reject_packets(struct work_struct *work)
diff --git a/net/rxrpc/ar-error.c b/net/rxrpc/ar-error.c
index f5539e2f7b58..2c27df1ffa17 100644
--- a/net/rxrpc/ar-error.c
+++ b/net/rxrpc/ar-error.c
@@ -111,7 +111,7 @@ void rxrpc_UDP_error_report(struct sock *sk)
111 111
112 /* pass the transport ref to error_handler to release */ 112 /* pass the transport ref to error_handler to release */
113 skb_queue_tail(&trans->error_queue, skb); 113 skb_queue_tail(&trans->error_queue, skb);
114 schedule_work(&trans->error_handler); 114 rxrpc_queue_work(&trans->error_handler);
115 115
116 /* reset and regenerate socket error */ 116 /* reset and regenerate socket error */
117 spin_lock_bh(&sk->sk_error_queue.lock); 117 spin_lock_bh(&sk->sk_error_queue.lock);
@@ -235,7 +235,7 @@ void rxrpc_UDP_error_handler(struct work_struct *work)
235 call->state < RXRPC_CALL_NETWORK_ERROR) { 235 call->state < RXRPC_CALL_NETWORK_ERROR) {
236 call->state = RXRPC_CALL_NETWORK_ERROR; 236 call->state = RXRPC_CALL_NETWORK_ERROR;
237 set_bit(RXRPC_CALL_RCVD_ERROR, &call->events); 237 set_bit(RXRPC_CALL_RCVD_ERROR, &call->events);
238 schedule_work(&call->processor); 238 rxrpc_queue_call(call);
239 } 239 }
240 write_unlock(&call->state_lock); 240 write_unlock(&call->state_lock);
241 list_del_init(&call->error_link); 241 list_del_init(&call->error_link);
@@ -245,7 +245,7 @@ void rxrpc_UDP_error_handler(struct work_struct *work)
245 } 245 }
246 246
247 if (!skb_queue_empty(&trans->error_queue)) 247 if (!skb_queue_empty(&trans->error_queue))
248 schedule_work(&trans->error_handler); 248 rxrpc_queue_work(&trans->error_handler);
249 249
250 rxrpc_free_skb(skb); 250 rxrpc_free_skb(skb);
251 rxrpc_put_transport(trans); 251 rxrpc_put_transport(trans);
diff --git a/net/rxrpc/ar-input.c b/net/rxrpc/ar-input.c
index 323c3454561c..ceb5d619a1d4 100644
--- a/net/rxrpc/ar-input.c
+++ b/net/rxrpc/ar-input.c
@@ -42,6 +42,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
42 bool force, bool terminal) 42 bool force, bool terminal)
43{ 43{
44 struct rxrpc_skb_priv *sp; 44 struct rxrpc_skb_priv *sp;
45 struct rxrpc_sock *rx = call->socket;
45 struct sock *sk; 46 struct sock *sk;
46 int skb_len, ret; 47 int skb_len, ret;
47 48
@@ -64,7 +65,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
64 return 0; 65 return 0;
65 } 66 }
66 67
67 sk = &call->socket->sk; 68 sk = &rx->sk;
68 69
69 if (!force) { 70 if (!force) {
70 /* cast skb->rcvbuf to unsigned... It's pointless, but 71 /* cast skb->rcvbuf to unsigned... It's pointless, but
@@ -89,25 +90,30 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
89 skb->sk = sk; 90 skb->sk = sk;
90 atomic_add(skb->truesize, &sk->sk_rmem_alloc); 91 atomic_add(skb->truesize, &sk->sk_rmem_alloc);
91 92
92 /* Cache the SKB length before we tack it onto the receive
93 * queue. Once it is added it no longer belongs to us and
94 * may be freed by other threads of control pulling packets
95 * from the queue.
96 */
97 skb_len = skb->len;
98
99 _net("post skb %p", skb);
100 __skb_queue_tail(&sk->sk_receive_queue, skb);
101 spin_unlock_bh(&sk->sk_receive_queue.lock);
102
103 if (!sock_flag(sk, SOCK_DEAD))
104 sk->sk_data_ready(sk, skb_len);
105
106 if (terminal) { 93 if (terminal) {
107 _debug("<<<< TERMINAL MESSAGE >>>>"); 94 _debug("<<<< TERMINAL MESSAGE >>>>");
108 set_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags); 95 set_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags);
109 } 96 }
110 97
98 /* allow interception by a kernel service */
99 if (rx->interceptor) {
100 rx->interceptor(sk, call->user_call_ID, skb);
101 spin_unlock_bh(&sk->sk_receive_queue.lock);
102 } else {
103
104 /* Cache the SKB length before we tack it onto the
105 * receive queue. Once it is added it no longer
106 * belongs to us and may be freed by other threads of
107 * control pulling packets from the queue */
108 skb_len = skb->len;
109
110 _net("post skb %p", skb);
111 __skb_queue_tail(&sk->sk_receive_queue, skb);
112 spin_unlock_bh(&sk->sk_receive_queue.lock);
113
114 if (!sock_flag(sk, SOCK_DEAD))
115 sk->sk_data_ready(sk, skb_len);
116 }
111 skb = NULL; 117 skb = NULL;
112 } else { 118 } else {
113 spin_unlock_bh(&sk->sk_receive_queue.lock); 119 spin_unlock_bh(&sk->sk_receive_queue.lock);
@@ -232,7 +238,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
232 read_lock(&call->state_lock); 238 read_lock(&call->state_lock);
233 if (call->state < RXRPC_CALL_COMPLETE && 239 if (call->state < RXRPC_CALL_COMPLETE &&
234 !test_and_set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) 240 !test_and_set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events))
235 schedule_work(&call->processor); 241 rxrpc_queue_call(call);
236 read_unlock(&call->state_lock); 242 read_unlock(&call->state_lock);
237 } 243 }
238 244
@@ -267,7 +273,7 @@ enqueue_packet:
267 atomic_inc(&call->ackr_not_idle); 273 atomic_inc(&call->ackr_not_idle);
268 read_lock(&call->state_lock); 274 read_lock(&call->state_lock);
269 if (call->state < RXRPC_CALL_DEAD) 275 if (call->state < RXRPC_CALL_DEAD)
270 schedule_work(&call->processor); 276 rxrpc_queue_call(call);
271 read_unlock(&call->state_lock); 277 read_unlock(&call->state_lock);
272 _leave(" = 0 [queued]"); 278 _leave(" = 0 [queued]");
273 return 0; 279 return 0;
@@ -360,7 +366,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
360 call->state = RXRPC_CALL_REMOTELY_ABORTED; 366 call->state = RXRPC_CALL_REMOTELY_ABORTED;
361 call->abort_code = abort_code; 367 call->abort_code = abort_code;
362 set_bit(RXRPC_CALL_RCVD_ABORT, &call->events); 368 set_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
363 schedule_work(&call->processor); 369 rxrpc_queue_call(call);
364 } 370 }
365 goto free_packet_unlock; 371 goto free_packet_unlock;
366 372
@@ -375,7 +381,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
375 case RXRPC_CALL_CLIENT_SEND_REQUEST: 381 case RXRPC_CALL_CLIENT_SEND_REQUEST:
376 call->state = RXRPC_CALL_SERVER_BUSY; 382 call->state = RXRPC_CALL_SERVER_BUSY;
377 set_bit(RXRPC_CALL_RCVD_BUSY, &call->events); 383 set_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
378 schedule_work(&call->processor); 384 rxrpc_queue_call(call);
379 case RXRPC_CALL_SERVER_BUSY: 385 case RXRPC_CALL_SERVER_BUSY:
380 goto free_packet_unlock; 386 goto free_packet_unlock;
381 default: 387 default:
@@ -419,7 +425,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
419 read_lock_bh(&call->state_lock); 425 read_lock_bh(&call->state_lock);
420 if (call->state < RXRPC_CALL_DEAD) { 426 if (call->state < RXRPC_CALL_DEAD) {
421 skb_queue_tail(&call->rx_queue, skb); 427 skb_queue_tail(&call->rx_queue, skb);
422 schedule_work(&call->processor); 428 rxrpc_queue_call(call);
423 skb = NULL; 429 skb = NULL;
424 } 430 }
425 read_unlock_bh(&call->state_lock); 431 read_unlock_bh(&call->state_lock);
@@ -434,7 +440,7 @@ protocol_error_locked:
434 call->state = RXRPC_CALL_LOCALLY_ABORTED; 440 call->state = RXRPC_CALL_LOCALLY_ABORTED;
435 call->abort_code = RX_PROTOCOL_ERROR; 441 call->abort_code = RX_PROTOCOL_ERROR;
436 set_bit(RXRPC_CALL_ABORT, &call->events); 442 set_bit(RXRPC_CALL_ABORT, &call->events);
437 schedule_work(&call->processor); 443 rxrpc_queue_call(call);
438 } 444 }
439free_packet_unlock: 445free_packet_unlock:
440 write_unlock_bh(&call->state_lock); 446 write_unlock_bh(&call->state_lock);
@@ -506,7 +512,7 @@ protocol_error:
506 call->state = RXRPC_CALL_LOCALLY_ABORTED; 512 call->state = RXRPC_CALL_LOCALLY_ABORTED;
507 call->abort_code = RX_PROTOCOL_ERROR; 513 call->abort_code = RX_PROTOCOL_ERROR;
508 set_bit(RXRPC_CALL_ABORT, &call->events); 514 set_bit(RXRPC_CALL_ABORT, &call->events);
509 schedule_work(&call->processor); 515 rxrpc_queue_call(call);
510 } 516 }
511 write_unlock_bh(&call->state_lock); 517 write_unlock_bh(&call->state_lock);
512 _leave(""); 518 _leave("");
@@ -542,7 +548,7 @@ static void rxrpc_post_packet_to_call(struct rxrpc_connection *conn,
542 switch (call->state) { 548 switch (call->state) {
543 case RXRPC_CALL_LOCALLY_ABORTED: 549 case RXRPC_CALL_LOCALLY_ABORTED:
544 if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events)) 550 if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
545 schedule_work(&call->processor); 551 rxrpc_queue_call(call);
546 case RXRPC_CALL_REMOTELY_ABORTED: 552 case RXRPC_CALL_REMOTELY_ABORTED:
547 case RXRPC_CALL_NETWORK_ERROR: 553 case RXRPC_CALL_NETWORK_ERROR:
548 case RXRPC_CALL_DEAD: 554 case RXRPC_CALL_DEAD:
@@ -591,7 +597,7 @@ dead_call:
591 sp->hdr.seq == __constant_cpu_to_be32(1)) { 597 sp->hdr.seq == __constant_cpu_to_be32(1)) {
592 _debug("incoming call"); 598 _debug("incoming call");
593 skb_queue_tail(&conn->trans->local->accept_queue, skb); 599 skb_queue_tail(&conn->trans->local->accept_queue, skb);
594 schedule_work(&conn->trans->local->acceptor); 600 rxrpc_queue_work(&conn->trans->local->acceptor);
595 goto done; 601 goto done;
596 } 602 }
597 603
@@ -630,7 +636,7 @@ found_completed_call:
630 _debug("final ack again"); 636 _debug("final ack again");
631 rxrpc_get_call(call); 637 rxrpc_get_call(call);
632 set_bit(RXRPC_CALL_ACK_FINAL, &call->events); 638 set_bit(RXRPC_CALL_ACK_FINAL, &call->events);
633 schedule_work(&call->processor); 639 rxrpc_queue_call(call);
634 640
635free_unlock: 641free_unlock:
636 read_unlock(&call->state_lock); 642 read_unlock(&call->state_lock);
@@ -651,7 +657,7 @@ static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
651 657
652 atomic_inc(&conn->usage); 658 atomic_inc(&conn->usage);
653 skb_queue_tail(&conn->rx_queue, skb); 659 skb_queue_tail(&conn->rx_queue, skb);
654 schedule_work(&conn->processor); 660 rxrpc_queue_conn(conn);
655} 661}
656 662
657/* 663/*
@@ -767,7 +773,7 @@ cant_route_call:
767 if (sp->hdr.seq == __constant_cpu_to_be32(1)) { 773 if (sp->hdr.seq == __constant_cpu_to_be32(1)) {
768 _debug("first packet"); 774 _debug("first packet");
769 skb_queue_tail(&local->accept_queue, skb); 775 skb_queue_tail(&local->accept_queue, skb);
770 schedule_work(&local->acceptor); 776 rxrpc_queue_work(&local->acceptor);
771 rxrpc_put_local(local); 777 rxrpc_put_local(local);
772 _leave(" [incoming]"); 778 _leave(" [incoming]");
773 return; 779 return;
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 7bfbf471c81e..cb1eb492ee48 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -19,8 +19,6 @@
19#define CHECK_SLAB_OKAY(X) do {} while(0) 19#define CHECK_SLAB_OKAY(X) do {} while(0)
20#endif 20#endif
21 21
22extern atomic_t rxrpc_n_skbs;
23
24#define FCRYPT_BSIZE 8 22#define FCRYPT_BSIZE 8
25struct rxrpc_crypt { 23struct rxrpc_crypt {
26 union { 24 union {
@@ -29,8 +27,12 @@ struct rxrpc_crypt {
29 }; 27 };
30} __attribute__((aligned(8))); 28} __attribute__((aligned(8)));
31 29
32extern __be32 rxrpc_epoch; /* local epoch for detecting local-end reset */ 30#define rxrpc_queue_work(WS) queue_work(rxrpc_workqueue, (WS))
33extern atomic_t rxrpc_debug_id; /* current debugging ID */ 31#define rxrpc_queue_delayed_work(WS,D) \
32 queue_delayed_work(rxrpc_workqueue, (WS), (D))
33
34#define rxrpc_queue_call(CALL) rxrpc_queue_work(&(CALL)->processor)
35#define rxrpc_queue_conn(CONN) rxrpc_queue_work(&(CONN)->processor)
34 36
35/* 37/*
36 * sk_state for RxRPC sockets 38 * sk_state for RxRPC sockets
@@ -50,6 +52,7 @@ enum {
50struct rxrpc_sock { 52struct rxrpc_sock {
51 /* WARNING: sk has to be the first member */ 53 /* WARNING: sk has to be the first member */
52 struct sock sk; 54 struct sock sk;
55 rxrpc_interceptor_t interceptor; /* kernel service Rx interceptor function */
53 struct rxrpc_local *local; /* local endpoint */ 56 struct rxrpc_local *local; /* local endpoint */
54 struct rxrpc_transport *trans; /* transport handler */ 57 struct rxrpc_transport *trans; /* transport handler */
55 struct rxrpc_conn_bundle *bundle; /* virtual connection bundle */ 58 struct rxrpc_conn_bundle *bundle; /* virtual connection bundle */
@@ -91,16 +94,6 @@ struct rxrpc_skb_priv {
91 94
92#define rxrpc_skb(__skb) ((struct rxrpc_skb_priv *) &(__skb)->cb) 95#define rxrpc_skb(__skb) ((struct rxrpc_skb_priv *) &(__skb)->cb)
93 96
94enum {
95 RXRPC_SKB_MARK_DATA, /* data message */
96 RXRPC_SKB_MARK_FINAL_ACK, /* final ACK received message */
97 RXRPC_SKB_MARK_BUSY, /* server busy message */
98 RXRPC_SKB_MARK_REMOTE_ABORT, /* remote abort message */
99 RXRPC_SKB_MARK_NET_ERROR, /* network error message */
100 RXRPC_SKB_MARK_LOCAL_ERROR, /* local error message */
101 RXRPC_SKB_MARK_NEW_CALL, /* local error message */
102};
103
104enum rxrpc_command { 97enum rxrpc_command {
105 RXRPC_CMD_SEND_DATA, /* send data message */ 98 RXRPC_CMD_SEND_DATA, /* send data message */
106 RXRPC_CMD_SEND_ABORT, /* request abort generation */ 99 RXRPC_CMD_SEND_ABORT, /* request abort generation */
@@ -439,25 +432,20 @@ static inline void rxrpc_abort_call(struct rxrpc_call *call, u32 abort_code)
439} 432}
440 433
441/* 434/*
442 * put a packet up for transport-level abort 435 * af_rxrpc.c
443 */ 436 */
444static inline 437extern atomic_t rxrpc_n_skbs;
445void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb) 438extern __be32 rxrpc_epoch;
446{ 439extern atomic_t rxrpc_debug_id;
447 CHECK_SLAB_OKAY(&local->usage); 440extern struct workqueue_struct *rxrpc_workqueue;
448 if (!atomic_inc_not_zero(&local->usage)) {
449 printk("resurrected on reject\n");
450 BUG();
451 }
452 skb_queue_tail(&local->reject_queue, skb);
453 schedule_work(&local->rejecter);
454}
455 441
456/* 442/*
457 * ar-accept.c 443 * ar-accept.c
458 */ 444 */
459extern void rxrpc_accept_incoming_calls(struct work_struct *); 445extern void rxrpc_accept_incoming_calls(struct work_struct *);
460extern int rxrpc_accept_call(struct rxrpc_sock *, unsigned long); 446extern struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *,
447 unsigned long);
448extern int rxrpc_reject_call(struct rxrpc_sock *);
461 449
462/* 450/*
463 * ar-ack.c 451 * ar-ack.c
@@ -514,6 +502,7 @@ rxrpc_incoming_connection(struct rxrpc_transport *, struct rxrpc_header *,
514 * ar-connevent.c 502 * ar-connevent.c
515 */ 503 */
516extern void rxrpc_process_connection(struct work_struct *); 504extern void rxrpc_process_connection(struct work_struct *);
505extern void rxrpc_reject_packet(struct rxrpc_local *, struct sk_buff *);
517extern void rxrpc_reject_packets(struct work_struct *); 506extern void rxrpc_reject_packets(struct work_struct *);
518 507
519/* 508/*
@@ -583,6 +572,7 @@ extern struct file_operations rxrpc_connection_seq_fops;
583/* 572/*
584 * ar-recvmsg.c 573 * ar-recvmsg.c
585 */ 574 */
575extern void rxrpc_remove_user_ID(struct rxrpc_sock *, struct rxrpc_call *);
586extern int rxrpc_recvmsg(struct kiocb *, struct socket *, struct msghdr *, 576extern int rxrpc_recvmsg(struct kiocb *, struct socket *, struct msghdr *,
587 size_t, int); 577 size_t, int);
588 578
diff --git a/net/rxrpc/ar-local.c b/net/rxrpc/ar-local.c
index a20a2c0fe105..fe03f71f17da 100644
--- a/net/rxrpc/ar-local.c
+++ b/net/rxrpc/ar-local.c
@@ -228,7 +228,7 @@ void rxrpc_put_local(struct rxrpc_local *local)
228 write_lock_bh(&rxrpc_local_lock); 228 write_lock_bh(&rxrpc_local_lock);
229 if (unlikely(atomic_dec_and_test(&local->usage))) { 229 if (unlikely(atomic_dec_and_test(&local->usage))) {
230 _debug("destroy local"); 230 _debug("destroy local");
231 schedule_work(&local->destroyer); 231 rxrpc_queue_work(&local->destroyer);
232 } 232 }
233 write_unlock_bh(&rxrpc_local_lock); 233 write_unlock_bh(&rxrpc_local_lock);
234 _leave(""); 234 _leave("");
diff --git a/net/rxrpc/ar-output.c b/net/rxrpc/ar-output.c
index 67aa9510f09b..5cdde4a48ed1 100644
--- a/net/rxrpc/ar-output.c
+++ b/net/rxrpc/ar-output.c
@@ -113,7 +113,7 @@ static void rxrpc_send_abort(struct rxrpc_call *call, u32 abort_code)
113 clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events); 113 clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
114 clear_bit(RXRPC_CALL_ACK, &call->events); 114 clear_bit(RXRPC_CALL_ACK, &call->events);
115 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); 115 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
116 schedule_work(&call->processor); 116 rxrpc_queue_call(call);
117 } 117 }
118 118
119 write_unlock_bh(&call->state_lock); 119 write_unlock_bh(&call->state_lock);
@@ -194,6 +194,77 @@ int rxrpc_client_sendmsg(struct kiocb *iocb, struct rxrpc_sock *rx,
194 return ret; 194 return ret;
195} 195}
196 196
197/**
198 * rxrpc_kernel_send_data - Allow a kernel service to send data on a call
199 * @call: The call to send data through
200 * @msg: The data to send
201 * @len: The amount of data to send
202 *
203 * Allow a kernel service to send data on a call. The call must be in an state
204 * appropriate to sending data. No control data should be supplied in @msg,
205 * nor should an address be supplied. MSG_MORE should be flagged if there's
206 * more data to come, otherwise this data will end the transmission phase.
207 */
208int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg,
209 size_t len)
210{
211 int ret;
212
213 _enter("{%d,%s},", call->debug_id, rxrpc_call_states[call->state]);
214
215 ASSERTCMP(msg->msg_name, ==, NULL);
216 ASSERTCMP(msg->msg_control, ==, NULL);
217
218 lock_sock(&call->socket->sk);
219
220 _debug("CALL %d USR %lx ST %d on CONN %p",
221 call->debug_id, call->user_call_ID, call->state, call->conn);
222
223 if (call->state >= RXRPC_CALL_COMPLETE) {
224 ret = -ESHUTDOWN; /* it's too late for this call */
225 } else if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
226 call->state != RXRPC_CALL_SERVER_ACK_REQUEST &&
227 call->state != RXRPC_CALL_SERVER_SEND_REPLY) {
228 ret = -EPROTO; /* request phase complete for this client call */
229 } else {
230 mm_segment_t oldfs = get_fs();
231 set_fs(KERNEL_DS);
232 ret = rxrpc_send_data(NULL, call->socket, call, msg, len);
233 set_fs(oldfs);
234 }
235
236 release_sock(&call->socket->sk);
237 _leave(" = %d", ret);
238 return ret;
239}
240
241EXPORT_SYMBOL(rxrpc_kernel_send_data);
242
243/*
244 * rxrpc_kernel_abort_call - Allow a kernel service to abort a call
245 * @call: The call to be aborted
246 * @abort_code: The abort code to stick into the ABORT packet
247 *
248 * Allow a kernel service to abort a call, if it's still in an abortable state.
249 */
250void rxrpc_kernel_abort_call(struct rxrpc_call *call, u32 abort_code)
251{
252 _enter("{%d},%d", call->debug_id, abort_code);
253
254 lock_sock(&call->socket->sk);
255
256 _debug("CALL %d USR %lx ST %d on CONN %p",
257 call->debug_id, call->user_call_ID, call->state, call->conn);
258
259 if (call->state < RXRPC_CALL_COMPLETE)
260 rxrpc_send_abort(call, abort_code);
261
262 release_sock(&call->socket->sk);
263 _leave("");
264}
265
266EXPORT_SYMBOL(rxrpc_kernel_abort_call);
267
197/* 268/*
198 * send a message through a server socket 269 * send a message through a server socket
199 * - caller holds the socket locked 270 * - caller holds the socket locked
@@ -214,8 +285,13 @@ int rxrpc_server_sendmsg(struct kiocb *iocb, struct rxrpc_sock *rx,
214 if (ret < 0) 285 if (ret < 0)
215 return ret; 286 return ret;
216 287
217 if (cmd == RXRPC_CMD_ACCEPT) 288 if (cmd == RXRPC_CMD_ACCEPT) {
218 return rxrpc_accept_call(rx, user_call_ID); 289 call = rxrpc_accept_call(rx, user_call_ID);
290 if (IS_ERR(call))
291 return PTR_ERR(call);
292 rxrpc_put_call(call);
293 return 0;
294 }
219 295
220 call = rxrpc_find_server_call(rx, user_call_ID); 296 call = rxrpc_find_server_call(rx, user_call_ID);
221 if (!call) 297 if (!call)
@@ -363,7 +439,7 @@ static inline void rxrpc_instant_resend(struct rxrpc_call *call)
363 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); 439 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
364 if (call->state < RXRPC_CALL_COMPLETE && 440 if (call->state < RXRPC_CALL_COMPLETE &&
365 !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events)) 441 !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
366 schedule_work(&call->processor); 442 rxrpc_queue_call(call);
367 } 443 }
368 read_unlock_bh(&call->state_lock); 444 read_unlock_bh(&call->state_lock);
369} 445}
diff --git a/net/rxrpc/ar-peer.c b/net/rxrpc/ar-peer.c
index 69ac355546ae..d399de4a7fe2 100644
--- a/net/rxrpc/ar-peer.c
+++ b/net/rxrpc/ar-peer.c
@@ -219,7 +219,7 @@ void rxrpc_put_peer(struct rxrpc_peer *peer)
219 return; 219 return;
220 } 220 }
221 221
222 schedule_work(&peer->destroyer); 222 rxrpc_queue_work(&peer->destroyer);
223 _leave(""); 223 _leave("");
224} 224}
225 225
diff --git a/net/rxrpc/ar-recvmsg.c b/net/rxrpc/ar-recvmsg.c
index e947d5c15900..f19121d4795b 100644
--- a/net/rxrpc/ar-recvmsg.c
+++ b/net/rxrpc/ar-recvmsg.c
@@ -19,7 +19,7 @@
19 * removal a call's user ID from the socket tree to make the user ID available 19 * removal a call's user ID from the socket tree to make the user ID available
20 * again and so that it won't be seen again in association with that call 20 * again and so that it won't be seen again in association with that call
21 */ 21 */
22static void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call) 22void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
23{ 23{
24 _debug("RELEASE CALL %d", call->debug_id); 24 _debug("RELEASE CALL %d", call->debug_id);
25 25
@@ -33,7 +33,7 @@ static void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
33 read_lock_bh(&call->state_lock); 33 read_lock_bh(&call->state_lock);
34 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 34 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
35 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 35 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
36 schedule_work(&call->processor); 36 rxrpc_queue_call(call);
37 read_unlock_bh(&call->state_lock); 37 read_unlock_bh(&call->state_lock);
38} 38}
39 39
@@ -364,3 +364,74 @@ wait_error:
364 return copied; 364 return copied;
365 365
366} 366}
367
368/**
369 * rxrpc_kernel_data_delivered - Record delivery of data message
370 * @skb: Message holding data
371 *
372 * Record the delivery of a data message. This permits RxRPC to keep its
373 * tracking correct. The socket buffer will be deleted.
374 */
375void rxrpc_kernel_data_delivered(struct sk_buff *skb)
376{
377 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
378 struct rxrpc_call *call = sp->call;
379
380 ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
381 ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
382 call->rx_data_recv = ntohl(sp->hdr.seq);
383
384 ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
385 rxrpc_free_skb(skb);
386}
387
388EXPORT_SYMBOL(rxrpc_kernel_data_delivered);
389
390/**
391 * rxrpc_kernel_is_data_last - Determine if data message is last one
392 * @skb: Message holding data
393 *
394 * Determine if data message is last one for the parent call.
395 */
396bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
397{
398 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
399
400 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);
401
402 return sp->hdr.flags & RXRPC_LAST_PACKET;
403}
404
405EXPORT_SYMBOL(rxrpc_kernel_is_data_last);
406
407/**
408 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
409 * @skb: Message indicating an abort
410 *
411 * Get the abort code from an RxRPC abort message.
412 */
413u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
414{
415 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
416
417 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT);
418
419 return sp->call->abort_code;
420}
421
422EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);
423
424/**
425 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
426 * @skb: Message indicating an error
427 *
428 * Get the error number from an RxRPC error message.
429 */
430int rxrpc_kernel_get_error_number(struct sk_buff *skb)
431{
432 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
433
434 return sp->error;
435}
436
437EXPORT_SYMBOL(rxrpc_kernel_get_error_number);
diff --git a/net/rxrpc/ar-skbuff.c b/net/rxrpc/ar-skbuff.c
index d73f6fc76011..de755e04d29c 100644
--- a/net/rxrpc/ar-skbuff.c
+++ b/net/rxrpc/ar-skbuff.c
@@ -36,7 +36,7 @@ static void rxrpc_request_final_ACK(struct rxrpc_call *call)
36 rxrpc_get_call(call); 36 rxrpc_get_call(call);
37 set_bit(RXRPC_CALL_ACK_FINAL, &call->events); 37 set_bit(RXRPC_CALL_ACK_FINAL, &call->events);
38 if (try_to_del_timer_sync(&call->ack_timer) >= 0) 38 if (try_to_del_timer_sync(&call->ack_timer) >= 0)
39 schedule_work(&call->processor); 39 rxrpc_queue_call(call);
40 break; 40 break;
41 41
42 case RXRPC_CALL_SERVER_RECV_REQUEST: 42 case RXRPC_CALL_SERVER_RECV_REQUEST:
@@ -116,3 +116,17 @@ void rxrpc_packet_destructor(struct sk_buff *skb)
116 sock_rfree(skb); 116 sock_rfree(skb);
117 _leave(""); 117 _leave("");
118} 118}
119
120/**
121 * rxrpc_kernel_free_skb - Free an RxRPC socket buffer
122 * @skb: The socket buffer to be freed
123 *
124 * Let RxRPC free its own socket buffer, permitting it to maintain debug
125 * accounting.
126 */
127void rxrpc_kernel_free_skb(struct sk_buff *skb)
128{
129 rxrpc_free_skb(skb);
130}
131
132EXPORT_SYMBOL(rxrpc_kernel_free_skb);
diff --git a/net/rxrpc/ar-transport.c b/net/rxrpc/ar-transport.c
index 9b4e5cb545d2..d43d78f19302 100644
--- a/net/rxrpc/ar-transport.c
+++ b/net/rxrpc/ar-transport.c
@@ -189,7 +189,7 @@ void rxrpc_put_transport(struct rxrpc_transport *trans)
189 /* let the reaper determine the timeout to avoid a race with 189 /* let the reaper determine the timeout to avoid a race with
190 * overextending the timeout if the reaper is running at the 190 * overextending the timeout if the reaper is running at the
191 * same time */ 191 * same time */
192 schedule_delayed_work(&rxrpc_transport_reap, 0); 192 rxrpc_queue_delayed_work(&rxrpc_transport_reap, 0);
193 _leave(""); 193 _leave("");
194} 194}
195 195
@@ -243,8 +243,8 @@ static void rxrpc_transport_reaper(struct work_struct *work)
243 if (earliest != ULONG_MAX) { 243 if (earliest != ULONG_MAX) {
244 _debug("reschedule reaper %ld", (long) earliest - now); 244 _debug("reschedule reaper %ld", (long) earliest - now);
245 ASSERTCMP(earliest, >, now); 245 ASSERTCMP(earliest, >, now);
246 schedule_delayed_work(&rxrpc_transport_reap, 246 rxrpc_queue_delayed_work(&rxrpc_transport_reap,
247 (earliest - now) * HZ); 247 (earliest - now) * HZ);
248 } 248 }
249 249
250 /* then destroy all those pulled out */ 250 /* then destroy all those pulled out */
@@ -270,7 +270,7 @@ void __exit rxrpc_destroy_all_transports(void)
270 270
271 rxrpc_transport_timeout = 0; 271 rxrpc_transport_timeout = 0;
272 cancel_delayed_work(&rxrpc_transport_reap); 272 cancel_delayed_work(&rxrpc_transport_reap);
273 schedule_delayed_work(&rxrpc_transport_reap, 0); 273 rxrpc_queue_delayed_work(&rxrpc_transport_reap, 0);
274 274
275 _leave(""); 275 _leave("");
276} 276}