aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2016-05-02 14:41:30 -0400
committerAnna Schumaker <Anna.Schumaker@Netapp.com>2016-05-17 15:47:59 -0400
commit94f58c58c0b4315542036ce7703adeeaf4764940 (patch)
tree8d15f07a182217db012a23e455abf106c9d56fb7 /net
parent88b18a120332cada6ff4adb9b5b7b6e4bbb653e5 (diff)
xprtrdma: Allow Read list and Reply chunk simultaneously
rpcrdma_marshal_req() makes a simplifying assumption: that NFS operations with large Call messages have small Reply messages, and vice versa. Therefore with RPC-over-RDMA, only one chunk type is ever needed for each Call/Reply pair, because one direction needs chunks, the other direction will always fit inline. In fact, this assumption is asserted in the code: if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { dprintk("RPC: %s: cannot marshal multiple chunk lists\n", __func__); return -EIO; } But RPCGSS_SEC breaks this assumption. Because krb5i and krb5p perform data transformation on RPC messages before they are transmitted, direct data placement techniques cannot be used, thus RPC messages must be sent via a Long call in both directions. All such calls are sent with a Position Zero Read chunk, and all such replies are handled with a Reply chunk. Thus the client must provide every Call/Reply pair with both a Read list and a Reply chunk. Without any special security in effect, NFSv4 WRITEs may now also use the Read list and provide a Reply chunk. The marshal_req logic was preventing that, meaning an NFSv4 WRITE with a large payload that included a GETATTR result larger than the inline threshold would fail. The code that encodes each chunk list is now completely contained in its own function. There is some code duplication, but the trade-off is that the overall logic should be more clear. Note that all three chunk lists now share the rl_segments array. Some additional per-req accounting is necessary to track this usage. For the same reasons that the above simplifying assumption has held true for so long, I don't expect more array elements are needed at this time. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Tested-by: Steve Wise <swise@opengridcomputing.com> Reviewed-by: Sagi Grimberg <sagi@grimberg.me> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c327
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h5
2 files changed, 272 insertions, 60 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c7c9bbbf758c..e80f43d58903 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -62,17 +62,17 @@ enum rpcrdma_chunktype {
62}; 62};
63 63
64static const char transfertypes[][12] = { 64static const char transfertypes[][12] = {
65 "pure inline", /* no chunks */ 65 "inline", /* no chunks */
66 " read chunk", /* some argument via rdma read */ 66 "read list", /* some argument via rdma read */
67 "*read chunk", /* entire request via rdma read */ 67 "*read list", /* entire request via rdma read */
68 "write chunk", /* some result via rdma write */ 68 "write list", /* some result via rdma write */
69 "reply chunk" /* entire reply via rdma write */ 69 "reply chunk" /* entire reply via rdma write */
70}; 70};
71 71
72/* Returns size of largest RPC-over-RDMA header in a Call message 72/* Returns size of largest RPC-over-RDMA header in a Call message
73 * 73 *
74 * The client marshals only one chunk list per Call message. 74 * The largest Call header contains a full-size Read list and a
75 * The largest list is the Read list. 75 * minimal Reply chunk.
76 */ 76 */
77static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) 77static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
78{ 78{
@@ -85,6 +85,11 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
85 maxsegs += 2; /* segment for head and tail buffers */ 85 maxsegs += 2; /* segment for head and tail buffers */
86 size = maxsegs * sizeof(struct rpcrdma_read_chunk); 86 size = maxsegs * sizeof(struct rpcrdma_read_chunk);
87 87
88 /* Minimal Read chunk size */
89 size += sizeof(__be32); /* segment count */
90 size += sizeof(struct rpcrdma_segment);
91 size += sizeof(__be32); /* list discriminator */
92
88 dprintk("RPC: %s: max call header size = %u\n", 93 dprintk("RPC: %s: max call header size = %u\n",
89 __func__, size); 94 __func__, size);
90 return size; 95 return size;
@@ -431,6 +436,209 @@ out:
431 return n; 436 return n;
432} 437}
433 438
439static inline __be32 *
440xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg)
441{
442 *iptr++ = cpu_to_be32(seg->mr_rkey);
443 *iptr++ = cpu_to_be32(seg->mr_len);
444 return xdr_encode_hyper(iptr, seg->mr_base);
445}
446
447/* XDR-encode the Read list. Supports encoding a list of read
448 * segments that belong to a single read chunk.
449 *
450 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
451 *
452 * Read chunklist (a linked list):
453 * N elements, position P (same P for all chunks of same arg!):
454 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
455 *
456 * Returns a pointer to the XDR word in the RDMA header following
457 * the end of the Read list, or an error pointer.
458 */
459static __be32 *
460rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
461 struct rpcrdma_req *req, struct rpc_rqst *rqst,
462 __be32 *iptr, enum rpcrdma_chunktype rtype)
463{
464 struct rpcrdma_mr_seg *seg = req->rl_nextseg;
465 unsigned int pos;
466 int n, nsegs;
467
468 if (rtype == rpcrdma_noch) {
469 *iptr++ = xdr_zero; /* item not present */
470 return iptr;
471 }
472
473 pos = rqst->rq_snd_buf.head[0].iov_len;
474 if (rtype == rpcrdma_areadch)
475 pos = 0;
476 nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg,
477 RPCRDMA_MAX_SEGS - req->rl_nchunks);
478 if (nsegs < 0)
479 return ERR_PTR(nsegs);
480
481 do {
482 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false);
483 if (n <= 0)
484 return ERR_PTR(n);
485
486 *iptr++ = xdr_one; /* item present */
487
488 /* All read segments in this chunk
489 * have the same "position".
490 */
491 *iptr++ = cpu_to_be32(pos);
492 iptr = xdr_encode_rdma_segment(iptr, seg);
493
494 dprintk("RPC: %5u %s: read segment pos %u "
495 "%d@0x%016llx:0x%08x (%s)\n",
496 rqst->rq_task->tk_pid, __func__, pos,
497 seg->mr_len, (unsigned long long)seg->mr_base,
498 seg->mr_rkey, n < nsegs ? "more" : "last");
499
500 r_xprt->rx_stats.read_chunk_count++;
501 req->rl_nchunks++;
502 seg += n;
503 nsegs -= n;
504 } while (nsegs);
505 req->rl_nextseg = seg;
506
507 /* Finish Read list */
508 *iptr++ = xdr_zero; /* Next item not present */
509 return iptr;
510}
511
512/* XDR-encode the Write list. Supports encoding a list containing
513 * one array of plain segments that belong to a single write chunk.
514 *
515 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
516 *
517 * Write chunklist (a list of (one) counted array):
518 * N elements:
519 * 1 - N - HLOO - HLOO - ... - HLOO - 0
520 *
521 * Returns a pointer to the XDR word in the RDMA header following
522 * the end of the Write list, or an error pointer.
523 */
524static __be32 *
525rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
526 struct rpc_rqst *rqst, __be32 *iptr,
527 enum rpcrdma_chunktype wtype)
528{
529 struct rpcrdma_mr_seg *seg = req->rl_nextseg;
530 int n, nsegs, nchunks;
531 __be32 *segcount;
532
533 if (wtype != rpcrdma_writech) {
534 *iptr++ = xdr_zero; /* no Write list present */
535 return iptr;
536 }
537
538 nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
539 rqst->rq_rcv_buf.head[0].iov_len,
540 wtype, seg,
541 RPCRDMA_MAX_SEGS - req->rl_nchunks);
542 if (nsegs < 0)
543 return ERR_PTR(nsegs);
544
545 *iptr++ = xdr_one; /* Write list present */
546 segcount = iptr++; /* save location of segment count */
547
548 nchunks = 0;
549 do {
550 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
551 if (n <= 0)
552 return ERR_PTR(n);
553
554 iptr = xdr_encode_rdma_segment(iptr, seg);
555
556 dprintk("RPC: %5u %s: write segment "
557 "%d@0x016%llx:0x%08x (%s)\n",
558 rqst->rq_task->tk_pid, __func__,
559 seg->mr_len, (unsigned long long)seg->mr_base,
560 seg->mr_rkey, n < nsegs ? "more" : "last");
561
562 r_xprt->rx_stats.write_chunk_count++;
563 r_xprt->rx_stats.total_rdma_request += seg->mr_len;
564 req->rl_nchunks++;
565 nchunks++;
566 seg += n;
567 nsegs -= n;
568 } while (nsegs);
569 req->rl_nextseg = seg;
570
571 /* Update count of segments in this Write chunk */
572 *segcount = cpu_to_be32(nchunks);
573
574 /* Finish Write list */
575 *iptr++ = xdr_zero; /* Next item not present */
576 return iptr;
577}
578
579/* XDR-encode the Reply chunk. Supports encoding an array of plain
580 * segments that belong to a single write (reply) chunk.
581 *
582 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
583 *
584 * Reply chunk (a counted array):
585 * N elements:
586 * 1 - N - HLOO - HLOO - ... - HLOO
587 *
588 * Returns a pointer to the XDR word in the RDMA header following
589 * the end of the Reply chunk, or an error pointer.
590 */
591static __be32 *
592rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
593 struct rpcrdma_req *req, struct rpc_rqst *rqst,
594 __be32 *iptr, enum rpcrdma_chunktype wtype)
595{
596 struct rpcrdma_mr_seg *seg = req->rl_nextseg;
597 int n, nsegs, nchunks;
598 __be32 *segcount;
599
600 if (wtype != rpcrdma_replych) {
601 *iptr++ = xdr_zero; /* no Reply chunk present */
602 return iptr;
603 }
604
605 nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
606 RPCRDMA_MAX_SEGS - req->rl_nchunks);
607 if (nsegs < 0)
608 return ERR_PTR(nsegs);
609
610 *iptr++ = xdr_one; /* Reply chunk present */
611 segcount = iptr++; /* save location of segment count */
612
613 nchunks = 0;
614 do {
615 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
616 if (n <= 0)
617 return ERR_PTR(n);
618
619 iptr = xdr_encode_rdma_segment(iptr, seg);
620
621 dprintk("RPC: %5u %s: reply segment "
622 "%d@0x%016llx:0x%08x (%s)\n",
623 rqst->rq_task->tk_pid, __func__,
624 seg->mr_len, (unsigned long long)seg->mr_base,
625 seg->mr_rkey, n < nsegs ? "more" : "last");
626
627 r_xprt->rx_stats.reply_chunk_count++;
628 r_xprt->rx_stats.total_rdma_request += seg->mr_len;
629 req->rl_nchunks++;
630 nchunks++;
631 seg += n;
632 nsegs -= n;
633 } while (nsegs);
634 req->rl_nextseg = seg;
635
636 /* Update count of segments in the Reply chunk */
637 *segcount = cpu_to_be32(nchunks);
638
639 return iptr;
640}
641
434/* 642/*
435 * Copy write data inline. 643 * Copy write data inline.
436 * This function is used for "small" requests. Data which is passed 644 * This function is used for "small" requests. Data which is passed
@@ -508,24 +716,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
508 struct rpc_xprt *xprt = rqst->rq_xprt; 716 struct rpc_xprt *xprt = rqst->rq_xprt;
509 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 717 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
510 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 718 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
511 char *base;
512 size_t rpclen;
513 ssize_t hdrlen;
514 enum rpcrdma_chunktype rtype, wtype; 719 enum rpcrdma_chunktype rtype, wtype;
515 struct rpcrdma_msg *headerp; 720 struct rpcrdma_msg *headerp;
721 unsigned int pos;
722 ssize_t hdrlen;
723 size_t rpclen;
724 __be32 *iptr;
516 725
517#if defined(CONFIG_SUNRPC_BACKCHANNEL) 726#if defined(CONFIG_SUNRPC_BACKCHANNEL)
518 if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) 727 if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
519 return rpcrdma_bc_marshal_reply(rqst); 728 return rpcrdma_bc_marshal_reply(rqst);
520#endif 729#endif
521 730
522 /*
523 * rpclen gets amount of data in first buffer, which is the
524 * pre-registered buffer.
525 */
526 base = rqst->rq_svec[0].iov_base;
527 rpclen = rqst->rq_svec[0].iov_len;
528
529 headerp = rdmab_to_msg(req->rl_rdmabuf); 731 headerp = rdmab_to_msg(req->rl_rdmabuf);
530 /* don't byte-swap XID, it's already done in request */ 732 /* don't byte-swap XID, it's already done in request */
531 headerp->rm_xid = rqst->rq_xid; 733 headerp->rm_xid = rqst->rq_xid;
@@ -565,8 +767,12 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
565 */ 767 */
566 if (rpcrdma_args_inline(r_xprt, rqst)) { 768 if (rpcrdma_args_inline(r_xprt, rqst)) {
567 rtype = rpcrdma_noch; 769 rtype = rpcrdma_noch;
770 rpcrdma_inline_pullup(rqst);
771 rpclen = rqst->rq_svec[0].iov_len;
568 } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) { 772 } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
569 rtype = rpcrdma_readch; 773 rtype = rpcrdma_readch;
774 rpclen = rqst->rq_svec[0].iov_len;
775 rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
570 } else { 776 } else {
571 r_xprt->rx_stats.nomsg_call_count++; 777 r_xprt->rx_stats.nomsg_call_count++;
572 headerp->rm_type = htonl(RDMA_NOMSG); 778 headerp->rm_type = htonl(RDMA_NOMSG);
@@ -574,52 +780,49 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
574 rpclen = 0; 780 rpclen = 0;
575 } 781 }
576 782
577 /* The following simplification is not true forever */ 783 /* This implementation supports the following combinations
578 if (rtype != rpcrdma_noch && wtype == rpcrdma_replych) 784 * of chunk lists in one RPC-over-RDMA Call message:
579 wtype = rpcrdma_noch; 785 *
580 if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { 786 * - Read list
581 dprintk("RPC: %s: cannot marshal multiple chunk lists\n", 787 * - Write list
582 __func__); 788 * - Reply chunk
583 return -EIO; 789 * - Read list + Reply chunk
584 } 790 *
585 791 * It might not yet support the following combinations:
586 hdrlen = RPCRDMA_HDRLEN_MIN; 792 *
587 793 * - Read list + Write list
588 /* 794 *
589 * Pull up any extra send data into the preregistered buffer. 795 * It does not support the following combinations:
590 * When padding is in use and applies to the transfer, insert 796 *
591 * it and change the message type. 797 * - Write list + Reply chunk
798 * - Read list + Write list + Reply chunk
799 *
800 * This implementation supports only a single chunk in each
801 * Read or Write list. Thus for example the client cannot
802 * send a Call message with a Position Zero Read chunk and a
803 * regular Read chunk at the same time.
592 */ 804 */
593 if (rtype == rpcrdma_noch) { 805 req->rl_nchunks = 0;
594 806 req->rl_nextseg = req->rl_segments;
595 rpcrdma_inline_pullup(rqst); 807 iptr = headerp->rm_body.rm_chunks;
596 808 iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
597 headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; 809 if (IS_ERR(iptr))
598 headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; 810 goto out_unmap;
599 headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero; 811 iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
600 /* new length after pullup */ 812 if (IS_ERR(iptr))
601 rpclen = rqst->rq_svec[0].iov_len; 813 goto out_unmap;
602 } else if (rtype == rpcrdma_readch) 814 iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
603 rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); 815 if (IS_ERR(iptr))
604 if (rtype != rpcrdma_noch) { 816 goto out_unmap;
605 hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, 817 hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
606 headerp, rtype);
607 wtype = rtype; /* simplify dprintk */
608
609 } else if (wtype != rpcrdma_noch) {
610 hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf,
611 headerp, wtype);
612 }
613 if (hdrlen < 0)
614 return hdrlen;
615 818
616 if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst)) 819 if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
617 goto out_overflow; 820 goto out_overflow;
618 821
619 dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd" 822 dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
620 " headerp 0x%p base 0x%p lkey 0x%x\n", 823 rqst->rq_task->tk_pid, __func__,
621 __func__, transfertypes[wtype], hdrlen, rpclen, 824 transfertypes[rtype], transfertypes[wtype],
622 headerp, base, rdmab_lkey(req->rl_rdmabuf)); 825 hdrlen, rpclen);
623 826
624 req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); 827 req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
625 req->rl_send_iov[0].length = hdrlen; 828 req->rl_send_iov[0].length = hdrlen;
@@ -637,12 +840,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
637 return 0; 840 return 0;
638 841
639out_overflow: 842out_overflow:
640 pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s\n", 843 pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
641 hdrlen, rpclen, transfertypes[wtype]); 844 hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
642 /* Terminate this RPC. Chunks registered above will be 845 /* Terminate this RPC. Chunks registered above will be
643 * released by xprt_release -> xprt_rmda_free . 846 * released by xprt_release -> xprt_rmda_free .
644 */ 847 */
645 return -EIO; 848 return -EIO;
849
850out_unmap:
851 for (pos = 0; req->rl_nchunks--;)
852 pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
853 &req->rl_segments[pos]);
854 return PTR_ERR(iptr);
646} 855}
647 856
648/* 857/*
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 4349e03069c7..61999b694a15 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -184,7 +184,9 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
184 */ 184 */
185 185
186#define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE) 186#define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE)
187#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */ 187
188/* data segments + head/tail for Call + head/tail for Reply */
189#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 4)
188 190
189struct rpcrdma_buffer; 191struct rpcrdma_buffer;
190 192
@@ -298,6 +300,7 @@ struct rpcrdma_req {
298 struct rpcrdma_regbuf *rl_rdmabuf; 300 struct rpcrdma_regbuf *rl_rdmabuf;
299 struct rpcrdma_regbuf *rl_sendbuf; 301 struct rpcrdma_regbuf *rl_sendbuf;
300 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; 302 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
303 struct rpcrdma_mr_seg *rl_nextseg;
301 304
302 struct ib_cqe rl_cqe; 305 struct ib_cqe rl_cqe;
303 struct list_head rl_all; 306 struct list_head rl_all;