aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorTrond Myklebust <trond.myklebust@primarydata.com>2015-10-08 10:46:19 -0400
committerTrond Myklebust <trond.myklebust@primarydata.com>2015-10-08 10:46:19 -0400
commit120bf961b90adb8e76be827b1a68efe3d1019419 (patch)
tree3d08a757172e9982bce628044399aebc638be41a /net
parent037fc9808a777f6fb6a54c6510b9656716e0c8c8 (diff)
parent31303d6cbb24ba94e8b82170213bd2fde6365d9a (diff)
Merge branch 'sunrpc'
* sunrpc: SUNRPC: Use MSG_SENDPAGE_NOTLAST in xs_send_pagedata() SUNRPC: Move AF_LOCAL receive data path into a workqueue context SUNRPC: Move UDP receive data path into a workqueue context SUNRPC: Move TCP receive data path into a workqueue context SUNRPC: Refactor TCP receive
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/xprtsock.c241
1 files changed, 164 insertions, 77 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 1a85e0ed0b48..e71aff251ac1 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -360,8 +360,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
360 int flags = XS_SENDMSG_FLAGS; 360 int flags = XS_SENDMSG_FLAGS;
361 361
362 remainder -= len; 362 remainder -= len;
363 if (remainder != 0 || more) 363 if (more)
364 flags |= MSG_MORE; 364 flags |= MSG_MORE;
365 if (remainder != 0)
366 flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE;
365 err = do_sendpage(sock, *ppage, base, len, flags); 367 err = do_sendpage(sock, *ppage, base, len, flags);
366 if (remainder == 0 || err != len) 368 if (remainder == 0 || err != len)
367 break; 369 break;
@@ -823,6 +825,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
823 825
824 kernel_sock_shutdown(sock, SHUT_RDWR); 826 kernel_sock_shutdown(sock, SHUT_RDWR);
825 827
828 mutex_lock(&transport->recv_mutex);
826 write_lock_bh(&sk->sk_callback_lock); 829 write_lock_bh(&sk->sk_callback_lock);
827 transport->inet = NULL; 830 transport->inet = NULL;
828 transport->sock = NULL; 831 transport->sock = NULL;
@@ -833,6 +836,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
833 xprt_clear_connected(xprt); 836 xprt_clear_connected(xprt);
834 write_unlock_bh(&sk->sk_callback_lock); 837 write_unlock_bh(&sk->sk_callback_lock);
835 xs_sock_reset_connection_flags(xprt); 838 xs_sock_reset_connection_flags(xprt);
839 mutex_unlock(&transport->recv_mutex);
836 840
837 trace_rpc_socket_close(xprt, sock); 841 trace_rpc_socket_close(xprt, sock);
838 sock_release(sock); 842 sock_release(sock);
@@ -886,6 +890,7 @@ static void xs_destroy(struct rpc_xprt *xprt)
886 890
887 cancel_delayed_work_sync(&transport->connect_worker); 891 cancel_delayed_work_sync(&transport->connect_worker);
888 xs_close(xprt); 892 xs_close(xprt);
893 cancel_work_sync(&transport->recv_worker);
889 xs_xprt_free(xprt); 894 xs_xprt_free(xprt);
890 module_put(THIS_MODULE); 895 module_put(THIS_MODULE);
891} 896}
@@ -906,44 +911,36 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
906} 911}
907 912
908/** 913/**
909 * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets 914 * xs_local_data_read_skb
910 * @sk: socket with data to read 915 * @xprt: transport
916 * @sk: socket
917 * @skb: skbuff
911 * 918 *
912 * Currently this assumes we can read the whole reply in a single gulp. 919 * Currently this assumes we can read the whole reply in a single gulp.
913 */ 920 */
914static void xs_local_data_ready(struct sock *sk) 921static void xs_local_data_read_skb(struct rpc_xprt *xprt,
922 struct sock *sk,
923 struct sk_buff *skb)
915{ 924{
916 struct rpc_task *task; 925 struct rpc_task *task;
917 struct rpc_xprt *xprt;
918 struct rpc_rqst *rovr; 926 struct rpc_rqst *rovr;
919 struct sk_buff *skb; 927 int repsize, copied;
920 int err, repsize, copied;
921 u32 _xid; 928 u32 _xid;
922 __be32 *xp; 929 __be32 *xp;
923 930
924 read_lock_bh(&sk->sk_callback_lock);
925 dprintk("RPC: %s...\n", __func__);
926 xprt = xprt_from_sock(sk);
927 if (xprt == NULL)
928 goto out;
929
930 skb = skb_recv_datagram(sk, 0, 1, &err);
931 if (skb == NULL)
932 goto out;
933
934 repsize = skb->len - sizeof(rpc_fraghdr); 931 repsize = skb->len - sizeof(rpc_fraghdr);
935 if (repsize < 4) { 932 if (repsize < 4) {
936 dprintk("RPC: impossible RPC reply size %d\n", repsize); 933 dprintk("RPC: impossible RPC reply size %d\n", repsize);
937 goto dropit; 934 return;
938 } 935 }
939 936
940 /* Copy the XID from the skb... */ 937 /* Copy the XID from the skb... */
941 xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); 938 xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
942 if (xp == NULL) 939 if (xp == NULL)
943 goto dropit; 940 return;
944 941
945 /* Look up and lock the request corresponding to the given XID */ 942 /* Look up and lock the request corresponding to the given XID */
946 spin_lock(&xprt->transport_lock); 943 spin_lock_bh(&xprt->transport_lock);
947 rovr = xprt_lookup_rqst(xprt, *xp); 944 rovr = xprt_lookup_rqst(xprt, *xp);
948 if (!rovr) 945 if (!rovr)
949 goto out_unlock; 946 goto out_unlock;
@@ -961,50 +958,68 @@ static void xs_local_data_ready(struct sock *sk)
961 xprt_complete_rqst(task, copied); 958 xprt_complete_rqst(task, copied);
962 959
963 out_unlock: 960 out_unlock:
964 spin_unlock(&xprt->transport_lock); 961 spin_unlock_bh(&xprt->transport_lock);
965 dropit: 962}
966 skb_free_datagram(sk, skb); 963
967 out: 964static void xs_local_data_receive(struct sock_xprt *transport)
968 read_unlock_bh(&sk->sk_callback_lock); 965{
966 struct sk_buff *skb;
967 struct sock *sk;
968 int err;
969
970 mutex_lock(&transport->recv_mutex);
971 sk = transport->inet;
972 if (sk == NULL)
973 goto out;
974 for (;;) {
975 skb = skb_recv_datagram(sk, 0, 1, &err);
976 if (skb == NULL)
977 break;
978 xs_local_data_read_skb(&transport->xprt, sk, skb);
979 skb_free_datagram(sk, skb);
980 }
981out:
982 mutex_unlock(&transport->recv_mutex);
983}
984
985static void xs_local_data_receive_workfn(struct work_struct *work)
986{
987 struct sock_xprt *transport =
988 container_of(work, struct sock_xprt, recv_worker);
989 xs_local_data_receive(transport);
969} 990}
970 991
971/** 992/**
972 * xs_udp_data_ready - "data ready" callback for UDP sockets 993 * xs_udp_data_read_skb - receive callback for UDP sockets
973 * @sk: socket with data to read 994 * @xprt: transport
995 * @sk: socket
996 * @skb: skbuff
974 * 997 *
975 */ 998 */
976static void xs_udp_data_ready(struct sock *sk) 999static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
1000 struct sock *sk,
1001 struct sk_buff *skb)
977{ 1002{
978 struct rpc_task *task; 1003 struct rpc_task *task;
979 struct rpc_xprt *xprt;
980 struct rpc_rqst *rovr; 1004 struct rpc_rqst *rovr;
981 struct sk_buff *skb; 1005 int repsize, copied;
982 int err, repsize, copied;
983 u32 _xid; 1006 u32 _xid;
984 __be32 *xp; 1007 __be32 *xp;
985 1008
986 read_lock_bh(&sk->sk_callback_lock);
987 dprintk("RPC: xs_udp_data_ready...\n");
988 if (!(xprt = xprt_from_sock(sk)))
989 goto out;
990
991 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
992 goto out;
993
994 repsize = skb->len - sizeof(struct udphdr); 1009 repsize = skb->len - sizeof(struct udphdr);
995 if (repsize < 4) { 1010 if (repsize < 4) {
996 dprintk("RPC: impossible RPC reply size %d!\n", repsize); 1011 dprintk("RPC: impossible RPC reply size %d!\n", repsize);
997 goto dropit; 1012 return;
998 } 1013 }
999 1014
1000 /* Copy the XID from the skb... */ 1015 /* Copy the XID from the skb... */
1001 xp = skb_header_pointer(skb, sizeof(struct udphdr), 1016 xp = skb_header_pointer(skb, sizeof(struct udphdr),
1002 sizeof(_xid), &_xid); 1017 sizeof(_xid), &_xid);
1003 if (xp == NULL) 1018 if (xp == NULL)
1004 goto dropit; 1019 return;
1005 1020
1006 /* Look up and lock the request corresponding to the given XID */ 1021 /* Look up and lock the request corresponding to the given XID */
1007 spin_lock(&xprt->transport_lock); 1022 spin_lock_bh(&xprt->transport_lock);
1008 rovr = xprt_lookup_rqst(xprt, *xp); 1023 rovr = xprt_lookup_rqst(xprt, *xp);
1009 if (!rovr) 1024 if (!rovr)
1010 goto out_unlock; 1025 goto out_unlock;
@@ -1025,10 +1040,54 @@ static void xs_udp_data_ready(struct sock *sk)
1025 xprt_complete_rqst(task, copied); 1040 xprt_complete_rqst(task, copied);
1026 1041
1027 out_unlock: 1042 out_unlock:
1028 spin_unlock(&xprt->transport_lock); 1043 spin_unlock_bh(&xprt->transport_lock);
1029 dropit: 1044}
1030 skb_free_datagram(sk, skb); 1045
1031 out: 1046static void xs_udp_data_receive(struct sock_xprt *transport)
1047{
1048 struct sk_buff *skb;
1049 struct sock *sk;
1050 int err;
1051
1052 mutex_lock(&transport->recv_mutex);
1053 sk = transport->inet;
1054 if (sk == NULL)
1055 goto out;
1056 for (;;) {
1057 skb = skb_recv_datagram(sk, 0, 1, &err);
1058 if (skb == NULL)
1059 break;
1060 xs_udp_data_read_skb(&transport->xprt, sk, skb);
1061 skb_free_datagram(sk, skb);
1062 }
1063out:
1064 mutex_unlock(&transport->recv_mutex);
1065}
1066
1067static void xs_udp_data_receive_workfn(struct work_struct *work)
1068{
1069 struct sock_xprt *transport =
1070 container_of(work, struct sock_xprt, recv_worker);
1071 xs_udp_data_receive(transport);
1072}
1073
1074/**
1075 * xs_data_ready - "data ready" callback for UDP sockets
1076 * @sk: socket with data to read
1077 *
1078 */
1079static void xs_data_ready(struct sock *sk)
1080{
1081 struct rpc_xprt *xprt;
1082
1083 read_lock_bh(&sk->sk_callback_lock);
1084 dprintk("RPC: xs_data_ready...\n");
1085 xprt = xprt_from_sock(sk);
1086 if (xprt != NULL) {
1087 struct sock_xprt *transport = container_of(xprt,
1088 struct sock_xprt, xprt);
1089 queue_work(rpciod_workqueue, &transport->recv_worker);
1090 }
1032 read_unlock_bh(&sk->sk_callback_lock); 1091 read_unlock_bh(&sk->sk_callback_lock);
1033} 1092}
1034 1093
@@ -1243,12 +1302,12 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1243 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); 1302 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid));
1244 1303
1245 /* Find and lock the request corresponding to this xid */ 1304 /* Find and lock the request corresponding to this xid */
1246 spin_lock(&xprt->transport_lock); 1305 spin_lock_bh(&xprt->transport_lock);
1247 req = xprt_lookup_rqst(xprt, transport->tcp_xid); 1306 req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1248 if (!req) { 1307 if (!req) {
1249 dprintk("RPC: XID %08x request not found!\n", 1308 dprintk("RPC: XID %08x request not found!\n",
1250 ntohl(transport->tcp_xid)); 1309 ntohl(transport->tcp_xid));
1251 spin_unlock(&xprt->transport_lock); 1310 spin_unlock_bh(&xprt->transport_lock);
1252 return -1; 1311 return -1;
1253 } 1312 }
1254 1313
@@ -1257,7 +1316,7 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1257 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) 1316 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1258 xprt_complete_rqst(req->rq_task, transport->tcp_copied); 1317 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1259 1318
1260 spin_unlock(&xprt->transport_lock); 1319 spin_unlock_bh(&xprt->transport_lock);
1261 return 0; 1320 return 0;
1262} 1321}
1263 1322
@@ -1277,10 +1336,10 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
1277 struct rpc_rqst *req; 1336 struct rpc_rqst *req;
1278 1337
1279 /* Look up and lock the request corresponding to the given XID */ 1338 /* Look up and lock the request corresponding to the given XID */
1280 spin_lock(&xprt->transport_lock); 1339 spin_lock_bh(&xprt->transport_lock);
1281 req = xprt_lookup_bc_request(xprt, transport->tcp_xid); 1340 req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
1282 if (req == NULL) { 1341 if (req == NULL) {
1283 spin_unlock(&xprt->transport_lock); 1342 spin_unlock_bh(&xprt->transport_lock);
1284 printk(KERN_WARNING "Callback slot table overflowed\n"); 1343 printk(KERN_WARNING "Callback slot table overflowed\n");
1285 xprt_force_disconnect(xprt); 1344 xprt_force_disconnect(xprt);
1286 return -1; 1345 return -1;
@@ -1291,7 +1350,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
1291 1350
1292 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) 1351 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1293 xprt_complete_bc_request(req, transport->tcp_copied); 1352 xprt_complete_bc_request(req, transport->tcp_copied);
1294 spin_unlock(&xprt->transport_lock); 1353 spin_unlock_bh(&xprt->transport_lock);
1295 1354
1296 return 0; 1355 return 0;
1297} 1356}
@@ -1391,6 +1450,44 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
1391 return len - desc.count; 1450 return len - desc.count;
1392} 1451}
1393 1452
1453static void xs_tcp_data_receive(struct sock_xprt *transport)
1454{
1455 struct rpc_xprt *xprt = &transport->xprt;
1456 struct sock *sk;
1457 read_descriptor_t rd_desc = {
1458 .count = 2*1024*1024,
1459 .arg.data = xprt,
1460 };
1461 unsigned long total = 0;
1462 int read = 0;
1463
1464 mutex_lock(&transport->recv_mutex);
1465 sk = transport->inet;
1466 if (sk == NULL)
1467 goto out;
1468
1469 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1470 for (;;) {
1471 lock_sock(sk);
1472 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1473 release_sock(sk);
1474 if (read <= 0)
1475 break;
1476 total += read;
1477 rd_desc.count = 65536;
1478 }
1479out:
1480 mutex_unlock(&transport->recv_mutex);
1481 trace_xs_tcp_data_ready(xprt, read, total);
1482}
1483
1484static void xs_tcp_data_receive_workfn(struct work_struct *work)
1485{
1486 struct sock_xprt *transport =
1487 container_of(work, struct sock_xprt, recv_worker);
1488 xs_tcp_data_receive(transport);
1489}
1490
1394/** 1491/**
1395 * xs_tcp_data_ready - "data ready" callback for TCP sockets 1492 * xs_tcp_data_ready - "data ready" callback for TCP sockets
1396 * @sk: socket with data to read 1493 * @sk: socket with data to read
@@ -1398,34 +1495,24 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
1398 */ 1495 */
1399static void xs_tcp_data_ready(struct sock *sk) 1496static void xs_tcp_data_ready(struct sock *sk)
1400{ 1497{
1498 struct sock_xprt *transport;
1401 struct rpc_xprt *xprt; 1499 struct rpc_xprt *xprt;
1402 read_descriptor_t rd_desc;
1403 int read;
1404 unsigned long total = 0;
1405 1500
1406 dprintk("RPC: xs_tcp_data_ready...\n"); 1501 dprintk("RPC: xs_tcp_data_ready...\n");
1407 1502
1408 read_lock_bh(&sk->sk_callback_lock); 1503 read_lock_bh(&sk->sk_callback_lock);
1409 if (!(xprt = xprt_from_sock(sk))) { 1504 if (!(xprt = xprt_from_sock(sk)))
1410 read = 0;
1411 goto out; 1505 goto out;
1412 } 1506 transport = container_of(xprt, struct sock_xprt, xprt);
1507
1413 /* Any data means we had a useful conversation, so 1508 /* Any data means we had a useful conversation, so
1414 * the we don't need to delay the next reconnect 1509 * the we don't need to delay the next reconnect
1415 */ 1510 */
1416 if (xprt->reestablish_timeout) 1511 if (xprt->reestablish_timeout)
1417 xprt->reestablish_timeout = 0; 1512 xprt->reestablish_timeout = 0;
1513 queue_work(rpciod_workqueue, &transport->recv_worker);
1418 1514
1419 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1420 rd_desc.arg.data = xprt;
1421 do {
1422 rd_desc.count = 65536;
1423 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1424 if (read > 0)
1425 total += read;
1426 } while (read > 0);
1427out: 1515out:
1428 trace_xs_tcp_data_ready(xprt, read, total);
1429 read_unlock_bh(&sk->sk_callback_lock); 1516 read_unlock_bh(&sk->sk_callback_lock);
1430} 1517}
1431 1518
@@ -1873,7 +1960,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1873 xs_save_old_callbacks(transport, sk); 1960 xs_save_old_callbacks(transport, sk);
1874 1961
1875 sk->sk_user_data = xprt; 1962 sk->sk_user_data = xprt;
1876 sk->sk_data_ready = xs_local_data_ready; 1963 sk->sk_data_ready = xs_data_ready;
1877 sk->sk_write_space = xs_udp_write_space; 1964 sk->sk_write_space = xs_udp_write_space;
1878 sk->sk_error_report = xs_error_report; 1965 sk->sk_error_report = xs_error_report;
1879 sk->sk_allocation = GFP_NOIO; 1966 sk->sk_allocation = GFP_NOIO;
@@ -2059,7 +2146,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2059 xs_save_old_callbacks(transport, sk); 2146 xs_save_old_callbacks(transport, sk);
2060 2147
2061 sk->sk_user_data = xprt; 2148 sk->sk_user_data = xprt;
2062 sk->sk_data_ready = xs_udp_data_ready; 2149 sk->sk_data_ready = xs_data_ready;
2063 sk->sk_write_space = xs_udp_write_space; 2150 sk->sk_write_space = xs_udp_write_space;
2064 sk->sk_allocation = GFP_NOIO; 2151 sk->sk_allocation = GFP_NOIO;
2065 2152
@@ -2650,6 +2737,7 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2650 } 2737 }
2651 2738
2652 new = container_of(xprt, struct sock_xprt, xprt); 2739 new = container_of(xprt, struct sock_xprt, xprt);
2740 mutex_init(&new->recv_mutex);
2653 memcpy(&xprt->addr, args->dstaddr, args->addrlen); 2741 memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2654 xprt->addrlen = args->addrlen; 2742 xprt->addrlen = args->addrlen;
2655 if (args->srcaddr) 2743 if (args->srcaddr)
@@ -2703,6 +2791,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
2703 xprt->ops = &xs_local_ops; 2791 xprt->ops = &xs_local_ops;
2704 xprt->timeout = &xs_local_default_timeout; 2792 xprt->timeout = &xs_local_default_timeout;
2705 2793
2794 INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn);
2706 INIT_DELAYED_WORK(&transport->connect_worker, 2795 INIT_DELAYED_WORK(&transport->connect_worker,
2707 xs_dummy_setup_socket); 2796 xs_dummy_setup_socket);
2708 2797
@@ -2774,21 +2863,20 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2774 2863
2775 xprt->timeout = &xs_udp_default_timeout; 2864 xprt->timeout = &xs_udp_default_timeout;
2776 2865
2866 INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
2867 INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
2868
2777 switch (addr->sa_family) { 2869 switch (addr->sa_family) {
2778 case AF_INET: 2870 case AF_INET:
2779 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2871 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2780 xprt_set_bound(xprt); 2872 xprt_set_bound(xprt);
2781 2873
2782 INIT_DELAYED_WORK(&transport->connect_worker,
2783 xs_udp_setup_socket);
2784 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); 2874 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2785 break; 2875 break;
2786 case AF_INET6: 2876 case AF_INET6:
2787 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2877 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2788 xprt_set_bound(xprt); 2878 xprt_set_bound(xprt);
2789 2879
2790 INIT_DELAYED_WORK(&transport->connect_worker,
2791 xs_udp_setup_socket);
2792 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); 2880 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2793 break; 2881 break;
2794 default: 2882 default:
@@ -2853,21 +2941,20 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2853 xprt->ops = &xs_tcp_ops; 2941 xprt->ops = &xs_tcp_ops;
2854 xprt->timeout = &xs_tcp_default_timeout; 2942 xprt->timeout = &xs_tcp_default_timeout;
2855 2943
2944 INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn);
2945 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
2946
2856 switch (addr->sa_family) { 2947 switch (addr->sa_family) {
2857 case AF_INET: 2948 case AF_INET:
2858 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2949 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2859 xprt_set_bound(xprt); 2950 xprt_set_bound(xprt);
2860 2951
2861 INIT_DELAYED_WORK(&transport->connect_worker,
2862 xs_tcp_setup_socket);
2863 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); 2952 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2864 break; 2953 break;
2865 case AF_INET6: 2954 case AF_INET6:
2866 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2955 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2867 xprt_set_bound(xprt); 2956 xprt_set_bound(xprt);
2868 2957
2869 INIT_DELAYED_WORK(&transport->connect_worker,
2870 xs_tcp_setup_socket);
2871 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); 2958 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2872 break; 2959 break;
2873 default: 2960 default: