aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma/rpc_rdma.c
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2016-09-15 10:57:24 -0400
committerAnna Schumaker <Anna.Schumaker@Netapp.com>2016-09-19 13:08:38 -0400
commit655fec6987be05964e70c2e2efcbb253710e282f (patch)
treef35806548be49c1c1a1b9955af53b4657f449e9c /net/sunrpc/xprtrdma/rpc_rdma.c
parentc8b920bb49939a5c6cf1d2d819300f318ea050d2 (diff)
xprtrdma: Use gathered Send for large inline messages
An RPC Call message that is sent inline but that has a data payload (ie, one or more items in rq_snd_buf's page list) must be "pulled up:" - call_allocate has to reserve enough RPC Call buffer space to accommodate the data payload - call_transmit has to memcopy the rq_snd_buf's page list and tail into its head iovec before it is sent As the inline threshold is increased beyond its current 1KB default, however, this means data payloads of more than a few KB are copied by the host CPU. For example, if the inline threshold is increased just to 4KB, then NFS WRITE requests up to 4KB would involve a memcpy of the NFS WRITE's payload data into the RPC Call buffer. This is an undesirable amount of participation by the host CPU. The inline threshold may be much larger than 4KB in the future, after negotiation with a peer server. Instead of copying the components of rq_snd_buf into its head iovec, construct a gather list of these components, and send them all in place. The same approach is already used in the Linux server's RPC-over-RDMA reply path. This mechanism also eliminates the need for rpcrdma_tail_pullup, which is used to manage the XDR pad and trailing inline content when a Read list is present. This requires that the pages in rq_snd_buf's page list be DMA-mapped during marshaling, and unmapped when a data-bearing RPC is completed. This is slightly less efficient for very small I/O payloads, but significantly more efficient as data payload size and inline threshold increase past a kilobyte. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c')
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c301
1 files changed, 170 insertions, 131 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 31a434d2f143..63bf0119f949 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -53,14 +53,6 @@
53# define RPCDBG_FACILITY RPCDBG_TRANS 53# define RPCDBG_FACILITY RPCDBG_TRANS
54#endif 54#endif
55 55
56enum rpcrdma_chunktype {
57 rpcrdma_noch = 0,
58 rpcrdma_readch,
59 rpcrdma_areadch,
60 rpcrdma_writech,
61 rpcrdma_replych
62};
63
64static const char transfertypes[][12] = { 56static const char transfertypes[][12] = {
65 "inline", /* no chunks */ 57 "inline", /* no chunks */
66 "read list", /* some argument via rdma read */ 58 "read list", /* some argument via rdma read */
@@ -157,42 +149,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
157 return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read; 149 return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
158} 150}
159 151
160static int
161rpcrdma_tail_pullup(struct xdr_buf *buf)
162{
163 size_t tlen = buf->tail[0].iov_len;
164 size_t skip = tlen & 3;
165
166 /* Do not include the tail if it is only an XDR pad */
167 if (tlen < 4)
168 return 0;
169
170 /* xdr_write_pages() adds a pad at the beginning of the tail
171 * if the content in "buf->pages" is unaligned. Force the
172 * tail's actual content to land at the next XDR position
173 * after the head instead.
174 */
175 if (skip) {
176 unsigned char *src, *dst;
177 unsigned int count;
178
179 src = buf->tail[0].iov_base;
180 dst = buf->head[0].iov_base;
181 dst += buf->head[0].iov_len;
182
183 src += skip;
184 tlen -= skip;
185
186 dprintk("RPC: %s: skip=%zu, memmove(%p, %p, %zu)\n",
187 __func__, skip, dst, src, tlen);
188
189 for (count = tlen; count; count--)
190 *dst++ = *src++;
191 }
192
193 return tlen;
194}
195
196/* Split "vec" on page boundaries into segments. FMR registers pages, 152/* Split "vec" on page boundaries into segments. FMR registers pages,
197 * not a byte range. Other modes coalesce these segments into a single 153 * not a byte range. Other modes coalesce these segments into a single
198 * MR when they can. 154 * MR when they can.
@@ -503,74 +459,184 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
503 return iptr; 459 return iptr;
504} 460}
505 461
506/* 462/* Prepare the RPC-over-RDMA header SGE.
507 * Copy write data inline.
508 * This function is used for "small" requests. Data which is passed
509 * to RPC via iovecs (or page list) is copied directly into the
510 * pre-registered memory buffer for this request. For small amounts
511 * of data, this is efficient. The cutoff value is tunable.
512 */ 463 */
513static void rpcrdma_inline_pullup(struct rpc_rqst *rqst) 464static bool
465rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
466 u32 len)
514{ 467{
515 int i, npages, curlen; 468 struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
516 int copy_len; 469 struct ib_sge *sge = &req->rl_send_sge[0];
517 unsigned char *srcp, *destp; 470
518 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 471 if (unlikely(!rpcrdma_regbuf_is_mapped(rb))) {
519 int page_base; 472 if (!__rpcrdma_dma_map_regbuf(ia, rb))
520 struct page **ppages; 473 return false;
474 sge->addr = rdmab_addr(rb);
475 sge->lkey = rdmab_lkey(rb);
476 }
477 sge->length = len;
521 478
522 destp = rqst->rq_svec[0].iov_base; 479 ib_dma_sync_single_for_device(ia->ri_device, sge->addr,
523 curlen = rqst->rq_svec[0].iov_len; 480 sge->length, DMA_TO_DEVICE);
524 destp += curlen; 481 req->rl_send_wr.num_sge++;
482 return true;
483}
525 484
526 dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n", 485/* Prepare the Send SGEs. The head and tail iovec, and each entry
527 __func__, destp, rqst->rq_slen, curlen); 486 * in the page list, gets its own SGE.
487 */
488static bool
489rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
490 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
491{
492 unsigned int sge_no, page_base, len, remaining;
493 struct rpcrdma_regbuf *rb = req->rl_sendbuf;
494 struct ib_device *device = ia->ri_device;
495 struct ib_sge *sge = req->rl_send_sge;
496 u32 lkey = ia->ri_pd->local_dma_lkey;
497 struct page *page, **ppages;
498
499 /* The head iovec is straightforward, as it is already
500 * DMA-mapped. Sync the content that has changed.
501 */
502 if (!rpcrdma_dma_map_regbuf(ia, rb))
503 return false;
504 sge_no = 1;
505 sge[sge_no].addr = rdmab_addr(rb);
506 sge[sge_no].length = xdr->head[0].iov_len;
507 sge[sge_no].lkey = rdmab_lkey(rb);
508 ib_dma_sync_single_for_device(device, sge[sge_no].addr,
509 sge[sge_no].length, DMA_TO_DEVICE);
510
511 /* If there is a Read chunk, the page list is being handled
512 * via explicit RDMA, and thus is skipped here. However, the
513 * tail iovec may include an XDR pad for the page list, as
514 * well as additional content, and may not reside in the
515 * same page as the head iovec.
516 */
517 if (rtype == rpcrdma_readch) {
518 len = xdr->tail[0].iov_len;
528 519
529 copy_len = rqst->rq_snd_buf.page_len; 520 /* Do not include the tail if it is only an XDR pad */
521 if (len < 4)
522 goto out;
530 523
531 if (rqst->rq_snd_buf.tail[0].iov_len) { 524 page = virt_to_page(xdr->tail[0].iov_base);
532 curlen = rqst->rq_snd_buf.tail[0].iov_len; 525 page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
533 if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) { 526
534 memmove(destp + copy_len, 527 /* If the content in the page list is an odd length,
535 rqst->rq_snd_buf.tail[0].iov_base, curlen); 528 * xdr_write_pages() has added a pad at the beginning
536 r_xprt->rx_stats.pullup_copy_count += curlen; 529 * of the tail iovec. Force the tail's non-pad content
530 * to land at the next XDR position in the Send message.
531 */
532 page_base += len & 3;
533 len -= len & 3;
534 goto map_tail;
535 }
536
537 /* If there is a page list present, temporarily DMA map
538 * and prepare an SGE for each page to be sent.
539 */
540 if (xdr->page_len) {
541 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
542 page_base = xdr->page_base & ~PAGE_MASK;
543 remaining = xdr->page_len;
544 while (remaining) {
545 sge_no++;
546 if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
547 goto out_mapping_overflow;
548
549 len = min_t(u32, PAGE_SIZE - page_base, remaining);
550 sge[sge_no].addr = ib_dma_map_page(device, *ppages,
551 page_base, len,
552 DMA_TO_DEVICE);
553 if (ib_dma_mapping_error(device, sge[sge_no].addr))
554 goto out_mapping_err;
555 sge[sge_no].length = len;
556 sge[sge_no].lkey = lkey;
557
558 req->rl_mapped_sges++;
559 ppages++;
560 remaining -= len;
561 page_base = 0;
537 } 562 }
538 dprintk("RPC: %s: tail destp 0x%p len %d\n",
539 __func__, destp + copy_len, curlen);
540 rqst->rq_svec[0].iov_len += curlen;
541 } 563 }
542 r_xprt->rx_stats.pullup_copy_count += copy_len;
543 564
544 page_base = rqst->rq_snd_buf.page_base; 565 /* The tail iovec is not always constructed in the same
545 ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT); 566 * page where the head iovec resides (see, for example,
546 page_base &= ~PAGE_MASK; 567 * gss_wrap_req_priv). To neatly accommodate that case,
547 npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT; 568 * DMA map it separately.
548 for (i = 0; copy_len && i < npages; i++) { 569 */
549 curlen = PAGE_SIZE - page_base; 570 if (xdr->tail[0].iov_len) {
550 if (curlen > copy_len) 571 page = virt_to_page(xdr->tail[0].iov_base);
551 curlen = copy_len; 572 page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
552 dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n", 573 len = xdr->tail[0].iov_len;
553 __func__, i, destp, copy_len, curlen); 574
554 srcp = kmap_atomic(ppages[i]); 575map_tail:
555 memcpy(destp, srcp+page_base, curlen); 576 sge_no++;
556 kunmap_atomic(srcp); 577 sge[sge_no].addr = ib_dma_map_page(device, page,
557 rqst->rq_svec[0].iov_len += curlen; 578 page_base, len,
558 destp += curlen; 579 DMA_TO_DEVICE);
559 copy_len -= curlen; 580 if (ib_dma_mapping_error(device, sge[sge_no].addr))
560 page_base = 0; 581 goto out_mapping_err;
582 sge[sge_no].length = len;
583 sge[sge_no].lkey = lkey;
584 req->rl_mapped_sges++;
561 } 585 }
562 /* header now contains entire send message */ 586
587out:
588 req->rl_send_wr.num_sge = sge_no + 1;
589 return true;
590
591out_mapping_overflow:
592 pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
593 return false;
594
595out_mapping_err:
596 pr_err("rpcrdma: Send mapping error\n");
597 return false;
598}
599
600bool
601rpcrdma_prepare_send_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
602 u32 hdrlen, struct xdr_buf *xdr,
603 enum rpcrdma_chunktype rtype)
604{
605 req->rl_send_wr.num_sge = 0;
606 req->rl_mapped_sges = 0;
607
608 if (!rpcrdma_prepare_hdr_sge(ia, req, hdrlen))
609 goto out_map;
610
611 if (rtype != rpcrdma_areadch)
612 if (!rpcrdma_prepare_msg_sges(ia, req, xdr, rtype))
613 goto out_map;
614
615 return true;
616
617out_map:
618 pr_err("rpcrdma: failed to DMA map a Send buffer\n");
619 return false;
620}
621
622void
623rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
624{
625 struct ib_device *device = ia->ri_device;
626 struct ib_sge *sge;
627 int count;
628
629 sge = &req->rl_send_sge[2];
630 for (count = req->rl_mapped_sges; count--; sge++)
631 ib_dma_unmap_page(device, sge->addr, sge->length,
632 DMA_TO_DEVICE);
633 req->rl_mapped_sges = 0;
563} 634}
564 635
565/* 636/*
566 * Marshal a request: the primary job of this routine is to choose 637 * Marshal a request: the primary job of this routine is to choose
567 * the transfer modes. See comments below. 638 * the transfer modes. See comments below.
568 * 639 *
569 * Prepares up to two IOVs per Call message:
570 *
571 * [0] -- RPC RDMA header
572 * [1] -- the RPC header/data
573 *
574 * Returns zero on success, otherwise a negative errno. 640 * Returns zero on success, otherwise a negative errno.
575 */ 641 */
576 642
@@ -638,12 +704,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
638 */ 704 */
639 if (rpcrdma_args_inline(r_xprt, rqst)) { 705 if (rpcrdma_args_inline(r_xprt, rqst)) {
640 rtype = rpcrdma_noch; 706 rtype = rpcrdma_noch;
641 rpcrdma_inline_pullup(rqst); 707 rpclen = rqst->rq_snd_buf.len;
642 rpclen = rqst->rq_svec[0].iov_len;
643 } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { 708 } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
644 rtype = rpcrdma_readch; 709 rtype = rpcrdma_readch;
645 rpclen = rqst->rq_svec[0].iov_len; 710 rpclen = rqst->rq_snd_buf.head[0].iov_len +
646 rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); 711 rqst->rq_snd_buf.tail[0].iov_len;
647 } else { 712 } else {
648 r_xprt->rx_stats.nomsg_call_count++; 713 r_xprt->rx_stats.nomsg_call_count++;
649 headerp->rm_type = htonl(RDMA_NOMSG); 714 headerp->rm_type = htonl(RDMA_NOMSG);
@@ -685,47 +750,21 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
685 goto out_unmap; 750 goto out_unmap;
686 hdrlen = (unsigned char *)iptr - (unsigned char *)headerp; 751 hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
687 752
688 if (hdrlen + rpclen > r_xprt->rx_data.inline_wsize)
689 goto out_overflow;
690
691 dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n", 753 dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
692 rqst->rq_task->tk_pid, __func__, 754 rqst->rq_task->tk_pid, __func__,
693 transfertypes[rtype], transfertypes[wtype], 755 transfertypes[rtype], transfertypes[wtype],
694 hdrlen, rpclen); 756 hdrlen, rpclen);
695 757
696 if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_rdmabuf)) 758 if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, hdrlen,
697 goto out_map; 759 &rqst->rq_snd_buf, rtype)) {
698 req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); 760 iptr = ERR_PTR(-EIO);
699 req->rl_send_iov[0].length = hdrlen; 761 goto out_unmap;
700 req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); 762 }
701
702 req->rl_send_wr.num_sge = 1;
703 if (rtype == rpcrdma_areadch)
704 return 0;
705
706 if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_sendbuf))
707 goto out_map;
708 req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
709 req->rl_send_iov[1].length = rpclen;
710 req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
711
712 req->rl_send_wr.num_sge = 2;
713
714 return 0; 763 return 0;
715 764
716out_overflow:
717 pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
718 hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
719 iptr = ERR_PTR(-EIO);
720
721out_unmap: 765out_unmap:
722 r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); 766 r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
723 return PTR_ERR(iptr); 767 return PTR_ERR(iptr);
724
725out_map:
726 pr_err("rpcrdma: failed to DMA map a Send buffer\n");
727 iptr = ERR_PTR(-EIO);
728 goto out_unmap;
729} 768}
730 769
731/* 770/*