summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2019-06-19 10:33:10 -0400
committerAnna Schumaker <Anna.Schumaker@Netapp.com>2019-07-09 10:30:25 -0400
commitd8099feda4833bab96b1bf312e9e6aad6b771570 (patch)
treeec6af94bc83c935b9a7ad96819184990ed97da92 /net/sunrpc
parent40088f0e9b62d7fa033918b54ef45f8bf7d1ad1c (diff)
xprtrdma: Reduce context switching due to Local Invalidation
Since commit ba69cd122ece ("xprtrdma: Remove support for FMR memory registration"), FRWR is the only supported memory registration mode. We can take advantage of the asynchronous nature of FRWR's LOCAL_INV Work Requests to get rid of the completion wait by having the LOCAL_INV completion handler take care of DMA unmapping MRs and waking the upper layer RPC waiter. This eliminates two context switches when local invalidation is necessary. As a side benefit, we will no longer need the per-xprt deferred completion work queue. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c103
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c61
-rw-r--r--net/sunrpc/xprtrdma/verbs.c17
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h8
4 files changed, 136 insertions, 53 deletions
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 524cac0a0715..0b6dad7580a1 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -542,7 +542,10 @@ static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
542 * @req: rpcrdma_req with a non-empty list of MRs to process 542 * @req: rpcrdma_req with a non-empty list of MRs to process
543 * 543 *
544 * Sleeps until it is safe for the host CPU to access the previously mapped 544 * Sleeps until it is safe for the host CPU to access the previously mapped
545 * memory regions. 545 * memory regions. This guarantees that registered MRs are properly fenced
546 * from the server before the RPC consumer accesses the data in them. It
547 * also ensures proper Send flow control: waking the next RPC waits until
548 * this RPC has relinquished all its Send Queue entries.
546 */ 549 */
547void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) 550void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
548{ 551{
@@ -616,3 +619,101 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
616 rpcrdma_mr_recycle(mr); 619 rpcrdma_mr_recycle(mr);
617 } 620 }
618} 621}
622
623/**
624 * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
625 * @cq: completion queue (ignored)
626 * @wc: completed WR
627 *
628 */
629static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
630{
631 struct ib_cqe *cqe = wc->wr_cqe;
632 struct rpcrdma_frwr *frwr =
633 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
634 struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
635
636 /* WARNING: Only wr_cqe and status are reliable at this point */
637 trace_xprtrdma_wc_li_done(wc, frwr);
638 rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
639 __frwr_release_mr(wc, mr);
640}
641
642/**
643 * frwr_unmap_async - invalidate memory regions that were registered for @req
644 * @r_xprt: controlling transport instance
645 * @req: rpcrdma_req with a non-empty list of MRs to process
646 *
647 * This guarantees that registered MRs are properly fenced from the
648 * server before the RPC consumer accesses the data in them. It also
649 * ensures proper Send flow control: waking the next RPC waits until
650 * this RPC has relinquished all its Send Queue entries.
651 */
652void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
653{
654 struct ib_send_wr *first, *last, **prev;
655 const struct ib_send_wr *bad_wr;
656 struct rpcrdma_frwr *frwr;
657 struct rpcrdma_mr *mr;
658 int rc;
659
660 /* Chain the LOCAL_INV Work Requests and post them with
661 * a single ib_post_send() call.
662 */
663 frwr = NULL;
664 prev = &first;
665 while (!list_empty(&req->rl_registered)) {
666 mr = rpcrdma_mr_pop(&req->rl_registered);
667
668 trace_xprtrdma_mr_localinv(mr);
669 r_xprt->rx_stats.local_inv_needed++;
670
671 frwr = &mr->frwr;
672 frwr->fr_cqe.done = frwr_wc_localinv;
673 frwr->fr_req = req;
674 last = &frwr->fr_invwr;
675 last->next = NULL;
676 last->wr_cqe = &frwr->fr_cqe;
677 last->sg_list = NULL;
678 last->num_sge = 0;
679 last->opcode = IB_WR_LOCAL_INV;
680 last->send_flags = IB_SEND_SIGNALED;
681 last->ex.invalidate_rkey = mr->mr_handle;
682
683 *prev = last;
684 prev = &last->next;
685 }
686
687 /* Strong send queue ordering guarantees that when the
688 * last WR in the chain completes, all WRs in the chain
689 * are complete. The last completion will wake up the
690 * RPC waiter.
691 */
692 frwr->fr_cqe.done = frwr_wc_localinv_done;
693
694 /* Transport disconnect drains the receive CQ before it
695 * replaces the QP. The RPC reply handler won't call us
696 * unless ri_id->qp is a valid pointer.
697 */
698 bad_wr = NULL;
699 rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
700 trace_xprtrdma_post_send(req, rc);
701 if (!rc)
702 return;
703
704 /* Recycle MRs in the LOCAL_INV chain that did not get posted.
705 */
706 while (bad_wr) {
707 frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
708 mr = container_of(frwr, struct rpcrdma_mr, frwr);
709 bad_wr = bad_wr->next;
710
711 rpcrdma_mr_recycle(mr);
712 }
713
714 /* The final LOCAL_INV WR in the chain is supposed to
715 * do the wake. If it was never posted, the wake will
716 * not happen, so wake here in that case.
717 */
718 rpcrdma_complete_rqst(req->rl_reply);
719}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 67d72d68ca6c..33b6e6a03f68 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1268,24 +1268,15 @@ out_badheader:
1268 goto out; 1268 goto out;
1269} 1269}
1270 1270
1271void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) 1271/* Ensure that any DMA mapped pages associated with
1272 * the Send of the RPC Call have been unmapped before
1273 * allowing the RPC to complete. This protects argument
1274 * memory not controlled by the RPC client from being
1275 * re-used before we're done with it.
1276 */
1277static void rpcrdma_release_tx(struct rpcrdma_xprt *r_xprt,
1278 struct rpcrdma_req *req)
1272{ 1279{
1273 /* Invalidate and unmap the data payloads before waking
1274 * the waiting application. This guarantees the memory
1275 * regions are properly fenced from the server before the
1276 * application accesses the data. It also ensures proper
1277 * send flow control: waking the next RPC waits until this
1278 * RPC has relinquished all its Send Queue entries.
1279 */
1280 if (!list_empty(&req->rl_registered))
1281 frwr_unmap_sync(r_xprt, req);
1282
1283 /* Ensure that any DMA mapped pages associated with
1284 * the Send of the RPC Call have been unmapped before
1285 * allowing the RPC to complete. This protects argument
1286 * memory not controlled by the RPC client from being
1287 * re-used before we're done with it.
1288 */
1289 if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { 1280 if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1290 r_xprt->rx_stats.reply_waits_for_send++; 1281 r_xprt->rx_stats.reply_waits_for_send++;
1291 out_of_line_wait_on_bit(&req->rl_flags, 1282 out_of_line_wait_on_bit(&req->rl_flags,
@@ -1295,24 +1286,23 @@ void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
1295 } 1286 }
1296} 1287}
1297 1288
1298/* Reply handling runs in the poll worker thread. Anything that 1289/**
1299 * might wait is deferred to a separate workqueue. 1290 * rpcrdma_release_rqst - Release hardware resources
1291 * @r_xprt: controlling transport instance
1292 * @req: request with resources to release
1293 *
1300 */ 1294 */
1301void rpcrdma_deferred_completion(struct work_struct *work) 1295void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
1302{ 1296{
1303 struct rpcrdma_rep *rep = 1297 if (!list_empty(&req->rl_registered))
1304 container_of(work, struct rpcrdma_rep, rr_work); 1298 frwr_unmap_sync(r_xprt, req);
1305 struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
1306 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1307 1299
1308 trace_xprtrdma_defer_cmp(rep); 1300 rpcrdma_release_tx(r_xprt, req);
1309 if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1310 frwr_reminv(rep, &req->rl_registered);
1311 rpcrdma_release_rqst(r_xprt, req);
1312 rpcrdma_complete_rqst(rep);
1313} 1301}
1314 1302
1315/* Process received RPC/RDMA messages. 1303/**
1304 * rpcrdma_reply_handler - Process received RPC/RDMA messages
1305 * @rep: Incoming rpcrdma_rep object to process
1316 * 1306 *
1317 * Errors must result in the RPC task either being awakened, or 1307 * Errors must result in the RPC task either being awakened, or
1318 * allowed to timeout, to discover the errors at that time. 1308 * allowed to timeout, to discover the errors at that time.
@@ -1374,7 +1364,16 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
1374 rep->rr_rqst = rqst; 1364 rep->rr_rqst = rqst;
1375 1365
1376 trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); 1366 trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
1377 queue_work(buf->rb_completion_wq, &rep->rr_work); 1367
1368 if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1369 frwr_reminv(rep, &req->rl_registered);
1370 if (!list_empty(&req->rl_registered)) {
1371 frwr_unmap_async(r_xprt, req);
1372 /* LocalInv completion will complete the RPC */
1373 } else {
1374 rpcrdma_release_tx(r_xprt, req);
1375 rpcrdma_complete_rqst(rep);
1376 }
1378 return; 1377 return;
1379 1378
1380out_badversion: 1379out_badversion:
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 0be5a36cacb6..c50a4b295bd7 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -89,14 +89,12 @@ static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
89 */ 89 */
90static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) 90static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
91{ 91{
92 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
93 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 92 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
94 93
95 /* Flush Receives, then wait for deferred Reply work 94 /* Flush Receives, then wait for deferred Reply work
96 * to complete. 95 * to complete.
97 */ 96 */
98 ib_drain_rq(ia->ri_id->qp); 97 ib_drain_rq(ia->ri_id->qp);
99 drain_workqueue(buf->rb_completion_wq);
100 98
101 /* Deferred Reply processing might have scheduled 99 /* Deferred Reply processing might have scheduled
102 * local invalidations. 100 * local invalidations.
@@ -1056,7 +1054,6 @@ static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp)
1056 1054
1057 rep->rr_cqe.done = rpcrdma_wc_receive; 1055 rep->rr_cqe.done = rpcrdma_wc_receive;
1058 rep->rr_rxprt = r_xprt; 1056 rep->rr_rxprt = r_xprt;
1059 INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
1060 rep->rr_recv_wr.next = NULL; 1057 rep->rr_recv_wr.next = NULL;
1061 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 1058 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
1062 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1059 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
@@ -1117,15 +1114,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1117 if (rc) 1114 if (rc)
1118 goto out; 1115 goto out;
1119 1116
1120 buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
1121 WQ_MEM_RECLAIM | WQ_HIGHPRI,
1122 0,
1123 r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
1124 if (!buf->rb_completion_wq) {
1125 rc = -ENOMEM;
1126 goto out;
1127 }
1128
1129 return 0; 1117 return 0;
1130out: 1118out:
1131 rpcrdma_buffer_destroy(buf); 1119 rpcrdma_buffer_destroy(buf);
@@ -1199,11 +1187,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1199{ 1187{
1200 cancel_delayed_work_sync(&buf->rb_refresh_worker); 1188 cancel_delayed_work_sync(&buf->rb_refresh_worker);
1201 1189
1202 if (buf->rb_completion_wq) {
1203 destroy_workqueue(buf->rb_completion_wq);
1204 buf->rb_completion_wq = NULL;
1205 }
1206
1207 rpcrdma_sendctxs_destroy(buf); 1190 rpcrdma_sendctxs_destroy(buf);
1208 1191
1209 while (!list_empty(&buf->rb_recv_bufs)) { 1192 while (!list_empty(&buf->rb_recv_bufs)) {
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index a39652884308..e465221c9c96 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -202,10 +202,9 @@ struct rpcrdma_rep {
202 bool rr_temp; 202 bool rr_temp;
203 struct rpcrdma_regbuf *rr_rdmabuf; 203 struct rpcrdma_regbuf *rr_rdmabuf;
204 struct rpcrdma_xprt *rr_rxprt; 204 struct rpcrdma_xprt *rr_rxprt;
205 struct work_struct rr_work; 205 struct rpc_rqst *rr_rqst;
206 struct xdr_buf rr_hdrbuf; 206 struct xdr_buf rr_hdrbuf;
207 struct xdr_stream rr_stream; 207 struct xdr_stream rr_stream;
208 struct rpc_rqst *rr_rqst;
209 struct list_head rr_list; 208 struct list_head rr_list;
210 struct ib_recv_wr rr_recv_wr; 209 struct ib_recv_wr rr_recv_wr;
211}; 210};
@@ -240,10 +239,12 @@ struct rpcrdma_sendctx {
240 * An external memory region is any buffer or page that is registered 239 * An external memory region is any buffer or page that is registered
241 * on the fly (ie, not pre-registered). 240 * on the fly (ie, not pre-registered).
242 */ 241 */
242struct rpcrdma_req;
243struct rpcrdma_frwr { 243struct rpcrdma_frwr {
244 struct ib_mr *fr_mr; 244 struct ib_mr *fr_mr;
245 struct ib_cqe fr_cqe; 245 struct ib_cqe fr_cqe;
246 struct completion fr_linv_done; 246 struct completion fr_linv_done;
247 struct rpcrdma_req *fr_req;
247 union { 248 union {
248 struct ib_reg_wr fr_regwr; 249 struct ib_reg_wr fr_regwr;
249 struct ib_send_wr fr_invwr; 250 struct ib_send_wr fr_invwr;
@@ -388,7 +389,6 @@ struct rpcrdma_buffer {
388 u32 rb_bc_srv_max_requests; 389 u32 rb_bc_srv_max_requests;
389 u32 rb_bc_max_requests; 390 u32 rb_bc_max_requests;
390 391
391 struct workqueue_struct *rb_completion_wq;
392 struct delayed_work rb_refresh_worker; 392 struct delayed_work rb_refresh_worker;
393}; 393};
394 394
@@ -561,6 +561,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
561int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req); 561int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
562void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs); 562void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
563void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); 563void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
564void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
564 565
565/* 566/*
566 * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c 567 * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
@@ -585,7 +586,6 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
585void rpcrdma_reply_handler(struct rpcrdma_rep *rep); 586void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
586void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, 587void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
587 struct rpcrdma_req *req); 588 struct rpcrdma_req *req);
588void rpcrdma_deferred_completion(struct work_struct *work);
589 589
590static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) 590static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
591{ 591{