aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2016-03-01 13:07:13 -0500
committerJ. Bruce Fields <bfields@redhat.com>2016-03-01 16:06:42 -0500
commit8bd5ba86d9ba7169e137fc4f32c553080c056a02 (patch)
tree09463b98f411458f698ac72c51747e3c7fec51d4
parentec705fd4d09be5c76178d8ac875cb4a8e91558a5 (diff)
svcrdma: Use new CQ API for RPC-over-RDMA server receive CQs
Calling ib_poll_cq() to sort through WCs during a completion is a common pattern amongst RDMA consumers. Since commit 14d3a3b2498e ("IB: add a proper completion queue abstraction"), WC sorting can be handled by the IB core. By converting to this new API, svcrdma is made a better neighbor to other RDMA consumers, as it allows the core to schedule the delivery of completions more fairly amongst all active consumers. Because each ib_cqe carries a pointer to a completion method, the core can now post operations on a consumer's QP, and handle the completions itself. svcrdma receive completions no longer use the dto_tasklet. Each polled Receive WC is now handled individually in soft IRQ context. The server transport's rdma_stat_rq_poll and rdma_stat_rq_prod metrics are no longer updated. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: J. Bruce Fields <bfields@redhat.com>
-rw-r--r--include/linux/sunrpc/svc_rdma.h2
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c129
2 files changed, 40 insertions, 91 deletions
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index c2b0d95602d8..cf79ab86d3d4 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -75,6 +75,7 @@ struct svc_rdma_op_ctxt {
75 struct svc_rdma_fastreg_mr *frmr; 75 struct svc_rdma_fastreg_mr *frmr;
76 int hdr_count; 76 int hdr_count;
77 struct xdr_buf arg; 77 struct xdr_buf arg;
78 struct ib_cqe cqe;
78 struct list_head dto_q; 79 struct list_head dto_q;
79 enum ib_wr_opcode wr_op; 80 enum ib_wr_opcode wr_op;
80 enum ib_wc_status wc_status; 81 enum ib_wc_status wc_status;
@@ -174,7 +175,6 @@ struct svcxprt_rdma {
174 struct work_struct sc_work; 175 struct work_struct sc_work;
175}; 176};
176/* sc_flags */ 177/* sc_flags */
177#define RDMAXPRT_RQ_PENDING 1
178#define RDMAXPRT_SQ_PENDING 2 178#define RDMAXPRT_SQ_PENDING 2
179#define RDMAXPRT_CONN_PENDING 3 179#define RDMAXPRT_CONN_PENDING 3
180 180
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 15c8fa3ee794..5dfa1b6bf0c2 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -68,7 +68,6 @@ static void svc_rdma_detach(struct svc_xprt *xprt);
68static void svc_rdma_free(struct svc_xprt *xprt); 68static void svc_rdma_free(struct svc_xprt *xprt);
69static int svc_rdma_has_wspace(struct svc_xprt *xprt); 69static int svc_rdma_has_wspace(struct svc_xprt *xprt);
70static int svc_rdma_secure_port(struct svc_rqst *); 70static int svc_rdma_secure_port(struct svc_rqst *);
71static void rq_cq_reap(struct svcxprt_rdma *xprt);
72static void sq_cq_reap(struct svcxprt_rdma *xprt); 71static void sq_cq_reap(struct svcxprt_rdma *xprt);
73 72
74static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL); 73static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
@@ -413,7 +412,6 @@ static void dto_tasklet_func(unsigned long data)
413 list_del_init(&xprt->sc_dto_q); 412 list_del_init(&xprt->sc_dto_q);
414 spin_unlock_irqrestore(&dto_lock, flags); 413 spin_unlock_irqrestore(&dto_lock, flags);
415 414
416 rq_cq_reap(xprt);
417 sq_cq_reap(xprt); 415 sq_cq_reap(xprt);
418 416
419 svc_xprt_put(&xprt->sc_xprt); 417 svc_xprt_put(&xprt->sc_xprt);
@@ -422,93 +420,48 @@ static void dto_tasklet_func(unsigned long data)
422 spin_unlock_irqrestore(&dto_lock, flags); 420 spin_unlock_irqrestore(&dto_lock, flags);
423} 421}
424 422
425/* 423/**
426 * Receive Queue Completion Handler 424 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
427 * 425 * @cq: completion queue
428 * Since an RQ completion handler is called on interrupt context, we 426 * @wc: completed WR
429 * need to defer the handling of the I/O to a tasklet
430 */
431static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
432{
433 struct svcxprt_rdma *xprt = cq_context;
434 unsigned long flags;
435
436 /* Guard against unconditional flush call for destroyed QP */
437 if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
438 return;
439
440 /*
441 * Set the bit regardless of whether or not it's on the list
442 * because it may be on the list already due to an SQ
443 * completion.
444 */
445 set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
446
447 /*
448 * If this transport is not already on the DTO transport queue,
449 * add it
450 */
451 spin_lock_irqsave(&dto_lock, flags);
452 if (list_empty(&xprt->sc_dto_q)) {
453 svc_xprt_get(&xprt->sc_xprt);
454 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
455 }
456 spin_unlock_irqrestore(&dto_lock, flags);
457
458 /* Tasklet does all the work to avoid irqsave locks. */
459 tasklet_schedule(&dto_tasklet);
460}
461
462/*
463 * rq_cq_reap - Process the RQ CQ.
464 * 427 *
465 * Take all completing WC off the CQE and enqueue the associated DTO
466 * context on the dto_q for the transport.
467 *
468 * Note that caller must hold a transport reference.
469 */ 428 */
470static void rq_cq_reap(struct svcxprt_rdma *xprt) 429static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
471{ 430{
472 int ret; 431 struct svcxprt_rdma *xprt = cq->cq_context;
473 struct ib_wc wc; 432 struct ib_cqe *cqe = wc->wr_cqe;
474 struct svc_rdma_op_ctxt *ctxt = NULL; 433 struct svc_rdma_op_ctxt *ctxt;
475
476 if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
477 return;
478 434
479 ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); 435 /* WARNING: Only wc->wr_cqe and wc->status are reliable */
480 atomic_inc(&rdma_stat_rq_poll); 436 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
437 ctxt->wc_status = wc->status;
438 svc_rdma_unmap_dma(ctxt);
481 439
482 while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) { 440 if (wc->status != IB_WC_SUCCESS)
483 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 441 goto flushed;
484 ctxt->wc_status = wc.status;
485 ctxt->byte_len = wc.byte_len;
486 svc_rdma_unmap_dma(ctxt);
487 if (wc.status != IB_WC_SUCCESS) {
488 /* Close the transport */
489 dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
490 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
491 svc_rdma_put_context(ctxt, 1);
492 svc_xprt_put(&xprt->sc_xprt);
493 continue;
494 }
495 spin_lock_bh(&xprt->sc_rq_dto_lock);
496 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
497 spin_unlock_bh(&xprt->sc_rq_dto_lock);
498 svc_xprt_put(&xprt->sc_xprt);
499 }
500 442
501 if (ctxt) 443 /* All wc fields are now known to be valid */
502 atomic_inc(&rdma_stat_rq_prod); 444 ctxt->byte_len = wc->byte_len;
445 spin_lock(&xprt->sc_rq_dto_lock);
446 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
447 spin_unlock(&xprt->sc_rq_dto_lock);
503 448
504 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 449 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
505 /* 450 if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
506 * If data arrived before established event, 451 goto out;
507 * don't enqueue. This defers RPC I/O until the 452 svc_xprt_enqueue(&xprt->sc_xprt);
508 * RDMA connection is complete. 453 goto out;
509 */ 454
510 if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 455flushed:
511 svc_xprt_enqueue(&xprt->sc_xprt); 456 if (wc->status != IB_WC_WR_FLUSH_ERR)
457 pr_warn("svcrdma: receive: %s (%u/0x%x)\n",
458 ib_wc_status_msg(wc->status),
459 wc->status, wc->vendor_err);
460 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
461 svc_rdma_put_context(ctxt, 1);
462
463out:
464 svc_xprt_put(&xprt->sc_xprt);
512} 465}
513 466
514/* 467/*
@@ -681,6 +634,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
681 ctxt = svc_rdma_get_context(xprt); 634 ctxt = svc_rdma_get_context(xprt);
682 buflen = 0; 635 buflen = 0;
683 ctxt->direction = DMA_FROM_DEVICE; 636 ctxt->direction = DMA_FROM_DEVICE;
637 ctxt->cqe.done = svc_rdma_wc_receive;
684 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { 638 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
685 if (sge_no >= xprt->sc_max_sge) { 639 if (sge_no >= xprt->sc_max_sge) {
686 pr_err("svcrdma: Too many sges (%d)\n", sge_no); 640 pr_err("svcrdma: Too many sges (%d)\n", sge_no);
@@ -705,7 +659,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
705 recv_wr.next = NULL; 659 recv_wr.next = NULL;
706 recv_wr.sg_list = &ctxt->sge[0]; 660 recv_wr.sg_list = &ctxt->sge[0];
707 recv_wr.num_sge = ctxt->count; 661 recv_wr.num_sge = ctxt->count;
708 recv_wr.wr_id = (u64)(unsigned long)ctxt; 662 recv_wr.wr_cqe = &ctxt->cqe;
709 663
710 svc_xprt_get(&xprt->sc_xprt); 664 svc_xprt_get(&xprt->sc_xprt);
711 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 665 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
@@ -1094,12 +1048,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
1094 dprintk("svcrdma: error creating SQ CQ for connect request\n"); 1048 dprintk("svcrdma: error creating SQ CQ for connect request\n");
1095 goto errout; 1049 goto errout;
1096 } 1050 }
1097 cq_attr.cqe = newxprt->sc_rq_depth; 1051 newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth,
1098 newxprt->sc_rq_cq = ib_create_cq(dev, 1052 0, IB_POLL_SOFTIRQ);
1099 rq_comp_handler,
1100 cq_event_handler,
1101 newxprt,
1102 &cq_attr);
1103 if (IS_ERR(newxprt->sc_rq_cq)) { 1053 if (IS_ERR(newxprt->sc_rq_cq)) {
1104 dprintk("svcrdma: error creating RQ CQ for connect request\n"); 1054 dprintk("svcrdma: error creating RQ CQ for connect request\n");
1105 goto errout; 1055 goto errout;
@@ -1193,7 +1143,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
1193 * miss the first message 1143 * miss the first message
1194 */ 1144 */
1195 ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP); 1145 ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
1196 ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
1197 1146
1198 /* Accept Connection */ 1147 /* Accept Connection */
1199 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 1148 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
@@ -1337,7 +1286,7 @@ static void __svc_rdma_free(struct work_struct *work)
1337 ib_destroy_cq(rdma->sc_sq_cq); 1286 ib_destroy_cq(rdma->sc_sq_cq);
1338 1287
1339 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 1288 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
1340 ib_destroy_cq(rdma->sc_rq_cq); 1289 ib_free_cq(rdma->sc_rq_cq);
1341 1290
1342 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 1291 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
1343 ib_dealloc_pd(rdma->sc_pd); 1292 ib_dealloc_pd(rdma->sc_pd);