aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTrond Myklebust <trond.myklebust@primarydata.com>2015-10-06 16:26:05 -0400
committerTrond Myklebust <trond.myklebust@primarydata.com>2015-10-08 08:27:04 -0400
commitf9b2ee714c5c945cda27e9cbca5f60d5199fb93f (patch)
tree61a50451255fc74e28638f4cd7890290c65b7f62
parentedc1b01cd3b20a5fff049e98f82a2b0d24a34c89 (diff)
SUNRPC: Move UDP receive data path into a workqueue context
Now that we've done it for TCP, let's convert UDP as well. Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
-rw-r--r--net/sunrpc/xprtsock.c84
1 files changed, 61 insertions, 23 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 58dc90ccebb6..df8bdcc10640 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -972,42 +972,36 @@ static void xs_local_data_ready(struct sock *sk)
972} 972}
973 973
974/** 974/**
975 * xs_udp_data_ready - "data ready" callback for UDP sockets 975 * xs_udp_data_read_skb - receive callback for UDP sockets
976 * @sk: socket with data to read 976 * @xprt: transport
977 * @sk: socket
978 * @skb: skbuff
977 * 979 *
978 */ 980 */
979static void xs_udp_data_ready(struct sock *sk) 981static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
982 struct sock *sk,
983 struct sk_buff *skb)
980{ 984{
981 struct rpc_task *task; 985 struct rpc_task *task;
982 struct rpc_xprt *xprt;
983 struct rpc_rqst *rovr; 986 struct rpc_rqst *rovr;
984 struct sk_buff *skb; 987 int repsize, copied;
985 int err, repsize, copied;
986 u32 _xid; 988 u32 _xid;
987 __be32 *xp; 989 __be32 *xp;
988 990
989 read_lock_bh(&sk->sk_callback_lock);
990 dprintk("RPC: xs_udp_data_ready...\n");
991 if (!(xprt = xprt_from_sock(sk)))
992 goto out;
993
994 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
995 goto out;
996
997 repsize = skb->len - sizeof(struct udphdr); 991 repsize = skb->len - sizeof(struct udphdr);
998 if (repsize < 4) { 992 if (repsize < 4) {
999 dprintk("RPC: impossible RPC reply size %d!\n", repsize); 993 dprintk("RPC: impossible RPC reply size %d!\n", repsize);
1000 goto dropit; 994 return;
1001 } 995 }
1002 996
1003 /* Copy the XID from the skb... */ 997 /* Copy the XID from the skb... */
1004 xp = skb_header_pointer(skb, sizeof(struct udphdr), 998 xp = skb_header_pointer(skb, sizeof(struct udphdr),
1005 sizeof(_xid), &_xid); 999 sizeof(_xid), &_xid);
1006 if (xp == NULL) 1000 if (xp == NULL)
1007 goto dropit; 1001 return;
1008 1002
1009 /* Look up and lock the request corresponding to the given XID */ 1003 /* Look up and lock the request corresponding to the given XID */
1010 spin_lock(&xprt->transport_lock); 1004 spin_lock_bh(&xprt->transport_lock);
1011 rovr = xprt_lookup_rqst(xprt, *xp); 1005 rovr = xprt_lookup_rqst(xprt, *xp);
1012 if (!rovr) 1006 if (!rovr)
1013 goto out_unlock; 1007 goto out_unlock;
@@ -1028,10 +1022,54 @@ static void xs_udp_data_ready(struct sock *sk)
1028 xprt_complete_rqst(task, copied); 1022 xprt_complete_rqst(task, copied);
1029 1023
1030 out_unlock: 1024 out_unlock:
1031 spin_unlock(&xprt->transport_lock); 1025 spin_unlock_bh(&xprt->transport_lock);
1032 dropit: 1026}
1033 skb_free_datagram(sk, skb); 1027
1034 out: 1028static void xs_udp_data_receive(struct sock_xprt *transport)
1029{
1030 struct sk_buff *skb;
1031 struct sock *sk;
1032 int err;
1033
1034 mutex_lock(&transport->recv_mutex);
1035 sk = transport->inet;
1036 if (sk == NULL)
1037 goto out;
1038 for (;;) {
1039 skb = skb_recv_datagram(sk, 0, 1, &err);
1040 if (skb == NULL)
1041 break;
1042 xs_udp_data_read_skb(&transport->xprt, sk, skb);
1043 skb_free_datagram(sk, skb);
1044 }
1045out:
1046 mutex_unlock(&transport->recv_mutex);
1047}
1048
1049static void xs_udp_data_receive_workfn(struct work_struct *work)
1050{
1051 struct sock_xprt *transport =
1052 container_of(work, struct sock_xprt, recv_worker);
1053 xs_udp_data_receive(transport);
1054}
1055
1056/**
1057 * xs_data_ready - "data ready" callback for UDP sockets
1058 * @sk: socket with data to read
1059 *
1060 */
1061static void xs_data_ready(struct sock *sk)
1062{
1063 struct rpc_xprt *xprt;
1064
1065 read_lock_bh(&sk->sk_callback_lock);
1066 dprintk("RPC: xs_data_ready...\n");
1067 xprt = xprt_from_sock(sk);
1068 if (xprt != NULL) {
1069 struct sock_xprt *transport = container_of(xprt,
1070 struct sock_xprt, xprt);
1071 queue_work(rpciod_workqueue, &transport->recv_worker);
1072 }
1035 read_unlock_bh(&sk->sk_callback_lock); 1073 read_unlock_bh(&sk->sk_callback_lock);
1036} 1074}
1037 1075
@@ -2094,7 +2132,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2094 xs_save_old_callbacks(transport, sk); 2132 xs_save_old_callbacks(transport, sk);
2095 2133
2096 sk->sk_user_data = xprt; 2134 sk->sk_user_data = xprt;
2097 sk->sk_data_ready = xs_udp_data_ready; 2135 sk->sk_data_ready = xs_data_ready;
2098 sk->sk_write_space = xs_udp_write_space; 2136 sk->sk_write_space = xs_udp_write_space;
2099 sk->sk_allocation = GFP_NOIO; 2137 sk->sk_allocation = GFP_NOIO;
2100 2138
@@ -2811,7 +2849,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2811 2849
2812 xprt->timeout = &xs_udp_default_timeout; 2850 xprt->timeout = &xs_udp_default_timeout;
2813 2851
2814 INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); 2852 INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
2815 INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); 2853 INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
2816 2854
2817 switch (addr->sa_family) { 2855 switch (addr->sa_family) {