aboutsummaryrefslogtreecommitdiffstats
path: root/net/rxrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/rxrpc')
-rw-r--r--net/rxrpc/af_rxrpc.c141
-rw-r--r--net/rxrpc/ar-accept.c119
-rw-r--r--net/rxrpc/ar-ack.c10
-rw-r--r--net/rxrpc/ar-call.c75
-rw-r--r--net/rxrpc/ar-connection.c28
-rw-r--r--net/rxrpc/ar-connevent.c20
-rw-r--r--net/rxrpc/ar-error.c6
-rw-r--r--net/rxrpc/ar-input.c60
-rw-r--r--net/rxrpc/ar-internal.h44
-rw-r--r--net/rxrpc/ar-local.c2
-rw-r--r--net/rxrpc/ar-output.c84
-rw-r--r--net/rxrpc/ar-peer.c2
-rw-r--r--net/rxrpc/ar-recvmsg.c75
-rw-r--r--net/rxrpc/ar-skbuff.c16
-rw-r--r--net/rxrpc/ar-transport.c8
15 files changed, 563 insertions, 127 deletions
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index bfa8822e2286..2c57df9c131b 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -41,6 +41,8 @@ atomic_t rxrpc_debug_id;
41/* count of skbs currently in use */ 41/* count of skbs currently in use */
42atomic_t rxrpc_n_skbs; 42atomic_t rxrpc_n_skbs;
43 43
44struct workqueue_struct *rxrpc_workqueue;
45
44static void rxrpc_sock_destructor(struct sock *); 46static void rxrpc_sock_destructor(struct sock *);
45 47
46/* 48/*
@@ -214,7 +216,8 @@ static int rxrpc_listen(struct socket *sock, int backlog)
214 */ 216 */
215static struct rxrpc_transport *rxrpc_name_to_transport(struct socket *sock, 217static struct rxrpc_transport *rxrpc_name_to_transport(struct socket *sock,
216 struct sockaddr *addr, 218 struct sockaddr *addr,
217 int addr_len, int flags) 219 int addr_len, int flags,
220 gfp_t gfp)
218{ 221{
219 struct sockaddr_rxrpc *srx = (struct sockaddr_rxrpc *) addr; 222 struct sockaddr_rxrpc *srx = (struct sockaddr_rxrpc *) addr;
220 struct rxrpc_transport *trans; 223 struct rxrpc_transport *trans;
@@ -232,17 +235,129 @@ static struct rxrpc_transport *rxrpc_name_to_transport(struct socket *sock,
232 return ERR_PTR(-EAFNOSUPPORT); 235 return ERR_PTR(-EAFNOSUPPORT);
233 236
234 /* find a remote transport endpoint from the local one */ 237 /* find a remote transport endpoint from the local one */
235 peer = rxrpc_get_peer(srx, GFP_KERNEL); 238 peer = rxrpc_get_peer(srx, gfp);
236 if (IS_ERR(peer)) 239 if (IS_ERR(peer))
237 return ERR_PTR(PTR_ERR(peer)); 240 return ERR_PTR(PTR_ERR(peer));
238 241
239 /* find a transport */ 242 /* find a transport */
240 trans = rxrpc_get_transport(rx->local, peer, GFP_KERNEL); 243 trans = rxrpc_get_transport(rx->local, peer, gfp);
241 rxrpc_put_peer(peer); 244 rxrpc_put_peer(peer);
242 _leave(" = %p", trans); 245 _leave(" = %p", trans);
243 return trans; 246 return trans;
244} 247}
245 248
249/**
250 * rxrpc_kernel_begin_call - Allow a kernel service to begin a call
251 * @sock: The socket on which to make the call
252 * @srx: The address of the peer to contact (defaults to socket setting)
253 * @key: The security context to use (defaults to socket setting)
254 * @user_call_ID: The ID to use
255 *
256 * Allow a kernel service to begin a call on the nominated socket. This just
257 * sets up all the internal tracking structures and allocates connection and
258 * call IDs as appropriate. The call to be used is returned.
259 *
260 * The default socket destination address and security may be overridden by
261 * supplying @srx and @key.
262 */
263struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
264 struct sockaddr_rxrpc *srx,
265 struct key *key,
266 unsigned long user_call_ID,
267 gfp_t gfp)
268{
269 struct rxrpc_conn_bundle *bundle;
270 struct rxrpc_transport *trans;
271 struct rxrpc_call *call;
272 struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
273 __be16 service_id;
274
275 _enter(",,%x,%lx", key_serial(key), user_call_ID);
276
277 lock_sock(&rx->sk);
278
279 if (srx) {
280 trans = rxrpc_name_to_transport(sock, (struct sockaddr *) srx,
281 sizeof(*srx), 0, gfp);
282 if (IS_ERR(trans)) {
283 call = ERR_PTR(PTR_ERR(trans));
284 trans = NULL;
285 goto out;
286 }
287 } else {
288 trans = rx->trans;
289 if (!trans) {
290 call = ERR_PTR(-ENOTCONN);
291 goto out;
292 }
293 atomic_inc(&trans->usage);
294 }
295
296 service_id = rx->service_id;
297 if (srx)
298 service_id = htons(srx->srx_service);
299
300 if (!key)
301 key = rx->key;
302 if (key && !key->payload.data)
303 key = NULL; /* a no-security key */
304
305 bundle = rxrpc_get_bundle(rx, trans, key, service_id, gfp);
306 if (IS_ERR(bundle)) {
307 call = ERR_PTR(PTR_ERR(bundle));
308 goto out;
309 }
310
311 call = rxrpc_get_client_call(rx, trans, bundle, user_call_ID, true,
312 gfp);
313 rxrpc_put_bundle(trans, bundle);
314out:
315 rxrpc_put_transport(trans);
316 release_sock(&rx->sk);
317 _leave(" = %p", call);
318 return call;
319}
320
321EXPORT_SYMBOL(rxrpc_kernel_begin_call);
322
323/**
324 * rxrpc_kernel_end_call - Allow a kernel service to end a call it was using
325 * @call: The call to end
326 *
327 * Allow a kernel service to end a call it was using. The call must be
328 * complete before this is called (the call should be aborted if necessary).
329 */
330void rxrpc_kernel_end_call(struct rxrpc_call *call)
331{
332 _enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
333 rxrpc_remove_user_ID(call->socket, call);
334 rxrpc_put_call(call);
335}
336
337EXPORT_SYMBOL(rxrpc_kernel_end_call);
338
339/**
340 * rxrpc_kernel_intercept_rx_messages - Intercept received RxRPC messages
341 * @sock: The socket to intercept received messages on
342 * @interceptor: The function to pass the messages to
343 *
344 * Allow a kernel service to intercept messages heading for the Rx queue on an
345 * RxRPC socket. They get passed to the specified function instead.
346 * @interceptor should free the socket buffers it is given. @interceptor is
347 * called with the socket receive queue spinlock held and softirqs disabled -
348 * this ensures that the messages will be delivered in the right order.
349 */
350void rxrpc_kernel_intercept_rx_messages(struct socket *sock,
351 rxrpc_interceptor_t interceptor)
352{
353 struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
354
355 _enter("");
356 rx->interceptor = interceptor;
357}
358
359EXPORT_SYMBOL(rxrpc_kernel_intercept_rx_messages);
360
246/* 361/*
247 * connect an RxRPC socket 362 * connect an RxRPC socket
248 * - this just targets it at a specific destination; no actual connection 363 * - this just targets it at a specific destination; no actual connection
@@ -294,7 +409,8 @@ static int rxrpc_connect(struct socket *sock, struct sockaddr *addr,
294 return -EBUSY; /* server sockets can't connect as well */ 409 return -EBUSY; /* server sockets can't connect as well */
295 } 410 }
296 411
297 trans = rxrpc_name_to_transport(sock, addr, addr_len, flags); 412 trans = rxrpc_name_to_transport(sock, addr, addr_len, flags,
413 GFP_KERNEL);
298 if (IS_ERR(trans)) { 414 if (IS_ERR(trans)) {
299 release_sock(&rx->sk); 415 release_sock(&rx->sk);
300 _leave(" = %ld", PTR_ERR(trans)); 416 _leave(" = %ld", PTR_ERR(trans));
@@ -344,7 +460,7 @@ static int rxrpc_sendmsg(struct kiocb *iocb, struct socket *sock,
344 if (m->msg_name) { 460 if (m->msg_name) {
345 ret = -EISCONN; 461 ret = -EISCONN;
346 trans = rxrpc_name_to_transport(sock, m->msg_name, 462 trans = rxrpc_name_to_transport(sock, m->msg_name,
347 m->msg_namelen, 0); 463 m->msg_namelen, 0, GFP_KERNEL);
348 if (IS_ERR(trans)) { 464 if (IS_ERR(trans)) {
349 ret = PTR_ERR(trans); 465 ret = PTR_ERR(trans);
350 trans = NULL; 466 trans = NULL;
@@ -576,7 +692,7 @@ static int rxrpc_release_sock(struct sock *sk)
576 692
577 /* try to flush out this socket */ 693 /* try to flush out this socket */
578 rxrpc_release_calls_on_socket(rx); 694 rxrpc_release_calls_on_socket(rx);
579 flush_scheduled_work(); 695 flush_workqueue(rxrpc_workqueue);
580 rxrpc_purge_queue(&sk->sk_receive_queue); 696 rxrpc_purge_queue(&sk->sk_receive_queue);
581 697
582 if (rx->conn) { 698 if (rx->conn) {
@@ -673,15 +789,21 @@ static int __init af_rxrpc_init(void)
673 789
674 rxrpc_epoch = htonl(xtime.tv_sec); 790 rxrpc_epoch = htonl(xtime.tv_sec);
675 791
792 ret = -ENOMEM;
676 rxrpc_call_jar = kmem_cache_create( 793 rxrpc_call_jar = kmem_cache_create(
677 "rxrpc_call_jar", sizeof(struct rxrpc_call), 0, 794 "rxrpc_call_jar", sizeof(struct rxrpc_call), 0,
678 SLAB_HWCACHE_ALIGN, NULL, NULL); 795 SLAB_HWCACHE_ALIGN, NULL, NULL);
679 if (!rxrpc_call_jar) { 796 if (!rxrpc_call_jar) {
680 printk(KERN_NOTICE "RxRPC: Failed to allocate call jar\n"); 797 printk(KERN_NOTICE "RxRPC: Failed to allocate call jar\n");
681 ret = -ENOMEM;
682 goto error_call_jar; 798 goto error_call_jar;
683 } 799 }
684 800
801 rxrpc_workqueue = create_workqueue("krxrpcd");
802 if (!rxrpc_workqueue) {
803 printk(KERN_NOTICE "RxRPC: Failed to allocate work queue\n");
804 goto error_work_queue;
805 }
806
685 ret = proto_register(&rxrpc_proto, 1); 807 ret = proto_register(&rxrpc_proto, 1);
686 if (ret < 0) { 808 if (ret < 0) {
687 printk(KERN_CRIT "RxRPC: Cannot register protocol\n"); 809 printk(KERN_CRIT "RxRPC: Cannot register protocol\n");
@@ -719,6 +841,8 @@ error_key_type:
719error_sock: 841error_sock:
720 proto_unregister(&rxrpc_proto); 842 proto_unregister(&rxrpc_proto);
721error_proto: 843error_proto:
844 destroy_workqueue(rxrpc_workqueue);
845error_work_queue:
722 kmem_cache_destroy(rxrpc_call_jar); 846 kmem_cache_destroy(rxrpc_call_jar);
723error_call_jar: 847error_call_jar:
724 return ret; 848 return ret;
@@ -743,9 +867,10 @@ static void __exit af_rxrpc_exit(void)
743 ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0); 867 ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0);
744 868
745 _debug("flush scheduled work"); 869 _debug("flush scheduled work");
746 flush_scheduled_work(); 870 flush_workqueue(rxrpc_workqueue);
747 proc_net_remove("rxrpc_conns"); 871 proc_net_remove("rxrpc_conns");
748 proc_net_remove("rxrpc_calls"); 872 proc_net_remove("rxrpc_calls");
873 destroy_workqueue(rxrpc_workqueue);
749 kmem_cache_destroy(rxrpc_call_jar); 874 kmem_cache_destroy(rxrpc_call_jar);
750 _leave(""); 875 _leave("");
751} 876}
diff --git a/net/rxrpc/ar-accept.c b/net/rxrpc/ar-accept.c
index e7af780cd6f9..92a87fde8bfe 100644
--- a/net/rxrpc/ar-accept.c
+++ b/net/rxrpc/ar-accept.c
@@ -139,7 +139,7 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
139 call->conn->state = RXRPC_CONN_SERVER_CHALLENGING; 139 call->conn->state = RXRPC_CONN_SERVER_CHALLENGING;
140 atomic_inc(&call->conn->usage); 140 atomic_inc(&call->conn->usage);
141 set_bit(RXRPC_CONN_CHALLENGE, &call->conn->events); 141 set_bit(RXRPC_CONN_CHALLENGE, &call->conn->events);
142 schedule_work(&call->conn->processor); 142 rxrpc_queue_conn(call->conn);
143 } else { 143 } else {
144 _debug("conn ready"); 144 _debug("conn ready");
145 call->state = RXRPC_CALL_SERVER_ACCEPTING; 145 call->state = RXRPC_CALL_SERVER_ACCEPTING;
@@ -183,7 +183,7 @@ invalid_service:
183 if (!test_bit(RXRPC_CALL_RELEASE, &call->flags) && 183 if (!test_bit(RXRPC_CALL_RELEASE, &call->flags) &&
184 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) { 184 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) {
185 rxrpc_get_call(call); 185 rxrpc_get_call(call);
186 schedule_work(&call->processor); 186 rxrpc_queue_call(call);
187 } 187 }
188 read_unlock_bh(&call->state_lock); 188 read_unlock_bh(&call->state_lock);
189 rxrpc_put_call(call); 189 rxrpc_put_call(call);
@@ -310,7 +310,8 @@ security_mismatch:
310 * handle acceptance of a call by userspace 310 * handle acceptance of a call by userspace
311 * - assign the user call ID to the call at the front of the queue 311 * - assign the user call ID to the call at the front of the queue
312 */ 312 */
313int rxrpc_accept_call(struct rxrpc_sock *rx, unsigned long user_call_ID) 313struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
314 unsigned long user_call_ID)
314{ 315{
315 struct rxrpc_call *call; 316 struct rxrpc_call *call;
316 struct rb_node *parent, **pp; 317 struct rb_node *parent, **pp;
@@ -374,12 +375,76 @@ int rxrpc_accept_call(struct rxrpc_sock *rx, unsigned long user_call_ID)
374 BUG(); 375 BUG();
375 if (test_and_set_bit(RXRPC_CALL_ACCEPTED, &call->events)) 376 if (test_and_set_bit(RXRPC_CALL_ACCEPTED, &call->events))
376 BUG(); 377 BUG();
377 schedule_work(&call->processor); 378 rxrpc_queue_call(call);
378 379
380 rxrpc_get_call(call);
379 write_unlock_bh(&call->state_lock); 381 write_unlock_bh(&call->state_lock);
380 write_unlock(&rx->call_lock); 382 write_unlock(&rx->call_lock);
381 _leave(" = 0"); 383 _leave(" = %p{%d}", call, call->debug_id);
382 return 0; 384 return call;
385
386 /* if the call is already dying or dead, then we leave the socket's ref
387 * on it to be released by rxrpc_dead_call_expired() as induced by
388 * rxrpc_release_call() */
389out_release:
390 _debug("release %p", call);
391 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
392 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
393 rxrpc_queue_call(call);
394out_discard:
395 write_unlock_bh(&call->state_lock);
396 _debug("discard %p", call);
397out:
398 write_unlock(&rx->call_lock);
399 _leave(" = %d", ret);
400 return ERR_PTR(ret);
401}
402
403/*
404 * handle rejectance of a call by userspace
405 * - reject the call at the front of the queue
406 */
407int rxrpc_reject_call(struct rxrpc_sock *rx)
408{
409 struct rxrpc_call *call;
410 int ret;
411
412 _enter("");
413
414 ASSERT(!irqs_disabled());
415
416 write_lock(&rx->call_lock);
417
418 ret = -ENODATA;
419 if (list_empty(&rx->acceptq))
420 goto out;
421
422 /* dequeue the first call and check it's still valid */
423 call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
424 list_del_init(&call->accept_link);
425 sk_acceptq_removed(&rx->sk);
426
427 write_lock_bh(&call->state_lock);
428 switch (call->state) {
429 case RXRPC_CALL_SERVER_ACCEPTING:
430 call->state = RXRPC_CALL_SERVER_BUSY;
431 if (test_and_set_bit(RXRPC_CALL_REJECT_BUSY, &call->events))
432 rxrpc_queue_call(call);
433 ret = 0;
434 goto out_release;
435 case RXRPC_CALL_REMOTELY_ABORTED:
436 case RXRPC_CALL_LOCALLY_ABORTED:
437 ret = -ECONNABORTED;
438 goto out_release;
439 case RXRPC_CALL_NETWORK_ERROR:
440 ret = call->conn->error;
441 goto out_release;
442 case RXRPC_CALL_DEAD:
443 ret = -ETIME;
444 goto out_discard;
445 default:
446 BUG();
447 }
383 448
384 /* if the call is already dying or dead, then we leave the socket's ref 449 /* if the call is already dying or dead, then we leave the socket's ref
385 * on it to be released by rxrpc_dead_call_expired() as induced by 450 * on it to be released by rxrpc_dead_call_expired() as induced by
@@ -388,7 +453,7 @@ out_release:
388 _debug("release %p", call); 453 _debug("release %p", call);
389 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 454 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
390 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 455 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
391 schedule_work(&call->processor); 456 rxrpc_queue_call(call);
392out_discard: 457out_discard:
393 write_unlock_bh(&call->state_lock); 458 write_unlock_bh(&call->state_lock);
394 _debug("discard %p", call); 459 _debug("discard %p", call);
@@ -397,3 +462,43 @@ out:
397 _leave(" = %d", ret); 462 _leave(" = %d", ret);
398 return ret; 463 return ret;
399} 464}
465
466/**
467 * rxrpc_kernel_accept_call - Allow a kernel service to accept an incoming call
468 * @sock: The socket on which the impending call is waiting
469 * @user_call_ID: The tag to attach to the call
470 *
471 * Allow a kernel service to accept an incoming call, assuming the incoming
472 * call is still valid.
473 */
474struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *sock,
475 unsigned long user_call_ID)
476{
477 struct rxrpc_call *call;
478
479 _enter(",%lx", user_call_ID);
480 call = rxrpc_accept_call(rxrpc_sk(sock->sk), user_call_ID);
481 _leave(" = %p", call);
482 return call;
483}
484
485EXPORT_SYMBOL(rxrpc_kernel_accept_call);
486
487/**
488 * rxrpc_kernel_reject_call - Allow a kernel service to reject an incoming call
489 * @sock: The socket on which the impending call is waiting
490 *
491 * Allow a kernel service to reject an incoming call with a BUSY message,
492 * assuming the incoming call is still valid.
493 */
494int rxrpc_kernel_reject_call(struct socket *sock)
495{
496 int ret;
497
498 _enter("");
499 ret = rxrpc_reject_call(rxrpc_sk(sock->sk));
500 _leave(" = %d", ret);
501 return ret;
502}
503
504EXPORT_SYMBOL(rxrpc_kernel_reject_call);
diff --git a/net/rxrpc/ar-ack.c b/net/rxrpc/ar-ack.c
index 8f7764eca96c..fc07a926df56 100644
--- a/net/rxrpc/ar-ack.c
+++ b/net/rxrpc/ar-ack.c
@@ -113,7 +113,7 @@ cancel_timer:
113 read_lock_bh(&call->state_lock); 113 read_lock_bh(&call->state_lock);
114 if (call->state <= RXRPC_CALL_COMPLETE && 114 if (call->state <= RXRPC_CALL_COMPLETE &&
115 !test_and_set_bit(RXRPC_CALL_ACK, &call->events)) 115 !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
116 schedule_work(&call->processor); 116 rxrpc_queue_call(call);
117 read_unlock_bh(&call->state_lock); 117 read_unlock_bh(&call->state_lock);
118} 118}
119 119
@@ -1166,7 +1166,7 @@ send_message_2:
1166 _debug("sendmsg failed: %d", ret); 1166 _debug("sendmsg failed: %d", ret);
1167 read_lock_bh(&call->state_lock); 1167 read_lock_bh(&call->state_lock);
1168 if (call->state < RXRPC_CALL_DEAD) 1168 if (call->state < RXRPC_CALL_DEAD)
1169 schedule_work(&call->processor); 1169 rxrpc_queue_call(call);
1170 read_unlock_bh(&call->state_lock); 1170 read_unlock_bh(&call->state_lock);
1171 goto error; 1171 goto error;
1172 } 1172 }
@@ -1210,7 +1210,7 @@ maybe_reschedule:
1210 if (call->events || !skb_queue_empty(&call->rx_queue)) { 1210 if (call->events || !skb_queue_empty(&call->rx_queue)) {
1211 read_lock_bh(&call->state_lock); 1211 read_lock_bh(&call->state_lock);
1212 if (call->state < RXRPC_CALL_DEAD) 1212 if (call->state < RXRPC_CALL_DEAD)
1213 schedule_work(&call->processor); 1213 rxrpc_queue_call(call);
1214 read_unlock_bh(&call->state_lock); 1214 read_unlock_bh(&call->state_lock);
1215 } 1215 }
1216 1216
@@ -1224,7 +1224,7 @@ maybe_reschedule:
1224 read_lock_bh(&call->state_lock); 1224 read_lock_bh(&call->state_lock);
1225 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 1225 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1226 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 1226 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
1227 schedule_work(&call->processor); 1227 rxrpc_queue_call(call);
1228 read_unlock_bh(&call->state_lock); 1228 read_unlock_bh(&call->state_lock);
1229 } 1229 }
1230 1230
@@ -1238,7 +1238,7 @@ error:
1238 * work pending bit and the work item being processed again */ 1238 * work pending bit and the work item being processed again */
1239 if (call->events && !work_pending(&call->processor)) { 1239 if (call->events && !work_pending(&call->processor)) {
1240 _debug("jumpstart %x", ntohl(call->conn->cid)); 1240 _debug("jumpstart %x", ntohl(call->conn->cid));
1241 schedule_work(&call->processor); 1241 rxrpc_queue_call(call);
1242 } 1242 }
1243 1243
1244 _leave(""); 1244 _leave("");
diff --git a/net/rxrpc/ar-call.c b/net/rxrpc/ar-call.c
index ac31cceda2f1..4d92d88ff1fc 100644
--- a/net/rxrpc/ar-call.c
+++ b/net/rxrpc/ar-call.c
@@ -19,7 +19,7 @@ struct kmem_cache *rxrpc_call_jar;
19LIST_HEAD(rxrpc_calls); 19LIST_HEAD(rxrpc_calls);
20DEFINE_RWLOCK(rxrpc_call_lock); 20DEFINE_RWLOCK(rxrpc_call_lock);
21static unsigned rxrpc_call_max_lifetime = 60; 21static unsigned rxrpc_call_max_lifetime = 60;
22static unsigned rxrpc_dead_call_timeout = 10; 22static unsigned rxrpc_dead_call_timeout = 2;
23 23
24static void rxrpc_destroy_call(struct work_struct *work); 24static void rxrpc_destroy_call(struct work_struct *work);
25static void rxrpc_call_life_expired(unsigned long _call); 25static void rxrpc_call_life_expired(unsigned long _call);
@@ -264,7 +264,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
264 switch (call->state) { 264 switch (call->state) {
265 case RXRPC_CALL_LOCALLY_ABORTED: 265 case RXRPC_CALL_LOCALLY_ABORTED:
266 if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events)) 266 if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
267 schedule_work(&call->processor); 267 rxrpc_queue_call(call);
268 case RXRPC_CALL_REMOTELY_ABORTED: 268 case RXRPC_CALL_REMOTELY_ABORTED:
269 read_unlock(&call->state_lock); 269 read_unlock(&call->state_lock);
270 goto aborted_call; 270 goto aborted_call;
@@ -398,6 +398,7 @@ found_extant_call:
398 */ 398 */
399void rxrpc_release_call(struct rxrpc_call *call) 399void rxrpc_release_call(struct rxrpc_call *call)
400{ 400{
401 struct rxrpc_connection *conn = call->conn;
401 struct rxrpc_sock *rx = call->socket; 402 struct rxrpc_sock *rx = call->socket;
402 403
403 _enter("{%d,%d,%d,%d}", 404 _enter("{%d,%d,%d,%d}",
@@ -413,8 +414,7 @@ void rxrpc_release_call(struct rxrpc_call *call)
413 /* dissociate from the socket 414 /* dissociate from the socket
414 * - the socket's ref on the call is passed to the death timer 415 * - the socket's ref on the call is passed to the death timer
415 */ 416 */
416 _debug("RELEASE CALL %p (%d CONN %p)", 417 _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
417 call, call->debug_id, call->conn);
418 418
419 write_lock_bh(&rx->call_lock); 419 write_lock_bh(&rx->call_lock);
420 if (!list_empty(&call->accept_link)) { 420 if (!list_empty(&call->accept_link)) {
@@ -430,24 +430,42 @@ void rxrpc_release_call(struct rxrpc_call *call)
430 } 430 }
431 write_unlock_bh(&rx->call_lock); 431 write_unlock_bh(&rx->call_lock);
432 432
433 if (call->conn->out_clientflag)
434 spin_lock(&call->conn->trans->client_lock);
435 write_lock_bh(&call->conn->lock);
436
437 /* free up the channel for reuse */ 433 /* free up the channel for reuse */
438 if (call->conn->out_clientflag) { 434 spin_lock(&conn->trans->client_lock);
439 call->conn->avail_calls++; 435 write_lock_bh(&conn->lock);
440 if (call->conn->avail_calls == RXRPC_MAXCALLS) 436 write_lock(&call->state_lock);
441 list_move_tail(&call->conn->bundle_link, 437
442 &call->conn->bundle->unused_conns); 438 if (conn->channels[call->channel] == call)
443 else if (call->conn->avail_calls == 1) 439 conn->channels[call->channel] = NULL;
444 list_move_tail(&call->conn->bundle_link, 440
445 &call->conn->bundle->avail_conns); 441 if (conn->out_clientflag && conn->bundle) {
442 conn->avail_calls++;
443 switch (conn->avail_calls) {
444 case 1:
445 list_move_tail(&conn->bundle_link,
446 &conn->bundle->avail_conns);
447 case 2 ... RXRPC_MAXCALLS - 1:
448 ASSERT(conn->channels[0] == NULL ||
449 conn->channels[1] == NULL ||
450 conn->channels[2] == NULL ||
451 conn->channels[3] == NULL);
452 break;
453 case RXRPC_MAXCALLS:
454 list_move_tail(&conn->bundle_link,
455 &conn->bundle->unused_conns);
456 ASSERT(conn->channels[0] == NULL &&
457 conn->channels[1] == NULL &&
458 conn->channels[2] == NULL &&
459 conn->channels[3] == NULL);
460 break;
461 default:
462 printk(KERN_ERR "RxRPC: conn->avail_calls=%d\n",
463 conn->avail_calls);
464 BUG();
465 }
446 } 466 }
447 467
448 write_lock(&call->state_lock); 468 spin_unlock(&conn->trans->client_lock);
449 if (call->conn->channels[call->channel] == call)
450 call->conn->channels[call->channel] = NULL;
451 469
452 if (call->state < RXRPC_CALL_COMPLETE && 470 if (call->state < RXRPC_CALL_COMPLETE &&
453 call->state != RXRPC_CALL_CLIENT_FINAL_ACK) { 471 call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
@@ -455,13 +473,12 @@ void rxrpc_release_call(struct rxrpc_call *call)
455 call->state = RXRPC_CALL_LOCALLY_ABORTED; 473 call->state = RXRPC_CALL_LOCALLY_ABORTED;
456 call->abort_code = RX_CALL_DEAD; 474 call->abort_code = RX_CALL_DEAD;
457 set_bit(RXRPC_CALL_ABORT, &call->events); 475 set_bit(RXRPC_CALL_ABORT, &call->events);
458 schedule_work(&call->processor); 476 rxrpc_queue_call(call);
459 } 477 }
460 write_unlock(&call->state_lock); 478 write_unlock(&call->state_lock);
461 write_unlock_bh(&call->conn->lock); 479 write_unlock_bh(&conn->lock);
462 if (call->conn->out_clientflag)
463 spin_unlock(&call->conn->trans->client_lock);
464 480
481 /* clean up the Rx queue */
465 if (!skb_queue_empty(&call->rx_queue) || 482 if (!skb_queue_empty(&call->rx_queue) ||
466 !skb_queue_empty(&call->rx_oos_queue)) { 483 !skb_queue_empty(&call->rx_oos_queue)) {
467 struct rxrpc_skb_priv *sp; 484 struct rxrpc_skb_priv *sp;
@@ -538,7 +555,7 @@ static void rxrpc_mark_call_released(struct rxrpc_call *call)
538 if (!test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 555 if (!test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
539 sched = true; 556 sched = true;
540 if (sched) 557 if (sched)
541 schedule_work(&call->processor); 558 rxrpc_queue_call(call);
542 } 559 }
543 write_unlock(&call->state_lock); 560 write_unlock(&call->state_lock);
544} 561}
@@ -588,7 +605,7 @@ void __rxrpc_put_call(struct rxrpc_call *call)
588 if (atomic_dec_and_test(&call->usage)) { 605 if (atomic_dec_and_test(&call->usage)) {
589 _debug("call %d dead", call->debug_id); 606 _debug("call %d dead", call->debug_id);
590 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD); 607 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
591 schedule_work(&call->destroyer); 608 rxrpc_queue_work(&call->destroyer);
592 } 609 }
593 _leave(""); 610 _leave("");
594} 611}
@@ -613,7 +630,7 @@ static void rxrpc_cleanup_call(struct rxrpc_call *call)
613 ASSERTCMP(call->events, ==, 0); 630 ASSERTCMP(call->events, ==, 0);
614 if (work_pending(&call->processor)) { 631 if (work_pending(&call->processor)) {
615 _debug("defer destroy"); 632 _debug("defer destroy");
616 schedule_work(&call->destroyer); 633 rxrpc_queue_work(&call->destroyer);
617 return; 634 return;
618 } 635 }
619 636
@@ -742,7 +759,7 @@ static void rxrpc_call_life_expired(unsigned long _call)
742 read_lock_bh(&call->state_lock); 759 read_lock_bh(&call->state_lock);
743 if (call->state < RXRPC_CALL_COMPLETE) { 760 if (call->state < RXRPC_CALL_COMPLETE) {
744 set_bit(RXRPC_CALL_LIFE_TIMER, &call->events); 761 set_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
745 schedule_work(&call->processor); 762 rxrpc_queue_call(call);
746 } 763 }
747 read_unlock_bh(&call->state_lock); 764 read_unlock_bh(&call->state_lock);
748} 765}
@@ -763,7 +780,7 @@ static void rxrpc_resend_time_expired(unsigned long _call)
763 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); 780 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
764 if (call->state < RXRPC_CALL_COMPLETE && 781 if (call->state < RXRPC_CALL_COMPLETE &&
765 !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events)) 782 !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
766 schedule_work(&call->processor); 783 rxrpc_queue_call(call);
767 read_unlock_bh(&call->state_lock); 784 read_unlock_bh(&call->state_lock);
768} 785}
769 786
@@ -782,6 +799,6 @@ static void rxrpc_ack_time_expired(unsigned long _call)
782 read_lock_bh(&call->state_lock); 799 read_lock_bh(&call->state_lock);
783 if (call->state < RXRPC_CALL_COMPLETE && 800 if (call->state < RXRPC_CALL_COMPLETE &&
784 !test_and_set_bit(RXRPC_CALL_ACK, &call->events)) 801 !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
785 schedule_work(&call->processor); 802 rxrpc_queue_call(call);
786 read_unlock_bh(&call->state_lock); 803 read_unlock_bh(&call->state_lock);
787} 804}
diff --git a/net/rxrpc/ar-connection.c b/net/rxrpc/ar-connection.c
index 01eb33c30571..43cb3e051ece 100644
--- a/net/rxrpc/ar-connection.c
+++ b/net/rxrpc/ar-connection.c
@@ -356,7 +356,7 @@ static int rxrpc_connect_exclusive(struct rxrpc_sock *rx,
356 conn->out_clientflag = RXRPC_CLIENT_INITIATED; 356 conn->out_clientflag = RXRPC_CLIENT_INITIATED;
357 conn->cid = 0; 357 conn->cid = 0;
358 conn->state = RXRPC_CONN_CLIENT; 358 conn->state = RXRPC_CONN_CLIENT;
359 conn->avail_calls = RXRPC_MAXCALLS; 359 conn->avail_calls = RXRPC_MAXCALLS - 1;
360 conn->security_level = rx->min_sec_level; 360 conn->security_level = rx->min_sec_level;
361 conn->key = key_get(rx->key); 361 conn->key = key_get(rx->key);
362 362
@@ -447,6 +447,11 @@ int rxrpc_connect_call(struct rxrpc_sock *rx,
447 if (--conn->avail_calls == 0) 447 if (--conn->avail_calls == 0)
448 list_move(&conn->bundle_link, 448 list_move(&conn->bundle_link,
449 &bundle->busy_conns); 449 &bundle->busy_conns);
450 ASSERTCMP(conn->avail_calls, <, RXRPC_MAXCALLS);
451 ASSERT(conn->channels[0] == NULL ||
452 conn->channels[1] == NULL ||
453 conn->channels[2] == NULL ||
454 conn->channels[3] == NULL);
450 atomic_inc(&conn->usage); 455 atomic_inc(&conn->usage);
451 break; 456 break;
452 } 457 }
@@ -456,6 +461,12 @@ int rxrpc_connect_call(struct rxrpc_sock *rx,
456 conn = list_entry(bundle->unused_conns.next, 461 conn = list_entry(bundle->unused_conns.next,
457 struct rxrpc_connection, 462 struct rxrpc_connection,
458 bundle_link); 463 bundle_link);
464 ASSERTCMP(conn->avail_calls, ==, RXRPC_MAXCALLS);
465 conn->avail_calls = RXRPC_MAXCALLS - 1;
466 ASSERT(conn->channels[0] == NULL &&
467 conn->channels[1] == NULL &&
468 conn->channels[2] == NULL &&
469 conn->channels[3] == NULL);
459 atomic_inc(&conn->usage); 470 atomic_inc(&conn->usage);
460 list_move(&conn->bundle_link, &bundle->avail_conns); 471 list_move(&conn->bundle_link, &bundle->avail_conns);
461 break; 472 break;
@@ -512,7 +523,7 @@ int rxrpc_connect_call(struct rxrpc_sock *rx,
512 candidate->state = RXRPC_CONN_CLIENT; 523 candidate->state = RXRPC_CONN_CLIENT;
513 candidate->avail_calls = RXRPC_MAXCALLS; 524 candidate->avail_calls = RXRPC_MAXCALLS;
514 candidate->security_level = rx->min_sec_level; 525 candidate->security_level = rx->min_sec_level;
515 candidate->key = key_get(rx->key); 526 candidate->key = key_get(bundle->key);
516 527
517 ret = rxrpc_init_client_conn_security(candidate); 528 ret = rxrpc_init_client_conn_security(candidate);
518 if (ret < 0) { 529 if (ret < 0) {
@@ -555,6 +566,10 @@ int rxrpc_connect_call(struct rxrpc_sock *rx,
555 for (chan = 0; chan < RXRPC_MAXCALLS; chan++) 566 for (chan = 0; chan < RXRPC_MAXCALLS; chan++)
556 if (!conn->channels[chan]) 567 if (!conn->channels[chan])
557 goto found_channel; 568 goto found_channel;
569 ASSERT(conn->channels[0] == NULL ||
570 conn->channels[1] == NULL ||
571 conn->channels[2] == NULL ||
572 conn->channels[3] == NULL);
558 BUG(); 573 BUG();
559 574
560found_channel: 575found_channel:
@@ -567,6 +582,7 @@ found_channel:
567 _net("CONNECT client on conn %d chan %d as call %x", 582 _net("CONNECT client on conn %d chan %d as call %x",
568 conn->debug_id, chan, ntohl(call->call_id)); 583 conn->debug_id, chan, ntohl(call->call_id));
569 584
585 ASSERTCMP(conn->avail_calls, <, RXRPC_MAXCALLS);
570 spin_unlock(&trans->client_lock); 586 spin_unlock(&trans->client_lock);
571 587
572 rxrpc_add_call_ID_to_conn(conn, call); 588 rxrpc_add_call_ID_to_conn(conn, call);
@@ -778,7 +794,7 @@ void rxrpc_put_connection(struct rxrpc_connection *conn)
778 conn->put_time = xtime.tv_sec; 794 conn->put_time = xtime.tv_sec;
779 if (atomic_dec_and_test(&conn->usage)) { 795 if (atomic_dec_and_test(&conn->usage)) {
780 _debug("zombie"); 796 _debug("zombie");
781 schedule_delayed_work(&rxrpc_connection_reap, 0); 797 rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
782 } 798 }
783 799
784 _leave(""); 800 _leave("");
@@ -862,8 +878,8 @@ void rxrpc_connection_reaper(struct work_struct *work)
862 if (earliest != ULONG_MAX) { 878 if (earliest != ULONG_MAX) {
863 _debug("reschedule reaper %ld", (long) earliest - now); 879 _debug("reschedule reaper %ld", (long) earliest - now);
864 ASSERTCMP(earliest, >, now); 880 ASSERTCMP(earliest, >, now);
865 schedule_delayed_work(&rxrpc_connection_reap, 881 rxrpc_queue_delayed_work(&rxrpc_connection_reap,
866 (earliest - now) * HZ); 882 (earliest - now) * HZ);
867 } 883 }
868 884
869 /* then destroy all those pulled out */ 885 /* then destroy all those pulled out */
@@ -889,7 +905,7 @@ void __exit rxrpc_destroy_all_connections(void)
889 905
890 rxrpc_connection_timeout = 0; 906 rxrpc_connection_timeout = 0;
891 cancel_delayed_work(&rxrpc_connection_reap); 907 cancel_delayed_work(&rxrpc_connection_reap);
892 schedule_delayed_work(&rxrpc_connection_reap, 0); 908 rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
893 909
894 _leave(""); 910 _leave("");
895} 911}
diff --git a/net/rxrpc/ar-connevent.c b/net/rxrpc/ar-connevent.c
index 4b02815c1ded..1ada43d51165 100644
--- a/net/rxrpc/ar-connevent.c
+++ b/net/rxrpc/ar-connevent.c
@@ -45,7 +45,7 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state,
45 set_bit(RXRPC_CALL_CONN_ABORT, &call->events); 45 set_bit(RXRPC_CALL_CONN_ABORT, &call->events);
46 else 46 else
47 set_bit(RXRPC_CALL_RCVD_ABORT, &call->events); 47 set_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
48 schedule_work(&call->processor); 48 rxrpc_queue_call(call);
49 } 49 }
50 write_unlock(&call->state_lock); 50 write_unlock(&call->state_lock);
51 } 51 }
@@ -133,7 +133,7 @@ void rxrpc_call_is_secure(struct rxrpc_call *call)
133 read_lock(&call->state_lock); 133 read_lock(&call->state_lock);
134 if (call->state < RXRPC_CALL_COMPLETE && 134 if (call->state < RXRPC_CALL_COMPLETE &&
135 !test_and_set_bit(RXRPC_CALL_SECURED, &call->events)) 135 !test_and_set_bit(RXRPC_CALL_SECURED, &call->events))
136 schedule_work(&call->processor); 136 rxrpc_queue_call(call);
137 read_unlock(&call->state_lock); 137 read_unlock(&call->state_lock);
138 } 138 }
139} 139}
@@ -308,6 +308,22 @@ protocol_error:
308} 308}
309 309
310/* 310/*
311 * put a packet up for transport-level abort
312 */
313void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
314{
315 CHECK_SLAB_OKAY(&local->usage);
316
317 if (!atomic_inc_not_zero(&local->usage)) {
318 printk("resurrected on reject\n");
319 BUG();
320 }
321
322 skb_queue_tail(&local->reject_queue, skb);
323 rxrpc_queue_work(&local->rejecter);
324}
325
326/*
311 * reject packets through the local endpoint 327 * reject packets through the local endpoint
312 */ 328 */
313void rxrpc_reject_packets(struct work_struct *work) 329void rxrpc_reject_packets(struct work_struct *work)
diff --git a/net/rxrpc/ar-error.c b/net/rxrpc/ar-error.c
index f5539e2f7b58..2c27df1ffa17 100644
--- a/net/rxrpc/ar-error.c
+++ b/net/rxrpc/ar-error.c
@@ -111,7 +111,7 @@ void rxrpc_UDP_error_report(struct sock *sk)
111 111
112 /* pass the transport ref to error_handler to release */ 112 /* pass the transport ref to error_handler to release */
113 skb_queue_tail(&trans->error_queue, skb); 113 skb_queue_tail(&trans->error_queue, skb);
114 schedule_work(&trans->error_handler); 114 rxrpc_queue_work(&trans->error_handler);
115 115
116 /* reset and regenerate socket error */ 116 /* reset and regenerate socket error */
117 spin_lock_bh(&sk->sk_error_queue.lock); 117 spin_lock_bh(&sk->sk_error_queue.lock);
@@ -235,7 +235,7 @@ void rxrpc_UDP_error_handler(struct work_struct *work)
235 call->state < RXRPC_CALL_NETWORK_ERROR) { 235 call->state < RXRPC_CALL_NETWORK_ERROR) {
236 call->state = RXRPC_CALL_NETWORK_ERROR; 236 call->state = RXRPC_CALL_NETWORK_ERROR;
237 set_bit(RXRPC_CALL_RCVD_ERROR, &call->events); 237 set_bit(RXRPC_CALL_RCVD_ERROR, &call->events);
238 schedule_work(&call->processor); 238 rxrpc_queue_call(call);
239 } 239 }
240 write_unlock(&call->state_lock); 240 write_unlock(&call->state_lock);
241 list_del_init(&call->error_link); 241 list_del_init(&call->error_link);
@@ -245,7 +245,7 @@ void rxrpc_UDP_error_handler(struct work_struct *work)
245 } 245 }
246 246
247 if (!skb_queue_empty(&trans->error_queue)) 247 if (!skb_queue_empty(&trans->error_queue))
248 schedule_work(&trans->error_handler); 248 rxrpc_queue_work(&trans->error_handler);
249 249
250 rxrpc_free_skb(skb); 250 rxrpc_free_skb(skb);
251 rxrpc_put_transport(trans); 251 rxrpc_put_transport(trans);
diff --git a/net/rxrpc/ar-input.c b/net/rxrpc/ar-input.c
index 323c3454561c..ceb5d619a1d4 100644
--- a/net/rxrpc/ar-input.c
+++ b/net/rxrpc/ar-input.c
@@ -42,6 +42,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
42 bool force, bool terminal) 42 bool force, bool terminal)
43{ 43{
44 struct rxrpc_skb_priv *sp; 44 struct rxrpc_skb_priv *sp;
45 struct rxrpc_sock *rx = call->socket;
45 struct sock *sk; 46 struct sock *sk;
46 int skb_len, ret; 47 int skb_len, ret;
47 48
@@ -64,7 +65,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
64 return 0; 65 return 0;
65 } 66 }
66 67
67 sk = &call->socket->sk; 68 sk = &rx->sk;
68 69
69 if (!force) { 70 if (!force) {
70 /* cast skb->rcvbuf to unsigned... It's pointless, but 71 /* cast skb->rcvbuf to unsigned... It's pointless, but
@@ -89,25 +90,30 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
89 skb->sk = sk; 90 skb->sk = sk;
90 atomic_add(skb->truesize, &sk->sk_rmem_alloc); 91 atomic_add(skb->truesize, &sk->sk_rmem_alloc);
91 92
92 /* Cache the SKB length before we tack it onto the receive
93 * queue. Once it is added it no longer belongs to us and
94 * may be freed by other threads of control pulling packets
95 * from the queue.
96 */
97 skb_len = skb->len;
98
99 _net("post skb %p", skb);
100 __skb_queue_tail(&sk->sk_receive_queue, skb);
101 spin_unlock_bh(&sk->sk_receive_queue.lock);
102
103 if (!sock_flag(sk, SOCK_DEAD))
104 sk->sk_data_ready(sk, skb_len);
105
106 if (terminal) { 93 if (terminal) {
107 _debug("<<<< TERMINAL MESSAGE >>>>"); 94 _debug("<<<< TERMINAL MESSAGE >>>>");
108 set_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags); 95 set_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags);
109 } 96 }
110 97
98 /* allow interception by a kernel service */
99 if (rx->interceptor) {
100 rx->interceptor(sk, call->user_call_ID, skb);
101 spin_unlock_bh(&sk->sk_receive_queue.lock);
102 } else {
103
104 /* Cache the SKB length before we tack it onto the
105 * receive queue. Once it is added it no longer
106 * belongs to us and may be freed by other threads of
107 * control pulling packets from the queue */
108 skb_len = skb->len;
109
110 _net("post skb %p", skb);
111 __skb_queue_tail(&sk->sk_receive_queue, skb);
112 spin_unlock_bh(&sk->sk_receive_queue.lock);
113
114 if (!sock_flag(sk, SOCK_DEAD))
115 sk->sk_data_ready(sk, skb_len);
116 }
111 skb = NULL; 117 skb = NULL;
112 } else { 118 } else {
113 spin_unlock_bh(&sk->sk_receive_queue.lock); 119 spin_unlock_bh(&sk->sk_receive_queue.lock);
@@ -232,7 +238,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
232 read_lock(&call->state_lock); 238 read_lock(&call->state_lock);
233 if (call->state < RXRPC_CALL_COMPLETE && 239 if (call->state < RXRPC_CALL_COMPLETE &&
234 !test_and_set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) 240 !test_and_set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events))
235 schedule_work(&call->processor); 241 rxrpc_queue_call(call);
236 read_unlock(&call->state_lock); 242 read_unlock(&call->state_lock);
237 } 243 }
238 244
@@ -267,7 +273,7 @@ enqueue_packet:
267 atomic_inc(&call->ackr_not_idle); 273 atomic_inc(&call->ackr_not_idle);
268 read_lock(&call->state_lock); 274 read_lock(&call->state_lock);
269 if (call->state < RXRPC_CALL_DEAD) 275 if (call->state < RXRPC_CALL_DEAD)
270 schedule_work(&call->processor); 276 rxrpc_queue_call(call);
271 read_unlock(&call->state_lock); 277 read_unlock(&call->state_lock);
272 _leave(" = 0 [queued]"); 278 _leave(" = 0 [queued]");
273 return 0; 279 return 0;
@@ -360,7 +366,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
360 call->state = RXRPC_CALL_REMOTELY_ABORTED; 366 call->state = RXRPC_CALL_REMOTELY_ABORTED;
361 call->abort_code = abort_code; 367 call->abort_code = abort_code;
362 set_bit(RXRPC_CALL_RCVD_ABORT, &call->events); 368 set_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
363 schedule_work(&call->processor); 369 rxrpc_queue_call(call);
364 } 370 }
365 goto free_packet_unlock; 371 goto free_packet_unlock;
366 372
@@ -375,7 +381,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
375 case RXRPC_CALL_CLIENT_SEND_REQUEST: 381 case RXRPC_CALL_CLIENT_SEND_REQUEST:
376 call->state = RXRPC_CALL_SERVER_BUSY; 382 call->state = RXRPC_CALL_SERVER_BUSY;
377 set_bit(RXRPC_CALL_RCVD_BUSY, &call->events); 383 set_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
378 schedule_work(&call->processor); 384 rxrpc_queue_call(call);
379 case RXRPC_CALL_SERVER_BUSY: 385 case RXRPC_CALL_SERVER_BUSY:
380 goto free_packet_unlock; 386 goto free_packet_unlock;
381 default: 387 default:
@@ -419,7 +425,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
419 read_lock_bh(&call->state_lock); 425 read_lock_bh(&call->state_lock);
420 if (call->state < RXRPC_CALL_DEAD) { 426 if (call->state < RXRPC_CALL_DEAD) {
421 skb_queue_tail(&call->rx_queue, skb); 427 skb_queue_tail(&call->rx_queue, skb);
422 schedule_work(&call->processor); 428 rxrpc_queue_call(call);
423 skb = NULL; 429 skb = NULL;
424 } 430 }
425 read_unlock_bh(&call->state_lock); 431 read_unlock_bh(&call->state_lock);
@@ -434,7 +440,7 @@ protocol_error_locked:
434 call->state = RXRPC_CALL_LOCALLY_ABORTED; 440 call->state = RXRPC_CALL_LOCALLY_ABORTED;
435 call->abort_code = RX_PROTOCOL_ERROR; 441 call->abort_code = RX_PROTOCOL_ERROR;
436 set_bit(RXRPC_CALL_ABORT, &call->events); 442 set_bit(RXRPC_CALL_ABORT, &call->events);
437 schedule_work(&call->processor); 443 rxrpc_queue_call(call);
438 } 444 }
439free_packet_unlock: 445free_packet_unlock:
440 write_unlock_bh(&call->state_lock); 446 write_unlock_bh(&call->state_lock);
@@ -506,7 +512,7 @@ protocol_error:
506 call->state = RXRPC_CALL_LOCALLY_ABORTED; 512 call->state = RXRPC_CALL_LOCALLY_ABORTED;
507 call->abort_code = RX_PROTOCOL_ERROR; 513 call->abort_code = RX_PROTOCOL_ERROR;
508 set_bit(RXRPC_CALL_ABORT, &call->events); 514 set_bit(RXRPC_CALL_ABORT, &call->events);
509 schedule_work(&call->processor); 515 rxrpc_queue_call(call);
510 } 516 }
511 write_unlock_bh(&call->state_lock); 517 write_unlock_bh(&call->state_lock);
512 _leave(""); 518 _leave("");
@@ -542,7 +548,7 @@ static void rxrpc_post_packet_to_call(struct rxrpc_connection *conn,
542 switch (call->state) { 548 switch (call->state) {
543 case RXRPC_CALL_LOCALLY_ABORTED: 549 case RXRPC_CALL_LOCALLY_ABORTED:
544 if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events)) 550 if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
545 schedule_work(&call->processor); 551 rxrpc_queue_call(call);
546 case RXRPC_CALL_REMOTELY_ABORTED: 552 case RXRPC_CALL_REMOTELY_ABORTED:
547 case RXRPC_CALL_NETWORK_ERROR: 553 case RXRPC_CALL_NETWORK_ERROR:
548 case RXRPC_CALL_DEAD: 554 case RXRPC_CALL_DEAD:
@@ -591,7 +597,7 @@ dead_call:
591 sp->hdr.seq == __constant_cpu_to_be32(1)) { 597 sp->hdr.seq == __constant_cpu_to_be32(1)) {
592 _debug("incoming call"); 598 _debug("incoming call");
593 skb_queue_tail(&conn->trans->local->accept_queue, skb); 599 skb_queue_tail(&conn->trans->local->accept_queue, skb);
594 schedule_work(&conn->trans->local->acceptor); 600 rxrpc_queue_work(&conn->trans->local->acceptor);
595 goto done; 601 goto done;
596 } 602 }
597 603
@@ -630,7 +636,7 @@ found_completed_call:
630 _debug("final ack again"); 636 _debug("final ack again");
631 rxrpc_get_call(call); 637 rxrpc_get_call(call);
632 set_bit(RXRPC_CALL_ACK_FINAL, &call->events); 638 set_bit(RXRPC_CALL_ACK_FINAL, &call->events);
633 schedule_work(&call->processor); 639 rxrpc_queue_call(call);
634 640
635free_unlock: 641free_unlock:
636 read_unlock(&call->state_lock); 642 read_unlock(&call->state_lock);
@@ -651,7 +657,7 @@ static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
651 657
652 atomic_inc(&conn->usage); 658 atomic_inc(&conn->usage);
653 skb_queue_tail(&conn->rx_queue, skb); 659 skb_queue_tail(&conn->rx_queue, skb);
654 schedule_work(&conn->processor); 660 rxrpc_queue_conn(conn);
655} 661}
656 662
657/* 663/*
@@ -767,7 +773,7 @@ cant_route_call:
767 if (sp->hdr.seq == __constant_cpu_to_be32(1)) { 773 if (sp->hdr.seq == __constant_cpu_to_be32(1)) {
768 _debug("first packet"); 774 _debug("first packet");
769 skb_queue_tail(&local->accept_queue, skb); 775 skb_queue_tail(&local->accept_queue, skb);
770 schedule_work(&local->acceptor); 776 rxrpc_queue_work(&local->acceptor);
771 rxrpc_put_local(local); 777 rxrpc_put_local(local);
772 _leave(" [incoming]"); 778 _leave(" [incoming]");
773 return; 779 return;
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 7bfbf471c81e..cb1eb492ee48 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -19,8 +19,6 @@
19#define CHECK_SLAB_OKAY(X) do {} while(0) 19#define CHECK_SLAB_OKAY(X) do {} while(0)
20#endif 20#endif
21 21
22extern atomic_t rxrpc_n_skbs;
23
24#define FCRYPT_BSIZE 8 22#define FCRYPT_BSIZE 8
25struct rxrpc_crypt { 23struct rxrpc_crypt {
26 union { 24 union {
@@ -29,8 +27,12 @@ struct rxrpc_crypt {
29 }; 27 };
30} __attribute__((aligned(8))); 28} __attribute__((aligned(8)));
31 29
32extern __be32 rxrpc_epoch; /* local epoch for detecting local-end reset */ 30#define rxrpc_queue_work(WS) queue_work(rxrpc_workqueue, (WS))
33extern atomic_t rxrpc_debug_id; /* current debugging ID */ 31#define rxrpc_queue_delayed_work(WS,D) \
32 queue_delayed_work(rxrpc_workqueue, (WS), (D))
33
34#define rxrpc_queue_call(CALL) rxrpc_queue_work(&(CALL)->processor)
35#define rxrpc_queue_conn(CONN) rxrpc_queue_work(&(CONN)->processor)
34 36
35/* 37/*
36 * sk_state for RxRPC sockets 38 * sk_state for RxRPC sockets
@@ -50,6 +52,7 @@ enum {
50struct rxrpc_sock { 52struct rxrpc_sock {
51 /* WARNING: sk has to be the first member */ 53 /* WARNING: sk has to be the first member */
52 struct sock sk; 54 struct sock sk;
55 rxrpc_interceptor_t interceptor; /* kernel service Rx interceptor function */
53 struct rxrpc_local *local; /* local endpoint */ 56 struct rxrpc_local *local; /* local endpoint */
54 struct rxrpc_transport *trans; /* transport handler */ 57 struct rxrpc_transport *trans; /* transport handler */
55 struct rxrpc_conn_bundle *bundle; /* virtual connection bundle */ 58 struct rxrpc_conn_bundle *bundle; /* virtual connection bundle */
@@ -91,16 +94,6 @@ struct rxrpc_skb_priv {
91 94
92#define rxrpc_skb(__skb) ((struct rxrpc_skb_priv *) &(__skb)->cb) 95#define rxrpc_skb(__skb) ((struct rxrpc_skb_priv *) &(__skb)->cb)
93 96
94enum {
95 RXRPC_SKB_MARK_DATA, /* data message */
96 RXRPC_SKB_MARK_FINAL_ACK, /* final ACK received message */
97 RXRPC_SKB_MARK_BUSY, /* server busy message */
98 RXRPC_SKB_MARK_REMOTE_ABORT, /* remote abort message */
99 RXRPC_SKB_MARK_NET_ERROR, /* network error message */
100 RXRPC_SKB_MARK_LOCAL_ERROR, /* local error message */
101 RXRPC_SKB_MARK_NEW_CALL, /* local error message */
102};
103
104enum rxrpc_command { 97enum rxrpc_command {
105 RXRPC_CMD_SEND_DATA, /* send data message */ 98 RXRPC_CMD_SEND_DATA, /* send data message */
106 RXRPC_CMD_SEND_ABORT, /* request abort generation */ 99 RXRPC_CMD_SEND_ABORT, /* request abort generation */
@@ -439,25 +432,20 @@ static inline void rxrpc_abort_call(struct rxrpc_call *call, u32 abort_code)
439} 432}
440 433
441/* 434/*
442 * put a packet up for transport-level abort 435 * af_rxrpc.c
443 */ 436 */
444static inline 437extern atomic_t rxrpc_n_skbs;
445void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb) 438extern __be32 rxrpc_epoch;
446{ 439extern atomic_t rxrpc_debug_id;
447 CHECK_SLAB_OKAY(&local->usage); 440extern struct workqueue_struct *rxrpc_workqueue;
448 if (!atomic_inc_not_zero(&local->usage)) {
449 printk("resurrected on reject\n");
450 BUG();
451 }
452 skb_queue_tail(&local->reject_queue, skb);
453 schedule_work(&local->rejecter);
454}
455 441
456/* 442/*
457 * ar-accept.c 443 * ar-accept.c
458 */ 444 */
459extern void rxrpc_accept_incoming_calls(struct work_struct *); 445extern void rxrpc_accept_incoming_calls(struct work_struct *);
460extern int rxrpc_accept_call(struct rxrpc_sock *, unsigned long); 446extern struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *,
447 unsigned long);
448extern int rxrpc_reject_call(struct rxrpc_sock *);
461 449
462/* 450/*
463 * ar-ack.c 451 * ar-ack.c
@@ -514,6 +502,7 @@ rxrpc_incoming_connection(struct rxrpc_transport *, struct rxrpc_header *,
514 * ar-connevent.c 502 * ar-connevent.c
515 */ 503 */
516extern void rxrpc_process_connection(struct work_struct *); 504extern void rxrpc_process_connection(struct work_struct *);
505extern void rxrpc_reject_packet(struct rxrpc_local *, struct sk_buff *);
517extern void rxrpc_reject_packets(struct work_struct *); 506extern void rxrpc_reject_packets(struct work_struct *);
518 507
519/* 508/*
@@ -583,6 +572,7 @@ extern struct file_operations rxrpc_connection_seq_fops;
583/* 572/*
584 * ar-recvmsg.c 573 * ar-recvmsg.c
585 */ 574 */
575extern void rxrpc_remove_user_ID(struct rxrpc_sock *, struct rxrpc_call *);
586extern int rxrpc_recvmsg(struct kiocb *, struct socket *, struct msghdr *, 576extern int rxrpc_recvmsg(struct kiocb *, struct socket *, struct msghdr *,
587 size_t, int); 577 size_t, int);
588 578
diff --git a/net/rxrpc/ar-local.c b/net/rxrpc/ar-local.c
index a20a2c0fe105..fe03f71f17da 100644
--- a/net/rxrpc/ar-local.c
+++ b/net/rxrpc/ar-local.c
@@ -228,7 +228,7 @@ void rxrpc_put_local(struct rxrpc_local *local)
228 write_lock_bh(&rxrpc_local_lock); 228 write_lock_bh(&rxrpc_local_lock);
229 if (unlikely(atomic_dec_and_test(&local->usage))) { 229 if (unlikely(atomic_dec_and_test(&local->usage))) {
230 _debug("destroy local"); 230 _debug("destroy local");
231 schedule_work(&local->destroyer); 231 rxrpc_queue_work(&local->destroyer);
232 } 232 }
233 write_unlock_bh(&rxrpc_local_lock); 233 write_unlock_bh(&rxrpc_local_lock);
234 _leave(""); 234 _leave("");
diff --git a/net/rxrpc/ar-output.c b/net/rxrpc/ar-output.c
index 67aa9510f09b..5cdde4a48ed1 100644
--- a/net/rxrpc/ar-output.c
+++ b/net/rxrpc/ar-output.c
@@ -113,7 +113,7 @@ static void rxrpc_send_abort(struct rxrpc_call *call, u32 abort_code)
113 clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events); 113 clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
114 clear_bit(RXRPC_CALL_ACK, &call->events); 114 clear_bit(RXRPC_CALL_ACK, &call->events);
115 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); 115 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
116 schedule_work(&call->processor); 116 rxrpc_queue_call(call);
117 } 117 }
118 118
119 write_unlock_bh(&call->state_lock); 119 write_unlock_bh(&call->state_lock);
@@ -194,6 +194,77 @@ int rxrpc_client_sendmsg(struct kiocb *iocb, struct rxrpc_sock *rx,
194 return ret; 194 return ret;
195} 195}
196 196
197/**
198 * rxrpc_kernel_send_data - Allow a kernel service to send data on a call
199 * @call: The call to send data through
200 * @msg: The data to send
201 * @len: The amount of data to send
202 *
203 * Allow a kernel service to send data on a call. The call must be in an state
204 * appropriate to sending data. No control data should be supplied in @msg,
205 * nor should an address be supplied. MSG_MORE should be flagged if there's
206 * more data to come, otherwise this data will end the transmission phase.
207 */
208int rxrpc_kernel_send_data(struct rxrpc_call *call, struct msghdr *msg,
209 size_t len)
210{
211 int ret;
212
213 _enter("{%d,%s},", call->debug_id, rxrpc_call_states[call->state]);
214
215 ASSERTCMP(msg->msg_name, ==, NULL);
216 ASSERTCMP(msg->msg_control, ==, NULL);
217
218 lock_sock(&call->socket->sk);
219
220 _debug("CALL %d USR %lx ST %d on CONN %p",
221 call->debug_id, call->user_call_ID, call->state, call->conn);
222
223 if (call->state >= RXRPC_CALL_COMPLETE) {
224 ret = -ESHUTDOWN; /* it's too late for this call */
225 } else if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
226 call->state != RXRPC_CALL_SERVER_ACK_REQUEST &&
227 call->state != RXRPC_CALL_SERVER_SEND_REPLY) {
228 ret = -EPROTO; /* request phase complete for this client call */
229 } else {
230 mm_segment_t oldfs = get_fs();
231 set_fs(KERNEL_DS);
232 ret = rxrpc_send_data(NULL, call->socket, call, msg, len);
233 set_fs(oldfs);
234 }
235
236 release_sock(&call->socket->sk);
237 _leave(" = %d", ret);
238 return ret;
239}
240
241EXPORT_SYMBOL(rxrpc_kernel_send_data);
242
243/*
244 * rxrpc_kernel_abort_call - Allow a kernel service to abort a call
245 * @call: The call to be aborted
246 * @abort_code: The abort code to stick into the ABORT packet
247 *
248 * Allow a kernel service to abort a call, if it's still in an abortable state.
249 */
250void rxrpc_kernel_abort_call(struct rxrpc_call *call, u32 abort_code)
251{
252 _enter("{%d},%d", call->debug_id, abort_code);
253
254 lock_sock(&call->socket->sk);
255
256 _debug("CALL %d USR %lx ST %d on CONN %p",
257 call->debug_id, call->user_call_ID, call->state, call->conn);
258
259 if (call->state < RXRPC_CALL_COMPLETE)
260 rxrpc_send_abort(call, abort_code);
261
262 release_sock(&call->socket->sk);
263 _leave("");
264}
265
266EXPORT_SYMBOL(rxrpc_kernel_abort_call);
267
197/* 268/*
198 * send a message through a server socket 269 * send a message through a server socket
199 * - caller holds the socket locked 270 * - caller holds the socket locked
@@ -214,8 +285,13 @@ int rxrpc_server_sendmsg(struct kiocb *iocb, struct rxrpc_sock *rx,
214 if (ret < 0) 285 if (ret < 0)
215 return ret; 286 return ret;
216 287
217 if (cmd == RXRPC_CMD_ACCEPT) 288 if (cmd == RXRPC_CMD_ACCEPT) {
218 return rxrpc_accept_call(rx, user_call_ID); 289 call = rxrpc_accept_call(rx, user_call_ID);
290 if (IS_ERR(call))
291 return PTR_ERR(call);
292 rxrpc_put_call(call);
293 return 0;
294 }
219 295
220 call = rxrpc_find_server_call(rx, user_call_ID); 296 call = rxrpc_find_server_call(rx, user_call_ID);
221 if (!call) 297 if (!call)
@@ -363,7 +439,7 @@ static inline void rxrpc_instant_resend(struct rxrpc_call *call)
363 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); 439 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
364 if (call->state < RXRPC_CALL_COMPLETE && 440 if (call->state < RXRPC_CALL_COMPLETE &&
365 !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events)) 441 !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
366 schedule_work(&call->processor); 442 rxrpc_queue_call(call);
367 } 443 }
368 read_unlock_bh(&call->state_lock); 444 read_unlock_bh(&call->state_lock);
369} 445}
diff --git a/net/rxrpc/ar-peer.c b/net/rxrpc/ar-peer.c
index 69ac355546ae..d399de4a7fe2 100644
--- a/net/rxrpc/ar-peer.c
+++ b/net/rxrpc/ar-peer.c
@@ -219,7 +219,7 @@ void rxrpc_put_peer(struct rxrpc_peer *peer)
219 return; 219 return;
220 } 220 }
221 221
222 schedule_work(&peer->destroyer); 222 rxrpc_queue_work(&peer->destroyer);
223 _leave(""); 223 _leave("");
224} 224}
225 225
diff --git a/net/rxrpc/ar-recvmsg.c b/net/rxrpc/ar-recvmsg.c
index e947d5c15900..f19121d4795b 100644
--- a/net/rxrpc/ar-recvmsg.c
+++ b/net/rxrpc/ar-recvmsg.c
@@ -19,7 +19,7 @@
19 * removal a call's user ID from the socket tree to make the user ID available 19 * removal a call's user ID from the socket tree to make the user ID available
20 * again and so that it won't be seen again in association with that call 20 * again and so that it won't be seen again in association with that call
21 */ 21 */
22static void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call) 22void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
23{ 23{
24 _debug("RELEASE CALL %d", call->debug_id); 24 _debug("RELEASE CALL %d", call->debug_id);
25 25
@@ -33,7 +33,7 @@ static void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
33 read_lock_bh(&call->state_lock); 33 read_lock_bh(&call->state_lock);
34 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 34 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
35 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 35 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
36 schedule_work(&call->processor); 36 rxrpc_queue_call(call);
37 read_unlock_bh(&call->state_lock); 37 read_unlock_bh(&call->state_lock);
38} 38}
39 39
@@ -364,3 +364,74 @@ wait_error:
364 return copied; 364 return copied;
365 365
366} 366}
367
368/**
369 * rxrpc_kernel_data_delivered - Record delivery of data message
370 * @skb: Message holding data
371 *
372 * Record the delivery of a data message. This permits RxRPC to keep its
373 * tracking correct. The socket buffer will be deleted.
374 */
375void rxrpc_kernel_data_delivered(struct sk_buff *skb)
376{
377 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
378 struct rxrpc_call *call = sp->call;
379
380 ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
381 ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
382 call->rx_data_recv = ntohl(sp->hdr.seq);
383
384 ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
385 rxrpc_free_skb(skb);
386}
387
388EXPORT_SYMBOL(rxrpc_kernel_data_delivered);
389
390/**
391 * rxrpc_kernel_is_data_last - Determine if data message is last one
392 * @skb: Message holding data
393 *
394 * Determine if data message is last one for the parent call.
395 */
396bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
397{
398 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
399
400 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);
401
402 return sp->hdr.flags & RXRPC_LAST_PACKET;
403}
404
405EXPORT_SYMBOL(rxrpc_kernel_is_data_last);
406
407/**
408 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
409 * @skb: Message indicating an abort
410 *
411 * Get the abort code from an RxRPC abort message.
412 */
413u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
414{
415 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
416
417 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT);
418
419 return sp->call->abort_code;
420}
421
422EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);
423
424/**
425 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
426 * @skb: Message indicating an error
427 *
428 * Get the error number from an RxRPC error message.
429 */
430int rxrpc_kernel_get_error_number(struct sk_buff *skb)
431{
432 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
433
434 return sp->error;
435}
436
437EXPORT_SYMBOL(rxrpc_kernel_get_error_number);
diff --git a/net/rxrpc/ar-skbuff.c b/net/rxrpc/ar-skbuff.c
index d73f6fc76011..de755e04d29c 100644
--- a/net/rxrpc/ar-skbuff.c
+++ b/net/rxrpc/ar-skbuff.c
@@ -36,7 +36,7 @@ static void rxrpc_request_final_ACK(struct rxrpc_call *call)
36 rxrpc_get_call(call); 36 rxrpc_get_call(call);
37 set_bit(RXRPC_CALL_ACK_FINAL, &call->events); 37 set_bit(RXRPC_CALL_ACK_FINAL, &call->events);
38 if (try_to_del_timer_sync(&call->ack_timer) >= 0) 38 if (try_to_del_timer_sync(&call->ack_timer) >= 0)
39 schedule_work(&call->processor); 39 rxrpc_queue_call(call);
40 break; 40 break;
41 41
42 case RXRPC_CALL_SERVER_RECV_REQUEST: 42 case RXRPC_CALL_SERVER_RECV_REQUEST:
@@ -116,3 +116,17 @@ void rxrpc_packet_destructor(struct sk_buff *skb)
116 sock_rfree(skb); 116 sock_rfree(skb);
117 _leave(""); 117 _leave("");
118} 118}
119
120/**
121 * rxrpc_kernel_free_skb - Free an RxRPC socket buffer
122 * @skb: The socket buffer to be freed
123 *
124 * Let RxRPC free its own socket buffer, permitting it to maintain debug
125 * accounting.
126 */
127void rxrpc_kernel_free_skb(struct sk_buff *skb)
128{
129 rxrpc_free_skb(skb);
130}
131
132EXPORT_SYMBOL(rxrpc_kernel_free_skb);
diff --git a/net/rxrpc/ar-transport.c b/net/rxrpc/ar-transport.c
index 9b4e5cb545d2..d43d78f19302 100644
--- a/net/rxrpc/ar-transport.c
+++ b/net/rxrpc/ar-transport.c
@@ -189,7 +189,7 @@ void rxrpc_put_transport(struct rxrpc_transport *trans)
189 /* let the reaper determine the timeout to avoid a race with 189 /* let the reaper determine the timeout to avoid a race with
190 * overextending the timeout if the reaper is running at the 190 * overextending the timeout if the reaper is running at the
191 * same time */ 191 * same time */
192 schedule_delayed_work(&rxrpc_transport_reap, 0); 192 rxrpc_queue_delayed_work(&rxrpc_transport_reap, 0);
193 _leave(""); 193 _leave("");
194} 194}
195 195
@@ -243,8 +243,8 @@ static void rxrpc_transport_reaper(struct work_struct *work)
243 if (earliest != ULONG_MAX) { 243 if (earliest != ULONG_MAX) {
244 _debug("reschedule reaper %ld", (long) earliest - now); 244 _debug("reschedule reaper %ld", (long) earliest - now);
245 ASSERTCMP(earliest, >, now); 245 ASSERTCMP(earliest, >, now);
246 schedule_delayed_work(&rxrpc_transport_reap, 246 rxrpc_queue_delayed_work(&rxrpc_transport_reap,
247 (earliest - now) * HZ); 247 (earliest - now) * HZ);
248 } 248 }
249 249
250 /* then destroy all those pulled out */ 250 /* then destroy all those pulled out */
@@ -270,7 +270,7 @@ void __exit rxrpc_destroy_all_transports(void)
270 270
271 rxrpc_transport_timeout = 0; 271 rxrpc_transport_timeout = 0;
272 cancel_delayed_work(&rxrpc_transport_reap); 272 cancel_delayed_work(&rxrpc_transport_reap);
273 schedule_delayed_work(&rxrpc_transport_reap, 0); 273 rxrpc_queue_delayed_work(&rxrpc_transport_reap, 0);
274 274
275 _leave(""); 275 _leave("");
276} 276}