aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTrond Myklebust <trond.myklebust@primarydata.com>2017-08-16 15:30:35 -0400
committerTrond Myklebust <trond.myklebust@primarydata.com>2017-08-18 14:45:04 -0400
commitce7c252a8c741aba7c38f817b86e34361f561e42 (patch)
tree4e90fa24a74ac7f8ecfcf1ddc1be735e64aa43cc
parent040249dfbeed9dd553fd323ef4b42c7a3270898b (diff)
SUNRPC: Add a separate spinlock to protect the RPC request receive list
This further reduces contention with the transport_lock, and allows us to convert to using a non-bh-safe spinlock, since the list is now never accessed from a bh context. Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
-rw-r--r--include/linux/sunrpc/xprt.h1
-rw-r--r--net/sunrpc/svcsock.c6
-rw-r--r--net/sunrpc/xprt.c20
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c8
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c7
-rw-r--r--net/sunrpc/xprtsock.c30
6 files changed, 41 insertions, 31 deletions
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 65b9e0224753..a97e6de5f9f2 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -232,6 +232,7 @@ struct rpc_xprt {
232 */ 232 */
233 spinlock_t transport_lock; /* lock transport info */ 233 spinlock_t transport_lock; /* lock transport info */
234 spinlock_t reserve_lock; /* lock slot table */ 234 spinlock_t reserve_lock; /* lock slot table */
235 spinlock_t recv_lock; /* lock receive list */
235 u32 xid; /* Next XID value to use */ 236 u32 xid; /* Next XID value to use */
236 struct rpc_task * snd_task; /* Task blocked in send */ 237 struct rpc_task * snd_task; /* Task blocked in send */
237 struct svc_xprt *bc_xprt; /* NFSv4.1 backchannel */ 238 struct svc_xprt *bc_xprt; /* NFSv4.1 backchannel */
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 2b720fa35c4f..272063ca81e8 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -1001,7 +1001,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
1001 1001
1002 if (!bc_xprt) 1002 if (!bc_xprt)
1003 return -EAGAIN; 1003 return -EAGAIN;
1004 spin_lock_bh(&bc_xprt->transport_lock); 1004 spin_lock(&bc_xprt->recv_lock);
1005 req = xprt_lookup_rqst(bc_xprt, xid); 1005 req = xprt_lookup_rqst(bc_xprt, xid);
1006 if (!req) 1006 if (!req)
1007 goto unlock_notfound; 1007 goto unlock_notfound;
@@ -1019,7 +1019,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
1019 memcpy(dst->iov_base, src->iov_base, src->iov_len); 1019 memcpy(dst->iov_base, src->iov_base, src->iov_len);
1020 xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len); 1020 xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len);
1021 rqstp->rq_arg.len = 0; 1021 rqstp->rq_arg.len = 0;
1022 spin_unlock_bh(&bc_xprt->transport_lock); 1022 spin_unlock(&bc_xprt->recv_lock);
1023 return 0; 1023 return 0;
1024unlock_notfound: 1024unlock_notfound:
1025 printk(KERN_NOTICE 1025 printk(KERN_NOTICE
@@ -1028,7 +1028,7 @@ unlock_notfound:
1028 __func__, ntohl(calldir), 1028 __func__, ntohl(calldir),
1029 bc_xprt, ntohl(xid)); 1029 bc_xprt, ntohl(xid));
1030unlock_eagain: 1030unlock_eagain:
1031 spin_unlock_bh(&bc_xprt->transport_lock); 1031 spin_unlock(&bc_xprt->recv_lock);
1032 return -EAGAIN; 1032 return -EAGAIN;
1033} 1033}
1034 1034
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 3eb9ec16eec4..2af189c5ac3e 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -872,17 +872,17 @@ void xprt_unpin_rqst(struct rpc_rqst *req)
872} 872}
873 873
874static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 874static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
875__must_hold(&req->rq_xprt->transport_lock) 875__must_hold(&req->rq_xprt->recv_lock)
876{ 876{
877 struct rpc_task *task = req->rq_task; 877 struct rpc_task *task = req->rq_task;
878 878
879 if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) { 879 if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
880 spin_unlock_bh(&req->rq_xprt->transport_lock); 880 spin_unlock(&req->rq_xprt->recv_lock);
881 set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate); 881 set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
882 wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV, 882 wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
883 TASK_UNINTERRUPTIBLE); 883 TASK_UNINTERRUPTIBLE);
884 clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate); 884 clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
885 spin_lock_bh(&req->rq_xprt->transport_lock); 885 spin_lock(&req->rq_xprt->recv_lock);
886 } 886 }
887} 887}
888 888
@@ -1008,13 +1008,13 @@ void xprt_transmit(struct rpc_task *task)
1008 /* 1008 /*
1009 * Add to the list only if we're expecting a reply 1009 * Add to the list only if we're expecting a reply
1010 */ 1010 */
1011 spin_lock_bh(&xprt->transport_lock);
1012 /* Update the softirq receive buffer */ 1011 /* Update the softirq receive buffer */
1013 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1012 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1014 sizeof(req->rq_private_buf)); 1013 sizeof(req->rq_private_buf));
1015 /* Add request to the receive list */ 1014 /* Add request to the receive list */
1015 spin_lock(&xprt->recv_lock);
1016 list_add_tail(&req->rq_list, &xprt->recv); 1016 list_add_tail(&req->rq_list, &xprt->recv);
1017 spin_unlock_bh(&xprt->transport_lock); 1017 spin_unlock(&xprt->recv_lock);
1018 xprt_reset_majortimeo(req); 1018 xprt_reset_majortimeo(req);
1019 /* Turn off autodisconnect */ 1019 /* Turn off autodisconnect */
1020 del_singleshot_timer_sync(&xprt->timer); 1020 del_singleshot_timer_sync(&xprt->timer);
@@ -1329,15 +1329,18 @@ void xprt_release(struct rpc_task *task)
1329 task->tk_ops->rpc_count_stats(task, task->tk_calldata); 1329 task->tk_ops->rpc_count_stats(task, task->tk_calldata);
1330 else if (task->tk_client) 1330 else if (task->tk_client)
1331 rpc_count_iostats(task, task->tk_client->cl_metrics); 1331 rpc_count_iostats(task, task->tk_client->cl_metrics);
1332 spin_lock(&xprt->recv_lock);
1333 if (!list_empty(&req->rq_list)) {
1334 list_del(&req->rq_list);
1335 xprt_wait_on_pinned_rqst(req);
1336 }
1337 spin_unlock(&xprt->recv_lock);
1332 spin_lock_bh(&xprt->transport_lock); 1338 spin_lock_bh(&xprt->transport_lock);
1333 xprt->ops->release_xprt(xprt, task); 1339 xprt->ops->release_xprt(xprt, task);
1334 if (xprt->ops->release_request) 1340 if (xprt->ops->release_request)
1335 xprt->ops->release_request(task); 1341 xprt->ops->release_request(task);
1336 if (!list_empty(&req->rq_list))
1337 list_del(&req->rq_list);
1338 xprt->last_used = jiffies; 1342 xprt->last_used = jiffies;
1339 xprt_schedule_autodisconnect(xprt); 1343 xprt_schedule_autodisconnect(xprt);
1340 xprt_wait_on_pinned_rqst(req);
1341 spin_unlock_bh(&xprt->transport_lock); 1344 spin_unlock_bh(&xprt->transport_lock);
1342 if (req->rq_buffer) 1345 if (req->rq_buffer)
1343 xprt->ops->buf_free(task); 1346 xprt->ops->buf_free(task);
@@ -1361,6 +1364,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
1361 1364
1362 spin_lock_init(&xprt->transport_lock); 1365 spin_lock_init(&xprt->transport_lock);
1363 spin_lock_init(&xprt->reserve_lock); 1366 spin_lock_init(&xprt->reserve_lock);
1367 spin_lock_init(&xprt->recv_lock);
1364 1368
1365 INIT_LIST_HEAD(&xprt->free); 1369 INIT_LIST_HEAD(&xprt->free);
1366 INIT_LIST_HEAD(&xprt->recv); 1370 INIT_LIST_HEAD(&xprt->recv);
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index ca4d6e4528f3..dfa748a0c8de 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1051,7 +1051,7 @@ rpcrdma_reply_handler(struct work_struct *work)
1051 * RPC completion while holding the transport lock to ensure 1051 * RPC completion while holding the transport lock to ensure
1052 * the rep, rqst, and rq_task pointers remain stable. 1052 * the rep, rqst, and rq_task pointers remain stable.
1053 */ 1053 */
1054 spin_lock_bh(&xprt->transport_lock); 1054 spin_lock(&xprt->recv_lock);
1055 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); 1055 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
1056 if (!rqst) 1056 if (!rqst)
1057 goto out_norqst; 1057 goto out_norqst;
@@ -1136,7 +1136,7 @@ out:
1136 xprt_release_rqst_cong(rqst->rq_task); 1136 xprt_release_rqst_cong(rqst->rq_task);
1137 1137
1138 xprt_complete_rqst(rqst->rq_task, status); 1138 xprt_complete_rqst(rqst->rq_task, status);
1139 spin_unlock_bh(&xprt->transport_lock); 1139 spin_unlock(&xprt->recv_lock);
1140 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", 1140 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
1141 __func__, xprt, rqst, status); 1141 __func__, xprt, rqst, status);
1142 return; 1142 return;
@@ -1187,12 +1187,12 @@ out_rdmaerr:
1187 r_xprt->rx_stats.bad_reply_count++; 1187 r_xprt->rx_stats.bad_reply_count++;
1188 goto out; 1188 goto out;
1189 1189
1190/* The req was still available, but by the time the transport_lock 1190/* The req was still available, but by the time the recv_lock
1191 * was acquired, the rqst and task had been released. Thus the RPC 1191 * was acquired, the rqst and task had been released. Thus the RPC
1192 * has already been terminated. 1192 * has already been terminated.
1193 */ 1193 */
1194out_norqst: 1194out_norqst:
1195 spin_unlock_bh(&xprt->transport_lock); 1195 spin_unlock(&xprt->recv_lock);
1196 rpcrdma_buffer_put(req); 1196 rpcrdma_buffer_put(req);
1197 dprintk("RPC: %s: race, no rqst left for req %p\n", 1197 dprintk("RPC: %s: race, no rqst left for req %p\n",
1198 __func__, req); 1198 __func__, req);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index c676ed0efb5a..0d574cda242d 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -52,7 +52,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
52 if (src->iov_len < 24) 52 if (src->iov_len < 24)
53 goto out_shortreply; 53 goto out_shortreply;
54 54
55 spin_lock_bh(&xprt->transport_lock); 55 spin_lock(&xprt->recv_lock);
56 req = xprt_lookup_rqst(xprt, xid); 56 req = xprt_lookup_rqst(xprt, xid);
57 if (!req) 57 if (!req)
58 goto out_notfound; 58 goto out_notfound;
@@ -69,17 +69,20 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
69 else if (credits > r_xprt->rx_buf.rb_bc_max_requests) 69 else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
70 credits = r_xprt->rx_buf.rb_bc_max_requests; 70 credits = r_xprt->rx_buf.rb_bc_max_requests;
71 71
72 spin_lock_bh(&xprt->transport_lock);
72 cwnd = xprt->cwnd; 73 cwnd = xprt->cwnd;
73 xprt->cwnd = credits << RPC_CWNDSHIFT; 74 xprt->cwnd = credits << RPC_CWNDSHIFT;
74 if (xprt->cwnd > cwnd) 75 if (xprt->cwnd > cwnd)
75 xprt_release_rqst_cong(req->rq_task); 76 xprt_release_rqst_cong(req->rq_task);
77 spin_unlock_bh(&xprt->transport_lock);
78
76 79
77 ret = 0; 80 ret = 0;
78 xprt_complete_rqst(req->rq_task, rcvbuf->len); 81 xprt_complete_rqst(req->rq_task, rcvbuf->len);
79 rcvbuf->len = 0; 82 rcvbuf->len = 0;
80 83
81out_unlock: 84out_unlock:
82 spin_unlock_bh(&xprt->transport_lock); 85 spin_unlock(&xprt->recv_lock);
83out: 86out:
84 return ret; 87 return ret;
85 88
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index a344bea15fc7..2b918137aaa0 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -969,12 +969,12 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
969 return; 969 return;
970 970
971 /* Look up and lock the request corresponding to the given XID */ 971 /* Look up and lock the request corresponding to the given XID */
972 spin_lock_bh(&xprt->transport_lock); 972 spin_lock(&xprt->recv_lock);
973 rovr = xprt_lookup_rqst(xprt, *xp); 973 rovr = xprt_lookup_rqst(xprt, *xp);
974 if (!rovr) 974 if (!rovr)
975 goto out_unlock; 975 goto out_unlock;
976 xprt_pin_rqst(rovr); 976 xprt_pin_rqst(rovr);
977 spin_unlock_bh(&xprt->transport_lock); 977 spin_unlock(&xprt->recv_lock);
978 task = rovr->rq_task; 978 task = rovr->rq_task;
979 979
980 copied = rovr->rq_private_buf.buflen; 980 copied = rovr->rq_private_buf.buflen;
@@ -983,16 +983,16 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
983 983
984 if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) { 984 if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
985 dprintk("RPC: sk_buff copy failed\n"); 985 dprintk("RPC: sk_buff copy failed\n");
986 spin_lock_bh(&xprt->transport_lock); 986 spin_lock(&xprt->recv_lock);
987 goto out_unpin; 987 goto out_unpin;
988 } 988 }
989 989
990 spin_lock_bh(&xprt->transport_lock); 990 spin_lock(&xprt->recv_lock);
991 xprt_complete_rqst(task, copied); 991 xprt_complete_rqst(task, copied);
992out_unpin: 992out_unpin:
993 xprt_unpin_rqst(rovr); 993 xprt_unpin_rqst(rovr);
994 out_unlock: 994 out_unlock:
995 spin_unlock_bh(&xprt->transport_lock); 995 spin_unlock(&xprt->recv_lock);
996} 996}
997 997
998static void xs_local_data_receive(struct sock_xprt *transport) 998static void xs_local_data_receive(struct sock_xprt *transport)
@@ -1055,12 +1055,12 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
1055 return; 1055 return;
1056 1056
1057 /* Look up and lock the request corresponding to the given XID */ 1057 /* Look up and lock the request corresponding to the given XID */
1058 spin_lock_bh(&xprt->transport_lock); 1058 spin_lock(&xprt->recv_lock);
1059 rovr = xprt_lookup_rqst(xprt, *xp); 1059 rovr = xprt_lookup_rqst(xprt, *xp);
1060 if (!rovr) 1060 if (!rovr)
1061 goto out_unlock; 1061 goto out_unlock;
1062 xprt_pin_rqst(rovr); 1062 xprt_pin_rqst(rovr);
1063 spin_unlock_bh(&xprt->transport_lock); 1063 spin_unlock(&xprt->recv_lock);
1064 task = rovr->rq_task; 1064 task = rovr->rq_task;
1065 1065
1066 if ((copied = rovr->rq_private_buf.buflen) > repsize) 1066 if ((copied = rovr->rq_private_buf.buflen) > repsize)
@@ -1069,7 +1069,7 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
1069 /* Suck it into the iovec, verify checksum if not done by hw. */ 1069 /* Suck it into the iovec, verify checksum if not done by hw. */
1070 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { 1070 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
1071 __UDPX_INC_STATS(sk, UDP_MIB_INERRORS); 1071 __UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
1072 spin_lock_bh(&xprt->transport_lock); 1072 spin_lock(&xprt->recv_lock);
1073 goto out_unpin; 1073 goto out_unpin;
1074 } 1074 }
1075 1075
@@ -1077,11 +1077,13 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
1077 1077
1078 spin_lock_bh(&xprt->transport_lock); 1078 spin_lock_bh(&xprt->transport_lock);
1079 xprt_adjust_cwnd(xprt, task, copied); 1079 xprt_adjust_cwnd(xprt, task, copied);
1080 spin_unlock_bh(&xprt->transport_lock);
1081 spin_lock(&xprt->recv_lock);
1080 xprt_complete_rqst(task, copied); 1082 xprt_complete_rqst(task, copied);
1081out_unpin: 1083out_unpin:
1082 xprt_unpin_rqst(rovr); 1084 xprt_unpin_rqst(rovr);
1083 out_unlock: 1085 out_unlock:
1084 spin_unlock_bh(&xprt->transport_lock); 1086 spin_unlock(&xprt->recv_lock);
1085} 1087}
1086 1088
1087static void xs_udp_data_receive(struct sock_xprt *transport) 1089static void xs_udp_data_receive(struct sock_xprt *transport)
@@ -1344,24 +1346,24 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1344 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); 1346 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid));
1345 1347
1346 /* Find and lock the request corresponding to this xid */ 1348 /* Find and lock the request corresponding to this xid */
1347 spin_lock_bh(&xprt->transport_lock); 1349 spin_lock(&xprt->recv_lock);
1348 req = xprt_lookup_rqst(xprt, transport->tcp_xid); 1350 req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1349 if (!req) { 1351 if (!req) {
1350 dprintk("RPC: XID %08x request not found!\n", 1352 dprintk("RPC: XID %08x request not found!\n",
1351 ntohl(transport->tcp_xid)); 1353 ntohl(transport->tcp_xid));
1352 spin_unlock_bh(&xprt->transport_lock); 1354 spin_unlock(&xprt->recv_lock);
1353 return -1; 1355 return -1;
1354 } 1356 }
1355 xprt_pin_rqst(req); 1357 xprt_pin_rqst(req);
1356 spin_unlock_bh(&xprt->transport_lock); 1358 spin_unlock(&xprt->recv_lock);
1357 1359
1358 xs_tcp_read_common(xprt, desc, req); 1360 xs_tcp_read_common(xprt, desc, req);
1359 1361
1360 spin_lock_bh(&xprt->transport_lock); 1362 spin_lock(&xprt->recv_lock);
1361 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) 1363 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1362 xprt_complete_rqst(req->rq_task, transport->tcp_copied); 1364 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1363 xprt_unpin_rqst(req); 1365 xprt_unpin_rqst(req);
1364 spin_unlock_bh(&xprt->transport_lock); 1366 spin_unlock(&xprt->recv_lock);
1365 return 0; 1367 return 0;
1366} 1368}
1367 1369