summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2017-10-16 15:01:30 -0400
committerAnna Schumaker <Anna.Schumaker@Netapp.com>2017-11-17 13:47:54 -0500
commitd8f532d20ee43a0117284798d486bc4f98e3b196 (patch)
tree539a169db11e0e8fe33c5de882410622c8032991 /net/sunrpc
parente1352c9610e3235f5e1b159038762d0c01c6ef36 (diff)
xprtrdma: Invoke rpcrdma_reply_handler directly from RECV completion
I noticed that the soft IRQ thread looked pretty busy under heavy I/O workloads. perf suggested one area that was expensive was the queue_work() call in rpcrdma_wc_receive. That gave me some ideas. Instead of scheduling a separate worker to process RPC Replies, promote the Receive completion handler to IB_POLL_WORKQUEUE, and invoke rpcrdma_reply_handler directly. Note that the poll workqueue is single-threaded. In order to keep memory invalidation from serializing all RPC Replies, handle any necessary invalidation tasks in a separate multi-threaded workqueue. This provides a two-tier scheme, similar to OS I/O interrupt handlers: A fast interrupt handler that schedules the slow handler and re-enables the interrupt, and a slower handler that is invoked for any needed heavy lifting. Benefits include: - One less context switch for RPCs that don't register memory - Receive completion handling is moved out of soft IRQ context to make room for other users of soft IRQ - The same CPU core now DMA syncs and XDR decodes the Receive buffer Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c46
-rw-r--r--net/sunrpc/xprtrdma/verbs.c8
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h5
3 files changed, 36 insertions, 23 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 418bcc6b3e1d..430f8b5a8c43 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1265,16 +1265,36 @@ out_badheader:
1265 goto out; 1265 goto out;
1266} 1266}
1267 1267
1268/* Reply handling runs in the poll worker thread. Anything that
1269 * might wait is deferred to a separate workqueue.
1270 */
1271void rpcrdma_deferred_completion(struct work_struct *work)
1272{
1273 struct rpcrdma_rep *rep =
1274 container_of(work, struct rpcrdma_rep, rr_work);
1275 struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
1276 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1277
1278 /* Invalidate and unmap the data payloads before waking
1279 * the waiting application. This guarantees the memory
1280 * regions are properly fenced from the server before the
1281 * application accesses the data. It also ensures proper
1282 * send flow control: waking the next RPC waits until this
1283 * RPC has relinquished all its Send Queue entries.
1284 */
1285 rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
1286 r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &req->rl_registered);
1287
1288 rpcrdma_complete_rqst(rep);
1289}
1290
1268/* Process received RPC/RDMA messages. 1291/* Process received RPC/RDMA messages.
1269 * 1292 *
1270 * Errors must result in the RPC task either being awakened, or 1293 * Errors must result in the RPC task either being awakened, or
1271 * allowed to timeout, to discover the errors at that time. 1294 * allowed to timeout, to discover the errors at that time.
1272 */ 1295 */
1273void 1296void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
1274rpcrdma_reply_handler(struct work_struct *work)
1275{ 1297{
1276 struct rpcrdma_rep *rep =
1277 container_of(work, struct rpcrdma_rep, rr_work);
1278 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1298 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1279 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1299 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1280 struct rpcrdma_req *req; 1300 struct rpcrdma_req *req;
@@ -1320,20 +1340,10 @@ rpcrdma_reply_handler(struct work_struct *work)
1320 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", 1340 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
1321 __func__, rep, req, be32_to_cpu(rep->rr_xid)); 1341 __func__, rep, req, be32_to_cpu(rep->rr_xid));
1322 1342
1323 /* Invalidate and unmap the data payloads before waking the 1343 if (list_empty(&req->rl_registered))
1324 * waiting application. This guarantees the memory regions 1344 rpcrdma_complete_rqst(rep);
1325 * are properly fenced from the server before the application 1345 else
1326 * accesses the data. It also ensures proper send flow control: 1346 queue_work(rpcrdma_receive_wq, &rep->rr_work);
1327 * waking the next RPC waits until this RPC has relinquished
1328 * all its Send Queue entries.
1329 */
1330 if (!list_empty(&req->rl_registered)) {
1331 rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
1332 r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
1333 &req->rl_registered);
1334 }
1335
1336 rpcrdma_complete_rqst(rep);
1337 return; 1347 return;
1338 1348
1339out_badstatus: 1349out_badstatus:
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 11a1fbf7e59e..d45695408df3 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -73,7 +73,7 @@ static void rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt);
73static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf); 73static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf);
74static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); 74static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
75 75
76static struct workqueue_struct *rpcrdma_receive_wq __read_mostly; 76struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
77 77
78int 78int
79rpcrdma_alloc_wq(void) 79rpcrdma_alloc_wq(void)
@@ -185,7 +185,7 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
185 rpcrdma_update_granted_credits(rep); 185 rpcrdma_update_granted_credits(rep);
186 186
187out_schedule: 187out_schedule:
188 queue_work(rpcrdma_receive_wq, &rep->rr_work); 188 rpcrdma_reply_handler(rep);
189 return; 189 return;
190 190
191out_fail: 191out_fail:
@@ -583,7 +583,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
583 583
584 recvcq = ib_alloc_cq(ia->ri_device, NULL, 584 recvcq = ib_alloc_cq(ia->ri_device, NULL,
585 ep->rep_attr.cap.max_recv_wr + 1, 585 ep->rep_attr.cap.max_recv_wr + 1,
586 0, IB_POLL_SOFTIRQ); 586 0, IB_POLL_WORKQUEUE);
587 if (IS_ERR(recvcq)) { 587 if (IS_ERR(recvcq)) {
588 rc = PTR_ERR(recvcq); 588 rc = PTR_ERR(recvcq);
589 dprintk("RPC: %s: failed to create recv CQ: %i\n", 589 dprintk("RPC: %s: failed to create recv CQ: %i\n",
@@ -974,7 +974,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
974 974
975 rep->rr_cqe.done = rpcrdma_wc_receive; 975 rep->rr_cqe.done = rpcrdma_wc_receive;
976 rep->rr_rxprt = r_xprt; 976 rep->rr_rxprt = r_xprt;
977 INIT_WORK(&rep->rr_work, rpcrdma_reply_handler); 977 INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
978 rep->rr_recv_wr.next = NULL; 978 rep->rr_recv_wr.next = NULL;
979 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 979 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
980 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 980 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index d68a1351d95e..a85bcd19b37a 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -533,6 +533,8 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
533bool frwr_is_supported(struct rpcrdma_ia *); 533bool frwr_is_supported(struct rpcrdma_ia *);
534bool fmr_is_supported(struct rpcrdma_ia *); 534bool fmr_is_supported(struct rpcrdma_ia *);
535 535
536extern struct workqueue_struct *rpcrdma_receive_wq;
537
536/* 538/*
537 * Endpoint calls - xprtrdma/verbs.c 539 * Endpoint calls - xprtrdma/verbs.c
538 */ 540 */
@@ -617,7 +619,8 @@ void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
617int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); 619int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
618void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); 620void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
619void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); 621void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
620void rpcrdma_reply_handler(struct work_struct *work); 622void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
623void rpcrdma_deferred_completion(struct work_struct *work);
621 624
622static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) 625static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
623{ 626{