summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2017-11-17 17:18:00 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2017-11-17 17:18:00 -0500
commitc3e9c04b89059a4c93c792da883ca284de182da5 (patch)
tree8cb58f19e0329f040e6c5bd2269572d8bbe58c16 /net/sunrpc
parente0bcb42e602816415f6fe07313b6fc84932244b7 (diff)
parentfcfa447062b2061e11f68b846d61cbfe60d0d604 (diff)
Merge tag 'nfs-for-4.15-1' of git://git.linux-nfs.org/projects/anna/linux-nfs
Pull NFS client updates from Anna Schumaker: "Stable bugfixes: - Revalidate "." and ".." correctly on open - Avoid RCU usage in tracepoints - Fix ugly referral attributes - Fix a typo in nomigration mount option - Revert "NFS: Move the flock open mode check into nfs_flock()" Features: - Implement a stronger send queue accounting system for NFS over RDMA - Switch some atomics to the new refcount_t type Other bugfixes and cleanups: - Clean up access mode bits - Remove special-case revalidations in nfs_opendir() - Improve invalidating NFS over RDMA memory for async operations that time out - Handle NFS over RDMA replies with a worqueue - Handle NFS over RDMA sends with a workqueue - Fix up replaying interrupted requests - Remove dead NFS over RDMA definitions - Update NFS over RDMA copyright information - Be more consistent with bool initialization and comparisons - Mark expected switch fall throughs - Various sunrpc tracepoint cleanups - Fix various OPEN races - Fix a typo in nfs_rename() - Use common error handling code in nfs_lock_and_join_request() - Check that some structures are properly cleaned up during net_exit() - Remove net pointer from dprintk()s" * tag 'nfs-for-4.15-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (62 commits) NFS: Revert "NFS: Move the flock open mode check into nfs_flock()" NFS: Fix typo in nomigration mount option nfs: Fix ugly referral attributes NFS: super: mark expected switch fall-throughs sunrpc: remove net pointer from messages nfs: remove net pointer from messages sunrpc: exit_net cleanup check added nfs client: exit_net cleanup check added nfs/write: Use common error handling code in nfs_lock_and_join_requests() NFSv4: Replace closed stateids with the "invalid special stateid" NFSv4: nfs_set_open_stateid must not trigger state recovery for closed state NFSv4: Check the open stateid when searching for expired state NFSv4: Clean up nfs4_delegreturn_done NFSv4: cleanup nfs4_close_done NFSv4: Retry NFS4ERR_OLD_STATEID errors in layoutreturn pNFS: Retry NFS4ERR_OLD_STATEID errors in layoutreturn-on-close NFSv4: Don't try to CLOSE if the stateid 'other' field has changed NFSv4: Retry CLOSE and DELEGRETURN on NFS4ERR_OLD_STATEID. NFS: Fix a typo in nfs_rename() NFSv4: Fix open create exclusive when the server reboots ...
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/clnt.c14
-rw-r--r--net/sunrpc/rpc_pipe.c8
-rw-r--r--net/sunrpc/rpcb_clnt.c6
-rw-r--r--net/sunrpc/sched.c3
-rw-r--r--net/sunrpc/sunrpc_syms.c3
-rw-r--r--net/sunrpc/xprt.c1
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c6
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c19
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c27
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c363
-rw-r--r--net/sunrpc/xprtrdma/transport.c19
-rw-r--r--net/sunrpc/xprtrdma/verbs.c236
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h119
-rw-r--r--net/sunrpc/xprtsock.c4
14 files changed, 531 insertions, 297 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 2ad827db2704..a801da812f86 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1491,7 +1491,6 @@ rpc_restart_call(struct rpc_task *task)
1491} 1491}
1492EXPORT_SYMBOL_GPL(rpc_restart_call); 1492EXPORT_SYMBOL_GPL(rpc_restart_call);
1493 1493
1494#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1495const char 1494const char
1496*rpc_proc_name(const struct rpc_task *task) 1495*rpc_proc_name(const struct rpc_task *task)
1497{ 1496{
@@ -1505,7 +1504,6 @@ const char
1505 } else 1504 } else
1506 return "no proc"; 1505 return "no proc";
1507} 1506}
1508#endif
1509 1507
1510/* 1508/*
1511 * 0. Initial state 1509 * 0. Initial state
@@ -1519,6 +1517,7 @@ call_start(struct rpc_task *task)
1519 struct rpc_clnt *clnt = task->tk_client; 1517 struct rpc_clnt *clnt = task->tk_client;
1520 int idx = task->tk_msg.rpc_proc->p_statidx; 1518 int idx = task->tk_msg.rpc_proc->p_statidx;
1521 1519
1520 trace_rpc_request(task);
1522 dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid, 1521 dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1523 clnt->cl_program->name, clnt->cl_vers, 1522 clnt->cl_program->name, clnt->cl_vers,
1524 rpc_proc_name(task), 1523 rpc_proc_name(task),
@@ -1586,6 +1585,7 @@ call_reserveresult(struct rpc_task *task)
1586 switch (status) { 1585 switch (status) {
1587 case -ENOMEM: 1586 case -ENOMEM:
1588 rpc_delay(task, HZ >> 2); 1587 rpc_delay(task, HZ >> 2);
1588 /* fall through */
1589 case -EAGAIN: /* woken up; retry */ 1589 case -EAGAIN: /* woken up; retry */
1590 task->tk_action = call_retry_reserve; 1590 task->tk_action = call_retry_reserve;
1591 return; 1591 return;
@@ -1647,10 +1647,13 @@ call_refreshresult(struct rpc_task *task)
1647 /* Use rate-limiting and a max number of retries if refresh 1647 /* Use rate-limiting and a max number of retries if refresh
1648 * had status 0 but failed to update the cred. 1648 * had status 0 but failed to update the cred.
1649 */ 1649 */
1650 /* fall through */
1650 case -ETIMEDOUT: 1651 case -ETIMEDOUT:
1651 rpc_delay(task, 3*HZ); 1652 rpc_delay(task, 3*HZ);
1653 /* fall through */
1652 case -EAGAIN: 1654 case -EAGAIN:
1653 status = -EACCES; 1655 status = -EACCES;
1656 /* fall through */
1654 case -EKEYEXPIRED: 1657 case -EKEYEXPIRED:
1655 if (!task->tk_cred_retry) 1658 if (!task->tk_cred_retry)
1656 break; 1659 break;
@@ -1911,6 +1914,7 @@ call_connect_status(struct rpc_task *task)
1911 task->tk_action = call_bind; 1914 task->tk_action = call_bind;
1912 return; 1915 return;
1913 } 1916 }
1917 /* fall through */
1914 case -ECONNRESET: 1918 case -ECONNRESET:
1915 case -ECONNABORTED: 1919 case -ECONNABORTED:
1916 case -ENETUNREACH: 1920 case -ENETUNREACH:
@@ -1924,6 +1928,7 @@ call_connect_status(struct rpc_task *task)
1924 break; 1928 break;
1925 /* retry with existing socket, after a delay */ 1929 /* retry with existing socket, after a delay */
1926 rpc_delay(task, 3*HZ); 1930 rpc_delay(task, 3*HZ);
1931 /* fall through */
1927 case -EAGAIN: 1932 case -EAGAIN:
1928 /* Check for timeouts before looping back to call_bind */ 1933 /* Check for timeouts before looping back to call_bind */
1929 case -ETIMEDOUT: 1934 case -ETIMEDOUT:
@@ -2025,6 +2030,7 @@ call_transmit_status(struct rpc_task *task)
2025 rpc_exit(task, task->tk_status); 2030 rpc_exit(task, task->tk_status);
2026 break; 2031 break;
2027 } 2032 }
2033 /* fall through */
2028 case -ECONNRESET: 2034 case -ECONNRESET:
2029 case -ECONNABORTED: 2035 case -ECONNABORTED:
2030 case -EADDRINUSE: 2036 case -EADDRINUSE:
@@ -2145,6 +2151,7 @@ call_status(struct rpc_task *task)
2145 * were a timeout. 2151 * were a timeout.
2146 */ 2152 */
2147 rpc_delay(task, 3*HZ); 2153 rpc_delay(task, 3*HZ);
2154 /* fall through */
2148 case -ETIMEDOUT: 2155 case -ETIMEDOUT:
2149 task->tk_action = call_timeout; 2156 task->tk_action = call_timeout;
2150 break; 2157 break;
@@ -2152,14 +2159,17 @@ call_status(struct rpc_task *task)
2152 case -ECONNRESET: 2159 case -ECONNRESET:
2153 case -ECONNABORTED: 2160 case -ECONNABORTED:
2154 rpc_force_rebind(clnt); 2161 rpc_force_rebind(clnt);
2162 /* fall through */
2155 case -EADDRINUSE: 2163 case -EADDRINUSE:
2156 rpc_delay(task, 3*HZ); 2164 rpc_delay(task, 3*HZ);
2165 /* fall through */
2157 case -EPIPE: 2166 case -EPIPE:
2158 case -ENOTCONN: 2167 case -ENOTCONN:
2159 task->tk_action = call_bind; 2168 task->tk_action = call_bind;
2160 break; 2169 break;
2161 case -ENOBUFS: 2170 case -ENOBUFS:
2162 rpc_delay(task, HZ>>2); 2171 rpc_delay(task, HZ>>2);
2172 /* fall through */
2163 case -EAGAIN: 2173 case -EAGAIN:
2164 task->tk_action = call_transmit; 2174 task->tk_action = call_transmit;
2165 break; 2175 break;
diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index 61a504fb1ae2..7803f3b6aa53 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -1410,8 +1410,8 @@ rpc_fill_super(struct super_block *sb, void *data, int silent)
1410 return PTR_ERR(gssd_dentry); 1410 return PTR_ERR(gssd_dentry);
1411 } 1411 }
1412 1412
1413 dprintk("RPC: sending pipefs MOUNT notification for net %p%s\n", 1413 dprintk("RPC: sending pipefs MOUNT notification for net %x%s\n",
1414 net, NET_NAME(net)); 1414 net->ns.inum, NET_NAME(net));
1415 mutex_lock(&sn->pipefs_sb_lock); 1415 mutex_lock(&sn->pipefs_sb_lock);
1416 sn->pipefs_sb = sb; 1416 sn->pipefs_sb = sb;
1417 err = blocking_notifier_call_chain(&rpc_pipefs_notifier_list, 1417 err = blocking_notifier_call_chain(&rpc_pipefs_notifier_list,
@@ -1462,8 +1462,8 @@ static void rpc_kill_sb(struct super_block *sb)
1462 goto out; 1462 goto out;
1463 } 1463 }
1464 sn->pipefs_sb = NULL; 1464 sn->pipefs_sb = NULL;
1465 dprintk("RPC: sending pipefs UMOUNT notification for net %p%s\n", 1465 dprintk("RPC: sending pipefs UMOUNT notification for net %x%s\n",
1466 net, NET_NAME(net)); 1466 net->ns.inum, NET_NAME(net));
1467 blocking_notifier_call_chain(&rpc_pipefs_notifier_list, 1467 blocking_notifier_call_chain(&rpc_pipefs_notifier_list,
1468 RPC_PIPEFS_UMOUNT, 1468 RPC_PIPEFS_UMOUNT,
1469 sb); 1469 sb);
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index ea0676f199c8..c526f8fb37c9 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -216,9 +216,9 @@ static void rpcb_set_local(struct net *net, struct rpc_clnt *clnt,
216 smp_wmb(); 216 smp_wmb();
217 sn->rpcb_users = 1; 217 sn->rpcb_users = 1;
218 dprintk("RPC: created new rpcb local clients (rpcb_local_clnt: " 218 dprintk("RPC: created new rpcb local clients (rpcb_local_clnt: "
219 "%p, rpcb_local_clnt4: %p) for net %p%s\n", 219 "%p, rpcb_local_clnt4: %p) for net %x%s\n",
220 sn->rpcb_local_clnt, sn->rpcb_local_clnt4, 220 sn->rpcb_local_clnt, sn->rpcb_local_clnt4,
221 net, (net == &init_net) ? " (init_net)" : ""); 221 net->ns.inum, (net == &init_net) ? " (init_net)" : "");
222} 222}
223 223
224/* 224/*
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 5dea47eb31bb..b1b49edd7c4d 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -274,10 +274,9 @@ static inline void rpc_task_set_debuginfo(struct rpc_task *task)
274 274
275static void rpc_set_active(struct rpc_task *task) 275static void rpc_set_active(struct rpc_task *task)
276{ 276{
277 trace_rpc_task_begin(task->tk_client, task, NULL);
278
279 rpc_task_set_debuginfo(task); 277 rpc_task_set_debuginfo(task);
280 set_bit(RPC_TASK_ACTIVE, &task->tk_runstate); 278 set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
279 trace_rpc_task_begin(task->tk_client, task, NULL);
281} 280}
282 281
283/* 282/*
diff --git a/net/sunrpc/sunrpc_syms.c b/net/sunrpc/sunrpc_syms.c
index c73de181467a..56f9eff74150 100644
--- a/net/sunrpc/sunrpc_syms.c
+++ b/net/sunrpc/sunrpc_syms.c
@@ -65,10 +65,13 @@ err_proc:
65 65
66static __net_exit void sunrpc_exit_net(struct net *net) 66static __net_exit void sunrpc_exit_net(struct net *net)
67{ 67{
68 struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
69
68 rpc_pipefs_exit_net(net); 70 rpc_pipefs_exit_net(net);
69 unix_gid_cache_destroy(net); 71 unix_gid_cache_destroy(net);
70 ip_map_cache_destroy(net); 72 ip_map_cache_destroy(net);
71 rpc_proc_exit(net); 73 rpc_proc_exit(net);
74 WARN_ON_ONCE(!list_empty(&sn->all_clients));
72} 75}
73 76
74static struct pernet_operations sunrpc_net_ops = { 77static struct pernet_operations sunrpc_net_ops = {
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 6160d17a31c4..333b9d697ae5 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1139,6 +1139,7 @@ void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
1139 case -EAGAIN: 1139 case -EAGAIN:
1140 xprt_add_backlog(xprt, task); 1140 xprt_add_backlog(xprt, task);
1141 dprintk("RPC: waiting for request slot\n"); 1141 dprintk("RPC: waiting for request slot\n");
1142 /* fall through */
1142 default: 1143 default:
1143 task->tk_status = -EAGAIN; 1144 task->tk_status = -EAGAIN;
1144 } 1145 }
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 823a781ec89c..8b818bb3518a 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -43,7 +43,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
43 req = rpcrdma_create_req(r_xprt); 43 req = rpcrdma_create_req(r_xprt);
44 if (IS_ERR(req)) 44 if (IS_ERR(req))
45 return PTR_ERR(req); 45 return PTR_ERR(req);
46 req->rl_backchannel = true; 46 __set_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags);
47 47
48 rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE, 48 rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
49 DMA_TO_DEVICE, GFP_KERNEL); 49 DMA_TO_DEVICE, GFP_KERNEL);
@@ -223,8 +223,8 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
223 *p++ = xdr_zero; 223 *p++ = xdr_zero;
224 *p = xdr_zero; 224 *p = xdr_zero;
225 225
226 if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN, 226 if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN,
227 &rqst->rq_snd_buf, rpcrdma_noch)) 227 &rqst->rq_snd_buf, rpcrdma_noch))
228 return -EIO; 228 return -EIO;
229 return 0; 229 return 0;
230} 230}
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index fa759dd2b0f3..29fc84c7ff98 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -306,28 +306,9 @@ out_reset:
306 } 306 }
307} 307}
308 308
309/* Use a slow, safe mechanism to invalidate all memory regions
310 * that were registered for "req".
311 */
312static void
313fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
314 bool sync)
315{
316 struct rpcrdma_mw *mw;
317
318 while (!list_empty(&req->rl_registered)) {
319 mw = rpcrdma_pop_mw(&req->rl_registered);
320 if (sync)
321 fmr_op_recover_mr(mw);
322 else
323 rpcrdma_defer_mr_recovery(mw);
324 }
325}
326
327const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { 309const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
328 .ro_map = fmr_op_map, 310 .ro_map = fmr_op_map,
329 .ro_unmap_sync = fmr_op_unmap_sync, 311 .ro_unmap_sync = fmr_op_unmap_sync,
330 .ro_unmap_safe = fmr_op_unmap_safe,
331 .ro_recover_mr = fmr_op_recover_mr, 312 .ro_recover_mr = fmr_op_recover_mr,
332 .ro_open = fmr_op_open, 313 .ro_open = fmr_op_open,
333 .ro_maxpages = fmr_op_maxpages, 314 .ro_maxpages = fmr_op_maxpages,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 35d7517ef0e6..773e66e10a15 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -420,7 +420,6 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
420 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE : 420 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
421 IB_ACCESS_REMOTE_READ; 421 IB_ACCESS_REMOTE_READ;
422 422
423 rpcrdma_set_signaled(&r_xprt->rx_ep, &reg_wr->wr);
424 rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr); 423 rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr);
425 if (rc) 424 if (rc)
426 goto out_senderr; 425 goto out_senderr;
@@ -508,12 +507,6 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws)
508 f->fr_cqe.done = frwr_wc_localinv_wake; 507 f->fr_cqe.done = frwr_wc_localinv_wake;
509 reinit_completion(&f->fr_linv_done); 508 reinit_completion(&f->fr_linv_done);
510 509
511 /* Initialize CQ count, since there is always a signaled
512 * WR being posted here. The new cqcount depends on how
513 * many SQEs are about to be consumed.
514 */
515 rpcrdma_init_cqcount(&r_xprt->rx_ep, count);
516
517 /* Transport disconnect drains the receive CQ before it 510 /* Transport disconnect drains the receive CQ before it
518 * replaces the QP. The RPC reply handler won't call us 511 * replaces the QP. The RPC reply handler won't call us
519 * unless ri_id->qp is a valid pointer. 512 * unless ri_id->qp is a valid pointer.
@@ -546,7 +539,6 @@ reset_mrs:
546 /* Find and reset the MRs in the LOCAL_INV WRs that did not 539 /* Find and reset the MRs in the LOCAL_INV WRs that did not
547 * get posted. 540 * get posted.
548 */ 541 */
549 rpcrdma_init_cqcount(&r_xprt->rx_ep, -count);
550 while (bad_wr) { 542 while (bad_wr) {
551 f = container_of(bad_wr, struct rpcrdma_frmr, 543 f = container_of(bad_wr, struct rpcrdma_frmr,
552 fr_invwr); 544 fr_invwr);
@@ -559,28 +551,9 @@ reset_mrs:
559 goto unmap; 551 goto unmap;
560} 552}
561 553
562/* Use a slow, safe mechanism to invalidate all memory regions
563 * that were registered for "req".
564 */
565static void
566frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
567 bool sync)
568{
569 struct rpcrdma_mw *mw;
570
571 while (!list_empty(&req->rl_registered)) {
572 mw = rpcrdma_pop_mw(&req->rl_registered);
573 if (sync)
574 frwr_op_recover_mr(mw);
575 else
576 rpcrdma_defer_mr_recovery(mw);
577 }
578}
579
580const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { 554const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
581 .ro_map = frwr_op_map, 555 .ro_map = frwr_op_map,
582 .ro_unmap_sync = frwr_op_unmap_sync, 556 .ro_unmap_sync = frwr_op_unmap_sync,
583 .ro_unmap_safe = frwr_op_unmap_safe,
584 .ro_recover_mr = frwr_op_recover_mr, 557 .ro_recover_mr = frwr_op_recover_mr,
585 .ro_open = frwr_op_open, 558 .ro_open = frwr_op_open,
586 .ro_maxpages = frwr_op_maxpages, 559 .ro_maxpages = frwr_op_maxpages,
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index f1889f4d4803..ed34dc0f144c 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1,4 +1,5 @@
1/* 1/*
2 * Copyright (c) 2014-2017 Oracle. All rights reserved.
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 * 4 *
4 * This software is available to you under a choice of one of two 5 * This software is available to you under a choice of one of two
@@ -75,11 +76,11 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
75 76
76 /* Maximum Read list size */ 77 /* Maximum Read list size */
77 maxsegs += 2; /* segment for head and tail buffers */ 78 maxsegs += 2; /* segment for head and tail buffers */
78 size = maxsegs * sizeof(struct rpcrdma_read_chunk); 79 size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
79 80
80 /* Minimal Read chunk size */ 81 /* Minimal Read chunk size */
81 size += sizeof(__be32); /* segment count */ 82 size += sizeof(__be32); /* segment count */
82 size += sizeof(struct rpcrdma_segment); 83 size += rpcrdma_segment_maxsz * sizeof(__be32);
83 size += sizeof(__be32); /* list discriminator */ 84 size += sizeof(__be32); /* list discriminator */
84 85
85 dprintk("RPC: %s: max call header size = %u\n", 86 dprintk("RPC: %s: max call header size = %u\n",
@@ -102,7 +103,7 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
102 /* Maximum Write list size */ 103 /* Maximum Write list size */
103 maxsegs += 2; /* segment for head and tail buffers */ 104 maxsegs += 2; /* segment for head and tail buffers */
104 size = sizeof(__be32); /* segment count */ 105 size = sizeof(__be32); /* segment count */
105 size += maxsegs * sizeof(struct rpcrdma_segment); 106 size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
106 size += sizeof(__be32); /* list discriminator */ 107 size += sizeof(__be32); /* list discriminator */
107 108
108 dprintk("RPC: %s: max reply header size = %u\n", 109 dprintk("RPC: %s: max reply header size = %u\n",
@@ -511,27 +512,60 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
511 return 0; 512 return 0;
512} 513}
513 514
514/* Prepare the RPC-over-RDMA header SGE. 515/**
516 * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
517 * @sc: sendctx containing SGEs to unmap
518 *
519 */
520void
521rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
522{
523 struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
524 struct ib_sge *sge;
525 unsigned int count;
526
527 dprintk("RPC: %s: unmapping %u sges for sc=%p\n",
528 __func__, sc->sc_unmap_count, sc);
529
530 /* The first two SGEs contain the transport header and
531 * the inline buffer. These are always left mapped so
532 * they can be cheaply re-used.
533 */
534 sge = &sc->sc_sges[2];
535 for (count = sc->sc_unmap_count; count; ++sge, --count)
536 ib_dma_unmap_page(ia->ri_device,
537 sge->addr, sge->length, DMA_TO_DEVICE);
538
539 if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
540 smp_mb__after_atomic();
541 wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
542 }
543}
544
545/* Prepare an SGE for the RPC-over-RDMA transport header.
515 */ 546 */
516static bool 547static bool
517rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req, 548rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
518 u32 len) 549 u32 len)
519{ 550{
551 struct rpcrdma_sendctx *sc = req->rl_sendctx;
520 struct rpcrdma_regbuf *rb = req->rl_rdmabuf; 552 struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
521 struct ib_sge *sge = &req->rl_send_sge[0]; 553 struct ib_sge *sge = sc->sc_sges;
522 554
523 if (unlikely(!rpcrdma_regbuf_is_mapped(rb))) { 555 if (!rpcrdma_dma_map_regbuf(ia, rb))
524 if (!__rpcrdma_dma_map_regbuf(ia, rb)) 556 goto out_regbuf;
525 return false; 557 sge->addr = rdmab_addr(rb);
526 sge->addr = rdmab_addr(rb);
527 sge->lkey = rdmab_lkey(rb);
528 }
529 sge->length = len; 558 sge->length = len;
559 sge->lkey = rdmab_lkey(rb);
530 560
531 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, 561 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
532 sge->length, DMA_TO_DEVICE); 562 sge->length, DMA_TO_DEVICE);
533 req->rl_send_wr.num_sge++; 563 sc->sc_wr.num_sge++;
534 return true; 564 return true;
565
566out_regbuf:
567 pr_err("rpcrdma: failed to DMA map a Send buffer\n");
568 return false;
535} 569}
536 570
537/* Prepare the Send SGEs. The head and tail iovec, and each entry 571/* Prepare the Send SGEs. The head and tail iovec, and each entry
@@ -541,10 +575,11 @@ static bool
541rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, 575rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
542 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) 576 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
543{ 577{
578 struct rpcrdma_sendctx *sc = req->rl_sendctx;
544 unsigned int sge_no, page_base, len, remaining; 579 unsigned int sge_no, page_base, len, remaining;
545 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 580 struct rpcrdma_regbuf *rb = req->rl_sendbuf;
546 struct ib_device *device = ia->ri_device; 581 struct ib_device *device = ia->ri_device;
547 struct ib_sge *sge = req->rl_send_sge; 582 struct ib_sge *sge = sc->sc_sges;
548 u32 lkey = ia->ri_pd->local_dma_lkey; 583 u32 lkey = ia->ri_pd->local_dma_lkey;
549 struct page *page, **ppages; 584 struct page *page, **ppages;
550 585
@@ -552,7 +587,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
552 * DMA-mapped. Sync the content that has changed. 587 * DMA-mapped. Sync the content that has changed.
553 */ 588 */
554 if (!rpcrdma_dma_map_regbuf(ia, rb)) 589 if (!rpcrdma_dma_map_regbuf(ia, rb))
555 return false; 590 goto out_regbuf;
556 sge_no = 1; 591 sge_no = 1;
557 sge[sge_no].addr = rdmab_addr(rb); 592 sge[sge_no].addr = rdmab_addr(rb);
558 sge[sge_no].length = xdr->head[0].iov_len; 593 sge[sge_no].length = xdr->head[0].iov_len;
@@ -607,7 +642,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
607 sge[sge_no].length = len; 642 sge[sge_no].length = len;
608 sge[sge_no].lkey = lkey; 643 sge[sge_no].lkey = lkey;
609 644
610 req->rl_mapped_sges++; 645 sc->sc_unmap_count++;
611 ppages++; 646 ppages++;
612 remaining -= len; 647 remaining -= len;
613 page_base = 0; 648 page_base = 0;
@@ -633,56 +668,61 @@ map_tail:
633 goto out_mapping_err; 668 goto out_mapping_err;
634 sge[sge_no].length = len; 669 sge[sge_no].length = len;
635 sge[sge_no].lkey = lkey; 670 sge[sge_no].lkey = lkey;
636 req->rl_mapped_sges++; 671 sc->sc_unmap_count++;
637 } 672 }
638 673
639out: 674out:
640 req->rl_send_wr.num_sge = sge_no + 1; 675 sc->sc_wr.num_sge += sge_no;
676 if (sc->sc_unmap_count)
677 __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
641 return true; 678 return true;
642 679
680out_regbuf:
681 pr_err("rpcrdma: failed to DMA map a Send buffer\n");
682 return false;
683
643out_mapping_overflow: 684out_mapping_overflow:
685 rpcrdma_unmap_sendctx(sc);
644 pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no); 686 pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
645 return false; 687 return false;
646 688
647out_mapping_err: 689out_mapping_err:
690 rpcrdma_unmap_sendctx(sc);
648 pr_err("rpcrdma: Send mapping error\n"); 691 pr_err("rpcrdma: Send mapping error\n");
649 return false; 692 return false;
650} 693}
651 694
652bool 695/**
653rpcrdma_prepare_send_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, 696 * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
654 u32 hdrlen, struct xdr_buf *xdr, 697 * @r_xprt: controlling transport
655 enum rpcrdma_chunktype rtype) 698 * @req: context of RPC Call being marshalled
699 * @hdrlen: size of transport header, in bytes
700 * @xdr: xdr_buf containing RPC Call
701 * @rtype: chunk type being encoded
702 *
703 * Returns 0 on success; otherwise a negative errno is returned.
704 */
705int
706rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
707 struct rpcrdma_req *req, u32 hdrlen,
708 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
656{ 709{
657 req->rl_send_wr.num_sge = 0; 710 req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
658 req->rl_mapped_sges = 0; 711 if (!req->rl_sendctx)
659 712 return -ENOBUFS;
660 if (!rpcrdma_prepare_hdr_sge(ia, req, hdrlen)) 713 req->rl_sendctx->sc_wr.num_sge = 0;
661 goto out_map; 714 req->rl_sendctx->sc_unmap_count = 0;
715 req->rl_sendctx->sc_req = req;
716 __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
717
718 if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
719 return -EIO;
662 720
663 if (rtype != rpcrdma_areadch) 721 if (rtype != rpcrdma_areadch)
664 if (!rpcrdma_prepare_msg_sges(ia, req, xdr, rtype)) 722 if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
665 goto out_map; 723 return -EIO;
666
667 return true;
668
669out_map:
670 pr_err("rpcrdma: failed to DMA map a Send buffer\n");
671 return false;
672}
673
674void
675rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
676{
677 struct ib_device *device = ia->ri_device;
678 struct ib_sge *sge;
679 int count;
680 724
681 sge = &req->rl_send_sge[2]; 725 return 0;
682 for (count = req->rl_mapped_sges; count--; sge++)
683 ib_dma_unmap_page(device, sge->addr, sge->length,
684 DMA_TO_DEVICE);
685 req->rl_mapped_sges = 0;
686} 726}
687 727
688/** 728/**
@@ -833,12 +873,10 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
833 transfertypes[rtype], transfertypes[wtype], 873 transfertypes[rtype], transfertypes[wtype],
834 xdr_stream_pos(xdr)); 874 xdr_stream_pos(xdr));
835 875
836 if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, 876 ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
837 xdr_stream_pos(xdr), 877 &rqst->rq_snd_buf, rtype);
838 &rqst->rq_snd_buf, rtype)) { 878 if (ret)
839 ret = -EIO;
840 goto out_err; 879 goto out_err;
841 }
842 return 0; 880 return 0;
843 881
844out_err: 882out_err:
@@ -970,14 +1008,13 @@ rpcrdma_mark_remote_invalidation(struct list_head *mws,
970 * straightforward to check the RPC header's direction field. 1008 * straightforward to check the RPC header's direction field.
971 */ 1009 */
972static bool 1010static bool
973rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1011rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
974 __be32 xid, __be32 proc)
975#if defined(CONFIG_SUNRPC_BACKCHANNEL) 1012#if defined(CONFIG_SUNRPC_BACKCHANNEL)
976{ 1013{
977 struct xdr_stream *xdr = &rep->rr_stream; 1014 struct xdr_stream *xdr = &rep->rr_stream;
978 __be32 *p; 1015 __be32 *p;
979 1016
980 if (proc != rdma_msg) 1017 if (rep->rr_proc != rdma_msg)
981 return false; 1018 return false;
982 1019
983 /* Peek at stream contents without advancing. */ 1020 /* Peek at stream contents without advancing. */
@@ -992,7 +1029,7 @@ rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
992 return false; 1029 return false;
993 1030
994 /* RPC header */ 1031 /* RPC header */
995 if (*p++ != xid) 1032 if (*p++ != rep->rr_xid)
996 return false; 1033 return false;
997 if (*p != cpu_to_be32(RPC_CALL)) 1034 if (*p != cpu_to_be32(RPC_CALL))
998 return false; 1035 return false;
@@ -1212,105 +1249,170 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1212 return -EREMOTEIO; 1249 return -EREMOTEIO;
1213} 1250}
1214 1251
1252/* Perform XID lookup, reconstruction of the RPC reply, and
1253 * RPC completion while holding the transport lock to ensure
1254 * the rep, rqst, and rq_task pointers remain stable.
1255 */
1256void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
1257{
1258 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1259 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1260 struct rpc_rqst *rqst = rep->rr_rqst;
1261 unsigned long cwnd;
1262 int status;
1263
1264 xprt->reestablish_timeout = 0;
1265
1266 switch (rep->rr_proc) {
1267 case rdma_msg:
1268 status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1269 break;
1270 case rdma_nomsg:
1271 status = rpcrdma_decode_nomsg(r_xprt, rep);
1272 break;
1273 case rdma_error:
1274 status = rpcrdma_decode_error(r_xprt, rep, rqst);
1275 break;
1276 default:
1277 status = -EIO;
1278 }
1279 if (status < 0)
1280 goto out_badheader;
1281
1282out:
1283 spin_lock(&xprt->recv_lock);
1284 cwnd = xprt->cwnd;
1285 xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT;
1286 if (xprt->cwnd > cwnd)
1287 xprt_release_rqst_cong(rqst->rq_task);
1288
1289 xprt_complete_rqst(rqst->rq_task, status);
1290 xprt_unpin_rqst(rqst);
1291 spin_unlock(&xprt->recv_lock);
1292 return;
1293
1294/* If the incoming reply terminated a pending RPC, the next
1295 * RPC call will post a replacement receive buffer as it is
1296 * being marshaled.
1297 */
1298out_badheader:
1299 dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1300 rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc));
1301 r_xprt->rx_stats.bad_reply_count++;
1302 status = -EIO;
1303 goto out;
1304}
1305
1306void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
1307{
1308 /* Invalidate and unmap the data payloads before waking
1309 * the waiting application. This guarantees the memory
1310 * regions are properly fenced from the server before the
1311 * application accesses the data. It also ensures proper
1312 * send flow control: waking the next RPC waits until this
1313 * RPC has relinquished all its Send Queue entries.
1314 */
1315 if (!list_empty(&req->rl_registered))
1316 r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
1317 &req->rl_registered);
1318
1319 /* Ensure that any DMA mapped pages associated with
1320 * the Send of the RPC Call have been unmapped before
1321 * allowing the RPC to complete. This protects argument
1322 * memory not controlled by the RPC client from being
1323 * re-used before we're done with it.
1324 */
1325 if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1326 r_xprt->rx_stats.reply_waits_for_send++;
1327 out_of_line_wait_on_bit(&req->rl_flags,
1328 RPCRDMA_REQ_F_TX_RESOURCES,
1329 bit_wait,
1330 TASK_UNINTERRUPTIBLE);
1331 }
1332}
1333
1334/* Reply handling runs in the poll worker thread. Anything that
1335 * might wait is deferred to a separate workqueue.
1336 */
1337void rpcrdma_deferred_completion(struct work_struct *work)
1338{
1339 struct rpcrdma_rep *rep =
1340 container_of(work, struct rpcrdma_rep, rr_work);
1341 struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
1342
1343 rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
1344 rpcrdma_release_rqst(rep->rr_rxprt, req);
1345 rpcrdma_complete_rqst(rep);
1346}
1347
1215/* Process received RPC/RDMA messages. 1348/* Process received RPC/RDMA messages.
1216 * 1349 *
1217 * Errors must result in the RPC task either being awakened, or 1350 * Errors must result in the RPC task either being awakened, or
1218 * allowed to timeout, to discover the errors at that time. 1351 * allowed to timeout, to discover the errors at that time.
1219 */ 1352 */
1220void 1353void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
1221rpcrdma_reply_handler(struct work_struct *work)
1222{ 1354{
1223 struct rpcrdma_rep *rep =
1224 container_of(work, struct rpcrdma_rep, rr_work);
1225 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1355 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1226 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1356 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1227 struct xdr_stream *xdr = &rep->rr_stream; 1357 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1228 struct rpcrdma_req *req; 1358 struct rpcrdma_req *req;
1229 struct rpc_rqst *rqst; 1359 struct rpc_rqst *rqst;
1230 __be32 *p, xid, vers, proc; 1360 u32 credits;
1231 unsigned long cwnd; 1361 __be32 *p;
1232 int status;
1233 1362
1234 dprintk("RPC: %s: incoming rep %p\n", __func__, rep); 1363 dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
1235 1364
1236 if (rep->rr_hdrbuf.head[0].iov_len == 0) 1365 if (rep->rr_hdrbuf.head[0].iov_len == 0)
1237 goto out_badstatus; 1366 goto out_badstatus;
1238 1367
1239 xdr_init_decode(xdr, &rep->rr_hdrbuf, 1368 xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
1240 rep->rr_hdrbuf.head[0].iov_base); 1369 rep->rr_hdrbuf.head[0].iov_base);
1241 1370
1242 /* Fixed transport header fields */ 1371 /* Fixed transport header fields */
1243 p = xdr_inline_decode(xdr, 4 * sizeof(*p)); 1372 p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
1244 if (unlikely(!p)) 1373 if (unlikely(!p))
1245 goto out_shortreply; 1374 goto out_shortreply;
1246 xid = *p++; 1375 rep->rr_xid = *p++;
1247 vers = *p++; 1376 rep->rr_vers = *p++;
1248 p++; /* credits */ 1377 credits = be32_to_cpu(*p++);
1249 proc = *p++; 1378 rep->rr_proc = *p++;
1379
1380 if (rep->rr_vers != rpcrdma_version)
1381 goto out_badversion;
1250 1382
1251 if (rpcrdma_is_bcall(r_xprt, rep, xid, proc)) 1383 if (rpcrdma_is_bcall(r_xprt, rep))
1252 return; 1384 return;
1253 1385
1254 /* Match incoming rpcrdma_rep to an rpcrdma_req to 1386 /* Match incoming rpcrdma_rep to an rpcrdma_req to
1255 * get context for handling any incoming chunks. 1387 * get context for handling any incoming chunks.
1256 */ 1388 */
1257 spin_lock(&xprt->recv_lock); 1389 spin_lock(&xprt->recv_lock);
1258 rqst = xprt_lookup_rqst(xprt, xid); 1390 rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
1259 if (!rqst) 1391 if (!rqst)
1260 goto out_norqst; 1392 goto out_norqst;
1261 xprt_pin_rqst(rqst); 1393 xprt_pin_rqst(rqst);
1394
1395 if (credits == 0)
1396 credits = 1; /* don't deadlock */
1397 else if (credits > buf->rb_max_requests)
1398 credits = buf->rb_max_requests;
1399 buf->rb_credits = credits;
1400
1262 spin_unlock(&xprt->recv_lock); 1401 spin_unlock(&xprt->recv_lock);
1402
1263 req = rpcr_to_rdmar(rqst); 1403 req = rpcr_to_rdmar(rqst);
1264 req->rl_reply = rep; 1404 req->rl_reply = rep;
1405 rep->rr_rqst = rqst;
1406 clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
1265 1407
1266 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", 1408 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
1267 __func__, rep, req, be32_to_cpu(xid)); 1409 __func__, rep, req, be32_to_cpu(rep->rr_xid));
1268
1269 /* Invalidate and unmap the data payloads before waking the
1270 * waiting application. This guarantees the memory regions
1271 * are properly fenced from the server before the application
1272 * accesses the data. It also ensures proper send flow control:
1273 * waking the next RPC waits until this RPC has relinquished
1274 * all its Send Queue entries.
1275 */
1276 if (!list_empty(&req->rl_registered)) {
1277 rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
1278 r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
1279 &req->rl_registered);
1280 }
1281
1282 xprt->reestablish_timeout = 0;
1283 if (vers != rpcrdma_version)
1284 goto out_badversion;
1285 1410
1286 switch (proc) { 1411 if (list_empty(&req->rl_registered) &&
1287 case rdma_msg: 1412 !test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags))
1288 status = rpcrdma_decode_msg(r_xprt, rep, rqst); 1413 rpcrdma_complete_rqst(rep);
1289 break; 1414 else
1290 case rdma_nomsg: 1415 queue_work(rpcrdma_receive_wq, &rep->rr_work);
1291 status = rpcrdma_decode_nomsg(r_xprt, rep);
1292 break;
1293 case rdma_error:
1294 status = rpcrdma_decode_error(r_xprt, rep, rqst);
1295 break;
1296 default:
1297 status = -EIO;
1298 }
1299 if (status < 0)
1300 goto out_badheader;
1301
1302out:
1303 spin_lock(&xprt->recv_lock);
1304 cwnd = xprt->cwnd;
1305 xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
1306 if (xprt->cwnd > cwnd)
1307 xprt_release_rqst_cong(rqst->rq_task);
1308
1309 xprt_complete_rqst(rqst->rq_task, status);
1310 xprt_unpin_rqst(rqst);
1311 spin_unlock(&xprt->recv_lock);
1312 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
1313 __func__, xprt, rqst, status);
1314 return; 1416 return;
1315 1417
1316out_badstatus: 1418out_badstatus:
@@ -1321,37 +1423,22 @@ out_badstatus:
1321 } 1423 }
1322 return; 1424 return;
1323 1425
1324/* If the incoming reply terminated a pending RPC, the next
1325 * RPC call will post a replacement receive buffer as it is
1326 * being marshaled.
1327 */
1328out_badversion: 1426out_badversion:
1329 dprintk("RPC: %s: invalid version %d\n", 1427 dprintk("RPC: %s: invalid version %d\n",
1330 __func__, be32_to_cpu(vers)); 1428 __func__, be32_to_cpu(rep->rr_vers));
1331 status = -EIO; 1429 goto repost;
1332 r_xprt->rx_stats.bad_reply_count++;
1333 goto out;
1334
1335out_badheader:
1336 dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1337 rqst->rq_task->tk_pid, __func__, be32_to_cpu(proc));
1338 r_xprt->rx_stats.bad_reply_count++;
1339 status = -EIO;
1340 goto out;
1341 1430
1342/* The req was still available, but by the time the recv_lock 1431/* The RPC transaction has already been terminated, or the header
1343 * was acquired, the rqst and task had been released. Thus the RPC 1432 * is corrupt.
1344 * has already been terminated.
1345 */ 1433 */
1346out_norqst: 1434out_norqst:
1347 spin_unlock(&xprt->recv_lock); 1435 spin_unlock(&xprt->recv_lock);
1348 dprintk("RPC: %s: no match for incoming xid 0x%08x\n", 1436 dprintk("RPC: %s: no match for incoming xid 0x%08x\n",
1349 __func__, be32_to_cpu(xid)); 1437 __func__, be32_to_cpu(rep->rr_xid));
1350 goto repost; 1438 goto repost;
1351 1439
1352out_shortreply: 1440out_shortreply:
1353 dprintk("RPC: %s: short/invalid reply\n", __func__); 1441 dprintk("RPC: %s: short/invalid reply\n", __func__);
1354 goto repost;
1355 1442
1356/* If no pending RPC transaction was matched, post a replacement 1443/* If no pending RPC transaction was matched, post a replacement
1357 * receive buffer before returning. 1444 * receive buffer before returning.
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index c84e2b644e13..646c24494ea7 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -1,4 +1,5 @@
1/* 1/*
2 * Copyright (c) 2014-2017 Oracle. All rights reserved.
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 * 4 *
4 * This software is available to you under a choice of one of two 5 * This software is available to you under a choice of one of two
@@ -678,16 +679,14 @@ xprt_rdma_free(struct rpc_task *task)
678 struct rpc_rqst *rqst = task->tk_rqstp; 679 struct rpc_rqst *rqst = task->tk_rqstp;
679 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 680 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
680 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 681 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
681 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
682 682
683 if (req->rl_backchannel) 683 if (test_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags))
684 return; 684 return;
685 685
686 dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); 686 dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
687 687
688 if (!list_empty(&req->rl_registered)) 688 if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
689 ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task)); 689 rpcrdma_release_rqst(r_xprt, req);
690 rpcrdma_unmap_sges(ia, req);
691 rpcrdma_buffer_put(req); 690 rpcrdma_buffer_put(req);
692} 691}
693 692
@@ -728,7 +727,8 @@ xprt_rdma_send_request(struct rpc_task *task)
728 727
729 /* On retransmit, remove any previously registered chunks */ 728 /* On retransmit, remove any previously registered chunks */
730 if (unlikely(!list_empty(&req->rl_registered))) 729 if (unlikely(!list_empty(&req->rl_registered)))
731 r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); 730 r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
731 &req->rl_registered);
732 732
733 rc = rpcrdma_marshal_req(r_xprt, rqst); 733 rc = rpcrdma_marshal_req(r_xprt, rqst);
734 if (rc < 0) 734 if (rc < 0)
@@ -742,6 +742,7 @@ xprt_rdma_send_request(struct rpc_task *task)
742 goto drop_connection; 742 goto drop_connection;
743 req->rl_connect_cookie = xprt->connect_cookie; 743 req->rl_connect_cookie = xprt->connect_cookie;
744 744
745 set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
745 if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) 746 if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
746 goto drop_connection; 747 goto drop_connection;
747 748
@@ -789,11 +790,13 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
789 r_xprt->rx_stats.failed_marshal_count, 790 r_xprt->rx_stats.failed_marshal_count,
790 r_xprt->rx_stats.bad_reply_count, 791 r_xprt->rx_stats.bad_reply_count,
791 r_xprt->rx_stats.nomsg_call_count); 792 r_xprt->rx_stats.nomsg_call_count);
792 seq_printf(seq, "%lu %lu %lu %lu\n", 793 seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
793 r_xprt->rx_stats.mrs_recovered, 794 r_xprt->rx_stats.mrs_recovered,
794 r_xprt->rx_stats.mrs_orphaned, 795 r_xprt->rx_stats.mrs_orphaned,
795 r_xprt->rx_stats.mrs_allocated, 796 r_xprt->rx_stats.mrs_allocated,
796 r_xprt->rx_stats.local_inv_needed); 797 r_xprt->rx_stats.local_inv_needed,
798 r_xprt->rx_stats.empty_sendctx_q,
799 r_xprt->rx_stats.reply_waits_for_send);
797} 800}
798 801
799static int 802static int
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 11a1fbf7e59e..710b3f77db82 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1,4 +1,5 @@
1/* 1/*
2 * Copyright (c) 2014-2017 Oracle. All rights reserved.
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 * 4 *
4 * This software is available to you under a choice of one of two 5 * This software is available to you under a choice of one of two
@@ -49,9 +50,10 @@
49 50
50#include <linux/interrupt.h> 51#include <linux/interrupt.h>
51#include <linux/slab.h> 52#include <linux/slab.h>
52#include <linux/prefetch.h>
53#include <linux/sunrpc/addr.h> 53#include <linux/sunrpc/addr.h>
54#include <linux/sunrpc/svc_rdma.h> 54#include <linux/sunrpc/svc_rdma.h>
55
56#include <asm-generic/barrier.h>
55#include <asm/bitops.h> 57#include <asm/bitops.h>
56 58
57#include <rdma/ib_cm.h> 59#include <rdma/ib_cm.h>
@@ -73,7 +75,7 @@ static void rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt);
73static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf); 75static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf);
74static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); 76static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
75 77
76static struct workqueue_struct *rpcrdma_receive_wq __read_mostly; 78struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
77 79
78int 80int
79rpcrdma_alloc_wq(void) 81rpcrdma_alloc_wq(void)
@@ -126,30 +128,17 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
126static void 128static void
127rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 129rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
128{ 130{
131 struct ib_cqe *cqe = wc->wr_cqe;
132 struct rpcrdma_sendctx *sc =
133 container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
134
129 /* WARNING: Only wr_cqe and status are reliable at this point */ 135 /* WARNING: Only wr_cqe and status are reliable at this point */
130 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 136 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
131 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 137 pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
132 ib_wc_status_msg(wc->status), 138 ib_wc_status_msg(wc->status),
133 wc->status, wc->vendor_err); 139 wc->status, wc->vendor_err);
134}
135
136/* Perform basic sanity checking to avoid using garbage
137 * to update the credit grant value.
138 */
139static void
140rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
141{
142 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
143 __be32 *p = rep->rr_rdmabuf->rg_base;
144 u32 credits;
145 140
146 credits = be32_to_cpup(p + 2); 141 rpcrdma_sendctx_put_locked(sc);
147 if (credits == 0)
148 credits = 1; /* don't deadlock */
149 else if (credits > buffer->rb_max_requests)
150 credits = buffer->rb_max_requests;
151
152 atomic_set(&buffer->rb_credits, credits);
153} 142}
154 143
155/** 144/**
@@ -181,11 +170,8 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
181 rdmab_addr(rep->rr_rdmabuf), 170 rdmab_addr(rep->rr_rdmabuf),
182 wc->byte_len, DMA_FROM_DEVICE); 171 wc->byte_len, DMA_FROM_DEVICE);
183 172
184 if (wc->byte_len >= RPCRDMA_HDRLEN_ERR)
185 rpcrdma_update_granted_credits(rep);
186
187out_schedule: 173out_schedule:
188 queue_work(rpcrdma_receive_wq, &rep->rr_work); 174 rpcrdma_reply_handler(rep);
189 return; 175 return;
190 176
191out_fail: 177out_fail:
@@ -295,7 +281,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
295 case RDMA_CM_EVENT_DISCONNECTED: 281 case RDMA_CM_EVENT_DISCONNECTED:
296 connstate = -ECONNABORTED; 282 connstate = -ECONNABORTED;
297connected: 283connected:
298 atomic_set(&xprt->rx_buf.rb_credits, 1); 284 xprt->rx_buf.rb_credits = 1;
299 ep->rep_connected = connstate; 285 ep->rep_connected = connstate;
300 rpcrdma_conn_func(ep); 286 rpcrdma_conn_func(ep);
301 wake_up_all(&ep->rep_connect_wait); 287 wake_up_all(&ep->rep_connect_wait);
@@ -564,16 +550,15 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
564 ep->rep_attr.cap.max_recv_sge); 550 ep->rep_attr.cap.max_recv_sge);
565 551
566 /* set trigger for requesting send completion */ 552 /* set trigger for requesting send completion */
567 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 553 ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
568 if (ep->rep_cqinit <= 2) 554 cdata->max_requests >> 2);
569 ep->rep_cqinit = 0; /* always signal? */ 555 ep->rep_send_count = ep->rep_send_batch;
570 rpcrdma_init_cqcount(ep, 0);
571 init_waitqueue_head(&ep->rep_connect_wait); 556 init_waitqueue_head(&ep->rep_connect_wait);
572 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 557 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
573 558
574 sendcq = ib_alloc_cq(ia->ri_device, NULL, 559 sendcq = ib_alloc_cq(ia->ri_device, NULL,
575 ep->rep_attr.cap.max_send_wr + 1, 560 ep->rep_attr.cap.max_send_wr + 1,
576 0, IB_POLL_SOFTIRQ); 561 1, IB_POLL_WORKQUEUE);
577 if (IS_ERR(sendcq)) { 562 if (IS_ERR(sendcq)) {
578 rc = PTR_ERR(sendcq); 563 rc = PTR_ERR(sendcq);
579 dprintk("RPC: %s: failed to create send CQ: %i\n", 564 dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -583,7 +568,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
583 568
584 recvcq = ib_alloc_cq(ia->ri_device, NULL, 569 recvcq = ib_alloc_cq(ia->ri_device, NULL,
585 ep->rep_attr.cap.max_recv_wr + 1, 570 ep->rep_attr.cap.max_recv_wr + 1,
586 0, IB_POLL_SOFTIRQ); 571 0, IB_POLL_WORKQUEUE);
587 if (IS_ERR(recvcq)) { 572 if (IS_ERR(recvcq)) {
588 rc = PTR_ERR(recvcq); 573 rc = PTR_ERR(recvcq);
589 dprintk("RPC: %s: failed to create recv CQ: %i\n", 574 dprintk("RPC: %s: failed to create recv CQ: %i\n",
@@ -846,6 +831,168 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
846 ib_drain_qp(ia->ri_id->qp); 831 ib_drain_qp(ia->ri_id->qp);
847} 832}
848 833
834/* Fixed-size circular FIFO queue. This implementation is wait-free and
835 * lock-free.
836 *
837 * Consumer is the code path that posts Sends. This path dequeues a
838 * sendctx for use by a Send operation. Multiple consumer threads
839 * are serialized by the RPC transport lock, which allows only one
840 * ->send_request call at a time.
841 *
842 * Producer is the code path that handles Send completions. This path
843 * enqueues a sendctx that has been completed. Multiple producer
844 * threads are serialized by the ib_poll_cq() function.
845 */
846
847/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
848 * queue activity, and ib_drain_qp has flushed all remaining Send
849 * requests.
850 */
851static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
852{
853 unsigned long i;
854
855 for (i = 0; i <= buf->rb_sc_last; i++)
856 kfree(buf->rb_sc_ctxs[i]);
857 kfree(buf->rb_sc_ctxs);
858}
859
860static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
861{
862 struct rpcrdma_sendctx *sc;
863
864 sc = kzalloc(sizeof(*sc) +
865 ia->ri_max_send_sges * sizeof(struct ib_sge),
866 GFP_KERNEL);
867 if (!sc)
868 return NULL;
869
870 sc->sc_wr.wr_cqe = &sc->sc_cqe;
871 sc->sc_wr.sg_list = sc->sc_sges;
872 sc->sc_wr.opcode = IB_WR_SEND;
873 sc->sc_cqe.done = rpcrdma_wc_send;
874 return sc;
875}
876
877static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
878{
879 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
880 struct rpcrdma_sendctx *sc;
881 unsigned long i;
882
883 /* Maximum number of concurrent outstanding Send WRs. Capping
884 * the circular queue size stops Send Queue overflow by causing
885 * the ->send_request call to fail temporarily before too many
886 * Sends are posted.
887 */
888 i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
889 dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i);
890 buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
891 if (!buf->rb_sc_ctxs)
892 return -ENOMEM;
893
894 buf->rb_sc_last = i - 1;
895 for (i = 0; i <= buf->rb_sc_last; i++) {
896 sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
897 if (!sc)
898 goto out_destroy;
899
900 sc->sc_xprt = r_xprt;
901 buf->rb_sc_ctxs[i] = sc;
902 }
903
904 return 0;
905
906out_destroy:
907 rpcrdma_sendctxs_destroy(buf);
908 return -ENOMEM;
909}
910
911/* The sendctx queue is not guaranteed to have a size that is a
912 * power of two, thus the helpers in circ_buf.h cannot be used.
913 * The other option is to use modulus (%), which can be expensive.
914 */
915static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
916 unsigned long item)
917{
918 return likely(item < buf->rb_sc_last) ? item + 1 : 0;
919}
920
921/**
922 * rpcrdma_sendctx_get_locked - Acquire a send context
923 * @buf: transport buffers from which to acquire an unused context
924 *
925 * Returns pointer to a free send completion context; or NULL if
926 * the queue is empty.
927 *
928 * Usage: Called to acquire an SGE array before preparing a Send WR.
929 *
930 * The caller serializes calls to this function (per rpcrdma_buffer),
931 * and provides an effective memory barrier that flushes the new value
932 * of rb_sc_head.
933 */
934struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
935{
936 struct rpcrdma_xprt *r_xprt;
937 struct rpcrdma_sendctx *sc;
938 unsigned long next_head;
939
940 next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
941
942 if (next_head == READ_ONCE(buf->rb_sc_tail))
943 goto out_emptyq;
944
945 /* ORDER: item must be accessed _before_ head is updated */
946 sc = buf->rb_sc_ctxs[next_head];
947
948 /* Releasing the lock in the caller acts as a memory
949 * barrier that flushes rb_sc_head.
950 */
951 buf->rb_sc_head = next_head;
952
953 return sc;
954
955out_emptyq:
956 /* The queue is "empty" if there have not been enough Send
957 * completions recently. This is a sign the Send Queue is
958 * backing up. Cause the caller to pause and try again.
959 */
960 dprintk("RPC: %s: empty sendctx queue\n", __func__);
961 r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
962 r_xprt->rx_stats.empty_sendctx_q++;
963 return NULL;
964}
965
966/**
967 * rpcrdma_sendctx_put_locked - Release a send context
968 * @sc: send context to release
969 *
970 * Usage: Called from Send completion to return a sendctxt
971 * to the queue.
972 *
973 * The caller serializes calls to this function (per rpcrdma_buffer).
974 */
975void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
976{
977 struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
978 unsigned long next_tail;
979
980 /* Unmap SGEs of previously completed by unsignaled
981 * Sends by walking up the queue until @sc is found.
982 */
983 next_tail = buf->rb_sc_tail;
984 do {
985 next_tail = rpcrdma_sendctx_next(buf, next_tail);
986
987 /* ORDER: item must be accessed _before_ tail is updated */
988 rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
989
990 } while (buf->rb_sc_ctxs[next_tail] != sc);
991
992 /* Paired with READ_ONCE */
993 smp_store_release(&buf->rb_sc_tail, next_tail);
994}
995
849static void 996static void
850rpcrdma_mr_recovery_worker(struct work_struct *work) 997rpcrdma_mr_recovery_worker(struct work_struct *work)
851{ 998{
@@ -941,13 +1088,8 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
941 spin_lock(&buffer->rb_reqslock); 1088 spin_lock(&buffer->rb_reqslock);
942 list_add(&req->rl_all, &buffer->rb_allreqs); 1089 list_add(&req->rl_all, &buffer->rb_allreqs);
943 spin_unlock(&buffer->rb_reqslock); 1090 spin_unlock(&buffer->rb_reqslock);
944 req->rl_cqe.done = rpcrdma_wc_send;
945 req->rl_buffer = &r_xprt->rx_buf; 1091 req->rl_buffer = &r_xprt->rx_buf;
946 INIT_LIST_HEAD(&req->rl_registered); 1092 INIT_LIST_HEAD(&req->rl_registered);
947 req->rl_send_wr.next = NULL;
948 req->rl_send_wr.wr_cqe = &req->rl_cqe;
949 req->rl_send_wr.sg_list = req->rl_send_sge;
950 req->rl_send_wr.opcode = IB_WR_SEND;
951 return req; 1093 return req;
952} 1094}
953 1095
@@ -974,7 +1116,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
974 1116
975 rep->rr_cqe.done = rpcrdma_wc_receive; 1117 rep->rr_cqe.done = rpcrdma_wc_receive;
976 rep->rr_rxprt = r_xprt; 1118 rep->rr_rxprt = r_xprt;
977 INIT_WORK(&rep->rr_work, rpcrdma_reply_handler); 1119 INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
978 rep->rr_recv_wr.next = NULL; 1120 rep->rr_recv_wr.next = NULL;
979 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 1121 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
980 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1122 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
@@ -995,7 +1137,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
995 1137
996 buf->rb_max_requests = r_xprt->rx_data.max_requests; 1138 buf->rb_max_requests = r_xprt->rx_data.max_requests;
997 buf->rb_bc_srv_max_requests = 0; 1139 buf->rb_bc_srv_max_requests = 0;
998 atomic_set(&buf->rb_credits, 1);
999 spin_lock_init(&buf->rb_mwlock); 1140 spin_lock_init(&buf->rb_mwlock);
1000 spin_lock_init(&buf->rb_lock); 1141 spin_lock_init(&buf->rb_lock);
1001 spin_lock_init(&buf->rb_recovery_lock); 1142 spin_lock_init(&buf->rb_recovery_lock);
@@ -1022,7 +1163,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1022 rc = PTR_ERR(req); 1163 rc = PTR_ERR(req);
1023 goto out; 1164 goto out;
1024 } 1165 }
1025 req->rl_backchannel = false;
1026 list_add(&req->rl_list, &buf->rb_send_bufs); 1166 list_add(&req->rl_list, &buf->rb_send_bufs);
1027 } 1167 }
1028 1168
@@ -1040,6 +1180,10 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1040 list_add(&rep->rr_list, &buf->rb_recv_bufs); 1180 list_add(&rep->rr_list, &buf->rb_recv_bufs);
1041 } 1181 }
1042 1182
1183 rc = rpcrdma_sendctxs_create(r_xprt);
1184 if (rc)
1185 goto out;
1186
1043 return 0; 1187 return 0;
1044out: 1188out:
1045 rpcrdma_buffer_destroy(buf); 1189 rpcrdma_buffer_destroy(buf);
@@ -1116,6 +1260,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1116 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1260 cancel_delayed_work_sync(&buf->rb_recovery_worker);
1117 cancel_delayed_work_sync(&buf->rb_refresh_worker); 1261 cancel_delayed_work_sync(&buf->rb_refresh_worker);
1118 1262
1263 rpcrdma_sendctxs_destroy(buf);
1264
1119 while (!list_empty(&buf->rb_recv_bufs)) { 1265 while (!list_empty(&buf->rb_recv_bufs)) {
1120 struct rpcrdma_rep *rep; 1266 struct rpcrdma_rep *rep;
1121 1267
@@ -1231,7 +1377,6 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
1231 struct rpcrdma_buffer *buffers = req->rl_buffer; 1377 struct rpcrdma_buffer *buffers = req->rl_buffer;
1232 struct rpcrdma_rep *rep = req->rl_reply; 1378 struct rpcrdma_rep *rep = req->rl_reply;
1233 1379
1234 req->rl_send_wr.num_sge = 0;
1235 req->rl_reply = NULL; 1380 req->rl_reply = NULL;
1236 1381
1237 spin_lock(&buffers->rb_lock); 1382 spin_lock(&buffers->rb_lock);
@@ -1363,7 +1508,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
1363 struct rpcrdma_ep *ep, 1508 struct rpcrdma_ep *ep,
1364 struct rpcrdma_req *req) 1509 struct rpcrdma_req *req)
1365{ 1510{
1366 struct ib_send_wr *send_wr = &req->rl_send_wr; 1511 struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
1367 struct ib_send_wr *send_wr_fail; 1512 struct ib_send_wr *send_wr_fail;
1368 int rc; 1513 int rc;
1369 1514
@@ -1377,7 +1522,14 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
1377 dprintk("RPC: %s: posting %d s/g entries\n", 1522 dprintk("RPC: %s: posting %d s/g entries\n",
1378 __func__, send_wr->num_sge); 1523 __func__, send_wr->num_sge);
1379 1524
1380 rpcrdma_set_signaled(ep, send_wr); 1525 if (!ep->rep_send_count ||
1526 test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1527 send_wr->send_flags |= IB_SEND_SIGNALED;
1528 ep->rep_send_count = ep->rep_send_batch;
1529 } else {
1530 send_wr->send_flags &= ~IB_SEND_SIGNALED;
1531 --ep->rep_send_count;
1532 }
1381 rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); 1533 rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
1382 if (rc) 1534 if (rc)
1383 goto out_postsend_err; 1535 goto out_postsend_err;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index e26a97d2f922..51686d9eac5f 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -1,4 +1,5 @@
1/* 1/*
2 * Copyright (c) 2014-2017 Oracle. All rights reserved.
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 * 4 *
4 * This software is available to you under a choice of one of two 5 * This software is available to you under a choice of one of two
@@ -93,8 +94,8 @@ enum {
93 */ 94 */
94 95
95struct rpcrdma_ep { 96struct rpcrdma_ep {
96 atomic_t rep_cqcount; 97 unsigned int rep_send_count;
97 int rep_cqinit; 98 unsigned int rep_send_batch;
98 int rep_connected; 99 int rep_connected;
99 struct ib_qp_init_attr rep_attr; 100 struct ib_qp_init_attr rep_attr;
100 wait_queue_head_t rep_connect_wait; 101 wait_queue_head_t rep_connect_wait;
@@ -104,25 +105,6 @@ struct rpcrdma_ep {
104 struct delayed_work rep_connect_worker; 105 struct delayed_work rep_connect_worker;
105}; 106};
106 107
107static inline void
108rpcrdma_init_cqcount(struct rpcrdma_ep *ep, int count)
109{
110 atomic_set(&ep->rep_cqcount, ep->rep_cqinit - count);
111}
112
113/* To update send queue accounting, provider must take a
114 * send completion every now and then.
115 */
116static inline void
117rpcrdma_set_signaled(struct rpcrdma_ep *ep, struct ib_send_wr *send_wr)
118{
119 send_wr->send_flags = 0;
120 if (unlikely(atomic_sub_return(1, &ep->rep_cqcount) <= 0)) {
121 rpcrdma_init_cqcount(ep, 0);
122 send_wr->send_flags = IB_SEND_SIGNALED;
123 }
124}
125
126/* Pre-allocate extra Work Requests for handling backward receives 108/* Pre-allocate extra Work Requests for handling backward receives
127 * and sends. This is a fixed value because the Work Queues are 109 * and sends. This is a fixed value because the Work Queues are
128 * allocated when the forward channel is set up. 110 * allocated when the forward channel is set up.
@@ -164,12 +146,6 @@ rdmab_lkey(struct rpcrdma_regbuf *rb)
164 return rb->rg_iov.lkey; 146 return rb->rg_iov.lkey;
165} 147}
166 148
167static inline struct rpcrdma_msg *
168rdmab_to_msg(struct rpcrdma_regbuf *rb)
169{
170 return (struct rpcrdma_msg *)rb->rg_base;
171}
172
173static inline struct ib_device * 149static inline struct ib_device *
174rdmab_device(struct rpcrdma_regbuf *rb) 150rdmab_device(struct rpcrdma_regbuf *rb)
175{ 151{
@@ -202,22 +178,24 @@ enum {
202}; 178};
203 179
204/* 180/*
205 * struct rpcrdma_rep -- this structure encapsulates state required to recv 181 * struct rpcrdma_rep -- this structure encapsulates state required
206 * and complete a reply, asychronously. It needs several pieces of 182 * to receive and complete an RPC Reply, asychronously. It needs
207 * state: 183 * several pieces of state:
208 * o recv buffer (posted to provider)
209 * o ib_sge (also donated to provider)
210 * o status of reply (length, success or not)
211 * o bookkeeping state to get run by reply handler (list, etc)
212 * 184 *
213 * These are allocated during initialization, per-transport instance. 185 * o receive buffer and ib_sge (donated to provider)
186 * o status of receive (success or not, length, inv rkey)
187 * o bookkeeping state to get run by reply handler (XDR stream)
214 * 188 *
215 * N of these are associated with a transport instance, and stored in 189 * These structures are allocated during transport initialization.
216 * struct rpcrdma_buffer. N is the max number of outstanding requests. 190 * N of these are associated with a transport instance, managed by
191 * struct rpcrdma_buffer. N is the max number of outstanding RPCs.
217 */ 192 */
218 193
219struct rpcrdma_rep { 194struct rpcrdma_rep {
220 struct ib_cqe rr_cqe; 195 struct ib_cqe rr_cqe;
196 __be32 rr_xid;
197 __be32 rr_vers;
198 __be32 rr_proc;
221 int rr_wc_flags; 199 int rr_wc_flags;
222 u32 rr_inv_rkey; 200 u32 rr_inv_rkey;
223 struct rpcrdma_regbuf *rr_rdmabuf; 201 struct rpcrdma_regbuf *rr_rdmabuf;
@@ -225,10 +203,34 @@ struct rpcrdma_rep {
225 struct work_struct rr_work; 203 struct work_struct rr_work;
226 struct xdr_buf rr_hdrbuf; 204 struct xdr_buf rr_hdrbuf;
227 struct xdr_stream rr_stream; 205 struct xdr_stream rr_stream;
206 struct rpc_rqst *rr_rqst;
228 struct list_head rr_list; 207 struct list_head rr_list;
229 struct ib_recv_wr rr_recv_wr; 208 struct ib_recv_wr rr_recv_wr;
230}; 209};
231 210
211/* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
212 */
213struct rpcrdma_req;
214struct rpcrdma_xprt;
215struct rpcrdma_sendctx {
216 struct ib_send_wr sc_wr;
217 struct ib_cqe sc_cqe;
218 struct rpcrdma_xprt *sc_xprt;
219 struct rpcrdma_req *sc_req;
220 unsigned int sc_unmap_count;
221 struct ib_sge sc_sges[];
222};
223
224/* Limit the number of SGEs that can be unmapped during one
225 * Send completion. This caps the amount of work a single
226 * completion can do before returning to the provider.
227 *
228 * Setting this to zero disables Send completion batching.
229 */
230enum {
231 RPCRDMA_MAX_SEND_BATCH = 7,
232};
233
232/* 234/*
233 * struct rpcrdma_mw - external memory region metadata 235 * struct rpcrdma_mw - external memory region metadata
234 * 236 *
@@ -340,26 +342,30 @@ enum {
340struct rpcrdma_buffer; 342struct rpcrdma_buffer;
341struct rpcrdma_req { 343struct rpcrdma_req {
342 struct list_head rl_list; 344 struct list_head rl_list;
343 unsigned int rl_mapped_sges;
344 unsigned int rl_connect_cookie; 345 unsigned int rl_connect_cookie;
345 struct rpcrdma_buffer *rl_buffer; 346 struct rpcrdma_buffer *rl_buffer;
346 struct rpcrdma_rep *rl_reply; 347 struct rpcrdma_rep *rl_reply;
347 struct xdr_stream rl_stream; 348 struct xdr_stream rl_stream;
348 struct xdr_buf rl_hdrbuf; 349 struct xdr_buf rl_hdrbuf;
349 struct ib_send_wr rl_send_wr; 350 struct rpcrdma_sendctx *rl_sendctx;
350 struct ib_sge rl_send_sge[RPCRDMA_MAX_SEND_SGES];
351 struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */ 351 struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
352 struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */ 352 struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */
353 struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */ 353 struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */
354 354
355 struct ib_cqe rl_cqe;
356 struct list_head rl_all; 355 struct list_head rl_all;
357 bool rl_backchannel; 356 unsigned long rl_flags;
358 357
359 struct list_head rl_registered; /* registered segments */ 358 struct list_head rl_registered; /* registered segments */
360 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; 359 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
361}; 360};
362 361
362/* rl_flags */
363enum {
364 RPCRDMA_REQ_F_BACKCHANNEL = 0,
365 RPCRDMA_REQ_F_PENDING,
366 RPCRDMA_REQ_F_TX_RESOURCES,
367};
368
363static inline void 369static inline void
364rpcrdma_set_xprtdata(struct rpc_rqst *rqst, struct rpcrdma_req *req) 370rpcrdma_set_xprtdata(struct rpc_rqst *rqst, struct rpcrdma_req *req)
365{ 371{
@@ -399,12 +405,17 @@ struct rpcrdma_buffer {
399 struct list_head rb_mws; 405 struct list_head rb_mws;
400 struct list_head rb_all; 406 struct list_head rb_all;
401 407
408 unsigned long rb_sc_head;
409 unsigned long rb_sc_tail;
410 unsigned long rb_sc_last;
411 struct rpcrdma_sendctx **rb_sc_ctxs;
412
402 spinlock_t rb_lock; /* protect buf lists */ 413 spinlock_t rb_lock; /* protect buf lists */
403 int rb_send_count, rb_recv_count; 414 int rb_send_count, rb_recv_count;
404 struct list_head rb_send_bufs; 415 struct list_head rb_send_bufs;
405 struct list_head rb_recv_bufs; 416 struct list_head rb_recv_bufs;
406 u32 rb_max_requests; 417 u32 rb_max_requests;
407 atomic_t rb_credits; /* most recent credit grant */ 418 u32 rb_credits; /* most recent credit grant */
408 419
409 u32 rb_bc_srv_max_requests; 420 u32 rb_bc_srv_max_requests;
410 spinlock_t rb_reqslock; /* protect rb_allreqs */ 421 spinlock_t rb_reqslock; /* protect rb_allreqs */
@@ -453,10 +464,12 @@ struct rpcrdma_stats {
453 unsigned long mrs_recovered; 464 unsigned long mrs_recovered;
454 unsigned long mrs_orphaned; 465 unsigned long mrs_orphaned;
455 unsigned long mrs_allocated; 466 unsigned long mrs_allocated;
467 unsigned long empty_sendctx_q;
456 468
457 /* accessed when receiving a reply */ 469 /* accessed when receiving a reply */
458 unsigned long long total_rdma_reply; 470 unsigned long long total_rdma_reply;
459 unsigned long long fixup_copy_count; 471 unsigned long long fixup_copy_count;
472 unsigned long reply_waits_for_send;
460 unsigned long local_inv_needed; 473 unsigned long local_inv_needed;
461 unsigned long nomsg_call_count; 474 unsigned long nomsg_call_count;
462 unsigned long bcall_count; 475 unsigned long bcall_count;
@@ -473,8 +486,6 @@ struct rpcrdma_memreg_ops {
473 struct rpcrdma_mw **); 486 struct rpcrdma_mw **);
474 void (*ro_unmap_sync)(struct rpcrdma_xprt *, 487 void (*ro_unmap_sync)(struct rpcrdma_xprt *,
475 struct list_head *); 488 struct list_head *);
476 void (*ro_unmap_safe)(struct rpcrdma_xprt *,
477 struct rpcrdma_req *, bool);
478 void (*ro_recover_mr)(struct rpcrdma_mw *); 489 void (*ro_recover_mr)(struct rpcrdma_mw *);
479 int (*ro_open)(struct rpcrdma_ia *, 490 int (*ro_open)(struct rpcrdma_ia *,
480 struct rpcrdma_ep *, 491 struct rpcrdma_ep *,
@@ -532,6 +543,8 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
532bool frwr_is_supported(struct rpcrdma_ia *); 543bool frwr_is_supported(struct rpcrdma_ia *);
533bool fmr_is_supported(struct rpcrdma_ia *); 544bool fmr_is_supported(struct rpcrdma_ia *);
534 545
546extern struct workqueue_struct *rpcrdma_receive_wq;
547
535/* 548/*
536 * Endpoint calls - xprtrdma/verbs.c 549 * Endpoint calls - xprtrdma/verbs.c
537 */ 550 */
@@ -554,6 +567,8 @@ struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
554void rpcrdma_destroy_req(struct rpcrdma_req *); 567void rpcrdma_destroy_req(struct rpcrdma_req *);
555int rpcrdma_buffer_create(struct rpcrdma_xprt *); 568int rpcrdma_buffer_create(struct rpcrdma_xprt *);
556void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); 569void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
570struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf);
571void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
557 572
558struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *); 573struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
559void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *); 574void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
@@ -610,12 +625,18 @@ enum rpcrdma_chunktype {
610 rpcrdma_replych 625 rpcrdma_replych
611}; 626};
612 627
613bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *, 628int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
614 u32, struct xdr_buf *, enum rpcrdma_chunktype); 629 struct rpcrdma_req *req, u32 hdrlen,
615void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *); 630 struct xdr_buf *xdr,
631 enum rpcrdma_chunktype rtype);
632void rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc);
616int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); 633int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
617void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); 634void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
618void rpcrdma_reply_handler(struct work_struct *work); 635void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
636void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
637void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
638 struct rpcrdma_req *req);
639void rpcrdma_deferred_completion(struct work_struct *work);
619 640
620static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) 641static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
621{ 642{
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 4dad5da388d6..9cc850c2719e 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -552,6 +552,7 @@ static int xs_local_send_request(struct rpc_task *task)
552 default: 552 default:
553 dprintk("RPC: sendmsg returned unrecognized error %d\n", 553 dprintk("RPC: sendmsg returned unrecognized error %d\n",
554 -status); 554 -status);
555 /* fall through */
555 case -EPIPE: 556 case -EPIPE:
556 xs_close(xprt); 557 xs_close(xprt);
557 status = -ENOTCONN; 558 status = -ENOTCONN;
@@ -1611,6 +1612,7 @@ static void xs_tcp_state_change(struct sock *sk)
1611 xprt->connect_cookie++; 1612 xprt->connect_cookie++;
1612 clear_bit(XPRT_CONNECTED, &xprt->state); 1613 clear_bit(XPRT_CONNECTED, &xprt->state);
1613 xs_tcp_force_close(xprt); 1614 xs_tcp_force_close(xprt);
1615 /* fall through */
1614 case TCP_CLOSING: 1616 case TCP_CLOSING:
1615 /* 1617 /*
1616 * If the server closed down the connection, make sure that 1618 * If the server closed down the connection, make sure that
@@ -2368,6 +2370,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2368 switch (ret) { 2370 switch (ret) {
2369 case 0: 2371 case 0:
2370 xs_set_srcport(transport, sock); 2372 xs_set_srcport(transport, sock);
2373 /* fall through */
2371 case -EINPROGRESS: 2374 case -EINPROGRESS:
2372 /* SYN_SENT! */ 2375 /* SYN_SENT! */
2373 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 2376 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
@@ -2419,6 +2422,7 @@ static void xs_tcp_setup_socket(struct work_struct *work)
2419 default: 2422 default:
2420 printk("%s: connect returned unhandled error %d\n", 2423 printk("%s: connect returned unhandled error %d\n",
2421 __func__, status); 2424 __func__, status);
2425 /* fall through */
2422 case -EADDRNOTAVAIL: 2426 case -EADDRNOTAVAIL:
2423 /* We're probably in TIME_WAIT. Get rid of existing socket, 2427 /* We're probably in TIME_WAIT. Get rid of existing socket,
2424 * and retry 2428 * and retry