aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtsock.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtsock.c')
-rw-r--r--net/sunrpc/xprtsock.c281
1 files changed, 150 insertions, 131 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index ae09d850cd11..53de72d2dded 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -50,6 +50,7 @@
50#include <linux/bvec.h> 50#include <linux/bvec.h>
51#include <linux/highmem.h> 51#include <linux/highmem.h>
52#include <linux/uio.h> 52#include <linux/uio.h>
53#include <linux/sched/mm.h>
53 54
54#include <trace/events/sunrpc.h> 55#include <trace/events/sunrpc.h>
55 56
@@ -404,8 +405,8 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
404 size_t want, seek_init = seek, offset = 0; 405 size_t want, seek_init = seek, offset = 0;
405 ssize_t ret; 406 ssize_t ret;
406 407
407 if (seek < buf->head[0].iov_len) { 408 want = min_t(size_t, count, buf->head[0].iov_len);
408 want = min_t(size_t, count, buf->head[0].iov_len); 409 if (seek < want) {
409 ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek); 410 ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek);
410 if (ret <= 0) 411 if (ret <= 0)
411 goto sock_err; 412 goto sock_err;
@@ -416,8 +417,8 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
416 goto out; 417 goto out;
417 seek = 0; 418 seek = 0;
418 } else { 419 } else {
419 seek -= buf->head[0].iov_len; 420 seek -= want;
420 offset += buf->head[0].iov_len; 421 offset += want;
421 } 422 }
422 423
423 want = xs_alloc_sparse_pages(buf, 424 want = xs_alloc_sparse_pages(buf,
@@ -442,8 +443,8 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
442 offset += want; 443 offset += want;
443 } 444 }
444 445
445 if (seek < buf->tail[0].iov_len) { 446 want = min_t(size_t, count - offset, buf->tail[0].iov_len);
446 want = min_t(size_t, count - offset, buf->tail[0].iov_len); 447 if (seek < want) {
447 ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek); 448 ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek);
448 if (ret <= 0) 449 if (ret <= 0)
449 goto sock_err; 450 goto sock_err;
@@ -453,7 +454,7 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
453 if (ret != want) 454 if (ret != want)
454 goto out; 455 goto out;
455 } else 456 } else
456 offset += buf->tail[0].iov_len; 457 offset = seek_init;
457 ret = -EMSGSIZE; 458 ret = -EMSGSIZE;
458out: 459out:
459 *read = offset - seek_init; 460 *read = offset - seek_init;
@@ -481,6 +482,14 @@ xs_read_stream_request_done(struct sock_xprt *transport)
481 return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT); 482 return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
482} 483}
483 484
485static void
486xs_read_stream_check_eor(struct sock_xprt *transport,
487 struct msghdr *msg)
488{
489 if (xs_read_stream_request_done(transport))
490 msg->msg_flags |= MSG_EOR;
491}
492
484static ssize_t 493static ssize_t
485xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg, 494xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
486 int flags, struct rpc_rqst *req) 495 int flags, struct rpc_rqst *req)
@@ -492,17 +501,21 @@ xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
492 xs_read_header(transport, buf); 501 xs_read_header(transport, buf);
493 502
494 want = transport->recv.len - transport->recv.offset; 503 want = transport->recv.len - transport->recv.offset;
495 ret = xs_read_xdr_buf(transport->sock, msg, flags, buf, 504 if (want != 0) {
496 transport->recv.copied + want, transport->recv.copied, 505 ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
497 &read); 506 transport->recv.copied + want,
498 transport->recv.offset += read; 507 transport->recv.copied,
499 transport->recv.copied += read; 508 &read);
500 if (transport->recv.offset == transport->recv.len) { 509 transport->recv.offset += read;
501 if (xs_read_stream_request_done(transport)) 510 transport->recv.copied += read;
502 msg->msg_flags |= MSG_EOR;
503 return read;
504 } 511 }
505 512
513 if (transport->recv.offset == transport->recv.len)
514 xs_read_stream_check_eor(transport, msg);
515
516 if (want == 0)
517 return 0;
518
506 switch (ret) { 519 switch (ret) {
507 default: 520 default:
508 break; 521 break;
@@ -655,13 +668,34 @@ out_err:
655 return ret != 0 ? ret : -ESHUTDOWN; 668 return ret != 0 ? ret : -ESHUTDOWN;
656} 669}
657 670
671static __poll_t xs_poll_socket(struct sock_xprt *transport)
672{
673 return transport->sock->ops->poll(NULL, transport->sock, NULL);
674}
675
676static bool xs_poll_socket_readable(struct sock_xprt *transport)
677{
678 __poll_t events = xs_poll_socket(transport);
679
680 return (events & (EPOLLIN | EPOLLRDNORM)) && !(events & EPOLLRDHUP);
681}
682
683static void xs_poll_check_readable(struct sock_xprt *transport)
684{
685
686 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
687 if (!xs_poll_socket_readable(transport))
688 return;
689 if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
690 queue_work(xprtiod_workqueue, &transport->recv_worker);
691}
692
658static void xs_stream_data_receive(struct sock_xprt *transport) 693static void xs_stream_data_receive(struct sock_xprt *transport)
659{ 694{
660 size_t read = 0; 695 size_t read = 0;
661 ssize_t ret = 0; 696 ssize_t ret = 0;
662 697
663 mutex_lock(&transport->recv_mutex); 698 mutex_lock(&transport->recv_mutex);
664 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
665 if (transport->sock == NULL) 699 if (transport->sock == NULL)
666 goto out; 700 goto out;
667 for (;;) { 701 for (;;) {
@@ -671,6 +705,10 @@ static void xs_stream_data_receive(struct sock_xprt *transport)
671 read += ret; 705 read += ret;
672 cond_resched(); 706 cond_resched();
673 } 707 }
708 if (ret == -ESHUTDOWN)
709 kernel_sock_shutdown(transport->sock, SHUT_RDWR);
710 else
711 xs_poll_check_readable(transport);
674out: 712out:
675 mutex_unlock(&transport->recv_mutex); 713 mutex_unlock(&transport->recv_mutex);
676 trace_xs_stream_read_data(&transport->xprt, ret, read); 714 trace_xs_stream_read_data(&transport->xprt, ret, read);
@@ -680,7 +718,10 @@ static void xs_stream_data_receive_workfn(struct work_struct *work)
680{ 718{
681 struct sock_xprt *transport = 719 struct sock_xprt *transport =
682 container_of(work, struct sock_xprt, recv_worker); 720 container_of(work, struct sock_xprt, recv_worker);
721 unsigned int pflags = memalloc_nofs_save();
722
683 xs_stream_data_receive(transport); 723 xs_stream_data_receive(transport);
724 memalloc_nofs_restore(pflags);
684} 725}
685 726
686static void 727static void
@@ -690,99 +731,65 @@ xs_stream_reset_connect(struct sock_xprt *transport)
690 transport->recv.len = 0; 731 transport->recv.len = 0;
691 transport->recv.copied = 0; 732 transport->recv.copied = 0;
692 transport->xmit.offset = 0; 733 transport->xmit.offset = 0;
734}
735
736static void
737xs_stream_start_connect(struct sock_xprt *transport)
738{
693 transport->xprt.stat.connect_count++; 739 transport->xprt.stat.connect_count++;
694 transport->xprt.stat.connect_start = jiffies; 740 transport->xprt.stat.connect_start = jiffies;
695} 741}
696 742
697#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) 743#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL)
698 744
745static int xs_sendmsg(struct socket *sock, struct msghdr *msg, size_t seek)
746{
747 if (seek)
748 iov_iter_advance(&msg->msg_iter, seek);
749 return sock_sendmsg(sock, msg);
750}
751
752static int xs_send_kvec(struct socket *sock, struct msghdr *msg, struct kvec *vec, size_t seek)
753{
754 iov_iter_kvec(&msg->msg_iter, WRITE, vec, 1, vec->iov_len);
755 return xs_sendmsg(sock, msg, seek);
756}
757
758static int xs_send_pagedata(struct socket *sock, struct msghdr *msg, struct xdr_buf *xdr, size_t base)
759{
760 int err;
761
762 err = xdr_alloc_bvec(xdr, GFP_KERNEL);
763 if (err < 0)
764 return err;
765
766 iov_iter_bvec(&msg->msg_iter, WRITE, xdr->bvec,
767 xdr_buf_pagecount(xdr),
768 xdr->page_len + xdr->page_base);
769 return xs_sendmsg(sock, msg, base + xdr->page_base);
770}
771
772#define xs_record_marker_len() sizeof(rpc_fraghdr)
773
699/* Common case: 774/* Common case:
700 * - stream transport 775 * - stream transport
701 * - sending from byte 0 of the message 776 * - sending from byte 0 of the message
702 * - the message is wholly contained in @xdr's head iovec 777 * - the message is wholly contained in @xdr's head iovec
703 */ 778 */
704static int xs_send_rm_and_kvec(struct socket *sock, struct xdr_buf *xdr, 779static int xs_send_rm_and_kvec(struct socket *sock, struct msghdr *msg,
705 unsigned int remainder) 780 rpc_fraghdr marker, struct kvec *vec, size_t base)
706{ 781{
707 struct msghdr msg = {
708 .msg_flags = XS_SENDMSG_FLAGS | (remainder ? MSG_MORE : 0)
709 };
710 rpc_fraghdr marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT |
711 (u32)xdr->len);
712 struct kvec iov[2] = { 782 struct kvec iov[2] = {
713 { 783 [0] = {
714 .iov_base = &marker, 784 .iov_base = &marker,
715 .iov_len = sizeof(marker) 785 .iov_len = sizeof(marker)
716 }, 786 },
717 { 787 [1] = *vec,
718 .iov_base = xdr->head[0].iov_base,
719 .iov_len = xdr->head[0].iov_len
720 },
721 };
722 int ret;
723
724 ret = kernel_sendmsg(sock, &msg, iov, 2,
725 iov[0].iov_len + iov[1].iov_len);
726 if (ret < 0)
727 return ret;
728 if (ret < iov[0].iov_len)
729 return -EPIPE;
730 return ret - iov[0].iov_len;
731}
732
733static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
734{
735 struct msghdr msg = {
736 .msg_name = addr,
737 .msg_namelen = addrlen,
738 .msg_flags = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
739 };
740 struct kvec iov = {
741 .iov_base = vec->iov_base + base,
742 .iov_len = vec->iov_len - base,
743 }; 788 };
789 size_t len = iov[0].iov_len + iov[1].iov_len;
744 790
745 if (iov.iov_len != 0) 791 iov_iter_kvec(&msg->msg_iter, WRITE, iov, 2, len);
746 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); 792 return xs_sendmsg(sock, msg, base);
747 return kernel_sendmsg(sock, &msg, NULL, 0, 0);
748}
749
750static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy, int *sent_p)
751{
752 ssize_t (*do_sendpage)(struct socket *sock, struct page *page,
753 int offset, size_t size, int flags);
754 struct page **ppage;
755 unsigned int remainder;
756 int err;
757
758 remainder = xdr->page_len - base;
759 base += xdr->page_base;
760 ppage = xdr->pages + (base >> PAGE_SHIFT);
761 base &= ~PAGE_MASK;
762 do_sendpage = sock->ops->sendpage;
763 if (!zerocopy)
764 do_sendpage = sock_no_sendpage;
765 for(;;) {
766 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
767 int flags = XS_SENDMSG_FLAGS;
768
769 remainder -= len;
770 if (more)
771 flags |= MSG_MORE;
772 if (remainder != 0)
773 flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE;
774 err = do_sendpage(sock, *ppage, base, len, flags);
775 if (remainder == 0 || err != len)
776 break;
777 *sent_p += err;
778 ppage++;
779 base = 0;
780 }
781 if (err > 0) {
782 *sent_p += err;
783 err = 0;
784 }
785 return err;
786} 793}
787 794
788/** 795/**
@@ -792,53 +799,60 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
792 * @addrlen: UDP only -- length of destination address 799 * @addrlen: UDP only -- length of destination address
793 * @xdr: buffer containing this request 800 * @xdr: buffer containing this request
794 * @base: starting position in the buffer 801 * @base: starting position in the buffer
795 * @zerocopy: true if it is safe to use sendpage() 802 * @rm: stream record marker field
796 * @sent_p: return the total number of bytes successfully queued for sending 803 * @sent_p: return the total number of bytes successfully queued for sending
797 * 804 *
798 */ 805 */
799static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy, int *sent_p) 806static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, rpc_fraghdr rm, int *sent_p)
800{ 807{
801 unsigned int remainder = xdr->len - base; 808 struct msghdr msg = {
809 .msg_name = addr,
810 .msg_namelen = addrlen,
811 .msg_flags = XS_SENDMSG_FLAGS | MSG_MORE,
812 };
813 unsigned int rmsize = rm ? sizeof(rm) : 0;
814 unsigned int remainder = rmsize + xdr->len - base;
815 unsigned int want;
802 int err = 0; 816 int err = 0;
803 int sent = 0;
804 817
805 if (unlikely(!sock)) 818 if (unlikely(!sock))
806 return -ENOTSOCK; 819 return -ENOTSOCK;
807 820
808 if (base != 0) { 821 want = xdr->head[0].iov_len + rmsize;
809 addr = NULL; 822 if (base < want) {
810 addrlen = 0; 823 unsigned int len = want - base;
811 }
812
813 if (base < xdr->head[0].iov_len || addr != NULL) {
814 unsigned int len = xdr->head[0].iov_len - base;
815 remainder -= len; 824 remainder -= len;
816 if (!base && !addr) 825 if (remainder == 0)
817 err = xs_send_rm_and_kvec(sock, xdr, remainder); 826 msg.msg_flags &= ~MSG_MORE;
827 if (rmsize)
828 err = xs_send_rm_and_kvec(sock, &msg, rm,
829 &xdr->head[0], base);
818 else 830 else
819 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], 831 err = xs_send_kvec(sock, &msg, &xdr->head[0], base);
820 base, remainder != 0);
821 if (remainder == 0 || err != len) 832 if (remainder == 0 || err != len)
822 goto out; 833 goto out;
823 *sent_p += err; 834 *sent_p += err;
824 base = 0; 835 base = 0;
825 } else 836 } else
826 base -= xdr->head[0].iov_len; 837 base -= want;
827 838
828 if (base < xdr->page_len) { 839 if (base < xdr->page_len) {
829 unsigned int len = xdr->page_len - base; 840 unsigned int len = xdr->page_len - base;
830 remainder -= len; 841 remainder -= len;
831 err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy, &sent); 842 if (remainder == 0)
832 *sent_p += sent; 843 msg.msg_flags &= ~MSG_MORE;
833 if (remainder == 0 || sent != len) 844 err = xs_send_pagedata(sock, &msg, xdr, base);
845 if (remainder == 0 || err != len)
834 goto out; 846 goto out;
847 *sent_p += err;
835 base = 0; 848 base = 0;
836 } else 849 } else
837 base -= xdr->page_len; 850 base -= xdr->page_len;
838 851
839 if (base >= xdr->tail[0].iov_len) 852 if (base >= xdr->tail[0].iov_len)
840 return 0; 853 return 0;
841 err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0); 854 msg.msg_flags &= ~MSG_MORE;
855 err = xs_send_kvec(sock, &msg, &xdr->tail[0], base);
842out: 856out:
843 if (err > 0) { 857 if (err > 0) {
844 *sent_p += err; 858 *sent_p += err;
@@ -907,6 +921,17 @@ xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
907 return transport->xmit.offset != 0 && req->rq_bytes_sent == 0; 921 return transport->xmit.offset != 0 && req->rq_bytes_sent == 0;
908} 922}
909 923
924/*
925 * Return the stream record marker field for a record of length < 2^31-1
926 */
927static rpc_fraghdr
928xs_stream_record_marker(struct xdr_buf *xdr)
929{
930 if (!xdr->len)
931 return 0;
932 return cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | (u32)xdr->len);
933}
934
910/** 935/**
911 * xs_local_send_request - write an RPC request to an AF_LOCAL socket 936 * xs_local_send_request - write an RPC request to an AF_LOCAL socket
912 * @req: pointer to RPC request 937 * @req: pointer to RPC request
@@ -939,7 +964,8 @@ static int xs_local_send_request(struct rpc_rqst *req)
939 req->rq_xtime = ktime_get(); 964 req->rq_xtime = ktime_get();
940 status = xs_sendpages(transport->sock, NULL, 0, xdr, 965 status = xs_sendpages(transport->sock, NULL, 0, xdr,
941 transport->xmit.offset, 966 transport->xmit.offset,
942 true, &sent); 967 xs_stream_record_marker(xdr),
968 &sent);
943 dprintk("RPC: %s(%u) = %d\n", 969 dprintk("RPC: %s(%u) = %d\n",
944 __func__, xdr->len - transport->xmit.offset, status); 970 __func__, xdr->len - transport->xmit.offset, status);
945 971
@@ -951,7 +977,6 @@ static int xs_local_send_request(struct rpc_rqst *req)
951 req->rq_bytes_sent = transport->xmit.offset; 977 req->rq_bytes_sent = transport->xmit.offset;
952 if (likely(req->rq_bytes_sent >= req->rq_slen)) { 978 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
953 req->rq_xmit_bytes_sent += transport->xmit.offset; 979 req->rq_xmit_bytes_sent += transport->xmit.offset;
954 req->rq_bytes_sent = 0;
955 transport->xmit.offset = 0; 980 transport->xmit.offset = 0;
956 return 0; 981 return 0;
957 } 982 }
@@ -1007,7 +1032,7 @@ static int xs_udp_send_request(struct rpc_rqst *req)
1007 1032
1008 req->rq_xtime = ktime_get(); 1033 req->rq_xtime = ktime_get();
1009 status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen, 1034 status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
1010 xdr, 0, true, &sent); 1035 xdr, 0, 0, &sent);
1011 1036
1012 dprintk("RPC: xs_udp_send_request(%u) = %d\n", 1037 dprintk("RPC: xs_udp_send_request(%u) = %d\n",
1013 xdr->len, status); 1038 xdr->len, status);
@@ -1071,7 +1096,6 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
1071 struct rpc_xprt *xprt = req->rq_xprt; 1096 struct rpc_xprt *xprt = req->rq_xprt;
1072 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1097 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1073 struct xdr_buf *xdr = &req->rq_snd_buf; 1098 struct xdr_buf *xdr = &req->rq_snd_buf;
1074 bool zerocopy = true;
1075 bool vm_wait = false; 1099 bool vm_wait = false;
1076 int status; 1100 int status;
1077 int sent; 1101 int sent;
@@ -1086,12 +1110,6 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
1086 xs_pktdump("packet data:", 1110 xs_pktdump("packet data:",
1087 req->rq_svec->iov_base, 1111 req->rq_svec->iov_base,
1088 req->rq_svec->iov_len); 1112 req->rq_svec->iov_len);
1089 /* Don't use zero copy if this is a resend. If the RPC call
1090 * completes while the socket holds a reference to the pages,
1091 * then we may end up resending corrupted data.
1092 */
1093 if (req->rq_task->tk_flags & RPC_TASK_SENT)
1094 zerocopy = false;
1095 1113
1096 if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state)) 1114 if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
1097 xs_tcp_set_socket_timeouts(xprt, transport->sock); 1115 xs_tcp_set_socket_timeouts(xprt, transport->sock);
@@ -1104,7 +1122,8 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
1104 sent = 0; 1122 sent = 0;
1105 status = xs_sendpages(transport->sock, NULL, 0, xdr, 1123 status = xs_sendpages(transport->sock, NULL, 0, xdr,
1106 transport->xmit.offset, 1124 transport->xmit.offset,
1107 zerocopy, &sent); 1125 xs_stream_record_marker(xdr),
1126 &sent);
1108 1127
1109 dprintk("RPC: xs_tcp_send_request(%u) = %d\n", 1128 dprintk("RPC: xs_tcp_send_request(%u) = %d\n",
1110 xdr->len - transport->xmit.offset, status); 1129 xdr->len - transport->xmit.offset, status);
@@ -1115,7 +1134,6 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
1115 req->rq_bytes_sent = transport->xmit.offset; 1134 req->rq_bytes_sent = transport->xmit.offset;
1116 if (likely(req->rq_bytes_sent >= req->rq_slen)) { 1135 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
1117 req->rq_xmit_bytes_sent += transport->xmit.offset; 1136 req->rq_xmit_bytes_sent += transport->xmit.offset;
1118 req->rq_bytes_sent = 0;
1119 transport->xmit.offset = 0; 1137 transport->xmit.offset = 0;
1120 return 0; 1138 return 0;
1121 } 1139 }
@@ -1255,6 +1273,8 @@ static void xs_reset_transport(struct sock_xprt *transport)
1255 xprt_clear_connected(xprt); 1273 xprt_clear_connected(xprt);
1256 write_unlock_bh(&sk->sk_callback_lock); 1274 write_unlock_bh(&sk->sk_callback_lock);
1257 xs_sock_reset_connection_flags(xprt); 1275 xs_sock_reset_connection_flags(xprt);
1276 /* Reset stream record info */
1277 xs_stream_reset_connect(transport);
1258 mutex_unlock(&transport->recv_mutex); 1278 mutex_unlock(&transport->recv_mutex);
1259 1279
1260 trace_rpc_socket_close(xprt, sock); 1280 trace_rpc_socket_close(xprt, sock);
@@ -1382,7 +1402,6 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
1382 int err; 1402 int err;
1383 1403
1384 mutex_lock(&transport->recv_mutex); 1404 mutex_lock(&transport->recv_mutex);
1385 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
1386 sk = transport->inet; 1405 sk = transport->inet;
1387 if (sk == NULL) 1406 if (sk == NULL)
1388 goto out; 1407 goto out;
@@ -1394,6 +1413,7 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
1394 consume_skb(skb); 1413 consume_skb(skb);
1395 cond_resched(); 1414 cond_resched();
1396 } 1415 }
1416 xs_poll_check_readable(transport);
1397out: 1417out:
1398 mutex_unlock(&transport->recv_mutex); 1418 mutex_unlock(&transport->recv_mutex);
1399} 1419}
@@ -1402,7 +1422,10 @@ static void xs_udp_data_receive_workfn(struct work_struct *work)
1402{ 1422{
1403 struct sock_xprt *transport = 1423 struct sock_xprt *transport =
1404 container_of(work, struct sock_xprt, recv_worker); 1424 container_of(work, struct sock_xprt, recv_worker);
1425 unsigned int pflags = memalloc_nofs_save();
1426
1405 xs_udp_data_receive(transport); 1427 xs_udp_data_receive(transport);
1428 memalloc_nofs_restore(pflags);
1406} 1429}
1407 1430
1408/** 1431/**
@@ -1893,7 +1916,6 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1893 sk->sk_write_space = xs_udp_write_space; 1916 sk->sk_write_space = xs_udp_write_space;
1894 sock_set_flag(sk, SOCK_FASYNC); 1917 sock_set_flag(sk, SOCK_FASYNC);
1895 sk->sk_error_report = xs_error_report; 1918 sk->sk_error_report = xs_error_report;
1896 sk->sk_allocation = GFP_NOIO;
1897 1919
1898 xprt_clear_connected(xprt); 1920 xprt_clear_connected(xprt);
1899 1921
@@ -1904,7 +1926,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1904 write_unlock_bh(&sk->sk_callback_lock); 1926 write_unlock_bh(&sk->sk_callback_lock);
1905 } 1927 }
1906 1928
1907 xs_stream_reset_connect(transport); 1929 xs_stream_start_connect(transport);
1908 1930
1909 return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); 1931 return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
1910} 1932}
@@ -2081,7 +2103,6 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2081 sk->sk_data_ready = xs_data_ready; 2103 sk->sk_data_ready = xs_data_ready;
2082 sk->sk_write_space = xs_udp_write_space; 2104 sk->sk_write_space = xs_udp_write_space;
2083 sock_set_flag(sk, SOCK_FASYNC); 2105 sock_set_flag(sk, SOCK_FASYNC);
2084 sk->sk_allocation = GFP_NOIO;
2085 2106
2086 xprt_set_connected(xprt); 2107 xprt_set_connected(xprt);
2087 2108
@@ -2244,7 +2265,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2244 sk->sk_write_space = xs_tcp_write_space; 2265 sk->sk_write_space = xs_tcp_write_space;
2245 sock_set_flag(sk, SOCK_FASYNC); 2266 sock_set_flag(sk, SOCK_FASYNC);
2246 sk->sk_error_report = xs_error_report; 2267 sk->sk_error_report = xs_error_report;
2247 sk->sk_allocation = GFP_NOIO;
2248 2268
2249 /* socket options */ 2269 /* socket options */
2250 sock_reset_flag(sk, SOCK_LINGER); 2270 sock_reset_flag(sk, SOCK_LINGER);
@@ -2264,8 +2284,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2264 2284
2265 xs_set_memalloc(xprt); 2285 xs_set_memalloc(xprt);
2266 2286
2267 /* Reset TCP record info */ 2287 xs_stream_start_connect(transport);
2268 xs_stream_reset_connect(transport);
2269 2288
2270 /* Tell the socket layer to start connecting... */ 2289 /* Tell the socket layer to start connecting... */
2271 set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); 2290 set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);