summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/sched.c1
-rw-r--r--net/sunrpc/xprt.c32
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c327
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c148
-rw-r--r--net/sunrpc/xprtrdma/transport.c83
-rw-r--r--net/sunrpc/xprtrdma/verbs.c115
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h44
-rw-r--r--net/sunrpc/xprtsock.c23
8 files changed, 441 insertions, 332 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index f820780280b5..8a0779e963f9 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -59,6 +59,7 @@ static struct rpc_wait_queue delay_queue;
59 */ 59 */
60struct workqueue_struct *rpciod_workqueue __read_mostly; 60struct workqueue_struct *rpciod_workqueue __read_mostly;
61struct workqueue_struct *xprtiod_workqueue __read_mostly; 61struct workqueue_struct *xprtiod_workqueue __read_mostly;
62EXPORT_SYMBOL_GPL(xprtiod_workqueue);
62 63
63unsigned long 64unsigned long
64rpc_task_timeout(const struct rpc_task *task) 65rpc_task_timeout(const struct rpc_task *task)
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 70d6a1f10db9..70a704c44c6d 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -846,6 +846,38 @@ void xprt_connect(struct rpc_task *task)
846 xprt_release_write(xprt, task); 846 xprt_release_write(xprt, task);
847} 847}
848 848
849/**
850 * xprt_reconnect_delay - compute the wait before scheduling a connect
851 * @xprt: transport instance
852 *
853 */
854unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt)
855{
856 unsigned long start, now = jiffies;
857
858 start = xprt->stat.connect_start + xprt->reestablish_timeout;
859 if (time_after(start, now))
860 return start - now;
861 return 0;
862}
863EXPORT_SYMBOL_GPL(xprt_reconnect_delay);
864
865/**
866 * xprt_reconnect_backoff - compute the new re-establish timeout
867 * @xprt: transport instance
868 * @init_to: initial reestablish timeout
869 *
870 */
871void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to)
872{
873 xprt->reestablish_timeout <<= 1;
874 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
875 xprt->reestablish_timeout = xprt->max_reconnect_timeout;
876 if (xprt->reestablish_timeout < init_to)
877 xprt->reestablish_timeout = init_to;
878}
879EXPORT_SYMBOL_GPL(xprt_reconnect_backoff);
880
849enum xprt_xid_rb_cmp { 881enum xprt_xid_rb_cmp {
850 XID_RB_EQUAL, 882 XID_RB_EQUAL,
851 XID_RB_LEFT, 883 XID_RB_LEFT,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 794ba4ca0994..0b6dad7580a1 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -144,6 +144,26 @@ frwr_mr_recycle_worker(struct work_struct *work)
144 frwr_release_mr(mr); 144 frwr_release_mr(mr);
145} 145}
146 146
147/* frwr_reset - Place MRs back on the free list
148 * @req: request to reset
149 *
150 * Used after a failed marshal. For FRWR, this means the MRs
151 * don't have to be fully released and recreated.
152 *
153 * NB: This is safe only as long as none of @req's MRs are
154 * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
155 * Work Request.
156 */
157void frwr_reset(struct rpcrdma_req *req)
158{
159 while (!list_empty(&req->rl_registered)) {
160 struct rpcrdma_mr *mr;
161
162 mr = rpcrdma_mr_pop(&req->rl_registered);
163 rpcrdma_mr_unmap_and_put(mr);
164 }
165}
166
147/** 167/**
148 * frwr_init_mr - Initialize one MR 168 * frwr_init_mr - Initialize one MR
149 * @ia: interface adapter 169 * @ia: interface adapter
@@ -168,7 +188,6 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
168 goto out_list_err; 188 goto out_list_err;
169 189
170 mr->frwr.fr_mr = frmr; 190 mr->frwr.fr_mr = frmr;
171 mr->frwr.fr_state = FRWR_IS_INVALID;
172 mr->mr_dir = DMA_NONE; 191 mr->mr_dir = DMA_NONE;
173 INIT_LIST_HEAD(&mr->mr_list); 192 INIT_LIST_HEAD(&mr->mr_list);
174 INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker); 193 INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
@@ -298,65 +317,6 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
298} 317}
299 318
300/** 319/**
301 * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
302 * @cq: completion queue (ignored)
303 * @wc: completed WR
304 *
305 */
306static void
307frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
308{
309 struct ib_cqe *cqe = wc->wr_cqe;
310 struct rpcrdma_frwr *frwr =
311 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
312
313 /* WARNING: Only wr_cqe and status are reliable at this point */
314 if (wc->status != IB_WC_SUCCESS)
315 frwr->fr_state = FRWR_FLUSHED_FR;
316 trace_xprtrdma_wc_fastreg(wc, frwr);
317}
318
319/**
320 * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC
321 * @cq: completion queue (ignored)
322 * @wc: completed WR
323 *
324 */
325static void
326frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
327{
328 struct ib_cqe *cqe = wc->wr_cqe;
329 struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
330 fr_cqe);
331
332 /* WARNING: Only wr_cqe and status are reliable at this point */
333 if (wc->status != IB_WC_SUCCESS)
334 frwr->fr_state = FRWR_FLUSHED_LI;
335 trace_xprtrdma_wc_li(wc, frwr);
336}
337
338/**
339 * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC
340 * @cq: completion queue (ignored)
341 * @wc: completed WR
342 *
343 * Awaken anyone waiting for an MR to finish being fenced.
344 */
345static void
346frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
347{
348 struct ib_cqe *cqe = wc->wr_cqe;
349 struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
350 fr_cqe);
351
352 /* WARNING: Only wr_cqe and status are reliable at this point */
353 if (wc->status != IB_WC_SUCCESS)
354 frwr->fr_state = FRWR_FLUSHED_LI;
355 trace_xprtrdma_wc_li_wake(wc, frwr);
356 complete(&frwr->fr_linv_done);
357}
358
359/**
360 * frwr_map - Register a memory region 320 * frwr_map - Register a memory region
361 * @r_xprt: controlling transport 321 * @r_xprt: controlling transport
362 * @seg: memory region co-ordinates 322 * @seg: memory region co-ordinates
@@ -378,23 +338,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
378{ 338{
379 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 339 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
380 bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS; 340 bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
381 struct rpcrdma_frwr *frwr;
382 struct rpcrdma_mr *mr; 341 struct rpcrdma_mr *mr;
383 struct ib_mr *ibmr; 342 struct ib_mr *ibmr;
384 struct ib_reg_wr *reg_wr; 343 struct ib_reg_wr *reg_wr;
385 int i, n; 344 int i, n;
386 u8 key; 345 u8 key;
387 346
388 mr = NULL; 347 mr = rpcrdma_mr_get(r_xprt);
389 do { 348 if (!mr)
390 if (mr) 349 goto out_getmr_err;
391 rpcrdma_mr_recycle(mr);
392 mr = rpcrdma_mr_get(r_xprt);
393 if (!mr)
394 return ERR_PTR(-EAGAIN);
395 } while (mr->frwr.fr_state != FRWR_IS_INVALID);
396 frwr = &mr->frwr;
397 frwr->fr_state = FRWR_IS_VALID;
398 350
399 if (nsegs > ia->ri_max_frwr_depth) 351 if (nsegs > ia->ri_max_frwr_depth)
400 nsegs = ia->ri_max_frwr_depth; 352 nsegs = ia->ri_max_frwr_depth;
@@ -423,7 +375,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
423 if (!mr->mr_nents) 375 if (!mr->mr_nents)
424 goto out_dmamap_err; 376 goto out_dmamap_err;
425 377
426 ibmr = frwr->fr_mr; 378 ibmr = mr->frwr.fr_mr;
427 n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE); 379 n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
428 if (unlikely(n != mr->mr_nents)) 380 if (unlikely(n != mr->mr_nents))
429 goto out_mapmr_err; 381 goto out_mapmr_err;
@@ -433,7 +385,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
433 key = (u8)(ibmr->rkey & 0x000000FF); 385 key = (u8)(ibmr->rkey & 0x000000FF);
434 ib_update_fast_reg_key(ibmr, ++key); 386 ib_update_fast_reg_key(ibmr, ++key);
435 387
436 reg_wr = &frwr->fr_regwr; 388 reg_wr = &mr->frwr.fr_regwr;
437 reg_wr->mr = ibmr; 389 reg_wr->mr = ibmr;
438 reg_wr->key = ibmr->rkey; 390 reg_wr->key = ibmr->rkey;
439 reg_wr->access = writing ? 391 reg_wr->access = writing ?
@@ -448,6 +400,10 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
448 *out = mr; 400 *out = mr;
449 return seg; 401 return seg;
450 402
403out_getmr_err:
404 xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
405 return ERR_PTR(-EAGAIN);
406
451out_dmamap_err: 407out_dmamap_err:
452 mr->mr_dir = DMA_NONE; 408 mr->mr_dir = DMA_NONE;
453 trace_xprtrdma_frwr_sgerr(mr, i); 409 trace_xprtrdma_frwr_sgerr(mr, i);
@@ -461,6 +417,23 @@ out_mapmr_err:
461} 417}
462 418
463/** 419/**
420 * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
421 * @cq: completion queue (ignored)
422 * @wc: completed WR
423 *
424 */
425static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
426{
427 struct ib_cqe *cqe = wc->wr_cqe;
428 struct rpcrdma_frwr *frwr =
429 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
430
431 /* WARNING: Only wr_cqe and status are reliable at this point */
432 trace_xprtrdma_wc_fastreg(wc, frwr);
433 /* The MR will get recycled when the associated req is retransmitted */
434}
435
436/**
464 * frwr_send - post Send WR containing the RPC Call message 437 * frwr_send - post Send WR containing the RPC Call message
465 * @ia: interface adapter 438 * @ia: interface adapter
466 * @req: Prepared RPC Call 439 * @req: Prepared RPC Call
@@ -512,31 +485,75 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
512 if (mr->mr_handle == rep->rr_inv_rkey) { 485 if (mr->mr_handle == rep->rr_inv_rkey) {
513 list_del_init(&mr->mr_list); 486 list_del_init(&mr->mr_list);
514 trace_xprtrdma_mr_remoteinv(mr); 487 trace_xprtrdma_mr_remoteinv(mr);
515 mr->frwr.fr_state = FRWR_IS_INVALID;
516 rpcrdma_mr_unmap_and_put(mr); 488 rpcrdma_mr_unmap_and_put(mr);
517 break; /* only one invalidated MR per RPC */ 489 break; /* only one invalidated MR per RPC */
518 } 490 }
519} 491}
520 492
493static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
494{
495 if (wc->status != IB_WC_SUCCESS)
496 rpcrdma_mr_recycle(mr);
497 else
498 rpcrdma_mr_unmap_and_put(mr);
499}
500
521/** 501/**
522 * frwr_unmap_sync - invalidate memory regions that were registered for @req 502 * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
523 * @r_xprt: controlling transport 503 * @cq: completion queue (ignored)
524 * @mrs: list of MRs to process 504 * @wc: completed WR
505 *
506 */
507static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
508{
509 struct ib_cqe *cqe = wc->wr_cqe;
510 struct rpcrdma_frwr *frwr =
511 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
512 struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
513
514 /* WARNING: Only wr_cqe and status are reliable at this point */
515 trace_xprtrdma_wc_li(wc, frwr);
516 __frwr_release_mr(wc, mr);
517}
518
519/**
520 * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
521 * @cq: completion queue (ignored)
522 * @wc: completed WR
525 * 523 *
526 * Sleeps until it is safe for the host CPU to access the 524 * Awaken anyone waiting for an MR to finish being fenced.
527 * previously mapped memory regions. 525 */
526static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
527{
528 struct ib_cqe *cqe = wc->wr_cqe;
529 struct rpcrdma_frwr *frwr =
530 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
531 struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
532
533 /* WARNING: Only wr_cqe and status are reliable at this point */
534 trace_xprtrdma_wc_li_wake(wc, frwr);
535 complete(&frwr->fr_linv_done);
536 __frwr_release_mr(wc, mr);
537}
538
539/**
540 * frwr_unmap_sync - invalidate memory regions that were registered for @req
541 * @r_xprt: controlling transport instance
542 * @req: rpcrdma_req with a non-empty list of MRs to process
528 * 543 *
529 * Caller ensures that @mrs is not empty before the call. This 544 * Sleeps until it is safe for the host CPU to access the previously mapped
530 * function empties the list. 545 * memory regions. This guarantees that registered MRs are properly fenced
546 * from the server before the RPC consumer accesses the data in them. It
547 * also ensures proper Send flow control: waking the next RPC waits until
548 * this RPC has relinquished all its Send Queue entries.
531 */ 549 */
532void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs) 550void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
533{ 551{
534 struct ib_send_wr *first, **prev, *last; 552 struct ib_send_wr *first, **prev, *last;
535 const struct ib_send_wr *bad_wr; 553 const struct ib_send_wr *bad_wr;
536 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
537 struct rpcrdma_frwr *frwr; 554 struct rpcrdma_frwr *frwr;
538 struct rpcrdma_mr *mr; 555 struct rpcrdma_mr *mr;
539 int count, rc; 556 int rc;
540 557
541 /* ORDER: Invalidate all of the MRs first 558 /* ORDER: Invalidate all of the MRs first
542 * 559 *
@@ -544,33 +561,32 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
544 * a single ib_post_send() call. 561 * a single ib_post_send() call.
545 */ 562 */
546 frwr = NULL; 563 frwr = NULL;
547 count = 0;
548 prev = &first; 564 prev = &first;
549 list_for_each_entry(mr, mrs, mr_list) { 565 while (!list_empty(&req->rl_registered)) {
550 mr->frwr.fr_state = FRWR_IS_INVALID; 566 mr = rpcrdma_mr_pop(&req->rl_registered);
551 567
552 frwr = &mr->frwr;
553 trace_xprtrdma_mr_localinv(mr); 568 trace_xprtrdma_mr_localinv(mr);
569 r_xprt->rx_stats.local_inv_needed++;
554 570
571 frwr = &mr->frwr;
555 frwr->fr_cqe.done = frwr_wc_localinv; 572 frwr->fr_cqe.done = frwr_wc_localinv;
556 last = &frwr->fr_invwr; 573 last = &frwr->fr_invwr;
557 memset(last, 0, sizeof(*last)); 574 last->next = NULL;
558 last->wr_cqe = &frwr->fr_cqe; 575 last->wr_cqe = &frwr->fr_cqe;
576 last->sg_list = NULL;
577 last->num_sge = 0;
559 last->opcode = IB_WR_LOCAL_INV; 578 last->opcode = IB_WR_LOCAL_INV;
579 last->send_flags = IB_SEND_SIGNALED;
560 last->ex.invalidate_rkey = mr->mr_handle; 580 last->ex.invalidate_rkey = mr->mr_handle;
561 count++;
562 581
563 *prev = last; 582 *prev = last;
564 prev = &last->next; 583 prev = &last->next;
565 } 584 }
566 if (!frwr)
567 goto unmap;
568 585
569 /* Strong send queue ordering guarantees that when the 586 /* Strong send queue ordering guarantees that when the
570 * last WR in the chain completes, all WRs in the chain 587 * last WR in the chain completes, all WRs in the chain
571 * are complete. 588 * are complete.
572 */ 589 */
573 last->send_flags = IB_SEND_SIGNALED;
574 frwr->fr_cqe.done = frwr_wc_localinv_wake; 590 frwr->fr_cqe.done = frwr_wc_localinv_wake;
575 reinit_completion(&frwr->fr_linv_done); 591 reinit_completion(&frwr->fr_linv_done);
576 592
@@ -578,37 +594,126 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
578 * replaces the QP. The RPC reply handler won't call us 594 * replaces the QP. The RPC reply handler won't call us
579 * unless ri_id->qp is a valid pointer. 595 * unless ri_id->qp is a valid pointer.
580 */ 596 */
581 r_xprt->rx_stats.local_inv_needed++;
582 bad_wr = NULL; 597 bad_wr = NULL;
583 rc = ib_post_send(ia->ri_id->qp, first, &bad_wr); 598 rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
599 trace_xprtrdma_post_send(req, rc);
600
601 /* The final LOCAL_INV WR in the chain is supposed to
602 * do the wake. If it was never posted, the wake will
603 * not happen, so don't wait in that case.
604 */
584 if (bad_wr != first) 605 if (bad_wr != first)
585 wait_for_completion(&frwr->fr_linv_done); 606 wait_for_completion(&frwr->fr_linv_done);
586 if (rc) 607 if (!rc)
587 goto out_release; 608 return;
588 609
589 /* ORDER: Now DMA unmap all of the MRs, and return 610 /* Recycle MRs in the LOCAL_INV chain that did not get posted.
590 * them to the free MR list.
591 */ 611 */
592unmap: 612 while (bad_wr) {
593 while (!list_empty(mrs)) { 613 frwr = container_of(bad_wr, struct rpcrdma_frwr,
594 mr = rpcrdma_mr_pop(mrs); 614 fr_invwr);
595 rpcrdma_mr_unmap_and_put(mr); 615 mr = container_of(frwr, struct rpcrdma_mr, frwr);
616 bad_wr = bad_wr->next;
617
618 list_del_init(&mr->mr_list);
619 rpcrdma_mr_recycle(mr);
596 } 620 }
597 return; 621}
598 622
599out_release: 623/**
600 pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc); 624 * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
625 * @cq: completion queue (ignored)
626 * @wc: completed WR
627 *
628 */
629static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
630{
631 struct ib_cqe *cqe = wc->wr_cqe;
632 struct rpcrdma_frwr *frwr =
633 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
634 struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
601 635
602 /* Unmap and release the MRs in the LOCAL_INV WRs that did not 636 /* WARNING: Only wr_cqe and status are reliable at this point */
603 * get posted. 637 trace_xprtrdma_wc_li_done(wc, frwr);
638 rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
639 __frwr_release_mr(wc, mr);
640}
641
642/**
643 * frwr_unmap_async - invalidate memory regions that were registered for @req
644 * @r_xprt: controlling transport instance
645 * @req: rpcrdma_req with a non-empty list of MRs to process
646 *
647 * This guarantees that registered MRs are properly fenced from the
648 * server before the RPC consumer accesses the data in them. It also
649 * ensures proper Send flow control: waking the next RPC waits until
650 * this RPC has relinquished all its Send Queue entries.
651 */
652void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
653{
654 struct ib_send_wr *first, *last, **prev;
655 const struct ib_send_wr *bad_wr;
656 struct rpcrdma_frwr *frwr;
657 struct rpcrdma_mr *mr;
658 int rc;
659
660 /* Chain the LOCAL_INV Work Requests and post them with
661 * a single ib_post_send() call.
662 */
663 frwr = NULL;
664 prev = &first;
665 while (!list_empty(&req->rl_registered)) {
666 mr = rpcrdma_mr_pop(&req->rl_registered);
667
668 trace_xprtrdma_mr_localinv(mr);
669 r_xprt->rx_stats.local_inv_needed++;
670
671 frwr = &mr->frwr;
672 frwr->fr_cqe.done = frwr_wc_localinv;
673 frwr->fr_req = req;
674 last = &frwr->fr_invwr;
675 last->next = NULL;
676 last->wr_cqe = &frwr->fr_cqe;
677 last->sg_list = NULL;
678 last->num_sge = 0;
679 last->opcode = IB_WR_LOCAL_INV;
680 last->send_flags = IB_SEND_SIGNALED;
681 last->ex.invalidate_rkey = mr->mr_handle;
682
683 *prev = last;
684 prev = &last->next;
685 }
686
687 /* Strong send queue ordering guarantees that when the
688 * last WR in the chain completes, all WRs in the chain
689 * are complete. The last completion will wake up the
690 * RPC waiter.
691 */
692 frwr->fr_cqe.done = frwr_wc_localinv_done;
693
694 /* Transport disconnect drains the receive CQ before it
695 * replaces the QP. The RPC reply handler won't call us
696 * unless ri_id->qp is a valid pointer.
697 */
698 bad_wr = NULL;
699 rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
700 trace_xprtrdma_post_send(req, rc);
701 if (!rc)
702 return;
703
704 /* Recycle MRs in the LOCAL_INV chain that did not get posted.
604 */ 705 */
605 while (bad_wr) { 706 while (bad_wr) {
606 frwr = container_of(bad_wr, struct rpcrdma_frwr, 707 frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
607 fr_invwr);
608 mr = container_of(frwr, struct rpcrdma_mr, frwr); 708 mr = container_of(frwr, struct rpcrdma_mr, frwr);
609 bad_wr = bad_wr->next; 709 bad_wr = bad_wr->next;
610 710
611 list_del_init(&mr->mr_list);
612 rpcrdma_mr_recycle(mr); 711 rpcrdma_mr_recycle(mr);
613 } 712 }
713
714 /* The final LOCAL_INV WR in the chain is supposed to
715 * do the wake. If it was never posted, the wake will
716 * not happen, so wake here in that case.
717 */
718 rpcrdma_complete_rqst(req->rl_reply);
614} 719}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 7dc62e55f526..4345e6912392 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -366,6 +366,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
366 unsigned int pos; 366 unsigned int pos;
367 int nsegs; 367 int nsegs;
368 368
369 if (rtype == rpcrdma_noch)
370 goto done;
371
369 pos = rqst->rq_snd_buf.head[0].iov_len; 372 pos = rqst->rq_snd_buf.head[0].iov_len;
370 if (rtype == rpcrdma_areadch) 373 if (rtype == rpcrdma_areadch)
371 pos = 0; 374 pos = 0;
@@ -389,7 +392,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
389 nsegs -= mr->mr_nents; 392 nsegs -= mr->mr_nents;
390 } while (nsegs); 393 } while (nsegs);
391 394
392 return 0; 395done:
396 return encode_item_not_present(xdr);
393} 397}
394 398
395/* Register and XDR encode the Write list. Supports encoding a list 399/* Register and XDR encode the Write list. Supports encoding a list
@@ -417,6 +421,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
417 int nsegs, nchunks; 421 int nsegs, nchunks;
418 __be32 *segcount; 422 __be32 *segcount;
419 423
424 if (wtype != rpcrdma_writech)
425 goto done;
426
420 seg = req->rl_segments; 427 seg = req->rl_segments;
421 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 428 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
422 rqst->rq_rcv_buf.head[0].iov_len, 429 rqst->rq_rcv_buf.head[0].iov_len,
@@ -451,7 +458,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
451 /* Update count of segments in this Write chunk */ 458 /* Update count of segments in this Write chunk */
452 *segcount = cpu_to_be32(nchunks); 459 *segcount = cpu_to_be32(nchunks);
453 460
454 return 0; 461done:
462 return encode_item_not_present(xdr);
455} 463}
456 464
457/* Register and XDR encode the Reply chunk. Supports encoding an array 465/* Register and XDR encode the Reply chunk. Supports encoding an array
@@ -476,6 +484,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
476 int nsegs, nchunks; 484 int nsegs, nchunks;
477 __be32 *segcount; 485 __be32 *segcount;
478 486
487 if (wtype != rpcrdma_replych)
488 return encode_item_not_present(xdr);
489
479 seg = req->rl_segments; 490 seg = req->rl_segments;
480 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); 491 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
481 if (nsegs < 0) 492 if (nsegs < 0)
@@ -511,6 +522,16 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
511 return 0; 522 return 0;
512} 523}
513 524
525static void rpcrdma_sendctx_done(struct kref *kref)
526{
527 struct rpcrdma_req *req =
528 container_of(kref, struct rpcrdma_req, rl_kref);
529 struct rpcrdma_rep *rep = req->rl_reply;
530
531 rpcrdma_complete_rqst(rep);
532 rep->rr_rxprt->rx_stats.reply_waits_for_send++;
533}
534
514/** 535/**
515 * rpcrdma_sendctx_unmap - DMA-unmap Send buffer 536 * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
516 * @sc: sendctx containing SGEs to unmap 537 * @sc: sendctx containing SGEs to unmap
@@ -520,6 +541,9 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
520{ 541{
521 struct ib_sge *sge; 542 struct ib_sge *sge;
522 543
544 if (!sc->sc_unmap_count)
545 return;
546
523 /* The first two SGEs contain the transport header and 547 /* The first two SGEs contain the transport header and
524 * the inline buffer. These are always left mapped so 548 * the inline buffer. These are always left mapped so
525 * they can be cheaply re-used. 549 * they can be cheaply re-used.
@@ -529,9 +553,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
529 ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length, 553 ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
530 DMA_TO_DEVICE); 554 DMA_TO_DEVICE);
531 555
532 if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, 556 kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
533 &sc->sc_req->rl_flags))
534 wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
535} 557}
536 558
537/* Prepare an SGE for the RPC-over-RDMA transport header. 559/* Prepare an SGE for the RPC-over-RDMA transport header.
@@ -666,7 +688,7 @@ map_tail:
666out: 688out:
667 sc->sc_wr.num_sge += sge_no; 689 sc->sc_wr.num_sge += sge_no;
668 if (sc->sc_unmap_count) 690 if (sc->sc_unmap_count)
669 __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); 691 kref_get(&req->rl_kref);
670 return true; 692 return true;
671 693
672out_regbuf: 694out_regbuf:
@@ -699,22 +721,28 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
699 struct rpcrdma_req *req, u32 hdrlen, 721 struct rpcrdma_req *req, u32 hdrlen,
700 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) 722 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
701{ 723{
724 int ret;
725
726 ret = -EAGAIN;
702 req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt); 727 req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
703 if (!req->rl_sendctx) 728 if (!req->rl_sendctx)
704 return -EAGAIN; 729 goto err;
705 req->rl_sendctx->sc_wr.num_sge = 0; 730 req->rl_sendctx->sc_wr.num_sge = 0;
706 req->rl_sendctx->sc_unmap_count = 0; 731 req->rl_sendctx->sc_unmap_count = 0;
707 req->rl_sendctx->sc_req = req; 732 req->rl_sendctx->sc_req = req;
708 __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); 733 kref_init(&req->rl_kref);
709 734
735 ret = -EIO;
710 if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen)) 736 if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
711 return -EIO; 737 goto err;
712
713 if (rtype != rpcrdma_areadch) 738 if (rtype != rpcrdma_areadch)
714 if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype)) 739 if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
715 return -EIO; 740 goto err;
716
717 return 0; 741 return 0;
742
743err:
744 trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
745 return ret;
718} 746}
719 747
720/** 748/**
@@ -842,50 +870,28 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
842 * send a Call message with a Position Zero Read chunk and a 870 * send a Call message with a Position Zero Read chunk and a
843 * regular Read chunk at the same time. 871 * regular Read chunk at the same time.
844 */ 872 */
845 if (rtype != rpcrdma_noch) { 873 ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
846 ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
847 if (ret)
848 goto out_err;
849 }
850 ret = encode_item_not_present(xdr);
851 if (ret) 874 if (ret)
852 goto out_err; 875 goto out_err;
853 876 ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
854 if (wtype == rpcrdma_writech) {
855 ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
856 if (ret)
857 goto out_err;
858 }
859 ret = encode_item_not_present(xdr);
860 if (ret) 877 if (ret)
861 goto out_err; 878 goto out_err;
862 879 ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
863 if (wtype != rpcrdma_replych)
864 ret = encode_item_not_present(xdr);
865 else
866 ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
867 if (ret) 880 if (ret)
868 goto out_err; 881 goto out_err;
869 882
870 trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype); 883 ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
871
872 ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
873 &rqst->rq_snd_buf, rtype); 884 &rqst->rq_snd_buf, rtype);
874 if (ret) 885 if (ret)
875 goto out_err; 886 goto out_err;
887
888 trace_xprtrdma_marshal(req, rtype, wtype);
876 return 0; 889 return 0;
877 890
878out_err: 891out_err:
879 trace_xprtrdma_marshal_failed(rqst, ret); 892 trace_xprtrdma_marshal_failed(rqst, ret);
880 switch (ret) { 893 r_xprt->rx_stats.failed_marshal_count++;
881 case -EAGAIN: 894 frwr_reset(req);
882 xprt_wait_for_buffer_space(rqst->rq_xprt);
883 break;
884 case -ENOBUFS:
885 break;
886 default:
887 r_xprt->rx_stats.failed_marshal_count++;
888 }
889 return ret; 895 return ret;
890} 896}
891 897
@@ -1269,51 +1275,17 @@ out_badheader:
1269 goto out; 1275 goto out;
1270} 1276}
1271 1277
1272void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) 1278static void rpcrdma_reply_done(struct kref *kref)
1273{
1274 /* Invalidate and unmap the data payloads before waking
1275 * the waiting application. This guarantees the memory
1276 * regions are properly fenced from the server before the
1277 * application accesses the data. It also ensures proper
1278 * send flow control: waking the next RPC waits until this
1279 * RPC has relinquished all its Send Queue entries.
1280 */
1281 if (!list_empty(&req->rl_registered))
1282 frwr_unmap_sync(r_xprt, &req->rl_registered);
1283
1284 /* Ensure that any DMA mapped pages associated with
1285 * the Send of the RPC Call have been unmapped before
1286 * allowing the RPC to complete. This protects argument
1287 * memory not controlled by the RPC client from being
1288 * re-used before we're done with it.
1289 */
1290 if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1291 r_xprt->rx_stats.reply_waits_for_send++;
1292 out_of_line_wait_on_bit(&req->rl_flags,
1293 RPCRDMA_REQ_F_TX_RESOURCES,
1294 bit_wait,
1295 TASK_UNINTERRUPTIBLE);
1296 }
1297}
1298
1299/* Reply handling runs in the poll worker thread. Anything that
1300 * might wait is deferred to a separate workqueue.
1301 */
1302void rpcrdma_deferred_completion(struct work_struct *work)
1303{ 1279{
1304 struct rpcrdma_rep *rep = 1280 struct rpcrdma_req *req =
1305 container_of(work, struct rpcrdma_rep, rr_work); 1281 container_of(kref, struct rpcrdma_req, rl_kref);
1306 struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
1307 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1308 1282
1309 trace_xprtrdma_defer_cmp(rep); 1283 rpcrdma_complete_rqst(req->rl_reply);
1310 if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1311 frwr_reminv(rep, &req->rl_registered);
1312 rpcrdma_release_rqst(r_xprt, req);
1313 rpcrdma_complete_rqst(rep);
1314} 1284}
1315 1285
1316/* Process received RPC/RDMA messages. 1286/**
1287 * rpcrdma_reply_handler - Process received RPC/RDMA messages
1288 * @rep: Incoming rpcrdma_rep object to process
1317 * 1289 *
1318 * Errors must result in the RPC task either being awakened, or 1290 * Errors must result in the RPC task either being awakened, or
1319 * allowed to timeout, to discover the errors at that time. 1291 * allowed to timeout, to discover the errors at that time.
@@ -1373,10 +1345,16 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
1373 } 1345 }
1374 req->rl_reply = rep; 1346 req->rl_reply = rep;
1375 rep->rr_rqst = rqst; 1347 rep->rr_rqst = rqst;
1376 clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
1377 1348
1378 trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); 1349 trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
1379 queue_work(buf->rb_completion_wq, &rep->rr_work); 1350
1351 if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1352 frwr_reminv(rep, &req->rl_registered);
1353 if (!list_empty(&req->rl_registered))
1354 frwr_unmap_async(r_xprt, req);
1355 /* LocalInv completion will complete the RPC */
1356 else
1357 kref_put(&req->rl_kref, rpcrdma_reply_done);
1380 return; 1358 return;
1381 1359
1382out_badversion: 1360out_badversion:
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 1f73a6a7e43c..4993aa49ecbe 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -298,6 +298,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
298 module_put(THIS_MODULE); 298 module_put(THIS_MODULE);
299} 299}
300 300
301/* 60 second timeout, no retries */
301static const struct rpc_timeout xprt_rdma_default_timeout = { 302static const struct rpc_timeout xprt_rdma_default_timeout = {
302 .to_initval = 60 * HZ, 303 .to_initval = 60 * HZ,
303 .to_maxval = 60 * HZ, 304 .to_maxval = 60 * HZ,
@@ -323,8 +324,9 @@ xprt_setup_rdma(struct xprt_create *args)
323 if (!xprt) 324 if (!xprt)
324 return ERR_PTR(-ENOMEM); 325 return ERR_PTR(-ENOMEM);
325 326
326 /* 60 second timeout, no retries */
327 xprt->timeout = &xprt_rdma_default_timeout; 327 xprt->timeout = &xprt_rdma_default_timeout;
328 xprt->connect_timeout = xprt->timeout->to_initval;
329 xprt->max_reconnect_timeout = xprt->timeout->to_maxval;
328 xprt->bind_timeout = RPCRDMA_BIND_TO; 330 xprt->bind_timeout = RPCRDMA_BIND_TO;
329 xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; 331 xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
330 xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO; 332 xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
@@ -487,31 +489,64 @@ xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
487} 489}
488 490
489/** 491/**
490 * xprt_rdma_connect - try to establish a transport connection 492 * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection
493 * @xprt: controlling transport instance
494 * @connect_timeout: reconnect timeout after client disconnects
495 * @reconnect_timeout: reconnect timeout after server disconnects
496 *
497 */
498static void xprt_rdma_tcp_set_connect_timeout(struct rpc_xprt *xprt,
499 unsigned long connect_timeout,
500 unsigned long reconnect_timeout)
501{
502 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
503
504 trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout);
505
506 spin_lock(&xprt->transport_lock);
507
508 if (connect_timeout < xprt->connect_timeout) {
509 struct rpc_timeout to;
510 unsigned long initval;
511
512 to = *xprt->timeout;
513 initval = connect_timeout;
514 if (initval < RPCRDMA_INIT_REEST_TO << 1)
515 initval = RPCRDMA_INIT_REEST_TO << 1;
516 to.to_initval = initval;
517 to.to_maxval = initval;
518 r_xprt->rx_timeout = to;
519 xprt->timeout = &r_xprt->rx_timeout;
520 xprt->connect_timeout = connect_timeout;
521 }
522
523 if (reconnect_timeout < xprt->max_reconnect_timeout)
524 xprt->max_reconnect_timeout = reconnect_timeout;
525
526 spin_unlock(&xprt->transport_lock);
527}
528
529/**
530 * xprt_rdma_connect - schedule an attempt to reconnect
491 * @xprt: transport state 531 * @xprt: transport state
492 * @task: RPC scheduler context 532 * @task: RPC scheduler context (unused)
493 * 533 *
494 */ 534 */
495static void 535static void
496xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) 536xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
497{ 537{
498 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 538 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
539 unsigned long delay;
499 540
500 trace_xprtrdma_op_connect(r_xprt); 541 trace_xprtrdma_op_connect(r_xprt);
542
543 delay = 0;
501 if (r_xprt->rx_ep.rep_connected != 0) { 544 if (r_xprt->rx_ep.rep_connected != 0) {
502 /* Reconnect */ 545 delay = xprt_reconnect_delay(xprt);
503 schedule_delayed_work(&r_xprt->rx_connect_worker, 546 xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
504 xprt->reestablish_timeout);
505 xprt->reestablish_timeout <<= 1;
506 if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
507 xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
508 else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
509 xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
510 } else {
511 schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
512 if (!RPC_IS_ASYNC(task))
513 flush_delayed_work(&r_xprt->rx_connect_worker);
514 } 547 }
548 queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker,
549 delay);
515} 550}
516 551
517/** 552/**
@@ -550,8 +585,11 @@ out_sleep:
550static void 585static void
551xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst) 586xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
552{ 587{
588 struct rpcrdma_xprt *r_xprt =
589 container_of(xprt, struct rpcrdma_xprt, rx_xprt);
590
553 memset(rqst, 0, sizeof(*rqst)); 591 memset(rqst, 0, sizeof(*rqst));
554 rpcrdma_buffer_put(rpcr_to_rdmar(rqst)); 592 rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
555 rpc_wake_up_next(&xprt->backlog); 593 rpc_wake_up_next(&xprt->backlog);
556} 594}
557 595
@@ -618,9 +656,16 @@ xprt_rdma_free(struct rpc_task *task)
618 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 656 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
619 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 657 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
620 658
621 if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
622 rpcrdma_release_rqst(r_xprt, req);
623 trace_xprtrdma_op_free(task, req); 659 trace_xprtrdma_op_free(task, req);
660
661 if (!list_empty(&req->rl_registered))
662 frwr_unmap_sync(r_xprt, req);
663
664 /* XXX: If the RPC is completing because of a signal and
665 * not because a reply was received, we ought to ensure
666 * that the Send completion has fired, so that memory
667 * involved with the Send is not still visible to the NIC.
668 */
624} 669}
625 670
626/** 671/**
@@ -667,7 +712,6 @@ xprt_rdma_send_request(struct rpc_rqst *rqst)
667 goto drop_connection; 712 goto drop_connection;
668 rqst->rq_xtime = ktime_get(); 713 rqst->rq_xtime = ktime_get();
669 714
670 __set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
671 if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) 715 if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
672 goto drop_connection; 716 goto drop_connection;
673 717
@@ -760,6 +804,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = {
760 .send_request = xprt_rdma_send_request, 804 .send_request = xprt_rdma_send_request,
761 .close = xprt_rdma_close, 805 .close = xprt_rdma_close,
762 .destroy = xprt_rdma_destroy, 806 .destroy = xprt_rdma_destroy,
807 .set_connect_timeout = xprt_rdma_tcp_set_connect_timeout,
763 .print_stats = xprt_rdma_print_stats, 808 .print_stats = xprt_rdma_print_stats,
764 .enable_swap = xprt_rdma_enable_swap, 809 .enable_swap = xprt_rdma_enable_swap,
765 .disable_swap = xprt_rdma_disable_swap, 810 .disable_swap = xprt_rdma_disable_swap,
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 84bb37924540..805b1f35e1ca 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -89,14 +89,12 @@ static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
89 */ 89 */
90static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) 90static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
91{ 91{
92 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
93 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 92 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
94 93
95 /* Flush Receives, then wait for deferred Reply work 94 /* Flush Receives, then wait for deferred Reply work
96 * to complete. 95 * to complete.
97 */ 96 */
98 ib_drain_rq(ia->ri_id->qp); 97 ib_drain_rq(ia->ri_id->qp);
99 drain_workqueue(buf->rb_completion_wq);
100 98
101 /* Deferred Reply processing might have scheduled 99 /* Deferred Reply processing might have scheduled
102 * local invalidations. 100 * local invalidations.
@@ -901,7 +899,7 @@ out_emptyq:
901 * completions recently. This is a sign the Send Queue is 899 * completions recently. This is a sign the Send Queue is
902 * backing up. Cause the caller to pause and try again. 900 * backing up. Cause the caller to pause and try again.
903 */ 901 */
904 set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags); 902 xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
905 r_xprt->rx_stats.empty_sendctx_q++; 903 r_xprt->rx_stats.empty_sendctx_q++;
906 return NULL; 904 return NULL;
907} 905}
@@ -936,10 +934,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
936 /* Paired with READ_ONCE */ 934 /* Paired with READ_ONCE */
937 smp_store_release(&buf->rb_sc_tail, next_tail); 935 smp_store_release(&buf->rb_sc_tail, next_tail);
938 936
939 if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) { 937 xprt_write_space(&sc->sc_xprt->rx_xprt);
940 smp_mb__after_atomic();
941 xprt_write_space(&sc->sc_xprt->rx_xprt);
942 }
943} 938}
944 939
945static void 940static void
@@ -977,8 +972,6 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
977 r_xprt->rx_stats.mrs_allocated += count; 972 r_xprt->rx_stats.mrs_allocated += count;
978 spin_unlock(&buf->rb_mrlock); 973 spin_unlock(&buf->rb_mrlock);
979 trace_xprtrdma_createmrs(r_xprt, count); 974 trace_xprtrdma_createmrs(r_xprt, count);
980
981 xprt_write_space(&r_xprt->rx_xprt);
982} 975}
983 976
984static void 977static void
@@ -990,6 +983,7 @@ rpcrdma_mr_refresh_worker(struct work_struct *work)
990 rx_buf); 983 rx_buf);
991 984
992 rpcrdma_mrs_create(r_xprt); 985 rpcrdma_mrs_create(r_xprt);
986 xprt_write_space(&r_xprt->rx_xprt);
993} 987}
994 988
995/** 989/**
@@ -1025,7 +1019,6 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
1025 if (!req->rl_recvbuf) 1019 if (!req->rl_recvbuf)
1026 goto out4; 1020 goto out4;
1027 1021
1028 req->rl_buffer = buffer;
1029 INIT_LIST_HEAD(&req->rl_registered); 1022 INIT_LIST_HEAD(&req->rl_registered);
1030 spin_lock(&buffer->rb_lock); 1023 spin_lock(&buffer->rb_lock);
1031 list_add(&req->rl_all, &buffer->rb_allreqs); 1024 list_add(&req->rl_all, &buffer->rb_allreqs);
@@ -1042,9 +1035,9 @@ out1:
1042 return NULL; 1035 return NULL;
1043} 1036}
1044 1037
1045static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) 1038static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
1039 bool temp)
1046{ 1040{
1047 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1048 struct rpcrdma_rep *rep; 1041 struct rpcrdma_rep *rep;
1049 1042
1050 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 1043 rep = kzalloc(sizeof(*rep), GFP_KERNEL);
@@ -1055,27 +1048,22 @@ static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp)
1055 DMA_FROM_DEVICE, GFP_KERNEL); 1048 DMA_FROM_DEVICE, GFP_KERNEL);
1056 if (!rep->rr_rdmabuf) 1049 if (!rep->rr_rdmabuf)
1057 goto out_free; 1050 goto out_free;
1051
1058 xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf), 1052 xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
1059 rdmab_length(rep->rr_rdmabuf)); 1053 rdmab_length(rep->rr_rdmabuf));
1060
1061 rep->rr_cqe.done = rpcrdma_wc_receive; 1054 rep->rr_cqe.done = rpcrdma_wc_receive;
1062 rep->rr_rxprt = r_xprt; 1055 rep->rr_rxprt = r_xprt;
1063 INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
1064 rep->rr_recv_wr.next = NULL; 1056 rep->rr_recv_wr.next = NULL;
1065 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 1057 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
1066 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1058 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1067 rep->rr_recv_wr.num_sge = 1; 1059 rep->rr_recv_wr.num_sge = 1;
1068 rep->rr_temp = temp; 1060 rep->rr_temp = temp;
1069 1061 return rep;
1070 spin_lock(&buf->rb_lock);
1071 list_add(&rep->rr_list, &buf->rb_recv_bufs);
1072 spin_unlock(&buf->rb_lock);
1073 return true;
1074 1062
1075out_free: 1063out_free:
1076 kfree(rep); 1064 kfree(rep);
1077out: 1065out:
1078 return false; 1066 return NULL;
1079} 1067}
1080 1068
1081/** 1069/**
@@ -1089,7 +1077,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1089 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1077 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1090 int i, rc; 1078 int i, rc;
1091 1079
1092 buf->rb_flags = 0;
1093 buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests; 1080 buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
1094 buf->rb_bc_srv_max_requests = 0; 1081 buf->rb_bc_srv_max_requests = 0;
1095 spin_lock_init(&buf->rb_mrlock); 1082 spin_lock_init(&buf->rb_mrlock);
@@ -1122,15 +1109,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1122 if (rc) 1109 if (rc)
1123 goto out; 1110 goto out;
1124 1111
1125 buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
1126 WQ_MEM_RECLAIM | WQ_HIGHPRI,
1127 0,
1128 r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
1129 if (!buf->rb_completion_wq) {
1130 rc = -ENOMEM;
1131 goto out;
1132 }
1133
1134 return 0; 1112 return 0;
1135out: 1113out:
1136 rpcrdma_buffer_destroy(buf); 1114 rpcrdma_buffer_destroy(buf);
@@ -1204,11 +1182,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1204{ 1182{
1205 cancel_delayed_work_sync(&buf->rb_refresh_worker); 1183 cancel_delayed_work_sync(&buf->rb_refresh_worker);
1206 1184
1207 if (buf->rb_completion_wq) {
1208 destroy_workqueue(buf->rb_completion_wq);
1209 buf->rb_completion_wq = NULL;
1210 }
1211
1212 rpcrdma_sendctxs_destroy(buf); 1185 rpcrdma_sendctxs_destroy(buf);
1213 1186
1214 while (!list_empty(&buf->rb_recv_bufs)) { 1187 while (!list_empty(&buf->rb_recv_bufs)) {
@@ -1325,13 +1298,12 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1325 1298
1326/** 1299/**
1327 * rpcrdma_buffer_put - Put request/reply buffers back into pool 1300 * rpcrdma_buffer_put - Put request/reply buffers back into pool
1301 * @buffers: buffer pool
1328 * @req: object to return 1302 * @req: object to return
1329 * 1303 *
1330 */ 1304 */
1331void 1305void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1332rpcrdma_buffer_put(struct rpcrdma_req *req)
1333{ 1306{
1334 struct rpcrdma_buffer *buffers = req->rl_buffer;
1335 struct rpcrdma_rep *rep = req->rl_reply; 1307 struct rpcrdma_rep *rep = req->rl_reply;
1336 1308
1337 req->rl_reply = NULL; 1309 req->rl_reply = NULL;
@@ -1484,8 +1456,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
1484 struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; 1456 struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
1485 int rc; 1457 int rc;
1486 1458
1487 if (!ep->rep_send_count || 1459 if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
1488 test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1489 send_wr->send_flags |= IB_SEND_SIGNALED; 1460 send_wr->send_flags |= IB_SEND_SIGNALED;
1490 ep->rep_send_count = ep->rep_send_batch; 1461 ep->rep_send_count = ep->rep_send_batch;
1491 } else { 1462 } else {
@@ -1505,11 +1476,13 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
1505{ 1476{
1506 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1477 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1507 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 1478 struct rpcrdma_ep *ep = &r_xprt->rx_ep;
1508 struct ib_recv_wr *wr, *bad_wr; 1479 struct ib_recv_wr *i, *wr, *bad_wr;
1480 struct rpcrdma_rep *rep;
1509 int needed, count, rc; 1481 int needed, count, rc;
1510 1482
1511 rc = 0; 1483 rc = 0;
1512 count = 0; 1484 count = 0;
1485
1513 needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); 1486 needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
1514 if (ep->rep_receive_count > needed) 1487 if (ep->rep_receive_count > needed)
1515 goto out; 1488 goto out;
@@ -1517,51 +1490,65 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
1517 if (!temp) 1490 if (!temp)
1518 needed += RPCRDMA_MAX_RECV_BATCH; 1491 needed += RPCRDMA_MAX_RECV_BATCH;
1519 1492
1520 count = 0; 1493 /* fast path: all needed reps can be found on the free list */
1521 wr = NULL; 1494 wr = NULL;
1495 spin_lock(&buf->rb_lock);
1522 while (needed) { 1496 while (needed) {
1523 struct rpcrdma_regbuf *rb;
1524 struct rpcrdma_rep *rep;
1525
1526 spin_lock(&buf->rb_lock);
1527 rep = list_first_entry_or_null(&buf->rb_recv_bufs, 1497 rep = list_first_entry_or_null(&buf->rb_recv_bufs,
1528 struct rpcrdma_rep, rr_list); 1498 struct rpcrdma_rep, rr_list);
1529 if (likely(rep)) 1499 if (!rep)
1530 list_del(&rep->rr_list); 1500 break;
1531 spin_unlock(&buf->rb_lock);
1532 if (!rep) {
1533 if (!rpcrdma_rep_create(r_xprt, temp))
1534 break;
1535 continue;
1536 }
1537 1501
1538 rb = rep->rr_rdmabuf; 1502 list_del(&rep->rr_list);
1539 if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) { 1503 rep->rr_recv_wr.next = wr;
1540 rpcrdma_recv_buffer_put(rep); 1504 wr = &rep->rr_recv_wr;
1505 --needed;
1506 }
1507 spin_unlock(&buf->rb_lock);
1508
1509 while (needed) {
1510 rep = rpcrdma_rep_create(r_xprt, temp);
1511 if (!rep)
1541 break; 1512 break;
1542 }
1543 1513
1544 trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
1545 rep->rr_recv_wr.next = wr; 1514 rep->rr_recv_wr.next = wr;
1546 wr = &rep->rr_recv_wr; 1515 wr = &rep->rr_recv_wr;
1547 ++count;
1548 --needed; 1516 --needed;
1549 } 1517 }
1550 if (!count) 1518 if (!wr)
1551 goto out; 1519 goto out;
1552 1520
1521 for (i = wr; i; i = i->next) {
1522 rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
1523
1524 if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
1525 goto release_wrs;
1526
1527 trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
1528 ++count;
1529 }
1530
1553 rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, 1531 rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
1554 (const struct ib_recv_wr **)&bad_wr); 1532 (const struct ib_recv_wr **)&bad_wr);
1533out:
1534 trace_xprtrdma_post_recvs(r_xprt, count, rc);
1555 if (rc) { 1535 if (rc) {
1556 for (wr = bad_wr; wr; wr = wr->next) { 1536 for (wr = bad_wr; wr;) {
1557 struct rpcrdma_rep *rep; 1537 struct rpcrdma_rep *rep;
1558 1538
1559 rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr); 1539 rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
1540 wr = wr->next;
1560 rpcrdma_recv_buffer_put(rep); 1541 rpcrdma_recv_buffer_put(rep);
1561 --count; 1542 --count;
1562 } 1543 }
1563 } 1544 }
1564 ep->rep_receive_count += count; 1545 ep->rep_receive_count += count;
1565out: 1546 return;
1566 trace_xprtrdma_post_recvs(r_xprt, count, rc); 1547
1548release_wrs:
1549 for (i = wr; i;) {
1550 rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
1551 i = i->next;
1552 rpcrdma_recv_buffer_put(rep);
1553 }
1567} 1554}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index d1e0749bcbc4..8378f45d2da7 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -44,7 +44,8 @@
44 44
45#include <linux/wait.h> /* wait_queue_head_t, etc */ 45#include <linux/wait.h> /* wait_queue_head_t, etc */
46#include <linux/spinlock.h> /* spinlock_t, etc */ 46#include <linux/spinlock.h> /* spinlock_t, etc */
47#include <linux/atomic.h> /* atomic_t, etc */ 47#include <linux/atomic.h> /* atomic_t, etc */
48#include <linux/kref.h> /* struct kref */
48#include <linux/workqueue.h> /* struct work_struct */ 49#include <linux/workqueue.h> /* struct work_struct */
49 50
50#include <rdma/rdma_cm.h> /* RDMA connection api */ 51#include <rdma/rdma_cm.h> /* RDMA connection api */
@@ -202,10 +203,9 @@ struct rpcrdma_rep {
202 bool rr_temp; 203 bool rr_temp;
203 struct rpcrdma_regbuf *rr_rdmabuf; 204 struct rpcrdma_regbuf *rr_rdmabuf;
204 struct rpcrdma_xprt *rr_rxprt; 205 struct rpcrdma_xprt *rr_rxprt;
205 struct work_struct rr_work; 206 struct rpc_rqst *rr_rqst;
206 struct xdr_buf rr_hdrbuf; 207 struct xdr_buf rr_hdrbuf;
207 struct xdr_stream rr_stream; 208 struct xdr_stream rr_stream;
208 struct rpc_rqst *rr_rqst;
209 struct list_head rr_list; 209 struct list_head rr_list;
210 struct ib_recv_wr rr_recv_wr; 210 struct ib_recv_wr rr_recv_wr;
211}; 211};
@@ -240,18 +240,12 @@ struct rpcrdma_sendctx {
240 * An external memory region is any buffer or page that is registered 240 * An external memory region is any buffer or page that is registered
241 * on the fly (ie, not pre-registered). 241 * on the fly (ie, not pre-registered).
242 */ 242 */
243enum rpcrdma_frwr_state { 243struct rpcrdma_req;
244 FRWR_IS_INVALID, /* ready to be used */
245 FRWR_IS_VALID, /* in use */
246 FRWR_FLUSHED_FR, /* flushed FASTREG WR */
247 FRWR_FLUSHED_LI, /* flushed LOCALINV WR */
248};
249
250struct rpcrdma_frwr { 244struct rpcrdma_frwr {
251 struct ib_mr *fr_mr; 245 struct ib_mr *fr_mr;
252 struct ib_cqe fr_cqe; 246 struct ib_cqe fr_cqe;
253 enum rpcrdma_frwr_state fr_state;
254 struct completion fr_linv_done; 247 struct completion fr_linv_done;
248 struct rpcrdma_req *fr_req;
255 union { 249 union {
256 struct ib_reg_wr fr_regwr; 250 struct ib_reg_wr fr_regwr;
257 struct ib_send_wr fr_invwr; 251 struct ib_send_wr fr_invwr;
@@ -326,7 +320,6 @@ struct rpcrdma_buffer;
326struct rpcrdma_req { 320struct rpcrdma_req {
327 struct list_head rl_list; 321 struct list_head rl_list;
328 struct rpc_rqst rl_slot; 322 struct rpc_rqst rl_slot;
329 struct rpcrdma_buffer *rl_buffer;
330 struct rpcrdma_rep *rl_reply; 323 struct rpcrdma_rep *rl_reply;
331 struct xdr_stream rl_stream; 324 struct xdr_stream rl_stream;
332 struct xdr_buf rl_hdrbuf; 325 struct xdr_buf rl_hdrbuf;
@@ -336,18 +329,12 @@ struct rpcrdma_req {
336 struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */ 329 struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */
337 330
338 struct list_head rl_all; 331 struct list_head rl_all;
339 unsigned long rl_flags; 332 struct kref rl_kref;
340 333
341 struct list_head rl_registered; /* registered segments */ 334 struct list_head rl_registered; /* registered segments */
342 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; 335 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
343}; 336};
344 337
345/* rl_flags */
346enum {
347 RPCRDMA_REQ_F_PENDING = 0,
348 RPCRDMA_REQ_F_TX_RESOURCES,
349};
350
351static inline struct rpcrdma_req * 338static inline struct rpcrdma_req *
352rpcr_to_rdmar(const struct rpc_rqst *rqst) 339rpcr_to_rdmar(const struct rpc_rqst *rqst)
353{ 340{
@@ -391,22 +378,15 @@ struct rpcrdma_buffer {
391 struct list_head rb_recv_bufs; 378 struct list_head rb_recv_bufs;
392 struct list_head rb_allreqs; 379 struct list_head rb_allreqs;
393 380
394 unsigned long rb_flags;
395 u32 rb_max_requests; 381 u32 rb_max_requests;
396 u32 rb_credits; /* most recent credit grant */ 382 u32 rb_credits; /* most recent credit grant */
397 383
398 u32 rb_bc_srv_max_requests; 384 u32 rb_bc_srv_max_requests;
399 u32 rb_bc_max_requests; 385 u32 rb_bc_max_requests;
400 386
401 struct workqueue_struct *rb_completion_wq;
402 struct delayed_work rb_refresh_worker; 387 struct delayed_work rb_refresh_worker;
403}; 388};
404 389
405/* rb_flags */
406enum {
407 RPCRDMA_BUF_F_EMPTY_SCQ = 0,
408};
409
410/* 390/*
411 * Statistics for RPCRDMA 391 * Statistics for RPCRDMA
412 */ 392 */
@@ -452,6 +432,7 @@ struct rpcrdma_xprt {
452 struct rpcrdma_ep rx_ep; 432 struct rpcrdma_ep rx_ep;
453 struct rpcrdma_buffer rx_buf; 433 struct rpcrdma_buffer rx_buf;
454 struct delayed_work rx_connect_worker; 434 struct delayed_work rx_connect_worker;
435 struct rpc_timeout rx_timeout;
455 struct rpcrdma_stats rx_stats; 436 struct rpcrdma_stats rx_stats;
456}; 437};
457 438
@@ -518,7 +499,8 @@ rpcrdma_mr_recycle(struct rpcrdma_mr *mr)
518} 499}
519 500
520struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); 501struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
521void rpcrdma_buffer_put(struct rpcrdma_req *); 502void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
503 struct rpcrdma_req *req);
522void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); 504void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
523 505
524bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, 506bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,
@@ -564,6 +546,7 @@ rpcrdma_data_dir(bool writing)
564/* Memory registration calls xprtrdma/frwr_ops.c 546/* Memory registration calls xprtrdma/frwr_ops.c
565 */ 547 */
566bool frwr_is_supported(struct ib_device *device); 548bool frwr_is_supported(struct ib_device *device);
549void frwr_reset(struct rpcrdma_req *req);
567int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep); 550int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep);
568int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr); 551int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
569void frwr_release_mr(struct rpcrdma_mr *mr); 552void frwr_release_mr(struct rpcrdma_mr *mr);
@@ -574,8 +557,8 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
574 struct rpcrdma_mr **mr); 557 struct rpcrdma_mr **mr);
575int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req); 558int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
576void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs); 559void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
577void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, 560void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
578 struct list_head *mrs); 561void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
579 562
580/* 563/*
581 * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c 564 * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
@@ -598,9 +581,6 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
598void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); 581void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
599void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); 582void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
600void rpcrdma_reply_handler(struct rpcrdma_rep *rep); 583void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
601void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
602 struct rpcrdma_req *req);
603void rpcrdma_deferred_completion(struct work_struct *work);
604 584
605static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) 585static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
606{ 586{
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 97c15d47f343..3c2cc96afcaa 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2414,25 +2414,6 @@ out:
2414 xprt_wake_pending_tasks(xprt, status); 2414 xprt_wake_pending_tasks(xprt, status);
2415} 2415}
2416 2416
2417static unsigned long xs_reconnect_delay(const struct rpc_xprt *xprt)
2418{
2419 unsigned long start, now = jiffies;
2420
2421 start = xprt->stat.connect_start + xprt->reestablish_timeout;
2422 if (time_after(start, now))
2423 return start - now;
2424 return 0;
2425}
2426
2427static void xs_reconnect_backoff(struct rpc_xprt *xprt)
2428{
2429 xprt->reestablish_timeout <<= 1;
2430 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
2431 xprt->reestablish_timeout = xprt->max_reconnect_timeout;
2432 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2433 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2434}
2435
2436/** 2417/**
2437 * xs_connect - connect a socket to a remote endpoint 2418 * xs_connect - connect a socket to a remote endpoint
2438 * @xprt: pointer to transport structure 2419 * @xprt: pointer to transport structure
@@ -2462,8 +2443,8 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
2462 /* Start by resetting any existing state */ 2443 /* Start by resetting any existing state */
2463 xs_reset_transport(transport); 2444 xs_reset_transport(transport);
2464 2445
2465 delay = xs_reconnect_delay(xprt); 2446 delay = xprt_reconnect_delay(xprt);
2466 xs_reconnect_backoff(xprt); 2447 xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO);
2467 2448
2468 } else 2449 } else
2469 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); 2450 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);