aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2016-03-01 13:07:22 -0500
committerJ. Bruce Fields <bfields@redhat.com>2016-03-01 16:06:43 -0500
commitbe99bb11400ce02552c35a6d3bf054de393ce30e (patch)
treefa4d144a53e4a919b83c2a073f6840650c6cb4f7
parent8bd5ba86d9ba7169e137fc4f32c553080c056a02 (diff)
svcrdma: Use new CQ API for RPC-over-RDMA server send CQs
Calling ib_poll_cq() to sort through WCs during a completion is a common pattern amongst RDMA consumers. Since commit 14d3a3b2498e ("IB: add a proper completion queue abstraction"), WC sorting can be handled by the IB core. By converting to this new API, svcrdma is made a better neighbor to other RDMA consumers, as it allows the core to schedule the delivery of completions more fairly amongst all active consumers. This new API also aims each completion at a function that is specific to the WR's opcode. Thus the ctxt->wr_op field and the switch in process_context is replaced by a set of methods that handle each completion type. Because each ib_cqe carries a pointer to a completion method, the core can now post operations on a consumer's QP, and handle the completions itself. The server's rdma_stat_sq_poll and rdma_stat_sq_prod metrics are no longer updated. As a clean up, the cq_event_handler, the dto_tasklet, and all associated locking is removed, as they are no longer referenced or used. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Tested-by: Steve Wise <swise@opengridcomputing.com> Signed-off-by: J. Bruce Fields <bfields@redhat.com>
-rw-r--r--include/linux/sunrpc/svc_rdma.h9
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c4
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c14
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c12
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c259
5 files changed, 121 insertions, 177 deletions
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index cf79ab86d3d4..3081339968c3 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -76,8 +76,9 @@ struct svc_rdma_op_ctxt {
76 int hdr_count; 76 int hdr_count;
77 struct xdr_buf arg; 77 struct xdr_buf arg;
78 struct ib_cqe cqe; 78 struct ib_cqe cqe;
79 struct ib_cqe reg_cqe;
80 struct ib_cqe inv_cqe;
79 struct list_head dto_q; 81 struct list_head dto_q;
80 enum ib_wr_opcode wr_op;
81 enum ib_wc_status wc_status; 82 enum ib_wc_status wc_status;
82 u32 byte_len; 83 u32 byte_len;
83 u32 position; 84 u32 position;
@@ -175,7 +176,6 @@ struct svcxprt_rdma {
175 struct work_struct sc_work; 176 struct work_struct sc_work;
176}; 177};
177/* sc_flags */ 178/* sc_flags */
178#define RDMAXPRT_SQ_PENDING 2
179#define RDMAXPRT_CONN_PENDING 3 179#define RDMAXPRT_CONN_PENDING 3
180 180
181#define RPCRDMA_LISTEN_BACKLOG 10 181#define RPCRDMA_LISTEN_BACKLOG 10
@@ -232,6 +232,11 @@ extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
232 int); 232 int);
233 233
234/* svc_rdma_transport.c */ 234/* svc_rdma_transport.c */
235extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
236extern void svc_rdma_wc_write(struct ib_cq *, struct ib_wc *);
237extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
238extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
239extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
235extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *); 240extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
236extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t); 241extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
237extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t); 242extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 254be8661981..a2a7519b0f23 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -119,7 +119,6 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
119 ctxt->pages[0] = virt_to_page(rqst->rq_buffer); 119 ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
120 ctxt->count = 1; 120 ctxt->count = 1;
121 121
122 ctxt->wr_op = IB_WR_SEND;
123 ctxt->direction = DMA_TO_DEVICE; 122 ctxt->direction = DMA_TO_DEVICE;
124 ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey; 123 ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
125 ctxt->sge[0].length = sndbuf->len; 124 ctxt->sge[0].length = sndbuf->len;
@@ -133,7 +132,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
133 atomic_inc(&rdma->sc_dma_used); 132 atomic_inc(&rdma->sc_dma_used);
134 133
135 memset(&send_wr, 0, sizeof(send_wr)); 134 memset(&send_wr, 0, sizeof(send_wr));
136 send_wr.wr_id = (unsigned long)ctxt; 135 ctxt->cqe.done = svc_rdma_wc_send;
136 send_wr.wr_cqe = &ctxt->cqe;
137 send_wr.sg_list = ctxt->sge; 137 send_wr.sg_list = ctxt->sge;
138 send_wr.num_sge = 1; 138 send_wr.num_sge = 1;
139 send_wr.opcode = IB_WR_SEND; 139 send_wr.opcode = IB_WR_SEND;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index d3718e94c169..3b24a646eb46 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -180,9 +180,9 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
180 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 180 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
181 181
182 memset(&read_wr, 0, sizeof(read_wr)); 182 memset(&read_wr, 0, sizeof(read_wr));
183 read_wr.wr.wr_id = (unsigned long)ctxt; 183 ctxt->cqe.done = svc_rdma_wc_read;
184 read_wr.wr.wr_cqe = &ctxt->cqe;
184 read_wr.wr.opcode = IB_WR_RDMA_READ; 185 read_wr.wr.opcode = IB_WR_RDMA_READ;
185 ctxt->wr_op = read_wr.wr.opcode;
186 read_wr.wr.send_flags = IB_SEND_SIGNALED; 186 read_wr.wr.send_flags = IB_SEND_SIGNALED;
187 read_wr.rkey = rs_handle; 187 read_wr.rkey = rs_handle;
188 read_wr.remote_addr = rs_offset; 188 read_wr.remote_addr = rs_offset;
@@ -299,8 +299,9 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
299 ctxt->read_hdr = head; 299 ctxt->read_hdr = head;
300 300
301 /* Prepare REG WR */ 301 /* Prepare REG WR */
302 ctxt->reg_cqe.done = svc_rdma_wc_reg;
303 reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
302 reg_wr.wr.opcode = IB_WR_REG_MR; 304 reg_wr.wr.opcode = IB_WR_REG_MR;
303 reg_wr.wr.wr_id = 0;
304 reg_wr.wr.send_flags = IB_SEND_SIGNALED; 305 reg_wr.wr.send_flags = IB_SEND_SIGNALED;
305 reg_wr.wr.num_sge = 0; 306 reg_wr.wr.num_sge = 0;
306 reg_wr.mr = frmr->mr; 307 reg_wr.mr = frmr->mr;
@@ -310,6 +311,8 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
310 311
311 /* Prepare RDMA_READ */ 312 /* Prepare RDMA_READ */
312 memset(&read_wr, 0, sizeof(read_wr)); 313 memset(&read_wr, 0, sizeof(read_wr));
314 ctxt->cqe.done = svc_rdma_wc_read;
315 read_wr.wr.wr_cqe = &ctxt->cqe;
313 read_wr.wr.send_flags = IB_SEND_SIGNALED; 316 read_wr.wr.send_flags = IB_SEND_SIGNALED;
314 read_wr.rkey = rs_handle; 317 read_wr.rkey = rs_handle;
315 read_wr.remote_addr = rs_offset; 318 read_wr.remote_addr = rs_offset;
@@ -317,19 +320,18 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
317 read_wr.wr.num_sge = 1; 320 read_wr.wr.num_sge = 1;
318 if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) { 321 if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
319 read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV; 322 read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
320 read_wr.wr.wr_id = (unsigned long)ctxt;
321 read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey; 323 read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
322 } else { 324 } else {
323 read_wr.wr.opcode = IB_WR_RDMA_READ; 325 read_wr.wr.opcode = IB_WR_RDMA_READ;
324 read_wr.wr.next = &inv_wr; 326 read_wr.wr.next = &inv_wr;
325 /* Prepare invalidate */ 327 /* Prepare invalidate */
326 memset(&inv_wr, 0, sizeof(inv_wr)); 328 memset(&inv_wr, 0, sizeof(inv_wr));
327 inv_wr.wr_id = (unsigned long)ctxt; 329 ctxt->inv_cqe.done = svc_rdma_wc_inv;
330 inv_wr.wr_cqe = &ctxt->inv_cqe;
328 inv_wr.opcode = IB_WR_LOCAL_INV; 331 inv_wr.opcode = IB_WR_LOCAL_INV;
329 inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE; 332 inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
330 inv_wr.ex.invalidate_rkey = frmr->mr->lkey; 333 inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
331 } 334 }
332 ctxt->wr_op = read_wr.wr.opcode;
333 335
334 /* Post the chain */ 336 /* Post the chain */
335 ret = svc_rdma_send(xprt, &reg_wr.wr); 337 ret = svc_rdma_send(xprt, &reg_wr.wr);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index a26ca569f257..4f1b1c4f45f9 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -297,8 +297,8 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
297 297
298 /* Prepare WRITE WR */ 298 /* Prepare WRITE WR */
299 memset(&write_wr, 0, sizeof write_wr); 299 memset(&write_wr, 0, sizeof write_wr);
300 ctxt->wr_op = IB_WR_RDMA_WRITE; 300 ctxt->cqe.done = svc_rdma_wc_write;
301 write_wr.wr.wr_id = (unsigned long)ctxt; 301 write_wr.wr.wr_cqe = &ctxt->cqe;
302 write_wr.wr.sg_list = &sge[0]; 302 write_wr.wr.sg_list = &sge[0];
303 write_wr.wr.num_sge = sge_no; 303 write_wr.wr.num_sge = sge_no;
304 write_wr.wr.opcode = IB_WR_RDMA_WRITE; 304 write_wr.wr.opcode = IB_WR_RDMA_WRITE;
@@ -549,8 +549,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
549 goto err; 549 goto err;
550 } 550 }
551 memset(&send_wr, 0, sizeof send_wr); 551 memset(&send_wr, 0, sizeof send_wr);
552 ctxt->wr_op = IB_WR_SEND; 552 ctxt->cqe.done = svc_rdma_wc_send;
553 send_wr.wr_id = (unsigned long)ctxt; 553 send_wr.wr_cqe = &ctxt->cqe;
554 send_wr.sg_list = ctxt->sge; 554 send_wr.sg_list = ctxt->sge;
555 send_wr.num_sge = sge_no; 555 send_wr.num_sge = sge_no;
556 send_wr.opcode = IB_WR_SEND; 556 send_wr.opcode = IB_WR_SEND;
@@ -698,8 +698,8 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
698 698
699 /* Prepare SEND WR */ 699 /* Prepare SEND WR */
700 memset(&err_wr, 0, sizeof(err_wr)); 700 memset(&err_wr, 0, sizeof(err_wr));
701 ctxt->wr_op = IB_WR_SEND; 701 ctxt->cqe.done = svc_rdma_wc_send;
702 err_wr.wr_id = (unsigned long)ctxt; 702 err_wr.wr_cqe = &ctxt->cqe;
703 err_wr.sg_list = ctxt->sge; 703 err_wr.sg_list = ctxt->sge;
704 err_wr.num_sge = 1; 704 err_wr.num_sge = 1;
705 err_wr.opcode = IB_WR_SEND; 705 err_wr.opcode = IB_WR_SEND;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 5dfa1b6bf0c2..90668969d559 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -63,16 +63,10 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
63 int flags); 63 int flags);
64static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 64static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
65static void svc_rdma_release_rqst(struct svc_rqst *); 65static void svc_rdma_release_rqst(struct svc_rqst *);
66static void dto_tasklet_func(unsigned long data);
67static void svc_rdma_detach(struct svc_xprt *xprt); 66static void svc_rdma_detach(struct svc_xprt *xprt);
68static void svc_rdma_free(struct svc_xprt *xprt); 67static void svc_rdma_free(struct svc_xprt *xprt);
69static int svc_rdma_has_wspace(struct svc_xprt *xprt); 68static int svc_rdma_has_wspace(struct svc_xprt *xprt);
70static int svc_rdma_secure_port(struct svc_rqst *); 69static int svc_rdma_secure_port(struct svc_rqst *);
71static void sq_cq_reap(struct svcxprt_rdma *xprt);
72
73static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
74static DEFINE_SPINLOCK(dto_lock);
75static LIST_HEAD(dto_xprt_q);
76 70
77static struct svc_xprt_ops svc_rdma_ops = { 71static struct svc_xprt_ops svc_rdma_ops = {
78 .xpo_create = svc_rdma_create, 72 .xpo_create = svc_rdma_create,
@@ -351,15 +345,6 @@ static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
351 } 345 }
352} 346}
353 347
354/* ib_cq event handler */
355static void cq_event_handler(struct ib_event *event, void *context)
356{
357 struct svc_xprt *xprt = context;
358 dprintk("svcrdma: received CQ event %s (%d), context=%p\n",
359 ib_event_msg(event->event), event->event, context);
360 set_bit(XPT_CLOSE, &xprt->xpt_flags);
361}
362
363/* QP event handler */ 348/* QP event handler */
364static void qp_event_handler(struct ib_event *event, void *context) 349static void qp_event_handler(struct ib_event *event, void *context)
365{ 350{
@@ -391,35 +376,6 @@ static void qp_event_handler(struct ib_event *event, void *context)
391 } 376 }
392} 377}
393 378
394/*
395 * Data Transfer Operation Tasklet
396 *
397 * Walks a list of transports with I/O pending, removing entries as
398 * they are added to the server's I/O pending list. Two bits indicate
399 * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
400 * spinlock that serializes access to the transport list with the RQ
401 * and SQ interrupt handlers.
402 */
403static void dto_tasklet_func(unsigned long data)
404{
405 struct svcxprt_rdma *xprt;
406 unsigned long flags;
407
408 spin_lock_irqsave(&dto_lock, flags);
409 while (!list_empty(&dto_xprt_q)) {
410 xprt = list_entry(dto_xprt_q.next,
411 struct svcxprt_rdma, sc_dto_q);
412 list_del_init(&xprt->sc_dto_q);
413 spin_unlock_irqrestore(&dto_lock, flags);
414
415 sq_cq_reap(xprt);
416
417 svc_xprt_put(&xprt->sc_xprt);
418 spin_lock_irqsave(&dto_lock, flags);
419 }
420 spin_unlock_irqrestore(&dto_lock, flags);
421}
422
423/** 379/**
424 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 380 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
425 * @cq: completion queue 381 * @cq: completion queue
@@ -464,132 +420,127 @@ out:
464 svc_xprt_put(&xprt->sc_xprt); 420 svc_xprt_put(&xprt->sc_xprt);
465} 421}
466 422
467/* 423static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
468 * Process a completion context 424 struct ib_wc *wc,
469 */ 425 const char *opname)
470static void process_context(struct svcxprt_rdma *xprt,
471 struct svc_rdma_op_ctxt *ctxt)
472{ 426{
473 struct svc_rdma_op_ctxt *read_hdr; 427 if (wc->status != IB_WC_SUCCESS)
474 int free_pages = 0; 428 goto err;
475
476 svc_rdma_unmap_dma(ctxt);
477
478 switch (ctxt->wr_op) {
479 case IB_WR_SEND:
480 free_pages = 1;
481 break;
482 429
483 case IB_WR_RDMA_WRITE: 430out:
484 break; 431 atomic_dec(&xprt->sc_sq_count);
432 wake_up(&xprt->sc_send_wait);
433 return;
485 434
486 case IB_WR_RDMA_READ: 435err:
487 case IB_WR_RDMA_READ_WITH_INV: 436 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
488 svc_rdma_put_frmr(xprt, ctxt->frmr); 437 if (wc->status != IB_WC_WR_FLUSH_ERR)
438 pr_err("svcrdma: %s: %s (%u/0x%x)\n",
439 opname, ib_wc_status_msg(wc->status),
440 wc->status, wc->vendor_err);
441 goto out;
442}
489 443
490 if (!test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) 444static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
491 break; 445 const char *opname)
446{
447 struct svcxprt_rdma *xprt = cq->cq_context;
492 448
493 read_hdr = ctxt->read_hdr; 449 svc_rdma_send_wc_common(xprt, wc, opname);
494 svc_rdma_put_context(ctxt, 0); 450 svc_xprt_put(&xprt->sc_xprt);
451}
495 452
496 spin_lock_bh(&xprt->sc_rq_dto_lock); 453/**
497 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 454 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
498 list_add_tail(&read_hdr->dto_q, 455 * @cq: completion queue
499 &xprt->sc_read_complete_q); 456 * @wc: completed WR
500 spin_unlock_bh(&xprt->sc_rq_dto_lock); 457 *
501 svc_xprt_enqueue(&xprt->sc_xprt); 458 */
502 return; 459void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
460{
461 struct ib_cqe *cqe = wc->wr_cqe;
462 struct svc_rdma_op_ctxt *ctxt;
503 463
504 default: 464 svc_rdma_send_wc_common_put(cq, wc, "send");
505 dprintk("svcrdma: unexpected completion opcode=%d\n",
506 ctxt->wr_op);
507 break;
508 }
509 465
510 svc_rdma_put_context(ctxt, free_pages); 466 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
467 svc_rdma_unmap_dma(ctxt);
468 svc_rdma_put_context(ctxt, 1);
511} 469}
512 470
513/* 471/**
514 * Send Queue Completion Handler - potentially called on interrupt context. 472 * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC
473 * @cq: completion queue
474 * @wc: completed WR
515 * 475 *
516 * Note that caller must hold a transport reference.
517 */ 476 */
518static void sq_cq_reap(struct svcxprt_rdma *xprt) 477void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc)
519{ 478{
520 struct svc_rdma_op_ctxt *ctxt = NULL; 479 struct ib_cqe *cqe = wc->wr_cqe;
521 struct ib_wc wc_a[6]; 480 struct svc_rdma_op_ctxt *ctxt;
522 struct ib_wc *wc;
523 struct ib_cq *cq = xprt->sc_sq_cq;
524 int ret;
525 481
526 memset(wc_a, 0, sizeof(wc_a)); 482 svc_rdma_send_wc_common_put(cq, wc, "write");
527 483
528 if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) 484 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
529 return; 485 svc_rdma_unmap_dma(ctxt);
486 svc_rdma_put_context(ctxt, 0);
487}
530 488
531 ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP); 489/**
532 atomic_inc(&rdma_stat_sq_poll); 490 * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
533 while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) { 491 * @cq: completion queue
534 int i; 492 * @wc: completed WR
493 *
494 */
495void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc)
496{
497 svc_rdma_send_wc_common_put(cq, wc, "fastreg");
498}
535 499
536 for (i = 0; i < ret; i++) { 500/**
537 wc = &wc_a[i]; 501 * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC
538 if (wc->status != IB_WC_SUCCESS) { 502 * @cq: completion queue
539 dprintk("svcrdma: sq wc err status %s (%d)\n", 503 * @wc: completed WR
540 ib_wc_status_msg(wc->status), 504 *
541 wc->status); 505 */
506void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc)
507{
508 struct svcxprt_rdma *xprt = cq->cq_context;
509 struct ib_cqe *cqe = wc->wr_cqe;
510 struct svc_rdma_op_ctxt *ctxt;
542 511
543 /* Close the transport */ 512 svc_rdma_send_wc_common(xprt, wc, "read");
544 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
545 }
546 513
547 /* Decrement used SQ WR count */ 514 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
548 atomic_dec(&xprt->sc_sq_count); 515 svc_rdma_unmap_dma(ctxt);
549 wake_up(&xprt->sc_send_wait); 516 svc_rdma_put_frmr(xprt, ctxt->frmr);
550 517
551 ctxt = (struct svc_rdma_op_ctxt *) 518 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
552 (unsigned long)wc->wr_id; 519 struct svc_rdma_op_ctxt *read_hdr;
553 if (ctxt)
554 process_context(xprt, ctxt);
555 520
556 svc_xprt_put(&xprt->sc_xprt); 521 read_hdr = ctxt->read_hdr;
557 } 522 spin_lock(&xprt->sc_rq_dto_lock);
523 list_add_tail(&read_hdr->dto_q,
524 &xprt->sc_read_complete_q);
525 spin_unlock(&xprt->sc_rq_dto_lock);
526
527 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
528 svc_xprt_enqueue(&xprt->sc_xprt);
558 } 529 }
559 530
560 if (ctxt) 531 svc_rdma_put_context(ctxt, 0);
561 atomic_inc(&rdma_stat_sq_prod); 532 svc_xprt_put(&xprt->sc_xprt);
562} 533}
563 534
564static void sq_comp_handler(struct ib_cq *cq, void *cq_context) 535/**
536 * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC
537 * @cq: completion queue
538 * @wc: completed WR
539 *
540 */
541void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc)
565{ 542{
566 struct svcxprt_rdma *xprt = cq_context; 543 svc_rdma_send_wc_common_put(cq, wc, "localInv");
567 unsigned long flags;
568
569 /* Guard against unconditional flush call for destroyed QP */
570 if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
571 return;
572
573 /*
574 * Set the bit regardless of whether or not it's on the list
575 * because it may be on the list already due to an RQ
576 * completion.
577 */
578 set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
579
580 /*
581 * If this transport is not already on the DTO transport queue,
582 * add it
583 */
584 spin_lock_irqsave(&dto_lock, flags);
585 if (list_empty(&xprt->sc_dto_q)) {
586 svc_xprt_get(&xprt->sc_xprt);
587 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
588 }
589 spin_unlock_irqrestore(&dto_lock, flags);
590
591 /* Tasklet does all the work to avoid irqsave locks. */
592 tasklet_schedule(&dto_tasklet);
593} 544}
594 545
595static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 546static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
@@ -980,7 +931,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
980 struct svcxprt_rdma *listen_rdma; 931 struct svcxprt_rdma *listen_rdma;
981 struct svcxprt_rdma *newxprt = NULL; 932 struct svcxprt_rdma *newxprt = NULL;
982 struct rdma_conn_param conn_param; 933 struct rdma_conn_param conn_param;
983 struct ib_cq_init_attr cq_attr = {};
984 struct ib_qp_init_attr qp_attr; 934 struct ib_qp_init_attr qp_attr;
985 struct ib_device *dev; 935 struct ib_device *dev;
986 unsigned int i; 936 unsigned int i;
@@ -1038,12 +988,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
1038 dprintk("svcrdma: error creating PD for connect request\n"); 988 dprintk("svcrdma: error creating PD for connect request\n");
1039 goto errout; 989 goto errout;
1040 } 990 }
1041 cq_attr.cqe = newxprt->sc_sq_depth; 991 newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
1042 newxprt->sc_sq_cq = ib_create_cq(dev, 992 0, IB_POLL_SOFTIRQ);
1043 sq_comp_handler,
1044 cq_event_handler,
1045 newxprt,
1046 &cq_attr);
1047 if (IS_ERR(newxprt->sc_sq_cq)) { 993 if (IS_ERR(newxprt->sc_sq_cq)) {
1048 dprintk("svcrdma: error creating SQ CQ for connect request\n"); 994 dprintk("svcrdma: error creating SQ CQ for connect request\n");
1049 goto errout; 995 goto errout;
@@ -1138,12 +1084,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
1138 /* Swap out the handler */ 1084 /* Swap out the handler */
1139 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 1085 newxprt->sc_cm_id->event_handler = rdma_cma_handler;
1140 1086
1141 /*
1142 * Arm the CQs for the SQ and RQ before accepting so we can't
1143 * miss the first message
1144 */
1145 ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
1146
1147 /* Accept Connection */ 1087 /* Accept Connection */
1148 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 1088 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
1149 memset(&conn_param, 0, sizeof conn_param); 1089 memset(&conn_param, 0, sizeof conn_param);
@@ -1283,7 +1223,7 @@ static void __svc_rdma_free(struct work_struct *work)
1283 ib_destroy_qp(rdma->sc_qp); 1223 ib_destroy_qp(rdma->sc_qp);
1284 1224
1285 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 1225 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
1286 ib_destroy_cq(rdma->sc_sq_cq); 1226 ib_free_cq(rdma->sc_sq_cq);
1287 1227
1288 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 1228 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
1289 ib_free_cq(rdma->sc_rq_cq); 1229 ib_free_cq(rdma->sc_rq_cq);
@@ -1347,9 +1287,6 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1347 spin_unlock_bh(&xprt->sc_lock); 1287 spin_unlock_bh(&xprt->sc_lock);
1348 atomic_inc(&rdma_stat_sq_starve); 1288 atomic_inc(&rdma_stat_sq_starve);
1349 1289
1350 /* See if we can opportunistically reap SQ WR to make room */
1351 sq_cq_reap(xprt);
1352
1353 /* Wait until SQ WR available if SQ still full */ 1290 /* Wait until SQ WR available if SQ still full */
1354 wait_event(xprt->sc_send_wait, 1291 wait_event(xprt->sc_send_wait,
1355 atomic_read(&xprt->sc_sq_count) < 1292 atomic_read(&xprt->sc_sq_count) <