diff options
author | Chuck Lever <chuck.lever@oracle.com> | 2016-03-01 13:07:13 -0500 |
---|---|---|
committer | J. Bruce Fields <bfields@redhat.com> | 2016-03-01 16:06:42 -0500 |
commit | 8bd5ba86d9ba7169e137fc4f32c553080c056a02 (patch) | |
tree | 09463b98f411458f698ac72c51747e3c7fec51d4 | |
parent | ec705fd4d09be5c76178d8ac875cb4a8e91558a5 (diff) |
svcrdma: Use new CQ API for RPC-over-RDMA server receive CQs
Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.
By converting to this new API, svcrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.
Because each ib_cqe carries a pointer to a completion method, the
core can now post operations on a consumer's QP, and handle the
completions itself.
svcrdma receive completions no longer use the dto_tasklet. Each
polled Receive WC is now handled individually in soft IRQ context.
The server transport's rdma_stat_rq_poll and rdma_stat_rq_prod
metrics are no longer updated.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
-rw-r--r-- | include/linux/sunrpc/svc_rdma.h | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 129 |
2 files changed, 40 insertions, 91 deletions
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index c2b0d95602d8..cf79ab86d3d4 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h | |||
@@ -75,6 +75,7 @@ struct svc_rdma_op_ctxt { | |||
75 | struct svc_rdma_fastreg_mr *frmr; | 75 | struct svc_rdma_fastreg_mr *frmr; |
76 | int hdr_count; | 76 | int hdr_count; |
77 | struct xdr_buf arg; | 77 | struct xdr_buf arg; |
78 | struct ib_cqe cqe; | ||
78 | struct list_head dto_q; | 79 | struct list_head dto_q; |
79 | enum ib_wr_opcode wr_op; | 80 | enum ib_wr_opcode wr_op; |
80 | enum ib_wc_status wc_status; | 81 | enum ib_wc_status wc_status; |
@@ -174,7 +175,6 @@ struct svcxprt_rdma { | |||
174 | struct work_struct sc_work; | 175 | struct work_struct sc_work; |
175 | }; | 176 | }; |
176 | /* sc_flags */ | 177 | /* sc_flags */ |
177 | #define RDMAXPRT_RQ_PENDING 1 | ||
178 | #define RDMAXPRT_SQ_PENDING 2 | 178 | #define RDMAXPRT_SQ_PENDING 2 |
179 | #define RDMAXPRT_CONN_PENDING 3 | 179 | #define RDMAXPRT_CONN_PENDING 3 |
180 | 180 | ||
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 15c8fa3ee794..5dfa1b6bf0c2 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c | |||
@@ -68,7 +68,6 @@ static void svc_rdma_detach(struct svc_xprt *xprt); | |||
68 | static void svc_rdma_free(struct svc_xprt *xprt); | 68 | static void svc_rdma_free(struct svc_xprt *xprt); |
69 | static int svc_rdma_has_wspace(struct svc_xprt *xprt); | 69 | static int svc_rdma_has_wspace(struct svc_xprt *xprt); |
70 | static int svc_rdma_secure_port(struct svc_rqst *); | 70 | static int svc_rdma_secure_port(struct svc_rqst *); |
71 | static void rq_cq_reap(struct svcxprt_rdma *xprt); | ||
72 | static void sq_cq_reap(struct svcxprt_rdma *xprt); | 71 | static void sq_cq_reap(struct svcxprt_rdma *xprt); |
73 | 72 | ||
74 | static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL); | 73 | static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL); |
@@ -413,7 +412,6 @@ static void dto_tasklet_func(unsigned long data) | |||
413 | list_del_init(&xprt->sc_dto_q); | 412 | list_del_init(&xprt->sc_dto_q); |
414 | spin_unlock_irqrestore(&dto_lock, flags); | 413 | spin_unlock_irqrestore(&dto_lock, flags); |
415 | 414 | ||
416 | rq_cq_reap(xprt); | ||
417 | sq_cq_reap(xprt); | 415 | sq_cq_reap(xprt); |
418 | 416 | ||
419 | svc_xprt_put(&xprt->sc_xprt); | 417 | svc_xprt_put(&xprt->sc_xprt); |
@@ -422,93 +420,48 @@ static void dto_tasklet_func(unsigned long data) | |||
422 | spin_unlock_irqrestore(&dto_lock, flags); | 420 | spin_unlock_irqrestore(&dto_lock, flags); |
423 | } | 421 | } |
424 | 422 | ||
425 | /* | 423 | /** |
426 | * Receive Queue Completion Handler | 424 | * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC |
427 | * | 425 | * @cq: completion queue |
428 | * Since an RQ completion handler is called on interrupt context, we | 426 | * @wc: completed WR |
429 | * need to defer the handling of the I/O to a tasklet | ||
430 | */ | ||
431 | static void rq_comp_handler(struct ib_cq *cq, void *cq_context) | ||
432 | { | ||
433 | struct svcxprt_rdma *xprt = cq_context; | ||
434 | unsigned long flags; | ||
435 | |||
436 | /* Guard against unconditional flush call for destroyed QP */ | ||
437 | if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0) | ||
438 | return; | ||
439 | |||
440 | /* | ||
441 | * Set the bit regardless of whether or not it's on the list | ||
442 | * because it may be on the list already due to an SQ | ||
443 | * completion. | ||
444 | */ | ||
445 | set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags); | ||
446 | |||
447 | /* | ||
448 | * If this transport is not already on the DTO transport queue, | ||
449 | * add it | ||
450 | */ | ||
451 | spin_lock_irqsave(&dto_lock, flags); | ||
452 | if (list_empty(&xprt->sc_dto_q)) { | ||
453 | svc_xprt_get(&xprt->sc_xprt); | ||
454 | list_add_tail(&xprt->sc_dto_q, &dto_xprt_q); | ||
455 | } | ||
456 | spin_unlock_irqrestore(&dto_lock, flags); | ||
457 | |||
458 | /* Tasklet does all the work to avoid irqsave locks. */ | ||
459 | tasklet_schedule(&dto_tasklet); | ||
460 | } | ||
461 | |||
462 | /* | ||
463 | * rq_cq_reap - Process the RQ CQ. | ||
464 | * | 427 | * |
465 | * Take all completing WC off the CQE and enqueue the associated DTO | ||
466 | * context on the dto_q for the transport. | ||
467 | * | ||
468 | * Note that caller must hold a transport reference. | ||
469 | */ | 428 | */ |
470 | static void rq_cq_reap(struct svcxprt_rdma *xprt) | 429 | static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) |
471 | { | 430 | { |
472 | int ret; | 431 | struct svcxprt_rdma *xprt = cq->cq_context; |
473 | struct ib_wc wc; | 432 | struct ib_cqe *cqe = wc->wr_cqe; |
474 | struct svc_rdma_op_ctxt *ctxt = NULL; | 433 | struct svc_rdma_op_ctxt *ctxt; |
475 | |||
476 | if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) | ||
477 | return; | ||
478 | 434 | ||
479 | ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); | 435 | /* WARNING: Only wc->wr_cqe and wc->status are reliable */ |
480 | atomic_inc(&rdma_stat_rq_poll); | 436 | ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); |
437 | ctxt->wc_status = wc->status; | ||
438 | svc_rdma_unmap_dma(ctxt); | ||
481 | 439 | ||
482 | while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) { | 440 | if (wc->status != IB_WC_SUCCESS) |
483 | ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; | 441 | goto flushed; |
484 | ctxt->wc_status = wc.status; | ||
485 | ctxt->byte_len = wc.byte_len; | ||
486 | svc_rdma_unmap_dma(ctxt); | ||
487 | if (wc.status != IB_WC_SUCCESS) { | ||
488 | /* Close the transport */ | ||
489 | dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt); | ||
490 | set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); | ||
491 | svc_rdma_put_context(ctxt, 1); | ||
492 | svc_xprt_put(&xprt->sc_xprt); | ||
493 | continue; | ||
494 | } | ||
495 | spin_lock_bh(&xprt->sc_rq_dto_lock); | ||
496 | list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q); | ||
497 | spin_unlock_bh(&xprt->sc_rq_dto_lock); | ||
498 | svc_xprt_put(&xprt->sc_xprt); | ||
499 | } | ||
500 | 442 | ||
501 | if (ctxt) | 443 | /* All wc fields are now known to be valid */ |
502 | atomic_inc(&rdma_stat_rq_prod); | 444 | ctxt->byte_len = wc->byte_len; |
445 | spin_lock(&xprt->sc_rq_dto_lock); | ||
446 | list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q); | ||
447 | spin_unlock(&xprt->sc_rq_dto_lock); | ||
503 | 448 | ||
504 | set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); | 449 | set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); |
505 | /* | 450 | if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) |
506 | * If data arrived before established event, | 451 | goto out; |
507 | * don't enqueue. This defers RPC I/O until the | 452 | svc_xprt_enqueue(&xprt->sc_xprt); |
508 | * RDMA connection is complete. | 453 | goto out; |
509 | */ | 454 | |
510 | if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) | 455 | flushed: |
511 | svc_xprt_enqueue(&xprt->sc_xprt); | 456 | if (wc->status != IB_WC_WR_FLUSH_ERR) |
457 | pr_warn("svcrdma: receive: %s (%u/0x%x)\n", | ||
458 | ib_wc_status_msg(wc->status), | ||
459 | wc->status, wc->vendor_err); | ||
460 | set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); | ||
461 | svc_rdma_put_context(ctxt, 1); | ||
462 | |||
463 | out: | ||
464 | svc_xprt_put(&xprt->sc_xprt); | ||
512 | } | 465 | } |
513 | 466 | ||
514 | /* | 467 | /* |
@@ -681,6 +634,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags) | |||
681 | ctxt = svc_rdma_get_context(xprt); | 634 | ctxt = svc_rdma_get_context(xprt); |
682 | buflen = 0; | 635 | buflen = 0; |
683 | ctxt->direction = DMA_FROM_DEVICE; | 636 | ctxt->direction = DMA_FROM_DEVICE; |
637 | ctxt->cqe.done = svc_rdma_wc_receive; | ||
684 | for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { | 638 | for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { |
685 | if (sge_no >= xprt->sc_max_sge) { | 639 | if (sge_no >= xprt->sc_max_sge) { |
686 | pr_err("svcrdma: Too many sges (%d)\n", sge_no); | 640 | pr_err("svcrdma: Too many sges (%d)\n", sge_no); |
@@ -705,7 +659,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags) | |||
705 | recv_wr.next = NULL; | 659 | recv_wr.next = NULL; |
706 | recv_wr.sg_list = &ctxt->sge[0]; | 660 | recv_wr.sg_list = &ctxt->sge[0]; |
707 | recv_wr.num_sge = ctxt->count; | 661 | recv_wr.num_sge = ctxt->count; |
708 | recv_wr.wr_id = (u64)(unsigned long)ctxt; | 662 | recv_wr.wr_cqe = &ctxt->cqe; |
709 | 663 | ||
710 | svc_xprt_get(&xprt->sc_xprt); | 664 | svc_xprt_get(&xprt->sc_xprt); |
711 | ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); | 665 | ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); |
@@ -1094,12 +1048,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) | |||
1094 | dprintk("svcrdma: error creating SQ CQ for connect request\n"); | 1048 | dprintk("svcrdma: error creating SQ CQ for connect request\n"); |
1095 | goto errout; | 1049 | goto errout; |
1096 | } | 1050 | } |
1097 | cq_attr.cqe = newxprt->sc_rq_depth; | 1051 | newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth, |
1098 | newxprt->sc_rq_cq = ib_create_cq(dev, | 1052 | 0, IB_POLL_SOFTIRQ); |
1099 | rq_comp_handler, | ||
1100 | cq_event_handler, | ||
1101 | newxprt, | ||
1102 | &cq_attr); | ||
1103 | if (IS_ERR(newxprt->sc_rq_cq)) { | 1053 | if (IS_ERR(newxprt->sc_rq_cq)) { |
1104 | dprintk("svcrdma: error creating RQ CQ for connect request\n"); | 1054 | dprintk("svcrdma: error creating RQ CQ for connect request\n"); |
1105 | goto errout; | 1055 | goto errout; |
@@ -1193,7 +1143,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) | |||
1193 | * miss the first message | 1143 | * miss the first message |
1194 | */ | 1144 | */ |
1195 | ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP); | 1145 | ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP); |
1196 | ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP); | ||
1197 | 1146 | ||
1198 | /* Accept Connection */ | 1147 | /* Accept Connection */ |
1199 | set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); | 1148 | set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); |
@@ -1337,7 +1286,7 @@ static void __svc_rdma_free(struct work_struct *work) | |||
1337 | ib_destroy_cq(rdma->sc_sq_cq); | 1286 | ib_destroy_cq(rdma->sc_sq_cq); |
1338 | 1287 | ||
1339 | if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) | 1288 | if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) |
1340 | ib_destroy_cq(rdma->sc_rq_cq); | 1289 | ib_free_cq(rdma->sc_rq_cq); |
1341 | 1290 | ||
1342 | if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) | 1291 | if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) |
1343 | ib_dealloc_pd(rdma->sc_pd); | 1292 | ib_dealloc_pd(rdma->sc_pd); |