aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2015-01-21 11:04:08 -0500
committerAnna Schumaker <Anna.Schumaker@Netapp.com>2015-01-30 10:47:49 -0500
commit0ca77dc372110cbed4dbac5e867ffdc60ebccf6a (patch)
tree3e8da0915b3de723bdb4ee86c4702e2900d41a6c /net/sunrpc
parent9128c3e794a77917a86dd5490ca2c5233a8c6fde (diff)
xprtrdma: Allocate RPC send buffer separately from struct rpcrdma_req
Because internal memory registration is an expensive and synchronous operation, xprtrdma pre-registers send and receive buffers at mount time, and then re-uses them for each RPC. A "hardway" allocation is a memory allocation and registration that replaces a send buffer during the processing of an RPC. Hardway must be done if the RPC send buffer is too small to accommodate an RPC's call and reply headers. For xprtrdma, each RPC send buffer is currently part of struct rpcrdma_req so that xprt_rdma_free(), which is passed nothing but the address of an RPC send buffer, can find its matching struct rpcrdma_req and rpcrdma_rep quickly via container_of / offsetof. That means that hardway currently has to replace a whole rpcrmda_req when it replaces an RPC send buffer. This is often a fairly hefty chunk of contiguous memory due to the size of the rl_segments array and the fact that both the send and receive buffers are part of struct rpcrdma_req. Some obscure re-use of fields in rpcrdma_req is done so that xprt_rdma_free() can detect replaced rpcrdma_req structs, and restore the original. This commit breaks apart the RPC send buffer and struct rpcrdma_req so that increasing the size of the rl_segments array does not change the alignment of each RPC send buffer. (Increasing rl_segments is needed to bump up the maximum r/wsize for NFS/RDMA). This change opens up some interesting possibilities for improving the design of xprt_rdma_allocate(). xprt_rdma_allocate() is now the one place where RPC send buffers are allocated or re-allocated, and they are now always left in place by xprt_rdma_free(). A large re-allocation that includes both the rl_segments array and the RPC send buffer is no longer needed. Send buffer re-allocation becomes quite rare. Good send buffer alignment is guaranteed no matter what the size of the rl_segments array is. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Reviewed-by: Steve Wise <swise@opengridcomputing.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c6
-rw-r--r--net/sunrpc/xprtrdma/transport.c146
-rw-r--r--net/sunrpc/xprtrdma/verbs.c16
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h14
4 files changed, 78 insertions, 104 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index f2eda155299a..8a6bdbd3e936 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -541,9 +541,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
541 req->rl_send_iov[0].length = hdrlen; 541 req->rl_send_iov[0].length = hdrlen;
542 req->rl_send_iov[0].lkey = req->rl_iov.lkey; 542 req->rl_send_iov[0].lkey = req->rl_iov.lkey;
543 543
544 req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base); 544 req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
545 req->rl_send_iov[1].length = rpclen; 545 req->rl_send_iov[1].length = rpclen;
546 req->rl_send_iov[1].lkey = req->rl_iov.lkey; 546 req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
547 547
548 req->rl_niovs = 2; 548 req->rl_niovs = 2;
549 549
@@ -556,7 +556,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
556 556
557 req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen; 557 req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
558 req->rl_send_iov[3].length = rqst->rq_slen - rpclen; 558 req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
559 req->rl_send_iov[3].lkey = req->rl_iov.lkey; 559 req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf);
560 560
561 req->rl_niovs = 4; 561 req->rl_niovs = 4;
562 } 562 }
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 808b3c52427a..a9d566227e7e 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -449,77 +449,72 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
449/* 449/*
450 * The RDMA allocate/free functions need the task structure as a place 450 * The RDMA allocate/free functions need the task structure as a place
451 * to hide the struct rpcrdma_req, which is necessary for the actual send/recv 451 * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
452 * sequence. For this reason, the recv buffers are attached to send 452 * sequence.
453 * buffers for portions of the RPC. Note that the RPC layer allocates 453 *
454 * both send and receive buffers in the same call. We may register 454 * The RPC layer allocates both send and receive buffers in the same call
455 * the receive buffer portion when using reply chunks. 455 * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
456 * We may register rq_rcv_buf when using reply chunks.
456 */ 457 */
457static void * 458static void *
458xprt_rdma_allocate(struct rpc_task *task, size_t size) 459xprt_rdma_allocate(struct rpc_task *task, size_t size)
459{ 460{
460 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 461 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
461 struct rpcrdma_req *req, *nreq; 462 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
463 struct rpcrdma_regbuf *rb;
464 struct rpcrdma_req *req;
465 size_t min_size;
466 gfp_t flags = task->tk_flags & RPC_TASK_SWAPPER ?
467 GFP_ATOMIC : GFP_NOFS;
462 468
463 req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf); 469 req = rpcrdma_buffer_get(&r_xprt->rx_buf);
464 if (req == NULL) 470 if (req == NULL)
465 return NULL; 471 return NULL;
466 472
467 if (size > req->rl_size) { 473 if (req->rl_sendbuf == NULL)
468 dprintk("RPC: %s: size %zd too large for buffer[%zd]: " 474 goto out_sendbuf;
469 "prog %d vers %d proc %d\n", 475 if (size > req->rl_sendbuf->rg_size)
470 __func__, size, req->rl_size, 476 goto out_sendbuf;
471 task->tk_client->cl_prog, task->tk_client->cl_vers, 477
472 task->tk_msg.rpc_proc->p_proc); 478out:
473 /*
474 * Outgoing length shortage. Our inline write max must have
475 * been configured to perform direct i/o.
476 *
477 * This is therefore a large metadata operation, and the
478 * allocate call was made on the maximum possible message,
479 * e.g. containing long filename(s) or symlink data. In
480 * fact, while these metadata operations *might* carry
481 * large outgoing payloads, they rarely *do*. However, we
482 * have to commit to the request here, so reallocate and
483 * register it now. The data path will never require this
484 * reallocation.
485 *
486 * If the allocation or registration fails, the RPC framework
487 * will (doggedly) retry.
488 */
489 if (task->tk_flags & RPC_TASK_SWAPPER)
490 nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
491 else
492 nreq = kmalloc(sizeof *req + size, GFP_NOFS);
493 if (nreq == NULL)
494 goto outfail;
495
496 if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
497 nreq->rl_base, size + sizeof(struct rpcrdma_req)
498 - offsetof(struct rpcrdma_req, rl_base),
499 &nreq->rl_handle, &nreq->rl_iov)) {
500 kfree(nreq);
501 goto outfail;
502 }
503 rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
504 nreq->rl_size = size;
505 nreq->rl_niovs = 0;
506 nreq->rl_nchunks = 0;
507 nreq->rl_buffer = (struct rpcrdma_buffer *)req;
508 nreq->rl_reply = req->rl_reply;
509 memcpy(nreq->rl_segments,
510 req->rl_segments, sizeof nreq->rl_segments);
511 /* flag the swap with an unused field */
512 nreq->rl_iov.length = 0;
513 req->rl_reply = NULL;
514 req = nreq;
515 }
516 dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); 479 dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
517 req->rl_connect_cookie = 0; /* our reserved value */ 480 req->rl_connect_cookie = 0; /* our reserved value */
518 return req->rl_xdr_buf; 481 return req->rl_sendbuf->rg_base;
519 482
520outfail: 483out_sendbuf:
484 /* XDR encoding and RPC/RDMA marshaling of this request has not
485 * yet occurred. Thus a lower bound is needed to prevent buffer
486 * overrun during marshaling.
487 *
488 * RPC/RDMA marshaling may choose to send payload bearing ops
489 * inline, if the result is smaller than the inline threshold.
490 * The value of the "size" argument accounts for header
491 * requirements but not for the payload in these cases.
492 *
493 * Likewise, allocate enough space to receive a reply up to the
494 * size of the inline threshold.
495 *
496 * It's unlikely that both the send header and the received
497 * reply will be large, but slush is provided here to allow
498 * flexibility when marshaling.
499 */
500 min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
501 min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
502 if (size < min_size)
503 size = min_size;
504
505 rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
506 if (IS_ERR(rb))
507 goto out_fail;
508 rb->rg_owner = req;
509
510 r_xprt->rx_stats.hardway_register_count += size;
511 rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
512 req->rl_sendbuf = rb;
513 goto out;
514
515out_fail:
521 rpcrdma_buffer_put(req); 516 rpcrdma_buffer_put(req);
522 rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++; 517 r_xprt->rx_stats.failed_marshal_count++;
523 return NULL; 518 return NULL;
524} 519}
525 520
@@ -531,47 +526,24 @@ xprt_rdma_free(void *buffer)
531{ 526{
532 struct rpcrdma_req *req; 527 struct rpcrdma_req *req;
533 struct rpcrdma_xprt *r_xprt; 528 struct rpcrdma_xprt *r_xprt;
534 struct rpcrdma_rep *rep; 529 struct rpcrdma_regbuf *rb;
535 int i; 530 int i;
536 531
537 if (buffer == NULL) 532 if (buffer == NULL)
538 return; 533 return;
539 534
540 req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]); 535 rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
541 if (req->rl_iov.length == 0) { /* see allocate above */ 536 req = rb->rg_owner;
542 r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer, 537 r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
543 struct rpcrdma_xprt, rx_buf);
544 } else
545 r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
546 rep = req->rl_reply;
547 538
548 dprintk("RPC: %s: called on 0x%p%s\n", 539 dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
549 __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
550 540
551 /*
552 * Finish the deregistration. The process is considered
553 * complete when the rr_func vector becomes NULL - this
554 * was put in place during rpcrdma_reply_handler() - the wait
555 * call below will not block if the dereg is "done". If
556 * interrupted, our framework will clean up.
557 */
558 for (i = 0; req->rl_nchunks;) { 541 for (i = 0; req->rl_nchunks;) {
559 --req->rl_nchunks; 542 --req->rl_nchunks;
560 i += rpcrdma_deregister_external( 543 i += rpcrdma_deregister_external(
561 &req->rl_segments[i], r_xprt); 544 &req->rl_segments[i], r_xprt);
562 } 545 }
563 546
564 if (req->rl_iov.length == 0) { /* see allocate above */
565 struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
566 oreq->rl_reply = req->rl_reply;
567 (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
568 req->rl_handle,
569 &req->rl_iov);
570 kfree(req);
571 req = oreq;
572 }
573
574 /* Put back request+reply buffers */
575 rpcrdma_buffer_put(req); 547 rpcrdma_buffer_put(req);
576} 548}
577 549
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index cdd6aacc9168..40894403db81 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1079,25 +1079,22 @@ static struct rpcrdma_req *
1079rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 1079rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1080{ 1080{
1081 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 1081 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1082 size_t wlen = 1 << fls(cdata->inline_wsize + 1082 size_t wlen = cdata->inline_wsize;
1083 sizeof(struct rpcrdma_req));
1084 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1083 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1085 struct rpcrdma_req *req; 1084 struct rpcrdma_req *req;
1086 int rc; 1085 int rc;
1087 1086
1088 rc = -ENOMEM; 1087 rc = -ENOMEM;
1089 req = kmalloc(wlen, GFP_KERNEL); 1088 req = kmalloc(sizeof(*req) + wlen, GFP_KERNEL);
1090 if (req == NULL) 1089 if (req == NULL)
1091 goto out; 1090 goto out;
1092 memset(req, 0, sizeof(struct rpcrdma_req)); 1091 memset(req, 0, sizeof(*req));
1093 1092
1094 rc = rpcrdma_register_internal(ia, req->rl_base, wlen - 1093 rc = rpcrdma_register_internal(ia, req->rl_base, wlen,
1095 offsetof(struct rpcrdma_req, rl_base),
1096 &req->rl_handle, &req->rl_iov); 1094 &req->rl_handle, &req->rl_iov);
1097 if (rc) 1095 if (rc)
1098 goto out_free; 1096 goto out_free;
1099 1097
1100 req->rl_size = wlen - sizeof(struct rpcrdma_req);
1101 req->rl_buffer = &r_xprt->rx_buf; 1098 req->rl_buffer = &r_xprt->rx_buf;
1102 return req; 1099 return req;
1103 1100
@@ -1121,7 +1118,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1121 rep = kmalloc(rlen, GFP_KERNEL); 1118 rep = kmalloc(rlen, GFP_KERNEL);
1122 if (rep == NULL) 1119 if (rep == NULL)
1123 goto out; 1120 goto out;
1124 memset(rep, 0, sizeof(struct rpcrdma_rep)); 1121 memset(rep, 0, sizeof(*rep));
1125 1122
1126 rc = rpcrdma_register_internal(ia, rep->rr_base, rlen - 1123 rc = rpcrdma_register_internal(ia, rep->rr_base, rlen -
1127 offsetof(struct rpcrdma_rep, rr_base), 1124 offsetof(struct rpcrdma_rep, rr_base),
@@ -1335,6 +1332,7 @@ rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1335 if (!req) 1332 if (!req)
1336 return; 1333 return;
1337 1334
1335 rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1338 rpcrdma_deregister_internal(ia, req->rl_handle, &req->rl_iov); 1336 rpcrdma_deregister_internal(ia, req->rl_handle, &req->rl_iov);
1339 kfree(req); 1337 kfree(req);
1340} 1338}
@@ -1729,8 +1727,6 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1729 struct rpcrdma_buffer *buffers = req->rl_buffer; 1727 struct rpcrdma_buffer *buffers = req->rl_buffer;
1730 unsigned long flags; 1728 unsigned long flags;
1731 1729
1732 if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
1733 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1734 spin_lock_irqsave(&buffers->rb_lock, flags); 1730 spin_lock_irqsave(&buffers->rb_lock, flags);
1735 if (buffers->rb_recv_index < buffers->rb_max_requests) { 1731 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1736 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; 1732 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 36c37c60f1fe..aa82f8d1c5b4 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -262,7 +262,6 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
262}; 262};
263 263
264struct rpcrdma_req { 264struct rpcrdma_req {
265 size_t rl_size; /* actual length of buffer */
266 unsigned int rl_niovs; /* 0, 2 or 4 */ 265 unsigned int rl_niovs; /* 0, 2 or 4 */
267 unsigned int rl_nchunks; /* non-zero if chunks */ 266 unsigned int rl_nchunks; /* non-zero if chunks */
268 unsigned int rl_connect_cookie; /* retry detection */ 267 unsigned int rl_connect_cookie; /* retry detection */
@@ -271,13 +270,20 @@ struct rpcrdma_req {
271 struct rpcrdma_rep *rl_reply;/* holder for reply buffer */ 270 struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
272 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */ 271 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
273 struct ib_sge rl_send_iov[4]; /* for active requests */ 272 struct ib_sge rl_send_iov[4]; /* for active requests */
273 struct rpcrdma_regbuf *rl_sendbuf;
274 struct ib_sge rl_iov; /* for posting */ 274 struct ib_sge rl_iov; /* for posting */
275 struct ib_mr *rl_handle; /* handle for mem in rl_iov */ 275 struct ib_mr *rl_handle; /* handle for mem in rl_iov */
276 char rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */ 276 char rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */
277 __u32 rl_xdr_buf[0]; /* start of returned rpc rq_buffer */
278}; 277};
279#define rpcr_to_rdmar(r) \ 278
280 container_of((r)->rq_buffer, struct rpcrdma_req, rl_xdr_buf[0]) 279static inline struct rpcrdma_req *
280rpcr_to_rdmar(struct rpc_rqst *rqst)
281{
282 struct rpcrdma_regbuf *rb = container_of(rqst->rq_buffer,
283 struct rpcrdma_regbuf,
284 rg_base[0]);
285 return rb->rg_owner;
286}
281 287
282/* 288/*
283 * struct rpcrdma_buffer -- holds list/queue of pre-registered memory for 289 * struct rpcrdma_buffer -- holds list/queue of pre-registered memory for