summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2017-10-20 10:48:12 -0400
committerAnna Schumaker <Anna.Schumaker@Netapp.com>2017-11-17 13:47:56 -0500
commitae72950abf99fb250aca972b3451b6e06a096c68 (patch)
tree101982d10909e16b45ff8491881e6490b5b18635 /net/sunrpc
parenta062a2a3efc5fece106d96d4a5165f3f23b5cbda (diff)
xprtrdma: Add data structure to manage RDMA Send arguments
Problem statement: Recently Sagi Grimberg <sagi@grimberg.me> observed that kernel RDMA- enabled storage initiators don't handle delayed Send completion correctly. If Send completion is delayed beyond the end of a ULP transaction, the ULP may release resources that are still being used by the HCA to complete a long-running Send operation. This is a common design trait amongst our initiators. Most Send operations are faster than the ULP transaction they are part of. Waiting for a completion for these is typically unnecessary. Infrequently, a network partition or some other problem crops up where an ordering problem can occur. In NFS parlance, the RPC Reply arrives and completes the RPC, but the HCA is still retrying the Send WR that conveyed the RPC Call. In this case, the HCA can try to use memory that has been invalidated or DMA unmapped, and the connection is lost. If that memory has been re-used for something else (possibly not related to NFS), and the Send retransmission exposes that data on the wire. Thus we cannot assume that it is safe to release Send-related resources just because a ULP reply has arrived. After some analysis, we have determined that the completion housekeeping will not be difficult for xprtrdma: - Inline Send buffers are registered via the local DMA key, and are already left DMA mapped for the lifetime of a transport connection, thus no additional handling is necessary for those - Gathered Sends involving page cache pages _will_ need to DMA unmap those pages after the Send completes. But like inline send buffers, they are registered via the local DMA key, and thus will not need to be invalidated In addition, RPC completion will need to wait for Send completion in the latter case. However, nearly always, the Send that conveys the RPC Call will have completed long before the RPC Reply arrives, and thus no additional latency will be accrued. Design notes: In this patch, the rpcrdma_sendctx object is introduced, and a lock-free circular queue is added to manage a set of them per transport. The RPC client's send path already prevents sending more than one RPC Call at the same time. This allows us to treat the consumer side of the queue (rpcrdma_sendctx_get_locked) as if there is a single consumer thread. The producer side of the queue (rpcrdma_sendctx_put_locked) is invoked only from the Send completion handler, which is a single thread of execution (soft IRQ). The only care that needs to be taken is with the tail index, which is shared between the producer and consumer. Only the producer updates the tail index. The consumer compares the head with the tail to ensure that the a sendctx that is in use is never handed out again (or, expressed more conventionally, the queue is empty). When the sendctx queue empties completely, there are enough Sends outstanding that posting more Send operations can result in a Send Queue overflow. In this case, the ULP is told to wait and try again. This introduces strong Send Queue accounting to xprtrdma. As a final touch, Jason Gunthorpe <jgunthorpe@obsidianresearch.com> suggested a mechanism that does not require signaling every Send. We signal once every N Sends, and perform SGE unmapping of N Send operations during that one completion. Reported-by: Sagi Grimberg <sagi@grimberg.me> Suggested-by: Jason Gunthorpe <jgunthorpe@obsidianresearch.com> Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c40
-rw-r--r--net/sunrpc/xprtrdma/transport.c6
-rw-r--r--net/sunrpc/xprtrdma/verbs.c195
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h38
4 files changed, 247 insertions, 32 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 7fd102960a81..9951c81b82ed 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -512,23 +512,26 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
512} 512}
513 513
514/** 514/**
515 * rpcrdma_unmap_sges - DMA-unmap Send buffers 515 * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
516 * @ia: interface adapter (device) 516 * @sc: sendctx containing SGEs to unmap
517 * @req: req with possibly some SGEs to be DMA unmapped
518 * 517 *
519 */ 518 */
520void 519void
521rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req) 520rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
522{ 521{
522 struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
523 struct ib_sge *sge; 523 struct ib_sge *sge;
524 unsigned int count; 524 unsigned int count;
525 525
526 dprintk("RPC: %s: unmapping %u sges for sc=%p\n",
527 __func__, sc->sc_unmap_count, sc);
528
526 /* The first two SGEs contain the transport header and 529 /* The first two SGEs contain the transport header and
527 * the inline buffer. These are always left mapped so 530 * the inline buffer. These are always left mapped so
528 * they can be cheaply re-used. 531 * they can be cheaply re-used.
529 */ 532 */
530 sge = &req->rl_send_sge[2]; 533 sge = &sc->sc_sges[2];
531 for (count = req->rl_mapped_sges; count--; sge++) 534 for (count = sc->sc_unmap_count; count; ++sge, --count)
532 ib_dma_unmap_page(ia->ri_device, 535 ib_dma_unmap_page(ia->ri_device,
533 sge->addr, sge->length, DMA_TO_DEVICE); 536 sge->addr, sge->length, DMA_TO_DEVICE);
534} 537}
@@ -539,8 +542,9 @@ static bool
539rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req, 542rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
540 u32 len) 543 u32 len)
541{ 544{
545 struct rpcrdma_sendctx *sc = req->rl_sendctx;
542 struct rpcrdma_regbuf *rb = req->rl_rdmabuf; 546 struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
543 struct ib_sge *sge = &req->rl_send_sge[0]; 547 struct ib_sge *sge = sc->sc_sges;
544 548
545 if (!rpcrdma_dma_map_regbuf(ia, rb)) 549 if (!rpcrdma_dma_map_regbuf(ia, rb))
546 goto out_regbuf; 550 goto out_regbuf;
@@ -550,7 +554,7 @@ rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
550 554
551 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, 555 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
552 sge->length, DMA_TO_DEVICE); 556 sge->length, DMA_TO_DEVICE);
553 req->rl_send_wr.num_sge++; 557 sc->sc_wr.num_sge++;
554 return true; 558 return true;
555 559
556out_regbuf: 560out_regbuf:
@@ -565,10 +569,11 @@ static bool
565rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, 569rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
566 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) 570 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
567{ 571{
572 struct rpcrdma_sendctx *sc = req->rl_sendctx;
568 unsigned int sge_no, page_base, len, remaining; 573 unsigned int sge_no, page_base, len, remaining;
569 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 574 struct rpcrdma_regbuf *rb = req->rl_sendbuf;
570 struct ib_device *device = ia->ri_device; 575 struct ib_device *device = ia->ri_device;
571 struct ib_sge *sge = req->rl_send_sge; 576 struct ib_sge *sge = sc->sc_sges;
572 u32 lkey = ia->ri_pd->local_dma_lkey; 577 u32 lkey = ia->ri_pd->local_dma_lkey;
573 struct page *page, **ppages; 578 struct page *page, **ppages;
574 579
@@ -631,7 +636,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
631 sge[sge_no].length = len; 636 sge[sge_no].length = len;
632 sge[sge_no].lkey = lkey; 637 sge[sge_no].lkey = lkey;
633 638
634 req->rl_mapped_sges++; 639 sc->sc_unmap_count++;
635 ppages++; 640 ppages++;
636 remaining -= len; 641 remaining -= len;
637 page_base = 0; 642 page_base = 0;
@@ -657,11 +662,11 @@ map_tail:
657 goto out_mapping_err; 662 goto out_mapping_err;
658 sge[sge_no].length = len; 663 sge[sge_no].length = len;
659 sge[sge_no].lkey = lkey; 664 sge[sge_no].lkey = lkey;
660 req->rl_mapped_sges++; 665 sc->sc_unmap_count++;
661 } 666 }
662 667
663out: 668out:
664 req->rl_send_wr.num_sge += sge_no; 669 sc->sc_wr.num_sge += sge_no;
665 return true; 670 return true;
666 671
667out_regbuf: 672out_regbuf:
@@ -669,12 +674,12 @@ out_regbuf:
669 return false; 674 return false;
670 675
671out_mapping_overflow: 676out_mapping_overflow:
672 rpcrdma_unmap_sges(ia, req); 677 rpcrdma_unmap_sendctx(sc);
673 pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no); 678 pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
674 return false; 679 return false;
675 680
676out_mapping_err: 681out_mapping_err:
677 rpcrdma_unmap_sges(ia, req); 682 rpcrdma_unmap_sendctx(sc);
678 pr_err("rpcrdma: Send mapping error\n"); 683 pr_err("rpcrdma: Send mapping error\n");
679 return false; 684 return false;
680} 685}
@@ -694,8 +699,11 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
694 struct rpcrdma_req *req, u32 hdrlen, 699 struct rpcrdma_req *req, u32 hdrlen,
695 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) 700 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
696{ 701{
697 req->rl_send_wr.num_sge = 0; 702 req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
698 req->rl_mapped_sges = 0; 703 if (!req->rl_sendctx)
704 return -ENOBUFS;
705 req->rl_sendctx->sc_wr.num_sge = 0;
706 req->rl_sendctx->sc_unmap_count = 0;
699 707
700 if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen)) 708 if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
701 return -EIO; 709 return -EIO;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index eb46d2479b09..7be6e2519197 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -687,7 +687,6 @@ xprt_rdma_free(struct rpc_task *task)
687 687
688 if (!list_empty(&req->rl_registered)) 688 if (!list_empty(&req->rl_registered))
689 ia->ri_ops->ro_unmap_sync(r_xprt, &req->rl_registered); 689 ia->ri_ops->ro_unmap_sync(r_xprt, &req->rl_registered);
690 rpcrdma_unmap_sges(ia, req);
691 rpcrdma_buffer_put(req); 690 rpcrdma_buffer_put(req);
692} 691}
693 692
@@ -790,11 +789,12 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
790 r_xprt->rx_stats.failed_marshal_count, 789 r_xprt->rx_stats.failed_marshal_count,
791 r_xprt->rx_stats.bad_reply_count, 790 r_xprt->rx_stats.bad_reply_count,
792 r_xprt->rx_stats.nomsg_call_count); 791 r_xprt->rx_stats.nomsg_call_count);
793 seq_printf(seq, "%lu %lu %lu %lu\n", 792 seq_printf(seq, "%lu %lu %lu %lu %lu\n",
794 r_xprt->rx_stats.mrs_recovered, 793 r_xprt->rx_stats.mrs_recovered,
795 r_xprt->rx_stats.mrs_orphaned, 794 r_xprt->rx_stats.mrs_orphaned,
796 r_xprt->rx_stats.mrs_allocated, 795 r_xprt->rx_stats.mrs_allocated,
797 r_xprt->rx_stats.local_inv_needed); 796 r_xprt->rx_stats.local_inv_needed,
797 r_xprt->rx_stats.empty_sendctx_q);
798} 798}
799 799
800static int 800static int
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 247b00b715c2..1bf7b1ee5699 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -52,6 +52,8 @@
52#include <linux/prefetch.h> 52#include <linux/prefetch.h>
53#include <linux/sunrpc/addr.h> 53#include <linux/sunrpc/addr.h>
54#include <linux/sunrpc/svc_rdma.h> 54#include <linux/sunrpc/svc_rdma.h>
55
56#include <asm-generic/barrier.h>
55#include <asm/bitops.h> 57#include <asm/bitops.h>
56 58
57#include <rdma/ib_cm.h> 59#include <rdma/ib_cm.h>
@@ -126,11 +128,17 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
126static void 128static void
127rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 129rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
128{ 130{
131 struct ib_cqe *cqe = wc->wr_cqe;
132 struct rpcrdma_sendctx *sc =
133 container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
134
129 /* WARNING: Only wr_cqe and status are reliable at this point */ 135 /* WARNING: Only wr_cqe and status are reliable at this point */
130 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 136 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
131 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 137 pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
132 ib_wc_status_msg(wc->status), 138 ib_wc_status_msg(wc->status),
133 wc->status, wc->vendor_err); 139 wc->status, wc->vendor_err);
140
141 rpcrdma_sendctx_put_locked(sc);
134} 142}
135 143
136/** 144/**
@@ -542,6 +550,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
542 ep->rep_attr.cap.max_recv_sge); 550 ep->rep_attr.cap.max_recv_sge);
543 551
544 /* set trigger for requesting send completion */ 552 /* set trigger for requesting send completion */
553 ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
554 cdata->max_requests >> 2);
555 ep->rep_send_count = ep->rep_send_batch;
545 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 556 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
546 if (ep->rep_cqinit <= 2) 557 if (ep->rep_cqinit <= 2)
547 ep->rep_cqinit = 0; /* always signal? */ 558 ep->rep_cqinit = 0; /* always signal? */
@@ -824,6 +835,168 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
824 ib_drain_qp(ia->ri_id->qp); 835 ib_drain_qp(ia->ri_id->qp);
825} 836}
826 837
838/* Fixed-size circular FIFO queue. This implementation is wait-free and
839 * lock-free.
840 *
841 * Consumer is the code path that posts Sends. This path dequeues a
842 * sendctx for use by a Send operation. Multiple consumer threads
843 * are serialized by the RPC transport lock, which allows only one
844 * ->send_request call at a time.
845 *
846 * Producer is the code path that handles Send completions. This path
847 * enqueues a sendctx that has been completed. Multiple producer
848 * threads are serialized by the ib_poll_cq() function.
849 */
850
851/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
852 * queue activity, and ib_drain_qp has flushed all remaining Send
853 * requests.
854 */
855static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
856{
857 unsigned long i;
858
859 for (i = 0; i <= buf->rb_sc_last; i++)
860 kfree(buf->rb_sc_ctxs[i]);
861 kfree(buf->rb_sc_ctxs);
862}
863
864static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
865{
866 struct rpcrdma_sendctx *sc;
867
868 sc = kzalloc(sizeof(*sc) +
869 ia->ri_max_send_sges * sizeof(struct ib_sge),
870 GFP_KERNEL);
871 if (!sc)
872 return NULL;
873
874 sc->sc_wr.wr_cqe = &sc->sc_cqe;
875 sc->sc_wr.sg_list = sc->sc_sges;
876 sc->sc_wr.opcode = IB_WR_SEND;
877 sc->sc_cqe.done = rpcrdma_wc_send;
878 return sc;
879}
880
881static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
882{
883 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
884 struct rpcrdma_sendctx *sc;
885 unsigned long i;
886
887 /* Maximum number of concurrent outstanding Send WRs. Capping
888 * the circular queue size stops Send Queue overflow by causing
889 * the ->send_request call to fail temporarily before too many
890 * Sends are posted.
891 */
892 i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
893 dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i);
894 buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
895 if (!buf->rb_sc_ctxs)
896 return -ENOMEM;
897
898 buf->rb_sc_last = i - 1;
899 for (i = 0; i <= buf->rb_sc_last; i++) {
900 sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
901 if (!sc)
902 goto out_destroy;
903
904 sc->sc_xprt = r_xprt;
905 buf->rb_sc_ctxs[i] = sc;
906 }
907
908 return 0;
909
910out_destroy:
911 rpcrdma_sendctxs_destroy(buf);
912 return -ENOMEM;
913}
914
915/* The sendctx queue is not guaranteed to have a size that is a
916 * power of two, thus the helpers in circ_buf.h cannot be used.
917 * The other option is to use modulus (%), which can be expensive.
918 */
919static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
920 unsigned long item)
921{
922 return likely(item < buf->rb_sc_last) ? item + 1 : 0;
923}
924
925/**
926 * rpcrdma_sendctx_get_locked - Acquire a send context
927 * @buf: transport buffers from which to acquire an unused context
928 *
929 * Returns pointer to a free send completion context; or NULL if
930 * the queue is empty.
931 *
932 * Usage: Called to acquire an SGE array before preparing a Send WR.
933 *
934 * The caller serializes calls to this function (per rpcrdma_buffer),
935 * and provides an effective memory barrier that flushes the new value
936 * of rb_sc_head.
937 */
938struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
939{
940 struct rpcrdma_xprt *r_xprt;
941 struct rpcrdma_sendctx *sc;
942 unsigned long next_head;
943
944 next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
945
946 if (next_head == READ_ONCE(buf->rb_sc_tail))
947 goto out_emptyq;
948
949 /* ORDER: item must be accessed _before_ head is updated */
950 sc = buf->rb_sc_ctxs[next_head];
951
952 /* Releasing the lock in the caller acts as a memory
953 * barrier that flushes rb_sc_head.
954 */
955 buf->rb_sc_head = next_head;
956
957 return sc;
958
959out_emptyq:
960 /* The queue is "empty" if there have not been enough Send
961 * completions recently. This is a sign the Send Queue is
962 * backing up. Cause the caller to pause and try again.
963 */
964 dprintk("RPC: %s: empty sendctx queue\n", __func__);
965 r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
966 r_xprt->rx_stats.empty_sendctx_q++;
967 return NULL;
968}
969
970/**
971 * rpcrdma_sendctx_put_locked - Release a send context
972 * @sc: send context to release
973 *
974 * Usage: Called from Send completion to return a sendctxt
975 * to the queue.
976 *
977 * The caller serializes calls to this function (per rpcrdma_buffer).
978 */
979void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
980{
981 struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
982 unsigned long next_tail;
983
984 /* Unmap SGEs of previously completed by unsignaled
985 * Sends by walking up the queue until @sc is found.
986 */
987 next_tail = buf->rb_sc_tail;
988 do {
989 next_tail = rpcrdma_sendctx_next(buf, next_tail);
990
991 /* ORDER: item must be accessed _before_ tail is updated */
992 rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
993
994 } while (buf->rb_sc_ctxs[next_tail] != sc);
995
996 /* Paired with READ_ONCE */
997 smp_store_release(&buf->rb_sc_tail, next_tail);
998}
999
827static void 1000static void
828rpcrdma_mr_recovery_worker(struct work_struct *work) 1001rpcrdma_mr_recovery_worker(struct work_struct *work)
829{ 1002{
@@ -919,13 +1092,8 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
919 spin_lock(&buffer->rb_reqslock); 1092 spin_lock(&buffer->rb_reqslock);
920 list_add(&req->rl_all, &buffer->rb_allreqs); 1093 list_add(&req->rl_all, &buffer->rb_allreqs);
921 spin_unlock(&buffer->rb_reqslock); 1094 spin_unlock(&buffer->rb_reqslock);
922 req->rl_cqe.done = rpcrdma_wc_send;
923 req->rl_buffer = &r_xprt->rx_buf; 1095 req->rl_buffer = &r_xprt->rx_buf;
924 INIT_LIST_HEAD(&req->rl_registered); 1096 INIT_LIST_HEAD(&req->rl_registered);
925 req->rl_send_wr.next = NULL;
926 req->rl_send_wr.wr_cqe = &req->rl_cqe;
927 req->rl_send_wr.sg_list = req->rl_send_sge;
928 req->rl_send_wr.opcode = IB_WR_SEND;
929 return req; 1097 return req;
930} 1098}
931 1099
@@ -1017,6 +1185,10 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1017 list_add(&rep->rr_list, &buf->rb_recv_bufs); 1185 list_add(&rep->rr_list, &buf->rb_recv_bufs);
1018 } 1186 }
1019 1187
1188 rc = rpcrdma_sendctxs_create(r_xprt);
1189 if (rc)
1190 goto out;
1191
1020 return 0; 1192 return 0;
1021out: 1193out:
1022 rpcrdma_buffer_destroy(buf); 1194 rpcrdma_buffer_destroy(buf);
@@ -1093,6 +1265,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1093 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1265 cancel_delayed_work_sync(&buf->rb_recovery_worker);
1094 cancel_delayed_work_sync(&buf->rb_refresh_worker); 1266 cancel_delayed_work_sync(&buf->rb_refresh_worker);
1095 1267
1268 rpcrdma_sendctxs_destroy(buf);
1269
1096 while (!list_empty(&buf->rb_recv_bufs)) { 1270 while (!list_empty(&buf->rb_recv_bufs)) {
1097 struct rpcrdma_rep *rep; 1271 struct rpcrdma_rep *rep;
1098 1272
@@ -1208,7 +1382,6 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
1208 struct rpcrdma_buffer *buffers = req->rl_buffer; 1382 struct rpcrdma_buffer *buffers = req->rl_buffer;
1209 struct rpcrdma_rep *rep = req->rl_reply; 1383 struct rpcrdma_rep *rep = req->rl_reply;
1210 1384
1211 req->rl_send_wr.num_sge = 0;
1212 req->rl_reply = NULL; 1385 req->rl_reply = NULL;
1213 1386
1214 spin_lock(&buffers->rb_lock); 1387 spin_lock(&buffers->rb_lock);
@@ -1340,7 +1513,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
1340 struct rpcrdma_ep *ep, 1513 struct rpcrdma_ep *ep,
1341 struct rpcrdma_req *req) 1514 struct rpcrdma_req *req)
1342{ 1515{
1343 struct ib_send_wr *send_wr = &req->rl_send_wr; 1516 struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
1344 struct ib_send_wr *send_wr_fail; 1517 struct ib_send_wr *send_wr_fail;
1345 int rc; 1518 int rc;
1346 1519
@@ -1354,7 +1527,13 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
1354 dprintk("RPC: %s: posting %d s/g entries\n", 1527 dprintk("RPC: %s: posting %d s/g entries\n",
1355 __func__, send_wr->num_sge); 1528 __func__, send_wr->num_sge);
1356 1529
1357 rpcrdma_set_signaled(ep, send_wr); 1530 if (!ep->rep_send_count) {
1531 send_wr->send_flags |= IB_SEND_SIGNALED;
1532 ep->rep_send_count = ep->rep_send_batch;
1533 } else {
1534 send_wr->send_flags &= ~IB_SEND_SIGNALED;
1535 --ep->rep_send_count;
1536 }
1358 rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); 1537 rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
1359 if (rc) 1538 if (rc)
1360 goto out_postsend_err; 1539 goto out_postsend_err;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 0b8ca5e5c706..537cfabe47d1 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -93,6 +93,8 @@ enum {
93 */ 93 */
94 94
95struct rpcrdma_ep { 95struct rpcrdma_ep {
96 unsigned int rep_send_count;
97 unsigned int rep_send_batch;
96 atomic_t rep_cqcount; 98 atomic_t rep_cqcount;
97 int rep_cqinit; 99 int rep_cqinit;
98 int rep_connected; 100 int rep_connected;
@@ -232,6 +234,27 @@ struct rpcrdma_rep {
232 struct ib_recv_wr rr_recv_wr; 234 struct ib_recv_wr rr_recv_wr;
233}; 235};
234 236
237/* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
238 */
239struct rpcrdma_xprt;
240struct rpcrdma_sendctx {
241 struct ib_send_wr sc_wr;
242 struct ib_cqe sc_cqe;
243 struct rpcrdma_xprt *sc_xprt;
244 unsigned int sc_unmap_count;
245 struct ib_sge sc_sges[];
246};
247
248/* Limit the number of SGEs that can be unmapped during one
249 * Send completion. This caps the amount of work a single
250 * completion can do before returning to the provider.
251 *
252 * Setting this to zero disables Send completion batching.
253 */
254enum {
255 RPCRDMA_MAX_SEND_BATCH = 7,
256};
257
235/* 258/*
236 * struct rpcrdma_mw - external memory region metadata 259 * struct rpcrdma_mw - external memory region metadata
237 * 260 *
@@ -343,19 +366,16 @@ enum {
343struct rpcrdma_buffer; 366struct rpcrdma_buffer;
344struct rpcrdma_req { 367struct rpcrdma_req {
345 struct list_head rl_list; 368 struct list_head rl_list;
346 unsigned int rl_mapped_sges;
347 unsigned int rl_connect_cookie; 369 unsigned int rl_connect_cookie;
348 struct rpcrdma_buffer *rl_buffer; 370 struct rpcrdma_buffer *rl_buffer;
349 struct rpcrdma_rep *rl_reply; 371 struct rpcrdma_rep *rl_reply;
350 struct xdr_stream rl_stream; 372 struct xdr_stream rl_stream;
351 struct xdr_buf rl_hdrbuf; 373 struct xdr_buf rl_hdrbuf;
352 struct ib_send_wr rl_send_wr; 374 struct rpcrdma_sendctx *rl_sendctx;
353 struct ib_sge rl_send_sge[RPCRDMA_MAX_SEND_SGES];
354 struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */ 375 struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
355 struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */ 376 struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */
356 struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */ 377 struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */
357 378
358 struct ib_cqe rl_cqe;
359 struct list_head rl_all; 379 struct list_head rl_all;
360 bool rl_backchannel; 380 bool rl_backchannel;
361 381
@@ -402,6 +422,11 @@ struct rpcrdma_buffer {
402 struct list_head rb_mws; 422 struct list_head rb_mws;
403 struct list_head rb_all; 423 struct list_head rb_all;
404 424
425 unsigned long rb_sc_head;
426 unsigned long rb_sc_tail;
427 unsigned long rb_sc_last;
428 struct rpcrdma_sendctx **rb_sc_ctxs;
429
405 spinlock_t rb_lock; /* protect buf lists */ 430 spinlock_t rb_lock; /* protect buf lists */
406 int rb_send_count, rb_recv_count; 431 int rb_send_count, rb_recv_count;
407 struct list_head rb_send_bufs; 432 struct list_head rb_send_bufs;
@@ -456,6 +481,7 @@ struct rpcrdma_stats {
456 unsigned long mrs_recovered; 481 unsigned long mrs_recovered;
457 unsigned long mrs_orphaned; 482 unsigned long mrs_orphaned;
458 unsigned long mrs_allocated; 483 unsigned long mrs_allocated;
484 unsigned long empty_sendctx_q;
459 485
460 /* accessed when receiving a reply */ 486 /* accessed when receiving a reply */
461 unsigned long long total_rdma_reply; 487 unsigned long long total_rdma_reply;
@@ -557,6 +583,8 @@ struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
557void rpcrdma_destroy_req(struct rpcrdma_req *); 583void rpcrdma_destroy_req(struct rpcrdma_req *);
558int rpcrdma_buffer_create(struct rpcrdma_xprt *); 584int rpcrdma_buffer_create(struct rpcrdma_xprt *);
559void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); 585void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
586struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf);
587void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
560 588
561struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *); 589struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
562void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *); 590void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
@@ -617,7 +645,7 @@ int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
617 struct rpcrdma_req *req, u32 hdrlen, 645 struct rpcrdma_req *req, u32 hdrlen,
618 struct xdr_buf *xdr, 646 struct xdr_buf *xdr,
619 enum rpcrdma_chunktype rtype); 647 enum rpcrdma_chunktype rtype);
620void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *); 648void rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc);
621int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); 649int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
622void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); 650void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
623void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); 651void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);