aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/svcsock.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/svcsock.c')
-rw-r--r--net/sunrpc/svcsock.c505
1 files changed, 307 insertions, 198 deletions
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 7e534dd09077..af04f779ce9f 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -64,7 +64,15 @@ static void svc_tcp_sock_detach(struct svc_xprt *);
64static void svc_sock_free(struct svc_xprt *); 64static void svc_sock_free(struct svc_xprt *);
65 65
66static struct svc_xprt *svc_create_socket(struct svc_serv *, int, 66static struct svc_xprt *svc_create_socket(struct svc_serv *, int,
67 struct sockaddr *, int, int); 67 struct net *, struct sockaddr *,
68 int, int);
69#if defined(CONFIG_NFS_V4_1)
70static struct svc_xprt *svc_bc_create_socket(struct svc_serv *, int,
71 struct net *, struct sockaddr *,
72 int, int);
73static void svc_bc_sock_free(struct svc_xprt *xprt);
74#endif /* CONFIG_NFS_V4_1 */
75
68#ifdef CONFIG_DEBUG_LOCK_ALLOC 76#ifdef CONFIG_DEBUG_LOCK_ALLOC
69static struct lock_class_key svc_key[2]; 77static struct lock_class_key svc_key[2];
70static struct lock_class_key svc_slock_key[2]; 78static struct lock_class_key svc_slock_key[2];
@@ -323,19 +331,21 @@ int svc_sock_names(struct svc_serv *serv, char *buf, const size_t buflen,
323 len = onelen; 331 len = onelen;
324 break; 332 break;
325 } 333 }
326 if (toclose && strcmp(toclose, buf + len) == 0) 334 if (toclose && strcmp(toclose, buf + len) == 0) {
327 closesk = svsk; 335 closesk = svsk;
328 else 336 svc_xprt_get(&closesk->sk_xprt);
337 } else
329 len += onelen; 338 len += onelen;
330 } 339 }
331 spin_unlock_bh(&serv->sv_lock); 340 spin_unlock_bh(&serv->sv_lock);
332 341
333 if (closesk) 342 if (closesk) {
334 /* Should unregister with portmap, but you cannot 343 /* Should unregister with portmap, but you cannot
335 * unregister just one protocol... 344 * unregister just one protocol...
336 */ 345 */
337 svc_close_xprt(&closesk->sk_xprt); 346 svc_close_xprt(&closesk->sk_xprt);
338 else if (toclose) 347 svc_xprt_put(&closesk->sk_xprt);
348 } else if (toclose)
339 return -ENOENT; 349 return -ENOENT;
340 return len; 350 return len;
341} 351}
@@ -377,6 +387,33 @@ static int svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, int nr,
377 return len; 387 return len;
378} 388}
379 389
390static int svc_partial_recvfrom(struct svc_rqst *rqstp,
391 struct kvec *iov, int nr,
392 int buflen, unsigned int base)
393{
394 size_t save_iovlen;
395 void __user *save_iovbase;
396 unsigned int i;
397 int ret;
398
399 if (base == 0)
400 return svc_recvfrom(rqstp, iov, nr, buflen);
401
402 for (i = 0; i < nr; i++) {
403 if (iov[i].iov_len > base)
404 break;
405 base -= iov[i].iov_len;
406 }
407 save_iovlen = iov[i].iov_len;
408 save_iovbase = iov[i].iov_base;
409 iov[i].iov_len -= base;
410 iov[i].iov_base += base;
411 ret = svc_recvfrom(rqstp, &iov[i], nr - i, buflen);
412 iov[i].iov_len = save_iovlen;
413 iov[i].iov_base = save_iovbase;
414 return ret;
415}
416
380/* 417/*
381 * Set socket snd and rcv buffer lengths 418 * Set socket snd and rcv buffer lengths
382 */ 419 */
@@ -399,7 +436,6 @@ static void svc_sock_setbufsize(struct socket *sock, unsigned int snd,
399 lock_sock(sock->sk); 436 lock_sock(sock->sk);
400 sock->sk->sk_sndbuf = snd * 2; 437 sock->sk->sk_sndbuf = snd * 2;
401 sock->sk->sk_rcvbuf = rcv * 2; 438 sock->sk->sk_rcvbuf = rcv * 2;
402 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK|SOCK_RCVBUF_LOCK;
403 sock->sk->sk_write_space(sock->sk); 439 sock->sk->sk_write_space(sock->sk);
404 release_sock(sock->sk); 440 release_sock(sock->sk);
405#endif 441#endif
@@ -410,6 +446,7 @@ static void svc_sock_setbufsize(struct socket *sock, unsigned int snd,
410static void svc_udp_data_ready(struct sock *sk, int count) 446static void svc_udp_data_ready(struct sock *sk, int count)
411{ 447{
412 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; 448 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
449 wait_queue_head_t *wq = sk_sleep(sk);
413 450
414 if (svsk) { 451 if (svsk) {
415 dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n", 452 dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n",
@@ -418,8 +455,8 @@ static void svc_udp_data_ready(struct sock *sk, int count)
418 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 455 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
419 svc_xprt_enqueue(&svsk->sk_xprt); 456 svc_xprt_enqueue(&svsk->sk_xprt);
420 } 457 }
421 if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) 458 if (wq && waitqueue_active(wq))
422 wake_up_interruptible(sk_sleep(sk)); 459 wake_up_interruptible(wq);
423} 460}
424 461
425/* 462/*
@@ -428,6 +465,7 @@ static void svc_udp_data_ready(struct sock *sk, int count)
428static void svc_write_space(struct sock *sk) 465static void svc_write_space(struct sock *sk)
429{ 466{
430 struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data); 467 struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data);
468 wait_queue_head_t *wq = sk_sleep(sk);
431 469
432 if (svsk) { 470 if (svsk) {
433 dprintk("svc: socket %p(inet %p), write_space busy=%d\n", 471 dprintk("svc: socket %p(inet %p), write_space busy=%d\n",
@@ -435,10 +473,10 @@ static void svc_write_space(struct sock *sk)
435 svc_xprt_enqueue(&svsk->sk_xprt); 473 svc_xprt_enqueue(&svsk->sk_xprt);
436 } 474 }
437 475
438 if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) { 476 if (wq && waitqueue_active(wq)) {
439 dprintk("RPC svc_write_space: someone sleeping on %p\n", 477 dprintk("RPC svc_write_space: someone sleeping on %p\n",
440 svsk); 478 svsk);
441 wake_up_interruptible(sk_sleep(sk)); 479 wake_up_interruptible(wq);
442 } 480 }
443} 481}
444 482
@@ -657,10 +695,11 @@ static struct svc_xprt *svc_udp_accept(struct svc_xprt *xprt)
657} 695}
658 696
659static struct svc_xprt *svc_udp_create(struct svc_serv *serv, 697static struct svc_xprt *svc_udp_create(struct svc_serv *serv,
698 struct net *net,
660 struct sockaddr *sa, int salen, 699 struct sockaddr *sa, int salen,
661 int flags) 700 int flags)
662{ 701{
663 return svc_create_socket(serv, IPPROTO_UDP, sa, salen, flags); 702 return svc_create_socket(serv, IPPROTO_UDP, net, sa, salen, flags);
664} 703}
665 704
666static struct svc_xprt_ops svc_udp_ops = { 705static struct svc_xprt_ops svc_udp_ops = {
@@ -728,6 +767,7 @@ static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv)
728static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused) 767static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
729{ 768{
730 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; 769 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
770 wait_queue_head_t *wq;
731 771
732 dprintk("svc: socket %p TCP (listen) state change %d\n", 772 dprintk("svc: socket %p TCP (listen) state change %d\n",
733 sk, sk->sk_state); 773 sk, sk->sk_state);
@@ -750,8 +790,9 @@ static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
750 printk("svc: socket %p: no user data\n", sk); 790 printk("svc: socket %p: no user data\n", sk);
751 } 791 }
752 792
753 if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) 793 wq = sk_sleep(sk);
754 wake_up_interruptible_all(sk_sleep(sk)); 794 if (wq && waitqueue_active(wq))
795 wake_up_interruptible_all(wq);
755} 796}
756 797
757/* 798/*
@@ -760,6 +801,7 @@ static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
760static void svc_tcp_state_change(struct sock *sk) 801static void svc_tcp_state_change(struct sock *sk)
761{ 802{
762 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; 803 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
804 wait_queue_head_t *wq = sk_sleep(sk);
763 805
764 dprintk("svc: socket %p TCP (connected) state change %d (svsk %p)\n", 806 dprintk("svc: socket %p TCP (connected) state change %d (svsk %p)\n",
765 sk, sk->sk_state, sk->sk_user_data); 807 sk, sk->sk_state, sk->sk_user_data);
@@ -770,13 +812,14 @@ static void svc_tcp_state_change(struct sock *sk)
770 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); 812 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
771 svc_xprt_enqueue(&svsk->sk_xprt); 813 svc_xprt_enqueue(&svsk->sk_xprt);
772 } 814 }
773 if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) 815 if (wq && waitqueue_active(wq))
774 wake_up_interruptible_all(sk_sleep(sk)); 816 wake_up_interruptible_all(wq);
775} 817}
776 818
777static void svc_tcp_data_ready(struct sock *sk, int count) 819static void svc_tcp_data_ready(struct sock *sk, int count)
778{ 820{
779 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; 821 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
822 wait_queue_head_t *wq = sk_sleep(sk);
780 823
781 dprintk("svc: socket %p TCP data ready (svsk %p)\n", 824 dprintk("svc: socket %p TCP data ready (svsk %p)\n",
782 sk, sk->sk_user_data); 825 sk, sk->sk_user_data);
@@ -784,8 +827,8 @@ static void svc_tcp_data_ready(struct sock *sk, int count)
784 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 827 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
785 svc_xprt_enqueue(&svsk->sk_xprt); 828 svc_xprt_enqueue(&svsk->sk_xprt);
786 } 829 }
787 if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) 830 if (wq && waitqueue_active(wq))
788 wake_up_interruptible(sk_sleep(sk)); 831 wake_up_interruptible(wq);
789} 832}
790 833
791/* 834/*
@@ -867,6 +910,56 @@ failed:
867 return NULL; 910 return NULL;
868} 911}
869 912
913static unsigned int svc_tcp_restore_pages(struct svc_sock *svsk, struct svc_rqst *rqstp)
914{
915 unsigned int i, len, npages;
916
917 if (svsk->sk_tcplen <= sizeof(rpc_fraghdr))
918 return 0;
919 len = svsk->sk_tcplen - sizeof(rpc_fraghdr);
920 npages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
921 for (i = 0; i < npages; i++) {
922 if (rqstp->rq_pages[i] != NULL)
923 put_page(rqstp->rq_pages[i]);
924 BUG_ON(svsk->sk_pages[i] == NULL);
925 rqstp->rq_pages[i] = svsk->sk_pages[i];
926 svsk->sk_pages[i] = NULL;
927 }
928 rqstp->rq_arg.head[0].iov_base = page_address(rqstp->rq_pages[0]);
929 return len;
930}
931
932static void svc_tcp_save_pages(struct svc_sock *svsk, struct svc_rqst *rqstp)
933{
934 unsigned int i, len, npages;
935
936 if (svsk->sk_tcplen <= sizeof(rpc_fraghdr))
937 return;
938 len = svsk->sk_tcplen - sizeof(rpc_fraghdr);
939 npages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
940 for (i = 0; i < npages; i++) {
941 svsk->sk_pages[i] = rqstp->rq_pages[i];
942 rqstp->rq_pages[i] = NULL;
943 }
944}
945
946static void svc_tcp_clear_pages(struct svc_sock *svsk)
947{
948 unsigned int i, len, npages;
949
950 if (svsk->sk_tcplen <= sizeof(rpc_fraghdr))
951 goto out;
952 len = svsk->sk_tcplen - sizeof(rpc_fraghdr);
953 npages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
954 for (i = 0; i < npages; i++) {
955 BUG_ON(svsk->sk_pages[i] == NULL);
956 put_page(svsk->sk_pages[i]);
957 svsk->sk_pages[i] = NULL;
958 }
959out:
960 svsk->sk_tcplen = 0;
961}
962
870/* 963/*
871 * Receive data. 964 * Receive data.
872 * If we haven't gotten the record length yet, get the next four bytes. 965 * If we haven't gotten the record length yet, get the next four bytes.
@@ -876,31 +969,15 @@ failed:
876static int svc_tcp_recv_record(struct svc_sock *svsk, struct svc_rqst *rqstp) 969static int svc_tcp_recv_record(struct svc_sock *svsk, struct svc_rqst *rqstp)
877{ 970{
878 struct svc_serv *serv = svsk->sk_xprt.xpt_server; 971 struct svc_serv *serv = svsk->sk_xprt.xpt_server;
972 unsigned int want;
879 int len; 973 int len;
880 974
881 if (test_and_clear_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags))
882 /* sndbuf needs to have room for one request
883 * per thread, otherwise we can stall even when the
884 * network isn't a bottleneck.
885 *
886 * We count all threads rather than threads in a
887 * particular pool, which provides an upper bound
888 * on the number of threads which will access the socket.
889 *
890 * rcvbuf just needs to be able to hold a few requests.
891 * Normally they will be removed from the queue
892 * as soon a a complete request arrives.
893 */
894 svc_sock_setbufsize(svsk->sk_sock,
895 (serv->sv_nrthreads+3) * serv->sv_max_mesg,
896 3 * serv->sv_max_mesg);
897
898 clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 975 clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
899 976
900 if (svsk->sk_tcplen < sizeof(rpc_fraghdr)) { 977 if (svsk->sk_tcplen < sizeof(rpc_fraghdr)) {
901 int want = sizeof(rpc_fraghdr) - svsk->sk_tcplen;
902 struct kvec iov; 978 struct kvec iov;
903 979
980 want = sizeof(rpc_fraghdr) - svsk->sk_tcplen;
904 iov.iov_base = ((char *) &svsk->sk_reclen) + svsk->sk_tcplen; 981 iov.iov_base = ((char *) &svsk->sk_reclen) + svsk->sk_tcplen;
905 iov.iov_len = want; 982 iov.iov_len = want;
906 if ((len = svc_recvfrom(rqstp, &iov, 1, want)) < 0) 983 if ((len = svc_recvfrom(rqstp, &iov, 1, want)) < 0)
@@ -910,7 +987,7 @@ static int svc_tcp_recv_record(struct svc_sock *svsk, struct svc_rqst *rqstp)
910 if (len < want) { 987 if (len < want) {
911 dprintk("svc: short recvfrom while reading record " 988 dprintk("svc: short recvfrom while reading record "
912 "length (%d of %d)\n", len, want); 989 "length (%d of %d)\n", len, want);
913 goto err_again; /* record header not complete */ 990 return -EAGAIN;
914 } 991 }
915 992
916 svsk->sk_reclen = ntohl(svsk->sk_reclen); 993 svsk->sk_reclen = ntohl(svsk->sk_reclen);
@@ -937,81 +1014,75 @@ static int svc_tcp_recv_record(struct svc_sock *svsk, struct svc_rqst *rqstp)
937 } 1014 }
938 } 1015 }
939 1016
940 /* Check whether enough data is available */ 1017 if (svsk->sk_reclen < 8)
941 len = svc_recv_available(svsk); 1018 goto err_delete; /* client is nuts. */
942 if (len < 0)
943 goto error;
944 1019
945 if (len < svsk->sk_reclen) {
946 dprintk("svc: incomplete TCP record (%d of %d)\n",
947 len, svsk->sk_reclen);
948 goto err_again; /* record not complete */
949 }
950 len = svsk->sk_reclen; 1020 len = svsk->sk_reclen;
951 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
952 1021
953 return len; 1022 return len;
954 error: 1023error:
955 if (len == -EAGAIN) 1024 dprintk("RPC: TCP recv_record got %d\n", len);
956 dprintk("RPC: TCP recv_record got EAGAIN\n");
957 return len; 1025 return len;
958 err_delete: 1026err_delete:
959 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); 1027 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
960 err_again:
961 return -EAGAIN; 1028 return -EAGAIN;
962} 1029}
963 1030
964static int svc_process_calldir(struct svc_sock *svsk, struct svc_rqst *rqstp, 1031static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
965 struct rpc_rqst **reqpp, struct kvec *vec)
966{ 1032{
1033 struct rpc_xprt *bc_xprt = svsk->sk_xprt.xpt_bc_xprt;
967 struct rpc_rqst *req = NULL; 1034 struct rpc_rqst *req = NULL;
968 u32 *p; 1035 struct kvec *src, *dst;
969 u32 xid; 1036 __be32 *p = (__be32 *)rqstp->rq_arg.head[0].iov_base;
970 u32 calldir; 1037 __be32 xid;
971 int len; 1038 __be32 calldir;
972 1039
973 len = svc_recvfrom(rqstp, vec, 1, 8);
974 if (len < 0)
975 goto error;
976
977 p = (u32 *)rqstp->rq_arg.head[0].iov_base;
978 xid = *p++; 1040 xid = *p++;
979 calldir = *p; 1041 calldir = *p;
980 1042
981 if (calldir == 0) { 1043 if (bc_xprt)
982 /* REQUEST is the most common case */ 1044 req = xprt_lookup_rqst(bc_xprt, xid);
983 vec[0] = rqstp->rq_arg.head[0];
984 } else {
985 /* REPLY */
986 if (svsk->sk_bc_xprt)
987 req = xprt_lookup_rqst(svsk->sk_bc_xprt, xid);
988
989 if (!req) {
990 printk(KERN_NOTICE
991 "%s: Got unrecognized reply: "
992 "calldir 0x%x sk_bc_xprt %p xid %08x\n",
993 __func__, ntohl(calldir),
994 svsk->sk_bc_xprt, xid);
995 vec[0] = rqstp->rq_arg.head[0];
996 goto out;
997 }
998 1045
999 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1046 if (!req) {
1000 sizeof(struct xdr_buf)); 1047 printk(KERN_NOTICE
1001 /* copy the xid and call direction */ 1048 "%s: Got unrecognized reply: "
1002 memcpy(req->rq_private_buf.head[0].iov_base, 1049 "calldir 0x%x xpt_bc_xprt %p xid %08x\n",
1003 rqstp->rq_arg.head[0].iov_base, 8); 1050 __func__, ntohl(calldir),
1004 vec[0] = req->rq_private_buf.head[0]; 1051 bc_xprt, xid);
1052 return -EAGAIN;
1005 } 1053 }
1006 out: 1054
1007 vec[0].iov_base += 8; 1055 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf));
1008 vec[0].iov_len -= 8; 1056 /*
1009 len = svsk->sk_reclen - 8; 1057 * XXX!: cheating for now! Only copying HEAD.
1010 error: 1058 * But we know this is good enough for now (in fact, for any
1011 *reqpp = req; 1059 * callback reply in the forseeable future).
1012 return len; 1060 */
1061 dst = &req->rq_private_buf.head[0];
1062 src = &rqstp->rq_arg.head[0];
1063 if (dst->iov_len < src->iov_len)
1064 return -EAGAIN; /* whatever; just giving up. */
1065 memcpy(dst->iov_base, src->iov_base, src->iov_len);
1066 xprt_complete_rqst(req->rq_task, svsk->sk_reclen);
1067 rqstp->rq_arg.len = 0;
1068 return 0;
1069}
1070
1071static int copy_pages_to_kvecs(struct kvec *vec, struct page **pages, int len)
1072{
1073 int i = 0;
1074 int t = 0;
1075
1076 while (t < len) {
1077 vec[i].iov_base = page_address(pages[i]);
1078 vec[i].iov_len = PAGE_SIZE;
1079 i++;
1080 t += PAGE_SIZE;
1081 }
1082 return i;
1013} 1083}
1014 1084
1085
1015/* 1086/*
1016 * Receive data from a TCP socket. 1087 * Receive data from a TCP socket.
1017 */ 1088 */
@@ -1022,8 +1093,10 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
1022 struct svc_serv *serv = svsk->sk_xprt.xpt_server; 1093 struct svc_serv *serv = svsk->sk_xprt.xpt_server;
1023 int len; 1094 int len;
1024 struct kvec *vec; 1095 struct kvec *vec;
1025 int pnum, vlen; 1096 unsigned int want, base;
1026 struct rpc_rqst *req = NULL; 1097 __be32 *p;
1098 __be32 calldir;
1099 int pnum;
1027 1100
1028 dprintk("svc: tcp_recv %p data %d conn %d close %d\n", 1101 dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
1029 svsk, test_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags), 1102 svsk, test_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags),
@@ -1034,87 +1107,73 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
1034 if (len < 0) 1107 if (len < 0)
1035 goto error; 1108 goto error;
1036 1109
1110 base = svc_tcp_restore_pages(svsk, rqstp);
1111 want = svsk->sk_reclen - base;
1112
1037 vec = rqstp->rq_vec; 1113 vec = rqstp->rq_vec;
1038 vec[0] = rqstp->rq_arg.head[0];
1039 vlen = PAGE_SIZE;
1040 1114
1041 /* 1115 pnum = copy_pages_to_kvecs(&vec[0], &rqstp->rq_pages[0],
1042 * We have enough data for the whole tcp record. Let's try and read the 1116 svsk->sk_reclen);
1043 * first 8 bytes to get the xid and the call direction. We can use this
1044 * to figure out if this is a call or a reply to a callback. If
1045 * sk_reclen is < 8 (xid and calldir), then this is a malformed packet.
1046 * In that case, don't bother with the calldir and just read the data.
1047 * It will be rejected in svc_process.
1048 */
1049 if (len >= 8) {
1050 len = svc_process_calldir(svsk, rqstp, &req, vec);
1051 if (len < 0)
1052 goto err_again;
1053 vlen -= 8;
1054 }
1055 1117
1056 pnum = 1;
1057 while (vlen < len) {
1058 vec[pnum].iov_base = (req) ?
1059 page_address(req->rq_private_buf.pages[pnum - 1]) :
1060 page_address(rqstp->rq_pages[pnum]);
1061 vec[pnum].iov_len = PAGE_SIZE;
1062 pnum++;
1063 vlen += PAGE_SIZE;
1064 }
1065 rqstp->rq_respages = &rqstp->rq_pages[pnum]; 1118 rqstp->rq_respages = &rqstp->rq_pages[pnum];
1066 1119
1067 /* Now receive data */ 1120 /* Now receive data */
1068 len = svc_recvfrom(rqstp, vec, pnum, len); 1121 len = svc_partial_recvfrom(rqstp, vec, pnum, want, base);
1069 if (len < 0) 1122 if (len >= 0)
1070 goto err_again; 1123 svsk->sk_tcplen += len;
1071 1124 if (len != want) {
1072 /* 1125 if (len < 0 && len != -EAGAIN)
1073 * Account for the 8 bytes we read earlier 1126 goto err_other;
1074 */ 1127 svc_tcp_save_pages(svsk, rqstp);
1075 len += 8; 1128 dprintk("svc: incomplete TCP record (%d of %d)\n",
1076 1129 svsk->sk_tcplen, svsk->sk_reclen);
1077 if (req) { 1130 goto err_noclose;
1078 xprt_complete_rqst(req->rq_task, len);
1079 len = 0;
1080 goto out;
1081 } 1131 }
1082 dprintk("svc: TCP complete record (%d bytes)\n", len); 1132
1083 rqstp->rq_arg.len = len; 1133 rqstp->rq_arg.len = svsk->sk_reclen;
1084 rqstp->rq_arg.page_base = 0; 1134 rqstp->rq_arg.page_base = 0;
1085 if (len <= rqstp->rq_arg.head[0].iov_len) { 1135 if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
1086 rqstp->rq_arg.head[0].iov_len = len; 1136 rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
1087 rqstp->rq_arg.page_len = 0; 1137 rqstp->rq_arg.page_len = 0;
1088 } else { 1138 } else
1089 rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len; 1139 rqstp->rq_arg.page_len = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len;
1090 }
1091 1140
1092 rqstp->rq_xprt_ctxt = NULL; 1141 rqstp->rq_xprt_ctxt = NULL;
1093 rqstp->rq_prot = IPPROTO_TCP; 1142 rqstp->rq_prot = IPPROTO_TCP;
1094 1143
1095out: 1144 p = (__be32 *)rqstp->rq_arg.head[0].iov_base;
1145 calldir = p[1];
1146 if (calldir)
1147 len = receive_cb_reply(svsk, rqstp);
1148
1096 /* Reset TCP read info */ 1149 /* Reset TCP read info */
1097 svsk->sk_reclen = 0; 1150 svsk->sk_reclen = 0;
1098 svsk->sk_tcplen = 0; 1151 svsk->sk_tcplen = 0;
1152 /* If we have more data, signal svc_xprt_enqueue() to try again */
1153 if (svc_recv_available(svsk) > sizeof(rpc_fraghdr))
1154 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
1155
1156 if (len < 0)
1157 goto error;
1099 1158
1100 svc_xprt_copy_addrs(rqstp, &svsk->sk_xprt); 1159 svc_xprt_copy_addrs(rqstp, &svsk->sk_xprt);
1101 if (serv->sv_stats) 1160 if (serv->sv_stats)
1102 serv->sv_stats->nettcpcnt++; 1161 serv->sv_stats->nettcpcnt++;
1103 1162
1104 return len; 1163 dprintk("svc: TCP complete record (%d bytes)\n", rqstp->rq_arg.len);
1164 return rqstp->rq_arg.len;
1105 1165
1106err_again:
1107 if (len == -EAGAIN) {
1108 dprintk("RPC: TCP recvfrom got EAGAIN\n");
1109 return len;
1110 }
1111error: 1166error:
1112 if (len != -EAGAIN) { 1167 if (len != -EAGAIN)
1113 printk(KERN_NOTICE "%s: recvfrom returned errno %d\n", 1168 goto err_other;
1114 svsk->sk_xprt.xpt_server->sv_name, -len); 1169 dprintk("RPC: TCP recvfrom got EAGAIN\n");
1115 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
1116 }
1117 return -EAGAIN; 1170 return -EAGAIN;
1171err_other:
1172 printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
1173 svsk->sk_xprt.xpt_server->sv_name, -len);
1174 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
1175err_noclose:
1176 return -EAGAIN; /* record not complete */
1118} 1177}
1119 1178
1120/* 1179/*
@@ -1133,9 +1192,6 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp)
1133 reclen = htonl(0x80000000|((xbufp->len ) - 4)); 1192 reclen = htonl(0x80000000|((xbufp->len ) - 4));
1134 memcpy(xbufp->head[0].iov_base, &reclen, 4); 1193 memcpy(xbufp->head[0].iov_base, &reclen, 4);
1135 1194
1136 if (test_bit(XPT_DEAD, &rqstp->rq_xprt->xpt_flags))
1137 return -ENOTCONN;
1138
1139 sent = svc_sendto(rqstp, &rqstp->rq_res); 1195 sent = svc_sendto(rqstp, &rqstp->rq_res);
1140 if (sent != xbufp->len) { 1196 if (sent != xbufp->len) {
1141 printk(KERN_NOTICE 1197 printk(KERN_NOTICE
@@ -1178,11 +1234,63 @@ static int svc_tcp_has_wspace(struct svc_xprt *xprt)
1178} 1234}
1179 1235
1180static struct svc_xprt *svc_tcp_create(struct svc_serv *serv, 1236static struct svc_xprt *svc_tcp_create(struct svc_serv *serv,
1237 struct net *net,
1181 struct sockaddr *sa, int salen, 1238 struct sockaddr *sa, int salen,
1182 int flags) 1239 int flags)
1183{ 1240{
1184 return svc_create_socket(serv, IPPROTO_TCP, sa, salen, flags); 1241 return svc_create_socket(serv, IPPROTO_TCP, net, sa, salen, flags);
1242}
1243
1244#if defined(CONFIG_NFS_V4_1)
1245static struct svc_xprt *svc_bc_create_socket(struct svc_serv *, int,
1246 struct net *, struct sockaddr *,
1247 int, int);
1248static void svc_bc_sock_free(struct svc_xprt *xprt);
1249
1250static struct svc_xprt *svc_bc_tcp_create(struct svc_serv *serv,
1251 struct net *net,
1252 struct sockaddr *sa, int salen,
1253 int flags)
1254{
1255 return svc_bc_create_socket(serv, IPPROTO_TCP, net, sa, salen, flags);
1256}
1257
1258static void svc_bc_tcp_sock_detach(struct svc_xprt *xprt)
1259{
1260}
1261
1262static struct svc_xprt_ops svc_tcp_bc_ops = {
1263 .xpo_create = svc_bc_tcp_create,
1264 .xpo_detach = svc_bc_tcp_sock_detach,
1265 .xpo_free = svc_bc_sock_free,
1266 .xpo_prep_reply_hdr = svc_tcp_prep_reply_hdr,
1267};
1268
1269static struct svc_xprt_class svc_tcp_bc_class = {
1270 .xcl_name = "tcp-bc",
1271 .xcl_owner = THIS_MODULE,
1272 .xcl_ops = &svc_tcp_bc_ops,
1273 .xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,
1274};
1275
1276static void svc_init_bc_xprt_sock(void)
1277{
1278 svc_reg_xprt_class(&svc_tcp_bc_class);
1279}
1280
1281static void svc_cleanup_bc_xprt_sock(void)
1282{
1283 svc_unreg_xprt_class(&svc_tcp_bc_class);
1284}
1285#else /* CONFIG_NFS_V4_1 */
1286static void svc_init_bc_xprt_sock(void)
1287{
1288}
1289
1290static void svc_cleanup_bc_xprt_sock(void)
1291{
1185} 1292}
1293#endif /* CONFIG_NFS_V4_1 */
1186 1294
1187static struct svc_xprt_ops svc_tcp_ops = { 1295static struct svc_xprt_ops svc_tcp_ops = {
1188 .xpo_create = svc_tcp_create, 1296 .xpo_create = svc_tcp_create,
@@ -1207,12 +1315,14 @@ void svc_init_xprt_sock(void)
1207{ 1315{
1208 svc_reg_xprt_class(&svc_tcp_class); 1316 svc_reg_xprt_class(&svc_tcp_class);
1209 svc_reg_xprt_class(&svc_udp_class); 1317 svc_reg_xprt_class(&svc_udp_class);
1318 svc_init_bc_xprt_sock();
1210} 1319}
1211 1320
1212void svc_cleanup_xprt_sock(void) 1321void svc_cleanup_xprt_sock(void)
1213{ 1322{
1214 svc_unreg_xprt_class(&svc_tcp_class); 1323 svc_unreg_xprt_class(&svc_tcp_class);
1215 svc_unreg_xprt_class(&svc_udp_class); 1324 svc_unreg_xprt_class(&svc_udp_class);
1325 svc_cleanup_bc_xprt_sock();
1216} 1326}
1217 1327
1218static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv) 1328static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv)
@@ -1234,18 +1344,10 @@ static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv)
1234 1344
1235 svsk->sk_reclen = 0; 1345 svsk->sk_reclen = 0;
1236 svsk->sk_tcplen = 0; 1346 svsk->sk_tcplen = 0;
1347 memset(&svsk->sk_pages[0], 0, sizeof(svsk->sk_pages));
1237 1348
1238 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; 1349 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1239 1350
1240 /* initialise setting must have enough space to
1241 * receive and respond to one request.
1242 * svc_tcp_recvfrom will re-adjust if necessary
1243 */
1244 svc_sock_setbufsize(svsk->sk_sock,
1245 3 * svsk->sk_xprt.xpt_server->sv_max_mesg,
1246 3 * svsk->sk_xprt.xpt_server->sv_max_mesg);
1247
1248 set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
1249 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 1351 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
1250 if (sk->sk_state != TCP_ESTABLISHED) 1352 if (sk->sk_state != TCP_ESTABLISHED)
1251 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); 1353 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
@@ -1258,19 +1360,13 @@ void svc_sock_update_bufs(struct svc_serv *serv)
1258 * The number of server threads has changed. Update 1360 * The number of server threads has changed. Update
1259 * rcvbuf and sndbuf accordingly on all sockets 1361 * rcvbuf and sndbuf accordingly on all sockets
1260 */ 1362 */
1261 struct list_head *le; 1363 struct svc_sock *svsk;
1262 1364
1263 spin_lock_bh(&serv->sv_lock); 1365 spin_lock_bh(&serv->sv_lock);
1264 list_for_each(le, &serv->sv_permsocks) { 1366 list_for_each_entry(svsk, &serv->sv_permsocks, sk_xprt.xpt_list)
1265 struct svc_sock *svsk =
1266 list_entry(le, struct svc_sock, sk_xprt.xpt_list);
1267 set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags); 1367 set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
1268 } 1368 list_for_each_entry(svsk, &serv->sv_tempsocks, sk_xprt.xpt_list)
1269 list_for_each(le, &serv->sv_tempsocks) {
1270 struct svc_sock *svsk =
1271 list_entry(le, struct svc_sock, sk_xprt.xpt_list);
1272 set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags); 1369 set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
1273 }
1274 spin_unlock_bh(&serv->sv_lock); 1370 spin_unlock_bh(&serv->sv_lock);
1275} 1371}
1276EXPORT_SYMBOL_GPL(svc_sock_update_bufs); 1372EXPORT_SYMBOL_GPL(svc_sock_update_bufs);
@@ -1315,8 +1411,14 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv,
1315 /* Initialize the socket */ 1411 /* Initialize the socket */
1316 if (sock->type == SOCK_DGRAM) 1412 if (sock->type == SOCK_DGRAM)
1317 svc_udp_init(svsk, serv); 1413 svc_udp_init(svsk, serv);
1318 else 1414 else {
1415 /* initialise setting must have enough space to
1416 * receive and respond to one request.
1417 */
1418 svc_sock_setbufsize(svsk->sk_sock, 4 * serv->sv_max_mesg,
1419 4 * serv->sv_max_mesg);
1319 svc_tcp_init(svsk, serv); 1420 svc_tcp_init(svsk, serv);
1421 }
1320 1422
1321 dprintk("svc: svc_setup_socket created %p (inet %p)\n", 1423 dprintk("svc: svc_setup_socket created %p (inet %p)\n",
1322 svsk, svsk->sk_sk); 1424 svsk, svsk->sk_sk);
@@ -1385,6 +1487,7 @@ EXPORT_SYMBOL_GPL(svc_addsock);
1385 */ 1487 */
1386static struct svc_xprt *svc_create_socket(struct svc_serv *serv, 1488static struct svc_xprt *svc_create_socket(struct svc_serv *serv,
1387 int protocol, 1489 int protocol,
1490 struct net *net,
1388 struct sockaddr *sin, int len, 1491 struct sockaddr *sin, int len,
1389 int flags) 1492 int flags)
1390{ 1493{
@@ -1421,7 +1524,7 @@ static struct svc_xprt *svc_create_socket(struct svc_serv *serv,
1421 return ERR_PTR(-EINVAL); 1524 return ERR_PTR(-EINVAL);
1422 } 1525 }
1423 1526
1424 error = sock_create_kern(family, type, protocol, &sock); 1527 error = __sock_create(net, family, type, protocol, &sock, 1);
1425 if (error < 0) 1528 if (error < 0)
1426 return ERR_PTR(error); 1529 return ERR_PTR(error);
1427 1530
@@ -1472,6 +1575,7 @@ static void svc_sock_detach(struct svc_xprt *xprt)
1472{ 1575{
1473 struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); 1576 struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
1474 struct sock *sk = svsk->sk_sk; 1577 struct sock *sk = svsk->sk_sk;
1578 wait_queue_head_t *wq;
1475 1579
1476 dprintk("svc: svc_sock_detach(%p)\n", svsk); 1580 dprintk("svc: svc_sock_detach(%p)\n", svsk);
1477 1581
@@ -1480,8 +1584,9 @@ static void svc_sock_detach(struct svc_xprt *xprt)
1480 sk->sk_data_ready = svsk->sk_odata; 1584 sk->sk_data_ready = svsk->sk_odata;
1481 sk->sk_write_space = svsk->sk_owspace; 1585 sk->sk_write_space = svsk->sk_owspace;
1482 1586
1483 if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) 1587 wq = sk_sleep(sk);
1484 wake_up_interruptible(sk_sleep(sk)); 1588 if (wq && waitqueue_active(wq))
1589 wake_up_interruptible(wq);
1485} 1590}
1486 1591
1487/* 1592/*
@@ -1495,8 +1600,10 @@ static void svc_tcp_sock_detach(struct svc_xprt *xprt)
1495 1600
1496 svc_sock_detach(xprt); 1601 svc_sock_detach(xprt);
1497 1602
1498 if (!test_bit(XPT_LISTENER, &xprt->xpt_flags)) 1603 if (!test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
1604 svc_tcp_clear_pages(svsk);
1499 kernel_sock_shutdown(svsk->sk_sock, SHUT_RDWR); 1605 kernel_sock_shutdown(svsk->sk_sock, SHUT_RDWR);
1606 }
1500} 1607}
1501 1608
1502/* 1609/*
@@ -1514,41 +1621,43 @@ static void svc_sock_free(struct svc_xprt *xprt)
1514 kfree(svsk); 1621 kfree(svsk);
1515} 1622}
1516 1623
1624#if defined(CONFIG_NFS_V4_1)
1517/* 1625/*
1518 * Create a svc_xprt. 1626 * Create a back channel svc_xprt which shares the fore channel socket.
1519 *
1520 * For internal use only (e.g. nfsv4.1 backchannel).
1521 * Callers should typically use the xpo_create() method.
1522 */ 1627 */
1523struct svc_xprt *svc_sock_create(struct svc_serv *serv, int prot) 1628static struct svc_xprt *svc_bc_create_socket(struct svc_serv *serv,
1629 int protocol,
1630 struct net *net,
1631 struct sockaddr *sin, int len,
1632 int flags)
1524{ 1633{
1525 struct svc_sock *svsk; 1634 struct svc_sock *svsk;
1526 struct svc_xprt *xprt = NULL; 1635 struct svc_xprt *xprt;
1636
1637 if (protocol != IPPROTO_TCP) {
1638 printk(KERN_WARNING "svc: only TCP sockets"
1639 " supported on shared back channel\n");
1640 return ERR_PTR(-EINVAL);
1641 }
1527 1642
1528 dprintk("svc: %s\n", __func__);
1529 svsk = kzalloc(sizeof(*svsk), GFP_KERNEL); 1643 svsk = kzalloc(sizeof(*svsk), GFP_KERNEL);
1530 if (!svsk) 1644 if (!svsk)
1531 goto out; 1645 return ERR_PTR(-ENOMEM);
1532 1646
1533 xprt = &svsk->sk_xprt; 1647 xprt = &svsk->sk_xprt;
1534 if (prot == IPPROTO_TCP) 1648 svc_xprt_init(&svc_tcp_bc_class, xprt, serv);
1535 svc_xprt_init(&svc_tcp_class, xprt, serv); 1649
1536 else if (prot == IPPROTO_UDP) 1650 serv->sv_bc_xprt = xprt;
1537 svc_xprt_init(&svc_udp_class, xprt, serv); 1651
1538 else
1539 BUG();
1540out:
1541 dprintk("svc: %s return %p\n", __func__, xprt);
1542 return xprt; 1652 return xprt;
1543} 1653}
1544EXPORT_SYMBOL_GPL(svc_sock_create);
1545 1654
1546/* 1655/*
1547 * Destroy a svc_sock. 1656 * Free a back channel svc_sock.
1548 */ 1657 */
1549void svc_sock_destroy(struct svc_xprt *xprt) 1658static void svc_bc_sock_free(struct svc_xprt *xprt)
1550{ 1659{
1551 if (xprt) 1660 if (xprt)
1552 kfree(container_of(xprt, struct svc_sock, sk_xprt)); 1661 kfree(container_of(xprt, struct svc_sock, sk_xprt));
1553} 1662}
1554EXPORT_SYMBOL_GPL(svc_sock_destroy); 1663#endif /* CONFIG_NFS_V4_1 */