aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2016-08-30 15:42:14 -0400
committerDavid S. Miller <davem@davemloft.net>2016-09-01 19:43:27 -0400
commitd001648ec7cf8b21ae9eec8b9ba4a18295adfb14 (patch)
tree830a6ec7dbc683675ba088750caeb5eafb4c8012
parent95ac3994514015823634ef1f7116dce24f26aa97 (diff)
rxrpc: Don't expose skbs to in-kernel users [ver #2]
Don't expose skbs to in-kernel users, such as the AFS filesystem, but instead provide a notification hook the indicates that a call needs attention and another that indicates that there's a new call to be collected. This makes the following possibilities more achievable: (1) Call refcounting can be made simpler if skbs don't hold refs to calls. (2) skbs referring to non-data events will be able to be freed much sooner rather than being queued for AFS to pick up as rxrpc_kernel_recv_data will be able to consult the call state. (3) We can shortcut the receive phase when a call is remotely aborted because we don't have to go through all the packets to get to the one cancelling the operation. (4) It makes it easier to do encryption/decryption directly between AFS's buffers and sk_buffs. (5) Encryption/decryption can more easily be done in the AFS's thread contexts - usually that of the userspace process that issued a syscall - rather than in one of rxrpc's background threads on a workqueue. (6) AFS will be able to wait synchronously on a call inside AF_RXRPC. To make this work, the following interface function has been added: int rxrpc_kernel_recv_data( struct socket *sock, struct rxrpc_call *call, void *buffer, size_t bufsize, size_t *_offset, bool want_more, u32 *_abort_code); This is the recvmsg equivalent. It allows the caller to find out about the state of a specific call and to transfer received data into a buffer piecemeal. afs_extract_data() and rxrpc_kernel_recv_data() now do all the extraction logic between them. They don't wait synchronously yet because the socket lock needs to be dealt with. Five interface functions have been removed: rxrpc_kernel_is_data_last() rxrpc_kernel_get_abort_code() rxrpc_kernel_get_error_number() rxrpc_kernel_free_skb() rxrpc_kernel_data_consumed() As a temporary hack, sk_buffs going to an in-kernel call are queued on the rxrpc_call struct (->knlrecv_queue) rather than being handed over to the in-kernel user. To process the queue internally, a temporary function, temp_deliver_data() has been added. This will be replaced with common code between the rxrpc_recvmsg() path and the kernel_rxrpc_recv_data() path in a future patch. Signed-off-by: David Howells <dhowells@redhat.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--Documentation/networking/rxrpc.txt72
-rw-r--r--fs/afs/cmservice.c142
-rw-r--r--fs/afs/fsclient.c148
-rw-r--r--fs/afs/internal.h33
-rw-r--r--fs/afs/rxrpc.c439
-rw-r--r--fs/afs/vlclient.c7
-rw-r--r--include/net/af_rxrpc.h35
-rw-r--r--net/rxrpc/af_rxrpc.c29
-rw-r--r--net/rxrpc/ar-internal.h23
-rw-r--r--net/rxrpc/call_accept.c13
-rw-r--r--net/rxrpc/call_object.c5
-rw-r--r--net/rxrpc/conn_event.c1
-rw-r--r--net/rxrpc/input.c10
-rw-r--r--net/rxrpc/output.c2
-rw-r--r--net/rxrpc/recvmsg.c191
-rw-r--r--net/rxrpc/skbuff.c1
16 files changed, 565 insertions, 586 deletions
diff --git a/Documentation/networking/rxrpc.txt b/Documentation/networking/rxrpc.txt
index cfc8cb91452f..1b63bbc6b94f 100644
--- a/Documentation/networking/rxrpc.txt
+++ b/Documentation/networking/rxrpc.txt
@@ -748,6 +748,37 @@ The kernel interface functions are as follows:
748 The msg must not specify a destination address, control data or any flags 748 The msg must not specify a destination address, control data or any flags
749 other than MSG_MORE. len is the total amount of data to transmit. 749 other than MSG_MORE. len is the total amount of data to transmit.
750 750
751 (*) Receive data from a call.
752
753 int rxrpc_kernel_recv_data(struct socket *sock,
754 struct rxrpc_call *call,
755 void *buf,
756 size_t size,
757 size_t *_offset,
758 bool want_more,
759 u32 *_abort)
760
761 This is used to receive data from either the reply part of a client call
762 or the request part of a service call. buf and size specify how much
763 data is desired and where to store it. *_offset is added on to buf and
764 subtracted from size internally; the amount copied into the buffer is
765 added to *_offset before returning.
766
767 want_more should be true if further data will be required after this is
768 satisfied and false if this is the last item of the receive phase.
769
770 There are three normal returns: 0 if the buffer was filled and want_more
771 was true; 1 if the buffer was filled, the last DATA packet has been
772 emptied and want_more was false; and -EAGAIN if the function needs to be
773 called again.
774
775 If the last DATA packet is processed but the buffer contains less than
776 the amount requested, EBADMSG is returned. If want_more wasn't set, but
777 more data was available, EMSGSIZE is returned.
778
779 If a remote ABORT is detected, the abort code received will be stored in
780 *_abort and ECONNABORTED will be returned.
781
751 (*) Abort a call. 782 (*) Abort a call.
752 783
753 void rxrpc_kernel_abort_call(struct socket *sock, 784 void rxrpc_kernel_abort_call(struct socket *sock,
@@ -825,47 +856,6 @@ The kernel interface functions are as follows:
825 Other errors may be returned if the call had been aborted (-ECONNABORTED) 856 Other errors may be returned if the call had been aborted (-ECONNABORTED)
826 or had timed out (-ETIME). 857 or had timed out (-ETIME).
827 858
828 (*) Record the delivery of a data message.
829
830 void rxrpc_kernel_data_consumed(struct rxrpc_call *call,
831 struct sk_buff *skb);
832
833 This is used to record a data message as having been consumed and to
834 update the ACK state for the call. The message must still be passed to
835 rxrpc_kernel_free_skb() for disposal by the caller.
836
837 (*) Free a message.
838
839 void rxrpc_kernel_free_skb(struct sk_buff *skb);
840
841 This is used to free a non-DATA socket buffer intercepted from an AF_RXRPC
842 socket.
843
844 (*) Determine if a data message is the last one on a call.
845
846 bool rxrpc_kernel_is_data_last(struct sk_buff *skb);
847
848 This is used to determine if a socket buffer holds the last data message
849 to be received for a call (true will be returned if it does, false
850 if not).
851
852 The data message will be part of the reply on a client call and the
853 request on an incoming call. In the latter case there will be more
854 messages, but in the former case there will not.
855
856 (*) Get the abort code from an abort message.
857
858 u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb);
859
860 This is used to extract the abort code from a remote abort message.
861
862 (*) Get the error number from a local or network error message.
863
864 int rxrpc_kernel_get_error_number(struct sk_buff *skb);
865
866 This is used to extract the error number from a message indicating either
867 a local error occurred or a network error occurred.
868
869 (*) Allocate a null key for doing anonymous security. 859 (*) Allocate a null key for doing anonymous security.
870 860
871 struct key *rxrpc_get_null_key(const char *keyname); 861 struct key *rxrpc_get_null_key(const char *keyname);
diff --git a/fs/afs/cmservice.c b/fs/afs/cmservice.c
index 77ee481059ac..2037e7a77a37 100644
--- a/fs/afs/cmservice.c
+++ b/fs/afs/cmservice.c
@@ -17,15 +17,12 @@
17#include "internal.h" 17#include "internal.h"
18#include "afs_cm.h" 18#include "afs_cm.h"
19 19
20static int afs_deliver_cb_init_call_back_state(struct afs_call *, 20static int afs_deliver_cb_init_call_back_state(struct afs_call *);
21 struct sk_buff *, bool); 21static int afs_deliver_cb_init_call_back_state3(struct afs_call *);
22static int afs_deliver_cb_init_call_back_state3(struct afs_call *, 22static int afs_deliver_cb_probe(struct afs_call *);
23 struct sk_buff *, bool); 23static int afs_deliver_cb_callback(struct afs_call *);
24static int afs_deliver_cb_probe(struct afs_call *, struct sk_buff *, bool); 24static int afs_deliver_cb_probe_uuid(struct afs_call *);
25static int afs_deliver_cb_callback(struct afs_call *, struct sk_buff *, bool); 25static int afs_deliver_cb_tell_me_about_yourself(struct afs_call *);
26static int afs_deliver_cb_probe_uuid(struct afs_call *, struct sk_buff *, bool);
27static int afs_deliver_cb_tell_me_about_yourself(struct afs_call *,
28 struct sk_buff *, bool);
29static void afs_cm_destructor(struct afs_call *); 26static void afs_cm_destructor(struct afs_call *);
30 27
31/* 28/*
@@ -130,7 +127,7 @@ static void afs_cm_destructor(struct afs_call *call)
130 * received. The step number here must match the final number in 127 * received. The step number here must match the final number in
131 * afs_deliver_cb_callback(). 128 * afs_deliver_cb_callback().
132 */ 129 */
133 if (call->unmarshall == 6) { 130 if (call->unmarshall == 5) {
134 ASSERT(call->server && call->count && call->request); 131 ASSERT(call->server && call->count && call->request);
135 afs_break_callbacks(call->server, call->count, call->request); 132 afs_break_callbacks(call->server, call->count, call->request);
136 } 133 }
@@ -164,8 +161,7 @@ static void SRXAFSCB_CallBack(struct work_struct *work)
164/* 161/*
165 * deliver request data to a CB.CallBack call 162 * deliver request data to a CB.CallBack call
166 */ 163 */
167static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb, 164static int afs_deliver_cb_callback(struct afs_call *call)
168 bool last)
169{ 165{
170 struct sockaddr_rxrpc srx; 166 struct sockaddr_rxrpc srx;
171 struct afs_callback *cb; 167 struct afs_callback *cb;
@@ -174,7 +170,7 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
174 u32 tmp; 170 u32 tmp;
175 int ret, loop; 171 int ret, loop;
176 172
177 _enter("{%u},{%u},%d", call->unmarshall, skb->len, last); 173 _enter("{%u}", call->unmarshall);
178 174
179 switch (call->unmarshall) { 175 switch (call->unmarshall) {
180 case 0: 176 case 0:
@@ -185,7 +181,7 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
185 /* extract the FID array and its count in two steps */ 181 /* extract the FID array and its count in two steps */
186 case 1: 182 case 1:
187 _debug("extract FID count"); 183 _debug("extract FID count");
188 ret = afs_extract_data(call, skb, last, &call->tmp, 4); 184 ret = afs_extract_data(call, &call->tmp, 4, true);
189 if (ret < 0) 185 if (ret < 0)
190 return ret; 186 return ret;
191 187
@@ -202,8 +198,8 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
202 198
203 case 2: 199 case 2:
204 _debug("extract FID array"); 200 _debug("extract FID array");
205 ret = afs_extract_data(call, skb, last, call->buffer, 201 ret = afs_extract_data(call, call->buffer,
206 call->count * 3 * 4); 202 call->count * 3 * 4, true);
207 if (ret < 0) 203 if (ret < 0)
208 return ret; 204 return ret;
209 205
@@ -229,7 +225,7 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
229 /* extract the callback array and its count in two steps */ 225 /* extract the callback array and its count in two steps */
230 case 3: 226 case 3:
231 _debug("extract CB count"); 227 _debug("extract CB count");
232 ret = afs_extract_data(call, skb, last, &call->tmp, 4); 228 ret = afs_extract_data(call, &call->tmp, 4, true);
233 if (ret < 0) 229 if (ret < 0)
234 return ret; 230 return ret;
235 231
@@ -239,13 +235,11 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
239 return -EBADMSG; 235 return -EBADMSG;
240 call->offset = 0; 236 call->offset = 0;
241 call->unmarshall++; 237 call->unmarshall++;
242 if (tmp == 0)
243 goto empty_cb_array;
244 238
245 case 4: 239 case 4:
246 _debug("extract CB array"); 240 _debug("extract CB array");
247 ret = afs_extract_data(call, skb, last, call->request, 241 ret = afs_extract_data(call, call->buffer,
248 call->count * 3 * 4); 242 call->count * 3 * 4, false);
249 if (ret < 0) 243 if (ret < 0)
250 return ret; 244 return ret;
251 245
@@ -258,15 +252,9 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
258 cb->type = ntohl(*bp++); 252 cb->type = ntohl(*bp++);
259 } 253 }
260 254
261 empty_cb_array:
262 call->offset = 0; 255 call->offset = 0;
263 call->unmarshall++; 256 call->unmarshall++;
264 257
265 case 5:
266 ret = afs_data_complete(call, skb, last);
267 if (ret < 0)
268 return ret;
269
270 /* Record that the message was unmarshalled successfully so 258 /* Record that the message was unmarshalled successfully so
271 * that the call destructor can know do the callback breaking 259 * that the call destructor can know do the callback breaking
272 * work, even if the final ACK isn't received. 260 * work, even if the final ACK isn't received.
@@ -275,7 +263,7 @@ static int afs_deliver_cb_callback(struct afs_call *call, struct sk_buff *skb,
275 * updated also. 263 * updated also.
276 */ 264 */
277 call->unmarshall++; 265 call->unmarshall++;
278 case 6: 266 case 5:
279 break; 267 break;
280 } 268 }
281 269
@@ -310,19 +298,17 @@ static void SRXAFSCB_InitCallBackState(struct work_struct *work)
310/* 298/*
311 * deliver request data to a CB.InitCallBackState call 299 * deliver request data to a CB.InitCallBackState call
312 */ 300 */
313static int afs_deliver_cb_init_call_back_state(struct afs_call *call, 301static int afs_deliver_cb_init_call_back_state(struct afs_call *call)
314 struct sk_buff *skb,
315 bool last)
316{ 302{
317 struct sockaddr_rxrpc srx; 303 struct sockaddr_rxrpc srx;
318 struct afs_server *server; 304 struct afs_server *server;
319 int ret; 305 int ret;
320 306
321 _enter(",{%u},%d", skb->len, last); 307 _enter("");
322 308
323 rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx); 309 rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx);
324 310
325 ret = afs_data_complete(call, skb, last); 311 ret = afs_extract_data(call, NULL, 0, false);
326 if (ret < 0) 312 if (ret < 0)
327 return ret; 313 return ret;
328 314
@@ -344,21 +330,61 @@ static int afs_deliver_cb_init_call_back_state(struct afs_call *call,
344/* 330/*
345 * deliver request data to a CB.InitCallBackState3 call 331 * deliver request data to a CB.InitCallBackState3 call
346 */ 332 */
347static int afs_deliver_cb_init_call_back_state3(struct afs_call *call, 333static int afs_deliver_cb_init_call_back_state3(struct afs_call *call)
348 struct sk_buff *skb,
349 bool last)
350{ 334{
351 struct sockaddr_rxrpc srx; 335 struct sockaddr_rxrpc srx;
352 struct afs_server *server; 336 struct afs_server *server;
337 struct afs_uuid *r;
338 unsigned loop;
339 __be32 *b;
340 int ret;
353 341
354 _enter(",{%u},%d", skb->len, last); 342 _enter("");
355 343
356 rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx); 344 rxrpc_kernel_get_peer(afs_socket, call->rxcall, &srx);
357 345
358 /* There are some arguments that we ignore */ 346 _enter("{%u}", call->unmarshall);
359 afs_data_consumed(call, skb); 347
360 if (!last) 348 switch (call->unmarshall) {
361 return -EAGAIN; 349 case 0:
350 call->offset = 0;
351 call->buffer = kmalloc(11 * sizeof(__be32), GFP_KERNEL);
352 if (!call->buffer)
353 return -ENOMEM;
354 call->unmarshall++;
355
356 case 1:
357 _debug("extract UUID");
358 ret = afs_extract_data(call, call->buffer,
359 11 * sizeof(__be32), false);
360 switch (ret) {
361 case 0: break;
362 case -EAGAIN: return 0;
363 default: return ret;
364 }
365
366 _debug("unmarshall UUID");
367 call->request = kmalloc(sizeof(struct afs_uuid), GFP_KERNEL);
368 if (!call->request)
369 return -ENOMEM;
370
371 b = call->buffer;
372 r = call->request;
373 r->time_low = ntohl(b[0]);
374 r->time_mid = ntohl(b[1]);
375 r->time_hi_and_version = ntohl(b[2]);
376 r->clock_seq_hi_and_reserved = ntohl(b[3]);
377 r->clock_seq_low = ntohl(b[4]);
378
379 for (loop = 0; loop < 6; loop++)
380 r->node[loop] = ntohl(b[loop + 5]);
381
382 call->offset = 0;
383 call->unmarshall++;
384
385 case 2:
386 break;
387 }
362 388
363 /* no unmarshalling required */ 389 /* no unmarshalling required */
364 call->state = AFS_CALL_REPLYING; 390 call->state = AFS_CALL_REPLYING;
@@ -390,14 +416,13 @@ static void SRXAFSCB_Probe(struct work_struct *work)
390/* 416/*
391 * deliver request data to a CB.Probe call 417 * deliver request data to a CB.Probe call
392 */ 418 */
393static int afs_deliver_cb_probe(struct afs_call *call, struct sk_buff *skb, 419static int afs_deliver_cb_probe(struct afs_call *call)
394 bool last)
395{ 420{
396 int ret; 421 int ret;
397 422
398 _enter(",{%u},%d", skb->len, last); 423 _enter("");
399 424
400 ret = afs_data_complete(call, skb, last); 425 ret = afs_extract_data(call, NULL, 0, false);
401 if (ret < 0) 426 if (ret < 0)
402 return ret; 427 return ret;
403 428
@@ -435,19 +460,14 @@ static void SRXAFSCB_ProbeUuid(struct work_struct *work)
435/* 460/*
436 * deliver request data to a CB.ProbeUuid call 461 * deliver request data to a CB.ProbeUuid call
437 */ 462 */
438static int afs_deliver_cb_probe_uuid(struct afs_call *call, struct sk_buff *skb, 463static int afs_deliver_cb_probe_uuid(struct afs_call *call)
439 bool last)
440{ 464{
441 struct afs_uuid *r; 465 struct afs_uuid *r;
442 unsigned loop; 466 unsigned loop;
443 __be32 *b; 467 __be32 *b;
444 int ret; 468 int ret;
445 469
446 _enter("{%u},{%u},%d", call->unmarshall, skb->len, last); 470 _enter("{%u}", call->unmarshall);
447
448 ret = afs_data_complete(call, skb, last);
449 if (ret < 0)
450 return ret;
451 471
452 switch (call->unmarshall) { 472 switch (call->unmarshall) {
453 case 0: 473 case 0:
@@ -459,8 +479,8 @@ static int afs_deliver_cb_probe_uuid(struct afs_call *call, struct sk_buff *skb,
459 479
460 case 1: 480 case 1:
461 _debug("extract UUID"); 481 _debug("extract UUID");
462 ret = afs_extract_data(call, skb, last, call->buffer, 482 ret = afs_extract_data(call, call->buffer,
463 11 * sizeof(__be32)); 483 11 * sizeof(__be32), false);
464 switch (ret) { 484 switch (ret) {
465 case 0: break; 485 case 0: break;
466 case -EAGAIN: return 0; 486 case -EAGAIN: return 0;
@@ -487,16 +507,9 @@ static int afs_deliver_cb_probe_uuid(struct afs_call *call, struct sk_buff *skb,
487 call->unmarshall++; 507 call->unmarshall++;
488 508
489 case 2: 509 case 2:
490 _debug("trailer");
491 if (skb->len != 0)
492 return -EBADMSG;
493 break; 510 break;
494 } 511 }
495 512
496 ret = afs_data_complete(call, skb, last);
497 if (ret < 0)
498 return ret;
499
500 call->state = AFS_CALL_REPLYING; 513 call->state = AFS_CALL_REPLYING;
501 514
502 INIT_WORK(&call->work, SRXAFSCB_ProbeUuid); 515 INIT_WORK(&call->work, SRXAFSCB_ProbeUuid);
@@ -570,14 +583,13 @@ static void SRXAFSCB_TellMeAboutYourself(struct work_struct *work)
570/* 583/*
571 * deliver request data to a CB.TellMeAboutYourself call 584 * deliver request data to a CB.TellMeAboutYourself call
572 */ 585 */
573static int afs_deliver_cb_tell_me_about_yourself(struct afs_call *call, 586static int afs_deliver_cb_tell_me_about_yourself(struct afs_call *call)
574 struct sk_buff *skb, bool last)
575{ 587{
576 int ret; 588 int ret;
577 589
578 _enter(",{%u},%d", skb->len, last); 590 _enter("");
579 591
580 ret = afs_data_complete(call, skb, last); 592 ret = afs_extract_data(call, NULL, 0, false);
581 if (ret < 0) 593 if (ret < 0)
582 return ret; 594 return ret;
583 595
diff --git a/fs/afs/fsclient.c b/fs/afs/fsclient.c
index 9312b92e54be..96f4d764d1a6 100644
--- a/fs/afs/fsclient.c
+++ b/fs/afs/fsclient.c
@@ -235,16 +235,15 @@ static void xdr_decode_AFSFetchVolumeStatus(const __be32 **_bp,
235/* 235/*
236 * deliver reply data to an FS.FetchStatus 236 * deliver reply data to an FS.FetchStatus
237 */ 237 */
238static int afs_deliver_fs_fetch_status(struct afs_call *call, 238static int afs_deliver_fs_fetch_status(struct afs_call *call)
239 struct sk_buff *skb, bool last)
240{ 239{
241 struct afs_vnode *vnode = call->reply; 240 struct afs_vnode *vnode = call->reply;
242 const __be32 *bp; 241 const __be32 *bp;
243 int ret; 242 int ret;
244 243
245 _enter(",,%u", last); 244 _enter("");
246 245
247 ret = afs_transfer_reply(call, skb, last); 246 ret = afs_transfer_reply(call);
248 if (ret < 0) 247 if (ret < 0)
249 return ret; 248 return ret;
250 249
@@ -307,8 +306,7 @@ int afs_fs_fetch_file_status(struct afs_server *server,
307/* 306/*
308 * deliver reply data to an FS.FetchData 307 * deliver reply data to an FS.FetchData
309 */ 308 */
310static int afs_deliver_fs_fetch_data(struct afs_call *call, 309static int afs_deliver_fs_fetch_data(struct afs_call *call)
311 struct sk_buff *skb, bool last)
312{ 310{
313 struct afs_vnode *vnode = call->reply; 311 struct afs_vnode *vnode = call->reply;
314 const __be32 *bp; 312 const __be32 *bp;
@@ -316,7 +314,7 @@ static int afs_deliver_fs_fetch_data(struct afs_call *call,
316 void *buffer; 314 void *buffer;
317 int ret; 315 int ret;
318 316
319 _enter("{%u},{%u},%d", call->unmarshall, skb->len, last); 317 _enter("{%u}", call->unmarshall);
320 318
321 switch (call->unmarshall) { 319 switch (call->unmarshall) {
322 case 0: 320 case 0:
@@ -332,7 +330,7 @@ static int afs_deliver_fs_fetch_data(struct afs_call *call,
332 * client) */ 330 * client) */
333 case 1: 331 case 1:
334 _debug("extract data length (MSW)"); 332 _debug("extract data length (MSW)");
335 ret = afs_extract_data(call, skb, last, &call->tmp, 4); 333 ret = afs_extract_data(call, &call->tmp, 4, true);
336 if (ret < 0) 334 if (ret < 0)
337 return ret; 335 return ret;
338 336
@@ -347,7 +345,7 @@ static int afs_deliver_fs_fetch_data(struct afs_call *call,
347 /* extract the returned data length */ 345 /* extract the returned data length */
348 case 2: 346 case 2:
349 _debug("extract data length"); 347 _debug("extract data length");
350 ret = afs_extract_data(call, skb, last, &call->tmp, 4); 348 ret = afs_extract_data(call, &call->tmp, 4, true);
351 if (ret < 0) 349 if (ret < 0)
352 return ret; 350 return ret;
353 351
@@ -363,10 +361,10 @@ static int afs_deliver_fs_fetch_data(struct afs_call *call,
363 _debug("extract data"); 361 _debug("extract data");
364 if (call->count > 0) { 362 if (call->count > 0) {
365 page = call->reply3; 363 page = call->reply3;
366 buffer = kmap_atomic(page); 364 buffer = kmap(page);
367 ret = afs_extract_data(call, skb, last, buffer, 365 ret = afs_extract_data(call, buffer,
368 call->count); 366 call->count, true);
369 kunmap_atomic(buffer); 367 kunmap(buffer);
370 if (ret < 0) 368 if (ret < 0)
371 return ret; 369 return ret;
372 } 370 }
@@ -376,8 +374,8 @@ static int afs_deliver_fs_fetch_data(struct afs_call *call,
376 374
377 /* extract the metadata */ 375 /* extract the metadata */
378 case 4: 376 case 4:
379 ret = afs_extract_data(call, skb, last, call->buffer, 377 ret = afs_extract_data(call, call->buffer,
380 (21 + 3 + 6) * 4); 378 (21 + 3 + 6) * 4, false);
381 if (ret < 0) 379 if (ret < 0)
382 return ret; 380 return ret;
383 381
@@ -391,18 +389,15 @@ static int afs_deliver_fs_fetch_data(struct afs_call *call,
391 call->unmarshall++; 389 call->unmarshall++;
392 390
393 case 5: 391 case 5:
394 ret = afs_data_complete(call, skb, last);
395 if (ret < 0)
396 return ret;
397 break; 392 break;
398 } 393 }
399 394
400 if (call->count < PAGE_SIZE) { 395 if (call->count < PAGE_SIZE) {
401 _debug("clear"); 396 _debug("clear");
402 page = call->reply3; 397 page = call->reply3;
403 buffer = kmap_atomic(page); 398 buffer = kmap(page);
404 memset(buffer + call->count, 0, PAGE_SIZE - call->count); 399 memset(buffer + call->count, 0, PAGE_SIZE - call->count);
405 kunmap_atomic(buffer); 400 kunmap(buffer);
406 } 401 }
407 402
408 _leave(" = 0 [done]"); 403 _leave(" = 0 [done]");
@@ -515,13 +510,12 @@ int afs_fs_fetch_data(struct afs_server *server,
515/* 510/*
516 * deliver reply data to an FS.GiveUpCallBacks 511 * deliver reply data to an FS.GiveUpCallBacks
517 */ 512 */
518static int afs_deliver_fs_give_up_callbacks(struct afs_call *call, 513static int afs_deliver_fs_give_up_callbacks(struct afs_call *call)
519 struct sk_buff *skb, bool last)
520{ 514{
521 _enter(",{%u},%d", skb->len, last); 515 _enter("");
522 516
523 /* shouldn't be any reply data */ 517 /* shouldn't be any reply data */
524 return afs_data_complete(call, skb, last); 518 return afs_extract_data(call, NULL, 0, false);
525} 519}
526 520
527/* 521/*
@@ -599,16 +593,15 @@ int afs_fs_give_up_callbacks(struct afs_server *server,
599/* 593/*
600 * deliver reply data to an FS.CreateFile or an FS.MakeDir 594 * deliver reply data to an FS.CreateFile or an FS.MakeDir
601 */ 595 */
602static int afs_deliver_fs_create_vnode(struct afs_call *call, 596static int afs_deliver_fs_create_vnode(struct afs_call *call)
603 struct sk_buff *skb, bool last)
604{ 597{
605 struct afs_vnode *vnode = call->reply; 598 struct afs_vnode *vnode = call->reply;
606 const __be32 *bp; 599 const __be32 *bp;
607 int ret; 600 int ret;
608 601
609 _enter("{%u},{%u},%d", call->unmarshall, skb->len, last); 602 _enter("{%u}", call->unmarshall);
610 603
611 ret = afs_transfer_reply(call, skb, last); 604 ret = afs_transfer_reply(call);
612 if (ret < 0) 605 if (ret < 0)
613 return ret; 606 return ret;
614 607
@@ -696,16 +689,15 @@ int afs_fs_create(struct afs_server *server,
696/* 689/*
697 * deliver reply data to an FS.RemoveFile or FS.RemoveDir 690 * deliver reply data to an FS.RemoveFile or FS.RemoveDir
698 */ 691 */
699static int afs_deliver_fs_remove(struct afs_call *call, 692static int afs_deliver_fs_remove(struct afs_call *call)
700 struct sk_buff *skb, bool last)
701{ 693{
702 struct afs_vnode *vnode = call->reply; 694 struct afs_vnode *vnode = call->reply;
703 const __be32 *bp; 695 const __be32 *bp;
704 int ret; 696 int ret;
705 697
706 _enter("{%u},{%u},%d", call->unmarshall, skb->len, last); 698 _enter("{%u}", call->unmarshall);
707 699
708 ret = afs_transfer_reply(call, skb, last); 700 ret = afs_transfer_reply(call);
709 if (ret < 0) 701 if (ret < 0)
710 return ret; 702 return ret;
711 703
@@ -777,16 +769,15 @@ int afs_fs_remove(struct afs_server *server,
777/* 769/*
778 * deliver reply data to an FS.Link 770 * deliver reply data to an FS.Link
779 */ 771 */
780static int afs_deliver_fs_link(struct afs_call *call, 772static int afs_deliver_fs_link(struct afs_call *call)
781 struct sk_buff *skb, bool last)
782{ 773{
783 struct afs_vnode *dvnode = call->reply, *vnode = call->reply2; 774 struct afs_vnode *dvnode = call->reply, *vnode = call->reply2;
784 const __be32 *bp; 775 const __be32 *bp;
785 int ret; 776 int ret;
786 777
787 _enter("{%u},{%u},%d", call->unmarshall, skb->len, last); 778 _enter("{%u}", call->unmarshall);
788 779
789 ret = afs_transfer_reply(call, skb, last); 780 ret = afs_transfer_reply(call);
790 if (ret < 0) 781 if (ret < 0)
791 return ret; 782 return ret;
792 783
@@ -863,16 +854,15 @@ int afs_fs_link(struct afs_server *server,
863/* 854/*
864 * deliver reply data to an FS.Symlink 855 * deliver reply data to an FS.Symlink
865 */ 856 */
866static int afs_deliver_fs_symlink(struct afs_call *call, 857static int afs_deliver_fs_symlink(struct afs_call *call)
867 struct sk_buff *skb, bool last)
868{ 858{
869 struct afs_vnode *vnode = call->reply; 859 struct afs_vnode *vnode = call->reply;
870 const __be32 *bp; 860 const __be32 *bp;
871 int ret; 861 int ret;
872 862
873 _enter("{%u},{%u},%d", call->unmarshall, skb->len, last); 863 _enter("{%u}", call->unmarshall);
874 864
875 ret = afs_transfer_reply(call, skb, last); 865 ret = afs_transfer_reply(call);
876 if (ret < 0) 866 if (ret < 0)
877 return ret; 867 return ret;
878 868
@@ -968,16 +958,15 @@ int afs_fs_symlink(struct afs_server *server,
968/* 958/*
969 * deliver reply data to an FS.Rename 959 * deliver reply data to an FS.Rename
970 */ 960 */
971static int afs_deliver_fs_rename(struct afs_call *call, 961static int afs_deliver_fs_rename(struct afs_call *call)
972 struct sk_buff *skb, bool last)
973{ 962{
974 struct afs_vnode *orig_dvnode = call->reply, *new_dvnode = call->reply2; 963 struct afs_vnode *orig_dvnode = call->reply, *new_dvnode = call->reply2;
975 const __be32 *bp; 964 const __be32 *bp;
976 int ret; 965 int ret;
977 966
978 _enter("{%u},{%u},%d", call->unmarshall, skb->len, last); 967 _enter("{%u}", call->unmarshall);
979 968
980 ret = afs_transfer_reply(call, skb, last); 969 ret = afs_transfer_reply(call);
981 if (ret < 0) 970 if (ret < 0)
982 return ret; 971 return ret;
983 972
@@ -1072,16 +1061,15 @@ int afs_fs_rename(struct afs_server *server,
1072/* 1061/*
1073 * deliver reply data to an FS.StoreData 1062 * deliver reply data to an FS.StoreData
1074 */ 1063 */
1075static int afs_deliver_fs_store_data(struct afs_call *call, 1064static int afs_deliver_fs_store_data(struct afs_call *call)
1076 struct sk_buff *skb, bool last)
1077{ 1065{
1078 struct afs_vnode *vnode = call->reply; 1066 struct afs_vnode *vnode = call->reply;
1079 const __be32 *bp; 1067 const __be32 *bp;
1080 int ret; 1068 int ret;
1081 1069
1082 _enter(",,%u", last); 1070 _enter("");
1083 1071
1084 ret = afs_transfer_reply(call, skb, last); 1072 ret = afs_transfer_reply(call);
1085 if (ret < 0) 1073 if (ret < 0)
1086 return ret; 1074 return ret;
1087 1075
@@ -1251,17 +1239,16 @@ int afs_fs_store_data(struct afs_server *server, struct afs_writeback *wb,
1251/* 1239/*
1252 * deliver reply data to an FS.StoreStatus 1240 * deliver reply data to an FS.StoreStatus
1253 */ 1241 */
1254static int afs_deliver_fs_store_status(struct afs_call *call, 1242static int afs_deliver_fs_store_status(struct afs_call *call)
1255 struct sk_buff *skb, bool last)
1256{ 1243{
1257 afs_dataversion_t *store_version; 1244 afs_dataversion_t *store_version;
1258 struct afs_vnode *vnode = call->reply; 1245 struct afs_vnode *vnode = call->reply;
1259 const __be32 *bp; 1246 const __be32 *bp;
1260 int ret; 1247 int ret;
1261 1248
1262 _enter(",,%u", last); 1249 _enter("");
1263 1250
1264 ret = afs_transfer_reply(call, skb, last); 1251 ret = afs_transfer_reply(call);
1265 if (ret < 0) 1252 if (ret < 0)
1266 return ret; 1253 return ret;
1267 1254
@@ -1443,14 +1430,13 @@ int afs_fs_setattr(struct afs_server *server, struct key *key,
1443/* 1430/*
1444 * deliver reply data to an FS.GetVolumeStatus 1431 * deliver reply data to an FS.GetVolumeStatus
1445 */ 1432 */
1446static int afs_deliver_fs_get_volume_status(struct afs_call *call, 1433static int afs_deliver_fs_get_volume_status(struct afs_call *call)
1447 struct sk_buff *skb, bool last)
1448{ 1434{
1449 const __be32 *bp; 1435 const __be32 *bp;
1450 char *p; 1436 char *p;
1451 int ret; 1437 int ret;
1452 1438
1453 _enter("{%u},{%u},%d", call->unmarshall, skb->len, last); 1439 _enter("{%u}", call->unmarshall);
1454 1440
1455 switch (call->unmarshall) { 1441 switch (call->unmarshall) {
1456 case 0: 1442 case 0:
@@ -1460,8 +1446,8 @@ static int afs_deliver_fs_get_volume_status(struct afs_call *call,
1460 /* extract the returned status record */ 1446 /* extract the returned status record */
1461 case 1: 1447 case 1:
1462 _debug("extract status"); 1448 _debug("extract status");
1463 ret = afs_extract_data(call, skb, last, call->buffer, 1449 ret = afs_extract_data(call, call->buffer,
1464 12 * 4); 1450 12 * 4, true);
1465 if (ret < 0) 1451 if (ret < 0)
1466 return ret; 1452 return ret;
1467 1453
@@ -1472,7 +1458,7 @@ static int afs_deliver_fs_get_volume_status(struct afs_call *call,
1472 1458
1473 /* extract the volume name length */ 1459 /* extract the volume name length */
1474 case 2: 1460 case 2:
1475 ret = afs_extract_data(call, skb, last, &call->tmp, 4); 1461 ret = afs_extract_data(call, &call->tmp, 4, true);
1476 if (ret < 0) 1462 if (ret < 0)
1477 return ret; 1463 return ret;
1478 1464
@@ -1487,8 +1473,8 @@ static int afs_deliver_fs_get_volume_status(struct afs_call *call,
1487 case 3: 1473 case 3:
1488 _debug("extract volname"); 1474 _debug("extract volname");
1489 if (call->count > 0) { 1475 if (call->count > 0) {
1490 ret = afs_extract_data(call, skb, last, call->reply3, 1476 ret = afs_extract_data(call, call->reply3,
1491 call->count); 1477 call->count, true);
1492 if (ret < 0) 1478 if (ret < 0)
1493 return ret; 1479 return ret;
1494 } 1480 }
@@ -1508,8 +1494,8 @@ static int afs_deliver_fs_get_volume_status(struct afs_call *call,
1508 call->count = 4 - (call->count & 3); 1494 call->count = 4 - (call->count & 3);
1509 1495
1510 case 4: 1496 case 4:
1511 ret = afs_extract_data(call, skb, last, call->buffer, 1497 ret = afs_extract_data(call, call->buffer,
1512 call->count); 1498 call->count, true);
1513 if (ret < 0) 1499 if (ret < 0)
1514 return ret; 1500 return ret;
1515 1501
@@ -1519,7 +1505,7 @@ static int afs_deliver_fs_get_volume_status(struct afs_call *call,
1519 1505
1520 /* extract the offline message length */ 1506 /* extract the offline message length */
1521 case 5: 1507 case 5:
1522 ret = afs_extract_data(call, skb, last, &call->tmp, 4); 1508 ret = afs_extract_data(call, &call->tmp, 4, true);
1523 if (ret < 0) 1509 if (ret < 0)
1524 return ret; 1510 return ret;
1525 1511
@@ -1534,8 +1520,8 @@ static int afs_deliver_fs_get_volume_status(struct afs_call *call,
1534 case 6: 1520 case 6:
1535 _debug("extract offline"); 1521 _debug("extract offline");
1536 if (call->count > 0) { 1522 if (call->count > 0) {
1537 ret = afs_extract_data(call, skb, last, call->reply3, 1523 ret = afs_extract_data(call, call->reply3,
1538 call->count); 1524 call->count, true);
1539 if (ret < 0) 1525 if (ret < 0)
1540 return ret; 1526 return ret;
1541 } 1527 }
@@ -1555,8 +1541,8 @@ static int afs_deliver_fs_get_volume_status(struct afs_call *call,
1555 call->count = 4 - (call->count & 3); 1541 call->count = 4 - (call->count & 3);
1556 1542
1557 case 7: 1543 case 7:
1558 ret = afs_extract_data(call, skb, last, call->buffer, 1544 ret = afs_extract_data(call, call->buffer,
1559 call->count); 1545 call->count, true);
1560 if (ret < 0) 1546 if (ret < 0)
1561 return ret; 1547 return ret;
1562 1548
@@ -1566,7 +1552,7 @@ static int afs_deliver_fs_get_volume_status(struct afs_call *call,
1566 1552
1567 /* extract the message of the day length */ 1553 /* extract the message of the day length */
1568 case 8: 1554 case 8:
1569 ret = afs_extract_data(call, skb, last, &call->tmp, 4); 1555 ret = afs_extract_data(call, &call->tmp, 4, true);
1570 if (ret < 0) 1556 if (ret < 0)
1571 return ret; 1557 return ret;
1572 1558
@@ -1581,8 +1567,8 @@ static int afs_deliver_fs_get_volume_status(struct afs_call *call,
1581 case 9: 1567 case 9:
1582 _debug("extract motd"); 1568 _debug("extract motd");
1583 if (call->count > 0) { 1569 if (call->count > 0) {
1584 ret = afs_extract_data(call, skb, last, call->reply3, 1570 ret = afs_extract_data(call, call->reply3,
1585 call->count); 1571 call->count, true);
1586 if (ret < 0) 1572 if (ret < 0)
1587 return ret; 1573 return ret;
1588 } 1574 }
@@ -1595,26 +1581,17 @@ static int afs_deliver_fs_get_volume_status(struct afs_call *call,
1595 call->unmarshall++; 1581 call->unmarshall++;
1596 1582
1597 /* extract the message of the day padding */ 1583 /* extract the message of the day padding */
1598 if ((call->count & 3) == 0) { 1584 call->count = (4 - (call->count & 3)) & 3;
1599 call->unmarshall++;
1600 goto no_motd_padding;
1601 }
1602 call->count = 4 - (call->count & 3);
1603 1585
1604 case 10: 1586 case 10:
1605 ret = afs_extract_data(call, skb, last, call->buffer, 1587 ret = afs_extract_data(call, call->buffer,
1606 call->count); 1588 call->count, false);
1607 if (ret < 0) 1589 if (ret < 0)
1608 return ret; 1590 return ret;
1609 1591
1610 call->offset = 0; 1592 call->offset = 0;
1611 call->unmarshall++; 1593 call->unmarshall++;
1612 no_motd_padding:
1613
1614 case 11: 1594 case 11:
1615 ret = afs_data_complete(call, skb, last);
1616 if (ret < 0)
1617 return ret;
1618 break; 1595 break;
1619 } 1596 }
1620 1597
@@ -1685,15 +1662,14 @@ int afs_fs_get_volume_status(struct afs_server *server,
1685/* 1662/*
1686 * deliver reply data to an FS.SetLock, FS.ExtendLock or FS.ReleaseLock 1663 * deliver reply data to an FS.SetLock, FS.ExtendLock or FS.ReleaseLock
1687 */ 1664 */
1688static int afs_deliver_fs_xxxx_lock(struct afs_call *call, 1665static int afs_deliver_fs_xxxx_lock(struct afs_call *call)
1689 struct sk_buff *skb, bool last)
1690{ 1666{
1691 const __be32 *bp; 1667 const __be32 *bp;
1692 int ret; 1668 int ret;
1693 1669
1694 _enter("{%u},{%u},%d", call->unmarshall, skb->len, last); 1670 _enter("{%u}", call->unmarshall);
1695 1671
1696 ret = afs_transfer_reply(call, skb, last); 1672 ret = afs_transfer_reply(call);
1697 if (ret < 0) 1673 if (ret < 0)
1698 return ret; 1674 return ret;
1699 1675
diff --git a/fs/afs/internal.h b/fs/afs/internal.h
index d97552de9c59..5497c8496055 100644
--- a/fs/afs/internal.h
+++ b/fs/afs/internal.h
@@ -13,7 +13,6 @@
13#include <linux/kernel.h> 13#include <linux/kernel.h>
14#include <linux/fs.h> 14#include <linux/fs.h>
15#include <linux/pagemap.h> 15#include <linux/pagemap.h>
16#include <linux/skbuff.h>
17#include <linux/rxrpc.h> 16#include <linux/rxrpc.h>
18#include <linux/key.h> 17#include <linux/key.h>
19#include <linux/workqueue.h> 18#include <linux/workqueue.h>
@@ -57,7 +56,7 @@ struct afs_mount_params {
57 */ 56 */
58struct afs_wait_mode { 57struct afs_wait_mode {
59 /* RxRPC received message notification */ 58 /* RxRPC received message notification */
60 void (*rx_wakeup)(struct afs_call *call); 59 rxrpc_notify_rx_t notify_rx;
61 60
62 /* synchronous call waiter and call dispatched notification */ 61 /* synchronous call waiter and call dispatched notification */
63 int (*wait)(struct afs_call *call); 62 int (*wait)(struct afs_call *call);
@@ -76,10 +75,8 @@ struct afs_call {
76 const struct afs_call_type *type; /* type of call */ 75 const struct afs_call_type *type; /* type of call */
77 const struct afs_wait_mode *wait_mode; /* completion wait mode */ 76 const struct afs_wait_mode *wait_mode; /* completion wait mode */
78 wait_queue_head_t waitq; /* processes awaiting completion */ 77 wait_queue_head_t waitq; /* processes awaiting completion */
79 void (*async_workfn)(struct afs_call *call); /* asynchronous work function */
80 struct work_struct async_work; /* asynchronous work processor */ 78 struct work_struct async_work; /* asynchronous work processor */
81 struct work_struct work; /* actual work processor */ 79 struct work_struct work; /* actual work processor */
82 struct sk_buff_head rx_queue; /* received packets */
83 struct rxrpc_call *rxcall; /* RxRPC call handle */ 80 struct rxrpc_call *rxcall; /* RxRPC call handle */
84 struct key *key; /* security for this call */ 81 struct key *key; /* security for this call */
85 struct afs_server *server; /* server affected by incoming CM call */ 82 struct afs_server *server; /* server affected by incoming CM call */
@@ -93,6 +90,7 @@ struct afs_call {
93 void *reply4; /* reply buffer (fourth part) */ 90 void *reply4; /* reply buffer (fourth part) */
94 pgoff_t first; /* first page in mapping to deal with */ 91 pgoff_t first; /* first page in mapping to deal with */
95 pgoff_t last; /* last page in mapping to deal with */ 92 pgoff_t last; /* last page in mapping to deal with */
93 size_t offset; /* offset into received data store */
96 enum { /* call state */ 94 enum { /* call state */
97 AFS_CALL_REQUESTING, /* request is being sent for outgoing call */ 95 AFS_CALL_REQUESTING, /* request is being sent for outgoing call */
98 AFS_CALL_AWAIT_REPLY, /* awaiting reply to outgoing call */ 96 AFS_CALL_AWAIT_REPLY, /* awaiting reply to outgoing call */
@@ -100,21 +98,18 @@ struct afs_call {
100 AFS_CALL_AWAIT_REQUEST, /* awaiting request data on incoming call */ 98 AFS_CALL_AWAIT_REQUEST, /* awaiting request data on incoming call */
101 AFS_CALL_REPLYING, /* replying to incoming call */ 99 AFS_CALL_REPLYING, /* replying to incoming call */
102 AFS_CALL_AWAIT_ACK, /* awaiting final ACK of incoming call */ 100 AFS_CALL_AWAIT_ACK, /* awaiting final ACK of incoming call */
103 AFS_CALL_COMPLETE, /* successfully completed */ 101 AFS_CALL_COMPLETE, /* Completed or failed */
104 AFS_CALL_BUSY, /* server was busy */
105 AFS_CALL_ABORTED, /* call was aborted */
106 AFS_CALL_ERROR, /* call failed due to error */
107 } state; 102 } state;
108 int error; /* error code */ 103 int error; /* error code */
104 u32 abort_code; /* Remote abort ID or 0 */
109 unsigned request_size; /* size of request data */ 105 unsigned request_size; /* size of request data */
110 unsigned reply_max; /* maximum size of reply */ 106 unsigned reply_max; /* maximum size of reply */
111 unsigned reply_size; /* current size of reply */
112 unsigned first_offset; /* offset into mapping[first] */ 107 unsigned first_offset; /* offset into mapping[first] */
113 unsigned last_to; /* amount of mapping[last] */ 108 unsigned last_to; /* amount of mapping[last] */
114 unsigned offset; /* offset into received data store */
115 unsigned char unmarshall; /* unmarshalling phase */ 109 unsigned char unmarshall; /* unmarshalling phase */
116 bool incoming; /* T if incoming call */ 110 bool incoming; /* T if incoming call */
117 bool send_pages; /* T if data from mapping should be sent */ 111 bool send_pages; /* T if data from mapping should be sent */
112 bool need_attention; /* T if RxRPC poked us */
118 u16 service_id; /* RxRPC service ID to call */ 113 u16 service_id; /* RxRPC service ID to call */
119 __be16 port; /* target UDP port */ 114 __be16 port; /* target UDP port */
120 __be32 operation_ID; /* operation ID for an incoming call */ 115 __be32 operation_ID; /* operation ID for an incoming call */
@@ -129,8 +124,7 @@ struct afs_call_type {
129 /* deliver request or reply data to an call 124 /* deliver request or reply data to an call
130 * - returning an error will cause the call to be aborted 125 * - returning an error will cause the call to be aborted
131 */ 126 */
132 int (*deliver)(struct afs_call *call, struct sk_buff *skb, 127 int (*deliver)(struct afs_call *call);
133 bool last);
134 128
135 /* map an abort code to an error number */ 129 /* map an abort code to an error number */
136 int (*abort_to_error)(u32 abort_code); 130 int (*abort_to_error)(u32 abort_code);
@@ -612,27 +606,18 @@ extern struct socket *afs_socket;
612 606
613extern int afs_open_socket(void); 607extern int afs_open_socket(void);
614extern void afs_close_socket(void); 608extern void afs_close_socket(void);
615extern void afs_data_consumed(struct afs_call *, struct sk_buff *);
616extern int afs_make_call(struct in_addr *, struct afs_call *, gfp_t, 609extern int afs_make_call(struct in_addr *, struct afs_call *, gfp_t,
617 const struct afs_wait_mode *); 610 const struct afs_wait_mode *);
618extern struct afs_call *afs_alloc_flat_call(const struct afs_call_type *, 611extern struct afs_call *afs_alloc_flat_call(const struct afs_call_type *,
619 size_t, size_t); 612 size_t, size_t);
620extern void afs_flat_call_destructor(struct afs_call *); 613extern void afs_flat_call_destructor(struct afs_call *);
621extern int afs_transfer_reply(struct afs_call *, struct sk_buff *, bool);
622extern void afs_send_empty_reply(struct afs_call *); 614extern void afs_send_empty_reply(struct afs_call *);
623extern void afs_send_simple_reply(struct afs_call *, const void *, size_t); 615extern void afs_send_simple_reply(struct afs_call *, const void *, size_t);
624extern int afs_extract_data(struct afs_call *, struct sk_buff *, bool, void *, 616extern int afs_extract_data(struct afs_call *, void *, size_t, bool);
625 size_t);
626 617
627static inline int afs_data_complete(struct afs_call *call, struct sk_buff *skb, 618static inline int afs_transfer_reply(struct afs_call *call)
628 bool last)
629{ 619{
630 if (skb->len > 0) 620 return afs_extract_data(call, call->buffer, call->reply_max, false);
631 return -EBADMSG;
632 afs_data_consumed(call, skb);
633 if (!last)
634 return -EAGAIN;
635 return 0;
636} 621}
637 622
638/* 623/*
diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c
index 7b0d18900f50..244896baf241 100644
--- a/fs/afs/rxrpc.c
+++ b/fs/afs/rxrpc.c
@@ -19,31 +19,31 @@
19struct socket *afs_socket; /* my RxRPC socket */ 19struct socket *afs_socket; /* my RxRPC socket */
20static struct workqueue_struct *afs_async_calls; 20static struct workqueue_struct *afs_async_calls;
21static atomic_t afs_outstanding_calls; 21static atomic_t afs_outstanding_calls;
22static atomic_t afs_outstanding_skbs;
23 22
24static void afs_wake_up_call_waiter(struct afs_call *); 23static void afs_free_call(struct afs_call *);
24static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
25static int afs_wait_for_call_to_complete(struct afs_call *); 25static int afs_wait_for_call_to_complete(struct afs_call *);
26static void afs_wake_up_async_call(struct afs_call *); 26static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
27static int afs_dont_wait_for_call_to_complete(struct afs_call *); 27static int afs_dont_wait_for_call_to_complete(struct afs_call *);
28static void afs_process_async_call(struct afs_call *); 28static void afs_process_async_call(struct work_struct *);
29static void afs_rx_interceptor(struct sock *, unsigned long, struct sk_buff *); 29static void afs_rx_new_call(struct sock *);
30static int afs_deliver_cm_op_id(struct afs_call *, struct sk_buff *, bool); 30static int afs_deliver_cm_op_id(struct afs_call *);
31 31
32/* synchronous call management */ 32/* synchronous call management */
33const struct afs_wait_mode afs_sync_call = { 33const struct afs_wait_mode afs_sync_call = {
34 .rx_wakeup = afs_wake_up_call_waiter, 34 .notify_rx = afs_wake_up_call_waiter,
35 .wait = afs_wait_for_call_to_complete, 35 .wait = afs_wait_for_call_to_complete,
36}; 36};
37 37
38/* asynchronous call management */ 38/* asynchronous call management */
39const struct afs_wait_mode afs_async_call = { 39const struct afs_wait_mode afs_async_call = {
40 .rx_wakeup = afs_wake_up_async_call, 40 .notify_rx = afs_wake_up_async_call,
41 .wait = afs_dont_wait_for_call_to_complete, 41 .wait = afs_dont_wait_for_call_to_complete,
42}; 42};
43 43
44/* asynchronous incoming call management */ 44/* asynchronous incoming call management */
45static const struct afs_wait_mode afs_async_incoming_call = { 45static const struct afs_wait_mode afs_async_incoming_call = {
46 .rx_wakeup = afs_wake_up_async_call, 46 .notify_rx = afs_wake_up_async_call,
47}; 47};
48 48
49/* asynchronous incoming call initial processing */ 49/* asynchronous incoming call initial processing */
@@ -55,16 +55,8 @@ static const struct afs_call_type afs_RXCMxxxx = {
55 55
56static void afs_collect_incoming_call(struct work_struct *); 56static void afs_collect_incoming_call(struct work_struct *);
57 57
58static struct sk_buff_head afs_incoming_calls;
59static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call); 58static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call);
60 59
61static void afs_async_workfn(struct work_struct *work)
62{
63 struct afs_call *call = container_of(work, struct afs_call, async_work);
64
65 call->async_workfn(call);
66}
67
68static int afs_wait_atomic_t(atomic_t *p) 60static int afs_wait_atomic_t(atomic_t *p)
69{ 61{
70 schedule(); 62 schedule();
@@ -83,8 +75,6 @@ int afs_open_socket(void)
83 75
84 _enter(""); 76 _enter("");
85 77
86 skb_queue_head_init(&afs_incoming_calls);
87
88 ret = -ENOMEM; 78 ret = -ENOMEM;
89 afs_async_calls = create_singlethread_workqueue("kafsd"); 79 afs_async_calls = create_singlethread_workqueue("kafsd");
90 if (!afs_async_calls) 80 if (!afs_async_calls)
@@ -110,12 +100,12 @@ int afs_open_socket(void)
110 if (ret < 0) 100 if (ret < 0)
111 goto error_2; 101 goto error_2;
112 102
103 rxrpc_kernel_new_call_notification(socket, afs_rx_new_call);
104
113 ret = kernel_listen(socket, INT_MAX); 105 ret = kernel_listen(socket, INT_MAX);
114 if (ret < 0) 106 if (ret < 0)
115 goto error_2; 107 goto error_2;
116 108
117 rxrpc_kernel_intercept_rx_messages(socket, afs_rx_interceptor);
118
119 afs_socket = socket; 109 afs_socket = socket;
120 _leave(" = 0"); 110 _leave(" = 0");
121 return 0; 111 return 0;
@@ -136,52 +126,20 @@ void afs_close_socket(void)
136{ 126{
137 _enter(""); 127 _enter("");
138 128
129 _debug("outstanding %u", atomic_read(&afs_outstanding_calls));
139 wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t, 130 wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
140 TASK_UNINTERRUPTIBLE); 131 TASK_UNINTERRUPTIBLE);
141 _debug("no outstanding calls"); 132 _debug("no outstanding calls");
142 133
134 flush_workqueue(afs_async_calls);
143 sock_release(afs_socket); 135 sock_release(afs_socket);
144 136
145 _debug("dework"); 137 _debug("dework");
146 destroy_workqueue(afs_async_calls); 138 destroy_workqueue(afs_async_calls);
147
148 ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0);
149 _leave(""); 139 _leave("");
150} 140}
151 141
152/* 142/*
153 * Note that the data in a socket buffer is now consumed.
154 */
155void afs_data_consumed(struct afs_call *call, struct sk_buff *skb)
156{
157 if (!skb) {
158 _debug("DLVR NULL [%d]", atomic_read(&afs_outstanding_skbs));
159 dump_stack();
160 } else {
161 _debug("DLVR %p{%u} [%d]",
162 skb, skb->mark, atomic_read(&afs_outstanding_skbs));
163 rxrpc_kernel_data_consumed(call->rxcall, skb);
164 }
165}
166
167/*
168 * free a socket buffer
169 */
170static void afs_free_skb(struct sk_buff *skb)
171{
172 if (!skb) {
173 _debug("FREE NULL [%d]", atomic_read(&afs_outstanding_skbs));
174 dump_stack();
175 } else {
176 _debug("FREE %p{%u} [%d]",
177 skb, skb->mark, atomic_read(&afs_outstanding_skbs));
178 if (atomic_dec_return(&afs_outstanding_skbs) == -1)
179 BUG();
180 rxrpc_kernel_free_skb(skb);
181 }
182}
183
184/*
185 * free a call 143 * free a call
186 */ 144 */
187static void afs_free_call(struct afs_call *call) 145static void afs_free_call(struct afs_call *call)
@@ -191,7 +149,6 @@ static void afs_free_call(struct afs_call *call)
191 149
192 ASSERTCMP(call->rxcall, ==, NULL); 150 ASSERTCMP(call->rxcall, ==, NULL);
193 ASSERT(!work_pending(&call->async_work)); 151 ASSERT(!work_pending(&call->async_work));
194 ASSERT(skb_queue_empty(&call->rx_queue));
195 ASSERT(call->type->name != NULL); 152 ASSERT(call->type->name != NULL);
196 153
197 kfree(call->request); 154 kfree(call->request);
@@ -227,7 +184,7 @@ static void afs_end_call(struct afs_call *call)
227 * allocate a call with flat request and reply buffers 184 * allocate a call with flat request and reply buffers
228 */ 185 */
229struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type, 186struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
230 size_t request_size, size_t reply_size) 187 size_t request_size, size_t reply_max)
231{ 188{
232 struct afs_call *call; 189 struct afs_call *call;
233 190
@@ -241,7 +198,7 @@ struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
241 198
242 call->type = type; 199 call->type = type;
243 call->request_size = request_size; 200 call->request_size = request_size;
244 call->reply_max = reply_size; 201 call->reply_max = reply_max;
245 202
246 if (request_size) { 203 if (request_size) {
247 call->request = kmalloc(request_size, GFP_NOFS); 204 call->request = kmalloc(request_size, GFP_NOFS);
@@ -249,14 +206,13 @@ struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
249 goto nomem_free; 206 goto nomem_free;
250 } 207 }
251 208
252 if (reply_size) { 209 if (reply_max) {
253 call->buffer = kmalloc(reply_size, GFP_NOFS); 210 call->buffer = kmalloc(reply_max, GFP_NOFS);
254 if (!call->buffer) 211 if (!call->buffer)
255 goto nomem_free; 212 goto nomem_free;
256 } 213 }
257 214
258 init_waitqueue_head(&call->waitq); 215 init_waitqueue_head(&call->waitq);
259 skb_queue_head_init(&call->rx_queue);
260 return call; 216 return call;
261 217
262nomem_free: 218nomem_free:
@@ -354,7 +310,6 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
354 struct msghdr msg; 310 struct msghdr msg;
355 struct kvec iov[1]; 311 struct kvec iov[1];
356 int ret; 312 int ret;
357 struct sk_buff *skb;
358 313
359 _enter("%x,{%d},", addr->s_addr, ntohs(call->port)); 314 _enter("%x,{%d},", addr->s_addr, ntohs(call->port));
360 315
@@ -366,8 +321,7 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
366 atomic_read(&afs_outstanding_calls)); 321 atomic_read(&afs_outstanding_calls));
367 322
368 call->wait_mode = wait_mode; 323 call->wait_mode = wait_mode;
369 call->async_workfn = afs_process_async_call; 324 INIT_WORK(&call->async_work, afs_process_async_call);
370 INIT_WORK(&call->async_work, afs_async_workfn);
371 325
372 memset(&srx, 0, sizeof(srx)); 326 memset(&srx, 0, sizeof(srx));
373 srx.srx_family = AF_RXRPC; 327 srx.srx_family = AF_RXRPC;
@@ -380,7 +334,8 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
380 334
381 /* create a call */ 335 /* create a call */
382 rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key, 336 rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key,
383 (unsigned long) call, gfp); 337 (unsigned long) call, gfp,
338 wait_mode->notify_rx);
384 call->key = NULL; 339 call->key = NULL;
385 if (IS_ERR(rxcall)) { 340 if (IS_ERR(rxcall)) {
386 ret = PTR_ERR(rxcall); 341 ret = PTR_ERR(rxcall);
@@ -423,8 +378,6 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
423 378
424error_do_abort: 379error_do_abort:
425 rxrpc_kernel_abort_call(afs_socket, rxcall, RX_USER_ABORT); 380 rxrpc_kernel_abort_call(afs_socket, rxcall, RX_USER_ABORT);
426 while ((skb = skb_dequeue(&call->rx_queue)))
427 afs_free_skb(skb);
428error_kill_call: 381error_kill_call:
429 afs_end_call(call); 382 afs_end_call(call);
430 _leave(" = %d", ret); 383 _leave(" = %d", ret);
@@ -432,141 +385,77 @@ error_kill_call:
432} 385}
433 386
434/* 387/*
435 * Handles intercepted messages that were arriving in the socket's Rx queue.
436 *
437 * Called from the AF_RXRPC call processor in waitqueue process context. For
438 * each call, it is guaranteed this will be called in order of packet to be
439 * delivered.
440 */
441static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID,
442 struct sk_buff *skb)
443{
444 struct afs_call *call = (struct afs_call *) user_call_ID;
445
446 _enter("%p,,%u", call, skb->mark);
447
448 _debug("ICPT %p{%u} [%d]",
449 skb, skb->mark, atomic_read(&afs_outstanding_skbs));
450
451 ASSERTCMP(sk, ==, afs_socket->sk);
452 atomic_inc(&afs_outstanding_skbs);
453
454 if (!call) {
455 /* its an incoming call for our callback service */
456 skb_queue_tail(&afs_incoming_calls, skb);
457 queue_work(afs_wq, &afs_collect_incoming_call_work);
458 } else {
459 /* route the messages directly to the appropriate call */
460 skb_queue_tail(&call->rx_queue, skb);
461 call->wait_mode->rx_wakeup(call);
462 }
463
464 _leave("");
465}
466
467/*
468 * deliver messages to a call 388 * deliver messages to a call
469 */ 389 */
470static void afs_deliver_to_call(struct afs_call *call) 390static void afs_deliver_to_call(struct afs_call *call)
471{ 391{
472 struct sk_buff *skb;
473 bool last;
474 u32 abort_code; 392 u32 abort_code;
475 int ret; 393 int ret;
476 394
477 _enter(""); 395 _enter("%s", call->type->name);
478 396
479 while ((call->state == AFS_CALL_AWAIT_REPLY || 397 while (call->state == AFS_CALL_AWAIT_REPLY ||
480 call->state == AFS_CALL_AWAIT_OP_ID || 398 call->state == AFS_CALL_AWAIT_OP_ID ||
481 call->state == AFS_CALL_AWAIT_REQUEST || 399 call->state == AFS_CALL_AWAIT_REQUEST ||
482 call->state == AFS_CALL_AWAIT_ACK) && 400 call->state == AFS_CALL_AWAIT_ACK
483 (skb = skb_dequeue(&call->rx_queue))) { 401 ) {
484 switch (skb->mark) { 402 if (call->state == AFS_CALL_AWAIT_ACK) {
485 case RXRPC_SKB_MARK_DATA: 403 size_t offset = 0;
486 _debug("Rcv DATA"); 404 ret = rxrpc_kernel_recv_data(afs_socket, call->rxcall,
487 last = rxrpc_kernel_is_data_last(skb); 405 NULL, 0, &offset, false,
488 ret = call->type->deliver(call, skb, last); 406 &call->abort_code);
489 switch (ret) { 407 if (ret == -EINPROGRESS || ret == -EAGAIN)
490 case -EAGAIN: 408 return;
491 if (last) { 409 if (ret == 1) {
492 _debug("short data"); 410 call->state = AFS_CALL_COMPLETE;
493 goto unmarshal_error; 411 goto done;
494 }
495 break;
496 case 0:
497 ASSERT(last);
498 if (call->state == AFS_CALL_AWAIT_REPLY)
499 call->state = AFS_CALL_COMPLETE;
500 break;
501 case -ENOTCONN:
502 abort_code = RX_CALL_DEAD;
503 goto do_abort;
504 case -ENOTSUPP:
505 abort_code = RX_INVALID_OPERATION;
506 goto do_abort;
507 default:
508 unmarshal_error:
509 abort_code = RXGEN_CC_UNMARSHAL;
510 if (call->state != AFS_CALL_AWAIT_REPLY)
511 abort_code = RXGEN_SS_UNMARSHAL;
512 do_abort:
513 rxrpc_kernel_abort_call(afs_socket,
514 call->rxcall,
515 abort_code);
516 call->error = ret;
517 call->state = AFS_CALL_ERROR;
518 break;
519 } 412 }
520 break; 413 return;
521 case RXRPC_SKB_MARK_FINAL_ACK:
522 _debug("Rcv ACK");
523 call->state = AFS_CALL_COMPLETE;
524 break;
525 case RXRPC_SKB_MARK_BUSY:
526 _debug("Rcv BUSY");
527 call->error = -EBUSY;
528 call->state = AFS_CALL_BUSY;
529 break;
530 case RXRPC_SKB_MARK_REMOTE_ABORT:
531 abort_code = rxrpc_kernel_get_abort_code(skb);
532 call->error = call->type->abort_to_error(abort_code);
533 call->state = AFS_CALL_ABORTED;
534 _debug("Rcv ABORT %u -> %d", abort_code, call->error);
535 break;
536 case RXRPC_SKB_MARK_LOCAL_ABORT:
537 abort_code = rxrpc_kernel_get_abort_code(skb);
538 call->error = call->type->abort_to_error(abort_code);
539 call->state = AFS_CALL_ABORTED;
540 _debug("Loc ABORT %u -> %d", abort_code, call->error);
541 break;
542 case RXRPC_SKB_MARK_NET_ERROR:
543 call->error = -rxrpc_kernel_get_error_number(skb);
544 call->state = AFS_CALL_ERROR;
545 _debug("Rcv NET ERROR %d", call->error);
546 break;
547 case RXRPC_SKB_MARK_LOCAL_ERROR:
548 call->error = -rxrpc_kernel_get_error_number(skb);
549 call->state = AFS_CALL_ERROR;
550 _debug("Rcv LOCAL ERROR %d", call->error);
551 break;
552 default:
553 BUG();
554 break;
555 } 414 }
556 415
557 afs_free_skb(skb); 416 ret = call->type->deliver(call);
558 } 417 switch (ret) {
559 418 case 0:
560 /* make sure the queue is empty if the call is done with (we might have 419 if (call->state == AFS_CALL_AWAIT_REPLY)
561 * aborted the call early because of an unmarshalling error) */ 420 call->state = AFS_CALL_COMPLETE;
562 if (call->state >= AFS_CALL_COMPLETE) { 421 goto done;
563 while ((skb = skb_dequeue(&call->rx_queue))) 422 case -EINPROGRESS:
564 afs_free_skb(skb); 423 case -EAGAIN:
565 if (call->incoming) 424 goto out;
566 afs_end_call(call); 425 case -ENOTCONN:
426 abort_code = RX_CALL_DEAD;
427 rxrpc_kernel_abort_call(afs_socket, call->rxcall,
428 abort_code);
429 goto do_abort;
430 case -ENOTSUPP:
431 abort_code = RX_INVALID_OPERATION;
432 rxrpc_kernel_abort_call(afs_socket, call->rxcall,
433 abort_code);
434 goto do_abort;
435 case -ENODATA:
436 case -EBADMSG:
437 case -EMSGSIZE:
438 default:
439 abort_code = RXGEN_CC_UNMARSHAL;
440 if (call->state != AFS_CALL_AWAIT_REPLY)
441 abort_code = RXGEN_SS_UNMARSHAL;
442 rxrpc_kernel_abort_call(afs_socket, call->rxcall,
443 abort_code);
444 goto do_abort;
445 }
567 } 446 }
568 447
448done:
449 if (call->state == AFS_CALL_COMPLETE && call->incoming)
450 afs_end_call(call);
451out:
569 _leave(""); 452 _leave("");
453 return;
454
455do_abort:
456 call->error = ret;
457 call->state = AFS_CALL_COMPLETE;
458 goto done;
570} 459}
571 460
572/* 461/*
@@ -574,7 +463,6 @@ static void afs_deliver_to_call(struct afs_call *call)
574 */ 463 */
575static int afs_wait_for_call_to_complete(struct afs_call *call) 464static int afs_wait_for_call_to_complete(struct afs_call *call)
576{ 465{
577 struct sk_buff *skb;
578 int ret; 466 int ret;
579 467
580 DECLARE_WAITQUEUE(myself, current); 468 DECLARE_WAITQUEUE(myself, current);
@@ -586,14 +474,15 @@ static int afs_wait_for_call_to_complete(struct afs_call *call)
586 set_current_state(TASK_INTERRUPTIBLE); 474 set_current_state(TASK_INTERRUPTIBLE);
587 475
588 /* deliver any messages that are in the queue */ 476 /* deliver any messages that are in the queue */
589 if (!skb_queue_empty(&call->rx_queue)) { 477 if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
478 call->need_attention = false;
590 __set_current_state(TASK_RUNNING); 479 __set_current_state(TASK_RUNNING);
591 afs_deliver_to_call(call); 480 afs_deliver_to_call(call);
592 continue; 481 continue;
593 } 482 }
594 483
595 ret = call->error; 484 ret = call->error;
596 if (call->state >= AFS_CALL_COMPLETE) 485 if (call->state == AFS_CALL_COMPLETE)
597 break; 486 break;
598 ret = -EINTR; 487 ret = -EINTR;
599 if (signal_pending(current)) 488 if (signal_pending(current))
@@ -607,9 +496,8 @@ static int afs_wait_for_call_to_complete(struct afs_call *call)
607 /* kill the call */ 496 /* kill the call */
608 if (call->state < AFS_CALL_COMPLETE) { 497 if (call->state < AFS_CALL_COMPLETE) {
609 _debug("call incomplete"); 498 _debug("call incomplete");
610 rxrpc_kernel_abort_call(afs_socket, call->rxcall, RX_CALL_DEAD); 499 rxrpc_kernel_abort_call(afs_socket, call->rxcall,
611 while ((skb = skb_dequeue(&call->rx_queue))) 500 RX_CALL_DEAD);
612 afs_free_skb(skb);
613 } 501 }
614 502
615 _debug("call complete"); 503 _debug("call complete");
@@ -621,17 +509,24 @@ static int afs_wait_for_call_to_complete(struct afs_call *call)
621/* 509/*
622 * wake up a waiting call 510 * wake up a waiting call
623 */ 511 */
624static void afs_wake_up_call_waiter(struct afs_call *call) 512static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
513 unsigned long call_user_ID)
625{ 514{
515 struct afs_call *call = (struct afs_call *)call_user_ID;
516
517 call->need_attention = true;
626 wake_up(&call->waitq); 518 wake_up(&call->waitq);
627} 519}
628 520
629/* 521/*
630 * wake up an asynchronous call 522 * wake up an asynchronous call
631 */ 523 */
632static void afs_wake_up_async_call(struct afs_call *call) 524static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
525 unsigned long call_user_ID)
633{ 526{
634 _enter(""); 527 struct afs_call *call = (struct afs_call *)call_user_ID;
528
529 call->need_attention = true;
635 queue_work(afs_async_calls, &call->async_work); 530 queue_work(afs_async_calls, &call->async_work);
636} 531}
637 532
@@ -649,8 +544,10 @@ static int afs_dont_wait_for_call_to_complete(struct afs_call *call)
649/* 544/*
650 * delete an asynchronous call 545 * delete an asynchronous call
651 */ 546 */
652static void afs_delete_async_call(struct afs_call *call) 547static void afs_delete_async_call(struct work_struct *work)
653{ 548{
549 struct afs_call *call = container_of(work, struct afs_call, async_work);
550
654 _enter(""); 551 _enter("");
655 552
656 afs_free_call(call); 553 afs_free_call(call);
@@ -660,17 +557,19 @@ static void afs_delete_async_call(struct afs_call *call)
660 557
661/* 558/*
662 * perform processing on an asynchronous call 559 * perform processing on an asynchronous call
663 * - on a multiple-thread workqueue this work item may try to run on several
664 * CPUs at the same time
665 */ 560 */
666static void afs_process_async_call(struct afs_call *call) 561static void afs_process_async_call(struct work_struct *work)
667{ 562{
563 struct afs_call *call = container_of(work, struct afs_call, async_work);
564
668 _enter(""); 565 _enter("");
669 566
670 if (!skb_queue_empty(&call->rx_queue)) 567 if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
568 call->need_attention = false;
671 afs_deliver_to_call(call); 569 afs_deliver_to_call(call);
570 }
672 571
673 if (call->state >= AFS_CALL_COMPLETE && call->wait_mode) { 572 if (call->state == AFS_CALL_COMPLETE && call->wait_mode) {
674 if (call->wait_mode->async_complete) 573 if (call->wait_mode->async_complete)
675 call->wait_mode->async_complete(call->reply, 574 call->wait_mode->async_complete(call->reply,
676 call->error); 575 call->error);
@@ -681,7 +580,7 @@ static void afs_process_async_call(struct afs_call *call)
681 580
682 /* we can't just delete the call because the work item may be 581 /* we can't just delete the call because the work item may be
683 * queued */ 582 * queued */
684 call->async_workfn = afs_delete_async_call; 583 call->async_work.func = afs_delete_async_call;
685 queue_work(afs_async_calls, &call->async_work); 584 queue_work(afs_async_calls, &call->async_work);
686 } 585 }
687 586
@@ -689,52 +588,16 @@ static void afs_process_async_call(struct afs_call *call)
689} 588}
690 589
691/* 590/*
692 * Empty a socket buffer into a flat reply buffer.
693 */
694int afs_transfer_reply(struct afs_call *call, struct sk_buff *skb, bool last)
695{
696 size_t len = skb->len;
697
698 if (len > call->reply_max - call->reply_size) {
699 _leave(" = -EBADMSG [%zu > %u]",
700 len, call->reply_max - call->reply_size);
701 return -EBADMSG;
702 }
703
704 if (len > 0) {
705 if (skb_copy_bits(skb, 0, call->buffer + call->reply_size,
706 len) < 0)
707 BUG();
708 call->reply_size += len;
709 }
710
711 afs_data_consumed(call, skb);
712 if (!last)
713 return -EAGAIN;
714
715 if (call->reply_size != call->reply_max) {
716 _leave(" = -EBADMSG [%u != %u]",
717 call->reply_size, call->reply_max);
718 return -EBADMSG;
719 }
720 return 0;
721}
722
723/*
724 * accept the backlog of incoming calls 591 * accept the backlog of incoming calls
725 */ 592 */
726static void afs_collect_incoming_call(struct work_struct *work) 593static void afs_collect_incoming_call(struct work_struct *work)
727{ 594{
728 struct rxrpc_call *rxcall; 595 struct rxrpc_call *rxcall;
729 struct afs_call *call = NULL; 596 struct afs_call *call = NULL;
730 struct sk_buff *skb;
731
732 while ((skb = skb_dequeue(&afs_incoming_calls))) {
733 _debug("new call");
734 597
735 /* don't need the notification */ 598 _enter("");
736 afs_free_skb(skb);
737 599
600 do {
738 if (!call) { 601 if (!call) {
739 call = kzalloc(sizeof(struct afs_call), GFP_KERNEL); 602 call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
740 if (!call) { 603 if (!call) {
@@ -742,12 +605,10 @@ static void afs_collect_incoming_call(struct work_struct *work)
742 return; 605 return;
743 } 606 }
744 607
745 call->async_workfn = afs_process_async_call; 608 INIT_WORK(&call->async_work, afs_process_async_call);
746 INIT_WORK(&call->async_work, afs_async_workfn);
747 call->wait_mode = &afs_async_incoming_call; 609 call->wait_mode = &afs_async_incoming_call;
748 call->type = &afs_RXCMxxxx; 610 call->type = &afs_RXCMxxxx;
749 init_waitqueue_head(&call->waitq); 611 init_waitqueue_head(&call->waitq);
750 skb_queue_head_init(&call->rx_queue);
751 call->state = AFS_CALL_AWAIT_OP_ID; 612 call->state = AFS_CALL_AWAIT_OP_ID;
752 613
753 _debug("CALL %p{%s} [%d]", 614 _debug("CALL %p{%s} [%d]",
@@ -757,46 +618,47 @@ static void afs_collect_incoming_call(struct work_struct *work)
757 } 618 }
758 619
759 rxcall = rxrpc_kernel_accept_call(afs_socket, 620 rxcall = rxrpc_kernel_accept_call(afs_socket,
760 (unsigned long) call); 621 (unsigned long)call,
622 afs_wake_up_async_call);
761 if (!IS_ERR(rxcall)) { 623 if (!IS_ERR(rxcall)) {
762 call->rxcall = rxcall; 624 call->rxcall = rxcall;
625 call->need_attention = true;
626 queue_work(afs_async_calls, &call->async_work);
763 call = NULL; 627 call = NULL;
764 } 628 }
765 } 629 } while (!call);
766 630
767 if (call) 631 if (call)
768 afs_free_call(call); 632 afs_free_call(call);
769} 633}
770 634
771/* 635/*
636 * Notification of an incoming call.
637 */
638static void afs_rx_new_call(struct sock *sk)
639{
640 queue_work(afs_wq, &afs_collect_incoming_call_work);
641}
642
643/*
772 * Grab the operation ID from an incoming cache manager call. The socket 644 * Grab the operation ID from an incoming cache manager call. The socket
773 * buffer is discarded on error or if we don't yet have sufficient data. 645 * buffer is discarded on error or if we don't yet have sufficient data.
774 */ 646 */
775static int afs_deliver_cm_op_id(struct afs_call *call, struct sk_buff *skb, 647static int afs_deliver_cm_op_id(struct afs_call *call)
776 bool last)
777{ 648{
778 size_t len = skb->len; 649 int ret;
779 void *oibuf = (void *) &call->operation_ID;
780 650
781 _enter("{%u},{%zu},%d", call->offset, len, last); 651 _enter("{%zu}", call->offset);
782 652
783 ASSERTCMP(call->offset, <, 4); 653 ASSERTCMP(call->offset, <, 4);
784 654
785 /* the operation ID forms the first four bytes of the request data */ 655 /* the operation ID forms the first four bytes of the request data */
786 len = min_t(size_t, len, 4 - call->offset); 656 ret = afs_extract_data(call, &call->operation_ID, 4, true);
787 if (skb_copy_bits(skb, 0, oibuf + call->offset, len) < 0) 657 if (ret < 0)
788 BUG(); 658 return ret;
789 if (!pskb_pull(skb, len))
790 BUG();
791 call->offset += len;
792
793 if (call->offset < 4) {
794 afs_data_consumed(call, skb);
795 _leave(" = -EAGAIN");
796 return -EAGAIN;
797 }
798 659
799 call->state = AFS_CALL_AWAIT_REQUEST; 660 call->state = AFS_CALL_AWAIT_REQUEST;
661 call->offset = 0;
800 662
801 /* ask the cache manager to route the call (it'll change the call type 663 /* ask the cache manager to route the call (it'll change the call type
802 * if successful) */ 664 * if successful) */
@@ -805,7 +667,7 @@ static int afs_deliver_cm_op_id(struct afs_call *call, struct sk_buff *skb,
805 667
806 /* pass responsibility for the remainer of this message off to the 668 /* pass responsibility for the remainer of this message off to the
807 * cache manager op */ 669 * cache manager op */
808 return call->type->deliver(call, skb, last); 670 return call->type->deliver(call);
809} 671}
810 672
811/* 673/*
@@ -881,25 +743,40 @@ void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
881/* 743/*
882 * Extract a piece of data from the received data socket buffers. 744 * Extract a piece of data from the received data socket buffers.
883 */ 745 */
884int afs_extract_data(struct afs_call *call, struct sk_buff *skb, 746int afs_extract_data(struct afs_call *call, void *buf, size_t count,
885 bool last, void *buf, size_t count) 747 bool want_more)
886{ 748{
887 size_t len = skb->len; 749 int ret;
888 750
889 _enter("{%u},{%zu},%d,,%zu", call->offset, len, last, count); 751 _enter("{%s,%zu},,%zu,%d",
752 call->type->name, call->offset, count, want_more);
890 753
891 ASSERTCMP(call->offset, <, count); 754 ASSERTCMP(call->offset, <=, count);
892 755
893 len = min_t(size_t, len, count - call->offset); 756 ret = rxrpc_kernel_recv_data(afs_socket, call->rxcall,
894 if (skb_copy_bits(skb, 0, buf + call->offset, len) < 0 || 757 buf, count, &call->offset,
895 !pskb_pull(skb, len)) 758 want_more, &call->abort_code);
896 BUG(); 759 if (ret == 0 || ret == -EAGAIN)
897 call->offset += len; 760 return ret;
898 761
899 if (call->offset < count) { 762 if (ret == 1) {
900 afs_data_consumed(call, skb); 763 switch (call->state) {
901 _leave(" = -EAGAIN"); 764 case AFS_CALL_AWAIT_REPLY:
902 return -EAGAIN; 765 call->state = AFS_CALL_COMPLETE;
766 break;
767 case AFS_CALL_AWAIT_REQUEST:
768 call->state = AFS_CALL_REPLYING;
769 break;
770 default:
771 break;
772 }
773 return 0;
903 } 774 }
904 return 0; 775
776 if (ret == -ECONNABORTED)
777 call->error = call->type->abort_to_error(call->abort_code);
778 else
779 call->error = ret;
780 call->state = AFS_CALL_COMPLETE;
781 return ret;
905} 782}
diff --git a/fs/afs/vlclient.c b/fs/afs/vlclient.c
index f94d1abdc3eb..94bcd97d22b8 100644
--- a/fs/afs/vlclient.c
+++ b/fs/afs/vlclient.c
@@ -58,17 +58,16 @@ static int afs_vl_abort_to_error(u32 abort_code)
58/* 58/*
59 * deliver reply data to a VL.GetEntryByXXX call 59 * deliver reply data to a VL.GetEntryByXXX call
60 */ 60 */
61static int afs_deliver_vl_get_entry_by_xxx(struct afs_call *call, 61static int afs_deliver_vl_get_entry_by_xxx(struct afs_call *call)
62 struct sk_buff *skb, bool last)
63{ 62{
64 struct afs_cache_vlocation *entry; 63 struct afs_cache_vlocation *entry;
65 __be32 *bp; 64 __be32 *bp;
66 u32 tmp; 65 u32 tmp;
67 int loop, ret; 66 int loop, ret;
68 67
69 _enter(",,%u", last); 68 _enter("");
70 69
71 ret = afs_transfer_reply(call, skb, last); 70 ret = afs_transfer_reply(call);
72 if (ret < 0) 71 if (ret < 0)
73 return ret; 72 return ret;
74 73
diff --git a/include/net/af_rxrpc.h b/include/net/af_rxrpc.h
index f8d8079dc058..b4b6a3664dda 100644
--- a/include/net/af_rxrpc.h
+++ b/include/net/af_rxrpc.h
@@ -12,7 +12,6 @@
12#ifndef _NET_RXRPC_H 12#ifndef _NET_RXRPC_H
13#define _NET_RXRPC_H 13#define _NET_RXRPC_H
14 14
15#include <linux/skbuff.h>
16#include <linux/rxrpc.h> 15#include <linux/rxrpc.h>
17 16
18struct key; 17struct key;
@@ -20,38 +19,26 @@ struct sock;
20struct socket; 19struct socket;
21struct rxrpc_call; 20struct rxrpc_call;
22 21
23/* 22typedef void (*rxrpc_notify_rx_t)(struct sock *, struct rxrpc_call *,
24 * the mark applied to socket buffers that may be intercepted 23 unsigned long);
25 */ 24typedef void (*rxrpc_notify_new_call_t)(struct sock *);
26enum rxrpc_skb_mark {
27 RXRPC_SKB_MARK_DATA, /* data message */
28 RXRPC_SKB_MARK_FINAL_ACK, /* final ACK received message */
29 RXRPC_SKB_MARK_BUSY, /* server busy message */
30 RXRPC_SKB_MARK_REMOTE_ABORT, /* remote abort message */
31 RXRPC_SKB_MARK_LOCAL_ABORT, /* local abort message */
32 RXRPC_SKB_MARK_NET_ERROR, /* network error message */
33 RXRPC_SKB_MARK_LOCAL_ERROR, /* local error message */
34 RXRPC_SKB_MARK_NEW_CALL, /* local error message */
35};
36 25
37typedef void (*rxrpc_interceptor_t)(struct sock *, unsigned long, 26void rxrpc_kernel_new_call_notification(struct socket *,
38 struct sk_buff *); 27 rxrpc_notify_new_call_t);
39void rxrpc_kernel_intercept_rx_messages(struct socket *, rxrpc_interceptor_t);
40struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *, 28struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
41 struct sockaddr_rxrpc *, 29 struct sockaddr_rxrpc *,
42 struct key *, 30 struct key *,
43 unsigned long, 31 unsigned long,
44 gfp_t); 32 gfp_t,
33 rxrpc_notify_rx_t);
45int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *, 34int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *,
46 struct msghdr *, size_t); 35 struct msghdr *, size_t);
47void rxrpc_kernel_data_consumed(struct rxrpc_call *, struct sk_buff *); 36int rxrpc_kernel_recv_data(struct socket *, struct rxrpc_call *,
37 void *, size_t, size_t *, bool, u32 *);
48void rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *, u32); 38void rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *, u32);
49void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *); 39void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *);
50bool rxrpc_kernel_is_data_last(struct sk_buff *); 40struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *, unsigned long,
51u32 rxrpc_kernel_get_abort_code(struct sk_buff *); 41 rxrpc_notify_rx_t);
52int rxrpc_kernel_get_error_number(struct sk_buff *);
53void rxrpc_kernel_free_skb(struct sk_buff *);
54struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *, unsigned long);
55int rxrpc_kernel_reject_call(struct socket *); 42int rxrpc_kernel_reject_call(struct socket *);
56void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *, 43void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
57 struct sockaddr_rxrpc *); 44 struct sockaddr_rxrpc *);
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index e07c91acd904..32d544995dda 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -231,6 +231,8 @@ static int rxrpc_listen(struct socket *sock, int backlog)
231 * @srx: The address of the peer to contact 231 * @srx: The address of the peer to contact
232 * @key: The security context to use (defaults to socket setting) 232 * @key: The security context to use (defaults to socket setting)
233 * @user_call_ID: The ID to use 233 * @user_call_ID: The ID to use
234 * @gfp: The allocation constraints
235 * @notify_rx: Where to send notifications instead of socket queue
234 * 236 *
235 * Allow a kernel service to begin a call on the nominated socket. This just 237 * Allow a kernel service to begin a call on the nominated socket. This just
236 * sets up all the internal tracking structures and allocates connection and 238 * sets up all the internal tracking structures and allocates connection and
@@ -243,7 +245,8 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
243 struct sockaddr_rxrpc *srx, 245 struct sockaddr_rxrpc *srx,
244 struct key *key, 246 struct key *key,
245 unsigned long user_call_ID, 247 unsigned long user_call_ID,
246 gfp_t gfp) 248 gfp_t gfp,
249 rxrpc_notify_rx_t notify_rx)
247{ 250{
248 struct rxrpc_conn_parameters cp; 251 struct rxrpc_conn_parameters cp;
249 struct rxrpc_call *call; 252 struct rxrpc_call *call;
@@ -270,6 +273,8 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
270 cp.exclusive = false; 273 cp.exclusive = false;
271 cp.service_id = srx->srx_service; 274 cp.service_id = srx->srx_service;
272 call = rxrpc_new_client_call(rx, &cp, srx, user_call_ID, gfp); 275 call = rxrpc_new_client_call(rx, &cp, srx, user_call_ID, gfp);
276 if (!IS_ERR(call))
277 call->notify_rx = notify_rx;
273 278
274 release_sock(&rx->sk); 279 release_sock(&rx->sk);
275 _leave(" = %p", call); 280 _leave(" = %p", call);
@@ -289,31 +294,27 @@ void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call)
289{ 294{
290 _enter("%d{%d}", call->debug_id, atomic_read(&call->usage)); 295 _enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
291 rxrpc_remove_user_ID(rxrpc_sk(sock->sk), call); 296 rxrpc_remove_user_ID(rxrpc_sk(sock->sk), call);
297 rxrpc_purge_queue(&call->knlrecv_queue);
292 rxrpc_put_call(call); 298 rxrpc_put_call(call);
293} 299}
294EXPORT_SYMBOL(rxrpc_kernel_end_call); 300EXPORT_SYMBOL(rxrpc_kernel_end_call);
295 301
296/** 302/**
297 * rxrpc_kernel_intercept_rx_messages - Intercept received RxRPC messages 303 * rxrpc_kernel_new_call_notification - Get notifications of new calls
298 * @sock: The socket to intercept received messages on 304 * @sock: The socket to intercept received messages on
299 * @interceptor: The function to pass the messages to 305 * @notify_new_call: Function to be called when new calls appear
300 * 306 *
301 * Allow a kernel service to intercept messages heading for the Rx queue on an 307 * Allow a kernel service to be given notifications about new calls.
302 * RxRPC socket. They get passed to the specified function instead.
303 * @interceptor should free the socket buffers it is given. @interceptor is
304 * called with the socket receive queue spinlock held and softirqs disabled -
305 * this ensures that the messages will be delivered in the right order.
306 */ 308 */
307void rxrpc_kernel_intercept_rx_messages(struct socket *sock, 309void rxrpc_kernel_new_call_notification(
308 rxrpc_interceptor_t interceptor) 310 struct socket *sock,
311 rxrpc_notify_new_call_t notify_new_call)
309{ 312{
310 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 313 struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
311 314
312 _enter(""); 315 rx->notify_new_call = notify_new_call;
313 rx->interceptor = interceptor;
314} 316}
315 317EXPORT_SYMBOL(rxrpc_kernel_new_call_notification);
316EXPORT_SYMBOL(rxrpc_kernel_intercept_rx_messages);
317 318
318/* 319/*
319 * connect an RxRPC socket 320 * connect an RxRPC socket
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 0c320b2b7b43..4e86d248dc5e 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -40,6 +40,20 @@ struct rxrpc_crypt {
40struct rxrpc_connection; 40struct rxrpc_connection;
41 41
42/* 42/*
43 * Mark applied to socket buffers.
44 */
45enum rxrpc_skb_mark {
46 RXRPC_SKB_MARK_DATA, /* data message */
47 RXRPC_SKB_MARK_FINAL_ACK, /* final ACK received message */
48 RXRPC_SKB_MARK_BUSY, /* server busy message */
49 RXRPC_SKB_MARK_REMOTE_ABORT, /* remote abort message */
50 RXRPC_SKB_MARK_LOCAL_ABORT, /* local abort message */
51 RXRPC_SKB_MARK_NET_ERROR, /* network error message */
52 RXRPC_SKB_MARK_LOCAL_ERROR, /* local error message */
53 RXRPC_SKB_MARK_NEW_CALL, /* local error message */
54};
55
56/*
43 * sk_state for RxRPC sockets 57 * sk_state for RxRPC sockets
44 */ 58 */
45enum { 59enum {
@@ -57,7 +71,7 @@ enum {
57struct rxrpc_sock { 71struct rxrpc_sock {
58 /* WARNING: sk has to be the first member */ 72 /* WARNING: sk has to be the first member */
59 struct sock sk; 73 struct sock sk;
60 rxrpc_interceptor_t interceptor; /* kernel service Rx interceptor function */ 74 rxrpc_notify_new_call_t notify_new_call; /* Func to notify of new call */
61 struct rxrpc_local *local; /* local endpoint */ 75 struct rxrpc_local *local; /* local endpoint */
62 struct list_head listen_link; /* link in the local endpoint's listen list */ 76 struct list_head listen_link; /* link in the local endpoint's listen list */
63 struct list_head secureq; /* calls awaiting connection security clearance */ 77 struct list_head secureq; /* calls awaiting connection security clearance */
@@ -367,6 +381,7 @@ enum rxrpc_call_flag {
367 RXRPC_CALL_EXPECT_OOS, /* expect out of sequence packets */ 381 RXRPC_CALL_EXPECT_OOS, /* expect out of sequence packets */
368 RXRPC_CALL_IS_SERVICE, /* Call is service call */ 382 RXRPC_CALL_IS_SERVICE, /* Call is service call */
369 RXRPC_CALL_EXPOSED, /* The call was exposed to the world */ 383 RXRPC_CALL_EXPOSED, /* The call was exposed to the world */
384 RXRPC_CALL_RX_NO_MORE, /* Don't indicate MSG_MORE from recvmsg() */
370}; 385};
371 386
372/* 387/*
@@ -441,6 +456,7 @@ struct rxrpc_call {
441 struct timer_list resend_timer; /* Tx resend timer */ 456 struct timer_list resend_timer; /* Tx resend timer */
442 struct work_struct destroyer; /* call destroyer */ 457 struct work_struct destroyer; /* call destroyer */
443 struct work_struct processor; /* packet processor and ACK generator */ 458 struct work_struct processor; /* packet processor and ACK generator */
459 rxrpc_notify_rx_t notify_rx; /* kernel service Rx notification function */
444 struct list_head link; /* link in master call list */ 460 struct list_head link; /* link in master call list */
445 struct list_head chan_wait_link; /* Link in conn->waiting_calls */ 461 struct list_head chan_wait_link; /* Link in conn->waiting_calls */
446 struct hlist_node error_link; /* link in error distribution list */ 462 struct hlist_node error_link; /* link in error distribution list */
@@ -448,6 +464,7 @@ struct rxrpc_call {
448 struct rb_node sock_node; /* node in socket call tree */ 464 struct rb_node sock_node; /* node in socket call tree */
449 struct sk_buff_head rx_queue; /* received packets */ 465 struct sk_buff_head rx_queue; /* received packets */
450 struct sk_buff_head rx_oos_queue; /* packets received out of sequence */ 466 struct sk_buff_head rx_oos_queue; /* packets received out of sequence */
467 struct sk_buff_head knlrecv_queue; /* Queue for kernel_recv [TODO: replace this] */
451 struct sk_buff *tx_pending; /* Tx socket buffer being filled */ 468 struct sk_buff *tx_pending; /* Tx socket buffer being filled */
452 wait_queue_head_t waitq; /* Wait queue for channel or Tx */ 469 wait_queue_head_t waitq; /* Wait queue for channel or Tx */
453 __be32 crypto_buf[2]; /* Temporary packet crypto buffer */ 470 __be32 crypto_buf[2]; /* Temporary packet crypto buffer */
@@ -512,7 +529,8 @@ extern struct workqueue_struct *rxrpc_workqueue;
512 * call_accept.c 529 * call_accept.c
513 */ 530 */
514void rxrpc_accept_incoming_calls(struct rxrpc_local *); 531void rxrpc_accept_incoming_calls(struct rxrpc_local *);
515struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long); 532struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long,
533 rxrpc_notify_rx_t);
516int rxrpc_reject_call(struct rxrpc_sock *); 534int rxrpc_reject_call(struct rxrpc_sock *);
517 535
518/* 536/*
@@ -874,6 +892,7 @@ int rxrpc_init_server_conn_security(struct rxrpc_connection *);
874/* 892/*
875 * skbuff.c 893 * skbuff.c
876 */ 894 */
895void rxrpc_kernel_data_consumed(struct rxrpc_call *, struct sk_buff *);
877void rxrpc_packet_destructor(struct sk_buff *); 896void rxrpc_packet_destructor(struct sk_buff *);
878void rxrpc_new_skb(struct sk_buff *); 897void rxrpc_new_skb(struct sk_buff *);
879void rxrpc_see_skb(struct sk_buff *); 898void rxrpc_see_skb(struct sk_buff *);
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 03af88fe798b..68a439e30df1 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -286,7 +286,8 @@ security_mismatch:
286 * - assign the user call ID to the call at the front of the queue 286 * - assign the user call ID to the call at the front of the queue
287 */ 287 */
288struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx, 288struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
289 unsigned long user_call_ID) 289 unsigned long user_call_ID,
290 rxrpc_notify_rx_t notify_rx)
290{ 291{
291 struct rxrpc_call *call; 292 struct rxrpc_call *call;
292 struct rb_node *parent, **pp; 293 struct rb_node *parent, **pp;
@@ -340,6 +341,7 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
340 } 341 }
341 342
342 /* formalise the acceptance */ 343 /* formalise the acceptance */
344 call->notify_rx = notify_rx;
343 call->user_call_ID = user_call_ID; 345 call->user_call_ID = user_call_ID;
344 rb_link_node(&call->sock_node, parent, pp); 346 rb_link_node(&call->sock_node, parent, pp);
345 rb_insert_color(&call->sock_node, &rx->calls); 347 rb_insert_color(&call->sock_node, &rx->calls);
@@ -437,17 +439,20 @@ out:
437 * rxrpc_kernel_accept_call - Allow a kernel service to accept an incoming call 439 * rxrpc_kernel_accept_call - Allow a kernel service to accept an incoming call
438 * @sock: The socket on which the impending call is waiting 440 * @sock: The socket on which the impending call is waiting
439 * @user_call_ID: The tag to attach to the call 441 * @user_call_ID: The tag to attach to the call
442 * @notify_rx: Where to send notifications instead of socket queue
440 * 443 *
441 * Allow a kernel service to accept an incoming call, assuming the incoming 444 * Allow a kernel service to accept an incoming call, assuming the incoming
442 * call is still valid. 445 * call is still valid. The caller should immediately trigger their own
446 * notification as there must be data waiting.
443 */ 447 */
444struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *sock, 448struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *sock,
445 unsigned long user_call_ID) 449 unsigned long user_call_ID,
450 rxrpc_notify_rx_t notify_rx)
446{ 451{
447 struct rxrpc_call *call; 452 struct rxrpc_call *call;
448 453
449 _enter(",%lx", user_call_ID); 454 _enter(",%lx", user_call_ID);
450 call = rxrpc_accept_call(rxrpc_sk(sock->sk), user_call_ID); 455 call = rxrpc_accept_call(rxrpc_sk(sock->sk), user_call_ID, notify_rx);
451 _leave(" = %p", call); 456 _leave(" = %p", call);
452 return call; 457 return call;
453} 458}
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 104ee8b1de06..516d8ea82f02 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -136,6 +136,7 @@ static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
136 INIT_LIST_HEAD(&call->accept_link); 136 INIT_LIST_HEAD(&call->accept_link);
137 skb_queue_head_init(&call->rx_queue); 137 skb_queue_head_init(&call->rx_queue);
138 skb_queue_head_init(&call->rx_oos_queue); 138 skb_queue_head_init(&call->rx_oos_queue);
139 skb_queue_head_init(&call->knlrecv_queue);
139 init_waitqueue_head(&call->waitq); 140 init_waitqueue_head(&call->waitq);
140 spin_lock_init(&call->lock); 141 spin_lock_init(&call->lock);
141 rwlock_init(&call->state_lock); 142 rwlock_init(&call->state_lock);
@@ -552,8 +553,6 @@ void rxrpc_release_call(struct rxrpc_call *call)
552 spin_lock_bh(&call->lock); 553 spin_lock_bh(&call->lock);
553 } 554 }
554 spin_unlock_bh(&call->lock); 555 spin_unlock_bh(&call->lock);
555
556 ASSERTCMP(call->state, !=, RXRPC_CALL_COMPLETE);
557 } 556 }
558 557
559 del_timer_sync(&call->resend_timer); 558 del_timer_sync(&call->resend_timer);
@@ -682,6 +681,7 @@ static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
682 struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu); 681 struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
683 682
684 rxrpc_purge_queue(&call->rx_queue); 683 rxrpc_purge_queue(&call->rx_queue);
684 rxrpc_purge_queue(&call->knlrecv_queue);
685 rxrpc_put_peer(call->peer); 685 rxrpc_put_peer(call->peer);
686 kmem_cache_free(rxrpc_call_jar, call); 686 kmem_cache_free(rxrpc_call_jar, call);
687} 687}
@@ -737,6 +737,7 @@ static void rxrpc_cleanup_call(struct rxrpc_call *call)
737 737
738 rxrpc_purge_queue(&call->rx_queue); 738 rxrpc_purge_queue(&call->rx_queue);
739 ASSERT(skb_queue_empty(&call->rx_oos_queue)); 739 ASSERT(skb_queue_empty(&call->rx_oos_queue));
740 rxrpc_purge_queue(&call->knlrecv_queue);
740 sock_put(&call->socket->sk); 741 sock_put(&call->socket->sk);
741 call_rcu(&call->rcu, rxrpc_rcu_destroy_call); 742 call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
742} 743}
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index bc9b05938ff5..9db90f4f768d 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -282,7 +282,6 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
282 case RXRPC_PACKET_TYPE_DATA: 282 case RXRPC_PACKET_TYPE_DATA:
283 case RXRPC_PACKET_TYPE_ACK: 283 case RXRPC_PACKET_TYPE_ACK:
284 rxrpc_conn_retransmit_call(conn, skb); 284 rxrpc_conn_retransmit_call(conn, skb);
285 rxrpc_free_skb(skb);
286 return 0; 285 return 0;
287 286
288 case RXRPC_PACKET_TYPE_ABORT: 287 case RXRPC_PACKET_TYPE_ABORT:
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 86bea9ad6c3d..72f016cfaaf5 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -90,9 +90,15 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
90 } 90 }
91 91
92 /* allow interception by a kernel service */ 92 /* allow interception by a kernel service */
93 if (rx->interceptor) { 93 if (skb->mark == RXRPC_SKB_MARK_NEW_CALL &&
94 rx->interceptor(sk, call->user_call_ID, skb); 94 rx->notify_new_call) {
95 spin_unlock_bh(&sk->sk_receive_queue.lock); 95 spin_unlock_bh(&sk->sk_receive_queue.lock);
96 skb_queue_tail(&call->knlrecv_queue, skb);
97 rx->notify_new_call(&rx->sk);
98 } else if (call->notify_rx) {
99 spin_unlock_bh(&sk->sk_receive_queue.lock);
100 skb_queue_tail(&call->knlrecv_queue, skb);
101 call->notify_rx(&rx->sk, call, call->user_call_ID);
96 } else { 102 } else {
97 _net("post skb %p", skb); 103 _net("post skb %p", skb);
98 __skb_queue_tail(&sk->sk_receive_queue, skb); 104 __skb_queue_tail(&sk->sk_receive_queue, skb);
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index b1e708a12151..817ae801e769 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -190,7 +190,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
190 if (cmd == RXRPC_CMD_ACCEPT) { 190 if (cmd == RXRPC_CMD_ACCEPT) {
191 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) 191 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
192 return -EINVAL; 192 return -EINVAL;
193 call = rxrpc_accept_call(rx, user_call_ID); 193 call = rxrpc_accept_call(rx, user_call_ID, NULL);
194 if (IS_ERR(call)) 194 if (IS_ERR(call))
195 return PTR_ERR(call); 195 return PTR_ERR(call);
196 rxrpc_put_call(call); 196 rxrpc_put_call(call);
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index c9b38c7fb448..0ab7b334bab1 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -369,55 +369,178 @@ wait_error:
369 369
370} 370}
371 371
372/** 372/*
373 * rxrpc_kernel_is_data_last - Determine if data message is last one 373 * Deliver messages to a call. This keeps processing packets until the buffer
374 * @skb: Message holding data 374 * is filled and we find either more DATA (returns 0) or the end of the DATA
375 * (returns 1). If more packets are required, it returns -EAGAIN.
375 * 376 *
376 * Determine if data message is last one for the parent call. 377 * TODO: Note that this is hacked in at the moment and will be replaced.
377 */ 378 */
378bool rxrpc_kernel_is_data_last(struct sk_buff *skb) 379static int temp_deliver_data(struct socket *sock, struct rxrpc_call *call,
380 struct iov_iter *iter, size_t size,
381 size_t *_offset)
379{ 382{
380 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 383 struct rxrpc_skb_priv *sp;
384 struct sk_buff *skb;
385 size_t remain;
386 int ret, copy;
387
388 _enter("%d", call->debug_id);
389
390next:
391 local_bh_disable();
392 skb = skb_dequeue(&call->knlrecv_queue);
393 local_bh_enable();
394 if (!skb) {
395 if (test_bit(RXRPC_CALL_RX_NO_MORE, &call->flags))
396 return 1;
397 _leave(" = -EAGAIN [empty]");
398 return -EAGAIN;
399 }
381 400
382 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA); 401 sp = rxrpc_skb(skb);
402 _debug("dequeued %p %u/%zu", skb, sp->offset, size);
383 403
384 return sp->hdr.flags & RXRPC_LAST_PACKET; 404 switch (skb->mark) {
385} 405 case RXRPC_SKB_MARK_DATA:
406 remain = size - *_offset;
407 if (remain > 0) {
408 copy = skb->len - sp->offset;
409 if (copy > remain)
410 copy = remain;
411 ret = skb_copy_datagram_iter(skb, sp->offset, iter,
412 copy);
413 if (ret < 0)
414 goto requeue_and_leave;
386 415
387EXPORT_SYMBOL(rxrpc_kernel_is_data_last); 416 /* handle piecemeal consumption of data packets */
417 sp->offset += copy;
418 *_offset += copy;
419 }
388 420
389/** 421 if (sp->offset < skb->len)
390 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message 422 goto partially_used_skb;
391 * @skb: Message indicating an abort 423
392 * 424 /* We consumed the whole packet */
393 * Get the abort code from an RxRPC abort message. 425 ASSERTCMP(sp->offset, ==, skb->len);
394 */ 426 if (sp->hdr.flags & RXRPC_LAST_PACKET)
395u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb) 427 set_bit(RXRPC_CALL_RX_NO_MORE, &call->flags);
396{ 428 rxrpc_kernel_data_consumed(call, skb);
397 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 429 rxrpc_free_skb(skb);
430 goto next;
398 431
399 switch (skb->mark) {
400 case RXRPC_SKB_MARK_REMOTE_ABORT:
401 case RXRPC_SKB_MARK_LOCAL_ABORT:
402 return sp->call->abort_code;
403 default: 432 default:
404 BUG(); 433 rxrpc_free_skb(skb);
434 goto next;
405 } 435 }
406}
407 436
408EXPORT_SYMBOL(rxrpc_kernel_get_abort_code); 437partially_used_skb:
438 ASSERTCMP(*_offset, ==, size);
439 ret = 0;
440requeue_and_leave:
441 skb_queue_head(&call->knlrecv_queue, skb);
442 return ret;
443}
409 444
410/** 445/**
411 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message 446 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
412 * @skb: Message indicating an error 447 * @sock: The socket that the call exists on
448 * @call: The call to send data through
449 * @buf: The buffer to receive into
450 * @size: The size of the buffer, including data already read
451 * @_offset: The running offset into the buffer.
452 * @want_more: True if more data is expected to be read
453 * @_abort: Where the abort code is stored if -ECONNABORTED is returned
454 *
455 * Allow a kernel service to receive data and pick up information about the
456 * state of a call. Returns 0 if got what was asked for and there's more
457 * available, 1 if we got what was asked for and we're at the end of the data
458 * and -EAGAIN if we need more data.
459 *
460 * Note that we may return -EAGAIN to drain empty packets at the end of the
461 * data, even if we've already copied over the requested data.
413 * 462 *
414 * Get the error number from an RxRPC error message. 463 * This function adds the amount it transfers to *_offset, so this should be
464 * precleared as appropriate. Note that the amount remaining in the buffer is
465 * taken to be size - *_offset.
466 *
467 * *_abort should also be initialised to 0.
415 */ 468 */
416int rxrpc_kernel_get_error_number(struct sk_buff *skb) 469int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
470 void *buf, size_t size, size_t *_offset,
471 bool want_more, u32 *_abort)
417{ 472{
418 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 473 struct iov_iter iter;
474 struct kvec iov;
475 int ret;
419 476
420 return sp->error; 477 _enter("{%d,%s},%zu,%d",
421} 478 call->debug_id, rxrpc_call_states[call->state], size, want_more);
479
480 ASSERTCMP(*_offset, <=, size);
481 ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
422 482
423EXPORT_SYMBOL(rxrpc_kernel_get_error_number); 483 iov.iov_base = buf + *_offset;
484 iov.iov_len = size - *_offset;
485 iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
486
487 lock_sock(sock->sk);
488
489 switch (call->state) {
490 case RXRPC_CALL_CLIENT_RECV_REPLY:
491 case RXRPC_CALL_SERVER_RECV_REQUEST:
492 case RXRPC_CALL_SERVER_ACK_REQUEST:
493 ret = temp_deliver_data(sock, call, &iter, size, _offset);
494 if (ret < 0)
495 goto out;
496
497 /* We can only reach here with a partially full buffer if we
498 * have reached the end of the data. We must otherwise have a
499 * full buffer or have been given -EAGAIN.
500 */
501 if (ret == 1) {
502 if (*_offset < size)
503 goto short_data;
504 if (!want_more)
505 goto read_phase_complete;
506 ret = 0;
507 goto out;
508 }
509
510 if (!want_more)
511 goto excess_data;
512 goto out;
513
514 case RXRPC_CALL_COMPLETE:
515 goto call_complete;
516
517 default:
518 *_offset = 0;
519 ret = -EINPROGRESS;
520 goto out;
521 }
522
523read_phase_complete:
524 ret = 1;
525out:
526 release_sock(sock->sk);
527 _leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
528 return ret;
529
530short_data:
531 ret = -EBADMSG;
532 goto out;
533excess_data:
534 ret = -EMSGSIZE;
535 goto out;
536call_complete:
537 *_abort = call->abort_code;
538 ret = call->error;
539 if (call->completion == RXRPC_CALL_SUCCEEDED) {
540 ret = 1;
541 if (size > 0)
542 ret = -ECONNRESET;
543 }
544 goto out;
545}
546EXPORT_SYMBOL(rxrpc_kernel_recv_data);
diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c
index 20529205bb8c..9752f8b1fdd0 100644
--- a/net/rxrpc/skbuff.c
+++ b/net/rxrpc/skbuff.c
@@ -127,7 +127,6 @@ void rxrpc_kernel_data_consumed(struct rxrpc_call *call, struct sk_buff *skb)
127 call->rx_data_recv = sp->hdr.seq; 127 call->rx_data_recv = sp->hdr.seq;
128 rxrpc_hard_ACK_data(call, skb); 128 rxrpc_hard_ACK_data(call, skb);
129} 129}
130EXPORT_SYMBOL(rxrpc_kernel_data_consumed);
131 130
132/* 131/*
133 * Destroy a packet that has an RxRPC control buffer 132 * Destroy a packet that has an RxRPC control buffer