aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2017-04-09 13:06:25 -0400
committerJ. Bruce Fields <bfields@redhat.com>2017-04-25 17:25:55 -0400
commit9a6a180b7867ceceeeab88a6f011bac23174b939 (patch)
tree1c0d07481795e45d8569acc48e0c969ef48fbb4f
parentf13193f50b64e2e0c87706b838d6b9895626a892 (diff)
svcrdma: Use rdma_rw API in RPC reply path
The current svcrdma sendto code path posts one RDMA Write WR at a time. Each of these Writes typically carries a small number of pages (for instance, up to 30 pages for mlx4 devices). That means a 1MB NFS READ reply requires 9 ib_post_send() calls for the Write WRs, and one for the Send WR carrying the actual RPC Reply message. Instead, use the new rdma_rw API. The details of Write WR chain construction and memory registration are taken care of in the RDMA core. svcrdma can focus on the details of the RPC-over-RDMA protocol. This gives three main benefits: 1. All Write WRs for one RDMA segment are posted in a single chain. As few as one ib_post_send() for each Write chunk. 2. The Write path can now use FRWR to register the Write buffers. If the device's maximum page list depth is large, this means a single Write WR is needed for each RPC's Write chunk data. 3. The new code introduces support for RPCs that carry both a Write list and a Reply chunk. This combination can be used for an NFSv4 READ where the data payload is large, and thus is removed from the Payload Stream, but the Payload Stream is still larger than the inline threshold. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: J. Bruce Fields <bfields@redhat.com>
-rw-r--r--include/linux/sunrpc/svc_rdma.h1
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c6
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c696
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c2
4 files changed, 350 insertions, 355 deletions
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index ca08671fb7e2..599ee03ee3fb 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -212,7 +212,6 @@ extern int svc_rdma_xdr_decode_req(struct xdr_buf *);
212extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *, 212extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
213 struct rpcrdma_msg *, 213 struct rpcrdma_msg *,
214 enum rpcrdma_errcode, __be32 *); 214 enum rpcrdma_errcode, __be32 *);
215extern void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *, int);
216extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int); 215extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int);
217extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int, 216extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int,
218 __be32, __be64, u32); 217 __be32, __be64, u32);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 0305b33d482f..bf185b79c98f 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -90,9 +90,9 @@ out_notfound:
90 * Caller holds the connection's mutex and has already marshaled 90 * Caller holds the connection's mutex and has already marshaled
91 * the RPC/RDMA request. 91 * the RPC/RDMA request.
92 * 92 *
93 * This is similar to svc_rdma_reply, but takes an rpc_rqst 93 * This is similar to svc_rdma_send_reply_msg, but takes a struct
94 * instead, does not support chunks, and avoids blocking memory 94 * rpc_rqst instead, does not support chunks, and avoids blocking
95 * allocation. 95 * memory allocation.
96 * 96 *
97 * XXX: There is still an opportunity to block in svc_rdma_send() 97 * XXX: There is still an opportunity to block in svc_rdma_send()
98 * if there are no SQ entries to post the Send. This may occur if 98 * if there are no SQ entries to post the Send. This may occur if
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 2eb3df698e11..ce62b78e5bc9 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -1,4 +1,5 @@
1/* 1/*
2 * Copyright (c) 2016 Oracle. All rights reserved.
2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 3 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
3 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 4 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
4 * 5 *
@@ -40,6 +41,63 @@
40 * Author: Tom Tucker <tom@opengridcomputing.com> 41 * Author: Tom Tucker <tom@opengridcomputing.com>
41 */ 42 */
42 43
44/* Operation
45 *
46 * The main entry point is svc_rdma_sendto. This is called by the
47 * RPC server when an RPC Reply is ready to be transmitted to a client.
48 *
49 * The passed-in svc_rqst contains a struct xdr_buf which holds an
50 * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
51 * transport header, post all Write WRs needed for this Reply, then post
52 * a Send WR conveying the transport header and the RPC message itself to
53 * the client.
54 *
55 * svc_rdma_sendto must fully transmit the Reply before returning, as
56 * the svc_rqst will be recycled as soon as sendto returns. Remaining
57 * resources referred to by the svc_rqst are also recycled at that time.
58 * Therefore any resources that must remain longer must be detached
59 * from the svc_rqst and released later.
60 *
61 * Page Management
62 *
63 * The I/O that performs Reply transmission is asynchronous, and may
64 * complete well after sendto returns. Thus pages under I/O must be
65 * removed from the svc_rqst before sendto returns.
66 *
67 * The logic here depends on Send Queue and completion ordering. Since
68 * the Send WR is always posted last, it will always complete last. Thus
69 * when it completes, it is guaranteed that all previous Write WRs have
70 * also completed.
71 *
72 * Write WRs are constructed and posted. Each Write segment gets its own
73 * svc_rdma_rw_ctxt, allowing the Write completion handler to find and
74 * DMA-unmap the pages under I/O for that Write segment. The Write
75 * completion handler does not release any pages.
76 *
77 * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt.
78 * The ownership of all of the Reply's pages are transferred into that
79 * ctxt, the Send WR is posted, and sendto returns.
80 *
81 * The svc_rdma_op_ctxt is presented when the Send WR completes. The
82 * Send completion handler finally releases the Reply's pages.
83 *
84 * This mechanism also assumes that completions on the transport's Send
85 * Completion Queue do not run in parallel. Otherwise a Write completion
86 * and Send completion running at the same time could release pages that
87 * are still DMA-mapped.
88 *
89 * Error Handling
90 *
91 * - If the Send WR is posted successfully, it will either complete
92 * successfully, or get flushed. Either way, the Send completion
93 * handler releases the Reply's pages.
94 * - If the Send WR cannot be not posted, the forward path releases
95 * the Reply's pages.
96 *
97 * This handles the case, without the use of page reference counting,
98 * where two different Write segments send portions of the same page.
99 */
100
43#include <linux/sunrpc/debug.h> 101#include <linux/sunrpc/debug.h>
44#include <linux/sunrpc/rpc_rdma.h> 102#include <linux/sunrpc/rpc_rdma.h>
45#include <linux/spinlock.h> 103#include <linux/spinlock.h>
@@ -55,6 +113,133 @@ static u32 xdr_padsize(u32 len)
55 return (len & 3) ? (4 - (len & 3)) : 0; 113 return (len & 3) ? (4 - (len & 3)) : 0;
56} 114}
57 115
116/* Returns length of transport header, in bytes.
117 */
118static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
119{
120 unsigned int nsegs;
121 __be32 *p;
122
123 p = rdma_resp;
124
125 /* RPC-over-RDMA V1 replies never have a Read list. */
126 p += rpcrdma_fixed_maxsz + 1;
127
128 /* Skip Write list. */
129 while (*p++ != xdr_zero) {
130 nsegs = be32_to_cpup(p++);
131 p += nsegs * rpcrdma_segment_maxsz;
132 }
133
134 /* Skip Reply chunk. */
135 if (*p++ != xdr_zero) {
136 nsegs = be32_to_cpup(p++);
137 p += nsegs * rpcrdma_segment_maxsz;
138 }
139
140 return (unsigned long)p - (unsigned long)rdma_resp;
141}
142
143/* One Write chunk is copied from Call transport header to Reply
144 * transport header. Each segment's length field is updated to
145 * reflect number of bytes consumed in the segment.
146 *
147 * Returns number of segments in this chunk.
148 */
149static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
150 unsigned int remaining)
151{
152 unsigned int i, nsegs;
153 u32 seg_len;
154
155 /* Write list discriminator */
156 *dst++ = *src++;
157
158 /* number of segments in this chunk */
159 nsegs = be32_to_cpup(src);
160 *dst++ = *src++;
161
162 for (i = nsegs; i; i--) {
163 /* segment's RDMA handle */
164 *dst++ = *src++;
165
166 /* bytes returned in this segment */
167 seg_len = be32_to_cpu(*src);
168 if (remaining >= seg_len) {
169 /* entire segment was consumed */
170 *dst = *src;
171 remaining -= seg_len;
172 } else {
173 /* segment only partly filled */
174 *dst = cpu_to_be32(remaining);
175 remaining = 0;
176 }
177 dst++; src++;
178
179 /* segment's RDMA offset */
180 *dst++ = *src++;
181 *dst++ = *src++;
182 }
183
184 return nsegs;
185}
186
187/* The client provided a Write list in the Call message. Fill in
188 * the segments in the first Write chunk in the Reply's transport
189 * header with the number of bytes consumed in each segment.
190 * Remaining chunks are returned unused.
191 *
192 * Assumptions:
193 * - Client has provided only one Write chunk
194 */
195static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
196 unsigned int consumed)
197{
198 unsigned int nsegs;
199 __be32 *p, *q;
200
201 /* RPC-over-RDMA V1 replies never have a Read list. */
202 p = rdma_resp + rpcrdma_fixed_maxsz + 1;
203
204 q = wr_ch;
205 while (*q != xdr_zero) {
206 nsegs = xdr_encode_write_chunk(p, q, consumed);
207 q += 2 + nsegs * rpcrdma_segment_maxsz;
208 p += 2 + nsegs * rpcrdma_segment_maxsz;
209 consumed = 0;
210 }
211
212 /* Terminate Write list */
213 *p++ = xdr_zero;
214
215 /* Reply chunk discriminator; may be replaced later */
216 *p = xdr_zero;
217}
218
219/* The client provided a Reply chunk in the Call message. Fill in
220 * the segments in the Reply chunk in the Reply message with the
221 * number of bytes consumed in each segment.
222 *
223 * Assumptions:
224 * - Reply can always fit in the provided Reply chunk
225 */
226static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
227 unsigned int consumed)
228{
229 __be32 *p;
230
231 /* Find the Reply chunk in the Reply's xprt header.
232 * RPC-over-RDMA V1 replies never have a Read list.
233 */
234 p = rdma_resp + rpcrdma_fixed_maxsz + 1;
235
236 /* Skip past Write list */
237 while (*p++ != xdr_zero)
238 p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
239
240 xdr_encode_write_chunk(p, rp_ch, consumed);
241}
242
58int svc_rdma_map_xdr(struct svcxprt_rdma *xprt, 243int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
59 struct xdr_buf *xdr, 244 struct xdr_buf *xdr,
60 struct svc_rdma_req_map *vec, 245 struct svc_rdma_req_map *vec,
@@ -123,45 +308,14 @@ int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
123 return 0; 308 return 0;
124} 309}
125 310
126static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
127 struct xdr_buf *xdr,
128 u32 xdr_off, size_t len, int dir)
129{
130 struct page *page;
131 dma_addr_t dma_addr;
132 if (xdr_off < xdr->head[0].iov_len) {
133 /* This offset is in the head */
134 xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
135 page = virt_to_page(xdr->head[0].iov_base);
136 } else {
137 xdr_off -= xdr->head[0].iov_len;
138 if (xdr_off < xdr->page_len) {
139 /* This offset is in the page list */
140 xdr_off += xdr->page_base;
141 page = xdr->pages[xdr_off >> PAGE_SHIFT];
142 xdr_off &= ~PAGE_MASK;
143 } else {
144 /* This offset is in the tail */
145 xdr_off -= xdr->page_len;
146 xdr_off += (unsigned long)
147 xdr->tail[0].iov_base & ~PAGE_MASK;
148 page = virt_to_page(xdr->tail[0].iov_base);
149 }
150 }
151 dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
152 min_t(size_t, PAGE_SIZE, len), dir);
153 return dma_addr;
154}
155
156/* Parse the RPC Call's transport header. 311/* Parse the RPC Call's transport header.
157 */ 312 */
158static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, 313static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
159 struct rpcrdma_write_array **write, 314 __be32 **write, __be32 **reply)
160 struct rpcrdma_write_array **reply)
161{ 315{
162 __be32 *p; 316 __be32 *p;
163 317
164 p = (__be32 *)&rmsgp->rm_body.rm_chunks[0]; 318 p = rdma_argp + rpcrdma_fixed_maxsz;
165 319
166 /* Read list */ 320 /* Read list */
167 while (*p++ != xdr_zero) 321 while (*p++ != xdr_zero)
@@ -169,7 +323,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
169 323
170 /* Write list */ 324 /* Write list */
171 if (*p != xdr_zero) { 325 if (*p != xdr_zero) {
172 *write = (struct rpcrdma_write_array *)p; 326 *write = p;
173 while (*p++ != xdr_zero) 327 while (*p++ != xdr_zero)
174 p += 1 + be32_to_cpu(*p) * 4; 328 p += 1 + be32_to_cpu(*p) * 4;
175 } else { 329 } else {
@@ -179,7 +333,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
179 333
180 /* Reply chunk */ 334 /* Reply chunk */
181 if (*p != xdr_zero) 335 if (*p != xdr_zero)
182 *reply = (struct rpcrdma_write_array *)p; 336 *reply = p;
183 else 337 else
184 *reply = NULL; 338 *reply = NULL;
185} 339}
@@ -210,6 +364,32 @@ static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
210 return be32_to_cpup(p); 364 return be32_to_cpup(p);
211} 365}
212 366
367/* ib_dma_map_page() is used here because svc_rdma_dma_unmap()
368 * is used during completion to DMA-unmap this memory, and
369 * it uses ib_dma_unmap_page() exclusively.
370 */
371static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
372 struct svc_rdma_op_ctxt *ctxt,
373 unsigned int sge_no,
374 unsigned char *base,
375 unsigned int len)
376{
377 unsigned long offset = (unsigned long)base & ~PAGE_MASK;
378 struct ib_device *dev = rdma->sc_cm_id->device;
379 dma_addr_t dma_addr;
380
381 dma_addr = ib_dma_map_page(dev, virt_to_page(base),
382 offset, len, DMA_TO_DEVICE);
383 if (ib_dma_mapping_error(dev, dma_addr))
384 return -EIO;
385
386 ctxt->sge[sge_no].addr = dma_addr;
387 ctxt->sge[sge_no].length = len;
388 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
389 svc_rdma_count_mappings(rdma, ctxt);
390 return 0;
391}
392
213static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, 393static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
214 struct svc_rdma_op_ctxt *ctxt, 394 struct svc_rdma_op_ctxt *ctxt,
215 unsigned int sge_no, 395 unsigned int sge_no,
@@ -253,222 +433,73 @@ int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
253 return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len); 433 return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len);
254} 434}
255 435
256/* Assumptions: 436/* Load the xdr_buf into the ctxt's sge array, and DMA map each
257 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE 437 * element as it is added.
438 *
439 * Returns the number of sge elements loaded on success, or
440 * a negative errno on failure.
258 */ 441 */
259static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, 442static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
260 u32 rmr, u64 to, 443 struct svc_rdma_op_ctxt *ctxt,
261 u32 xdr_off, int write_len, 444 struct xdr_buf *xdr, __be32 *wr_lst)
262 struct svc_rdma_req_map *vec)
263{ 445{
264 struct ib_rdma_wr write_wr; 446 unsigned int len, sge_no, remaining, page_off;
265 struct ib_sge *sge; 447 struct page **ppages;
266 int xdr_sge_no; 448 unsigned char *base;
267 int sge_no; 449 u32 xdr_pad;
268 int sge_bytes; 450 int ret;
269 int sge_off;
270 int bc;
271 struct svc_rdma_op_ctxt *ctxt;
272 451
273 if (vec->count > RPCSVC_MAXPAGES) { 452 sge_no = 1;
274 pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
275 return -EIO;
276 }
277 453
278 dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, " 454 ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++,
279 "write_len=%d, vec->sge=%p, vec->count=%lu\n", 455 xdr->head[0].iov_base,
280 rmr, (unsigned long long)to, xdr_off, 456 xdr->head[0].iov_len);
281 write_len, vec->sge, vec->count); 457 if (ret < 0)
458 return ret;
282 459
283 ctxt = svc_rdma_get_context(xprt); 460 /* If a Write chunk is present, the xdr_buf's page list
284 ctxt->direction = DMA_TO_DEVICE; 461 * is not included inline. However the Upper Layer may
285 sge = ctxt->sge; 462 * have added XDR padding in the tail buffer, and that
286 463 * should not be included inline.
287 /* Find the SGE associated with xdr_off */ 464 */
288 for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count; 465 if (wr_lst) {
289 xdr_sge_no++) { 466 base = xdr->tail[0].iov_base;
290 if (vec->sge[xdr_sge_no].iov_len > bc) 467 len = xdr->tail[0].iov_len;
291 break; 468 xdr_pad = xdr_padsize(xdr->page_len);
292 bc -= vec->sge[xdr_sge_no].iov_len;
293 }
294 469
295 sge_off = bc; 470 if (len && xdr_pad) {
296 bc = write_len; 471 base += xdr_pad;
297 sge_no = 0; 472 len -= xdr_pad;
298
299 /* Copy the remaining SGE */
300 while (bc != 0) {
301 sge_bytes = min_t(size_t,
302 bc, vec->sge[xdr_sge_no].iov_len-sge_off);
303 sge[sge_no].length = sge_bytes;
304 sge[sge_no].addr =
305 dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
306 sge_bytes, DMA_TO_DEVICE);
307 xdr_off += sge_bytes;
308 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
309 sge[sge_no].addr))
310 goto err;
311 svc_rdma_count_mappings(xprt, ctxt);
312 sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
313 ctxt->count++;
314 sge_off = 0;
315 sge_no++;
316 xdr_sge_no++;
317 if (xdr_sge_no > vec->count) {
318 pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
319 goto err;
320 } 473 }
321 bc -= sge_bytes;
322 if (sge_no == xprt->sc_max_sge)
323 break;
324 }
325
326 /* Prepare WRITE WR */
327 memset(&write_wr, 0, sizeof write_wr);
328 ctxt->cqe.done = svc_rdma_wc_write;
329 write_wr.wr.wr_cqe = &ctxt->cqe;
330 write_wr.wr.sg_list = &sge[0];
331 write_wr.wr.num_sge = sge_no;
332 write_wr.wr.opcode = IB_WR_RDMA_WRITE;
333 write_wr.wr.send_flags = IB_SEND_SIGNALED;
334 write_wr.rkey = rmr;
335 write_wr.remote_addr = to;
336
337 /* Post It */
338 atomic_inc(&rdma_stat_write);
339 if (svc_rdma_send(xprt, &write_wr.wr))
340 goto err;
341 return write_len - bc;
342 err:
343 svc_rdma_unmap_dma(ctxt);
344 svc_rdma_put_context(ctxt, 0);
345 return -EIO;
346}
347 474
348noinline 475 goto tail;
349static int send_write_chunks(struct svcxprt_rdma *xprt,
350 struct rpcrdma_write_array *wr_ary,
351 struct rpcrdma_msg *rdma_resp,
352 struct svc_rqst *rqstp,
353 struct svc_rdma_req_map *vec)
354{
355 u32 xfer_len = rqstp->rq_res.page_len;
356 int write_len;
357 u32 xdr_off;
358 int chunk_off;
359 int chunk_no;
360 int nchunks;
361 struct rpcrdma_write_array *res_ary;
362 int ret;
363
364 res_ary = (struct rpcrdma_write_array *)
365 &rdma_resp->rm_body.rm_chunks[1];
366
367 /* Write chunks start at the pagelist */
368 nchunks = be32_to_cpu(wr_ary->wc_nchunks);
369 for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
370 xfer_len && chunk_no < nchunks;
371 chunk_no++) {
372 struct rpcrdma_segment *arg_ch;
373 u64 rs_offset;
374
375 arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
376 write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
377
378 /* Prepare the response chunk given the length actually
379 * written */
380 xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
381 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
382 arg_ch->rs_handle,
383 arg_ch->rs_offset,
384 write_len);
385 chunk_off = 0;
386 while (write_len) {
387 ret = send_write(xprt, rqstp,
388 be32_to_cpu(arg_ch->rs_handle),
389 rs_offset + chunk_off,
390 xdr_off,
391 write_len,
392 vec);
393 if (ret <= 0)
394 goto out_err;
395 chunk_off += ret;
396 xdr_off += ret;
397 xfer_len -= ret;
398 write_len -= ret;
399 }
400 } 476 }
401 /* Update the req with the number of chunks actually used */
402 svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
403 477
404 return rqstp->rq_res.page_len; 478 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
479 page_off = xdr->page_base & ~PAGE_MASK;
480 remaining = xdr->page_len;
481 while (remaining) {
482 len = min_t(u32, PAGE_SIZE - page_off, remaining);
405 483
406out_err: 484 ret = svc_rdma_dma_map_page(rdma, ctxt, sge_no++,
407 pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret); 485 *ppages++, page_off, len);
408 return -EIO; 486 if (ret < 0)
409} 487 return ret;
410
411noinline
412static int send_reply_chunks(struct svcxprt_rdma *xprt,
413 struct rpcrdma_write_array *rp_ary,
414 struct rpcrdma_msg *rdma_resp,
415 struct svc_rqst *rqstp,
416 struct svc_rdma_req_map *vec)
417{
418 u32 xfer_len = rqstp->rq_res.len;
419 int write_len;
420 u32 xdr_off;
421 int chunk_no;
422 int chunk_off;
423 int nchunks;
424 struct rpcrdma_segment *ch;
425 struct rpcrdma_write_array *res_ary;
426 int ret;
427 488
428 /* XXX: need to fix when reply lists occur with read-list and or 489 remaining -= len;
429 * write-list */ 490 page_off = 0;
430 res_ary = (struct rpcrdma_write_array *)
431 &rdma_resp->rm_body.rm_chunks[2];
432
433 /* xdr offset starts at RPC message */
434 nchunks = be32_to_cpu(rp_ary->wc_nchunks);
435 for (xdr_off = 0, chunk_no = 0;
436 xfer_len && chunk_no < nchunks;
437 chunk_no++) {
438 u64 rs_offset;
439 ch = &rp_ary->wc_array[chunk_no].wc_target;
440 write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
441
442 /* Prepare the reply chunk given the length actually
443 * written */
444 xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
445 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
446 ch->rs_handle, ch->rs_offset,
447 write_len);
448 chunk_off = 0;
449 while (write_len) {
450 ret = send_write(xprt, rqstp,
451 be32_to_cpu(ch->rs_handle),
452 rs_offset + chunk_off,
453 xdr_off,
454 write_len,
455 vec);
456 if (ret <= 0)
457 goto out_err;
458 chunk_off += ret;
459 xdr_off += ret;
460 xfer_len -= ret;
461 write_len -= ret;
462 }
463 } 491 }
464 /* Update the req with the number of chunks actually used */
465 svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
466 492
467 return rqstp->rq_res.len; 493 base = xdr->tail[0].iov_base;
494 len = xdr->tail[0].iov_len;
495tail:
496 if (len) {
497 ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, base, len);
498 if (ret < 0)
499 return ret;
500 }
468 501
469out_err: 502 return sge_no - 1;
470 pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
471 return -EIO;
472} 503}
473 504
474/* The svc_rqst and all resources it owns are released as soon as 505/* The svc_rqst and all resources it owns are released as soon as
@@ -525,90 +556,66 @@ int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
525 return svc_rdma_send(rdma, send_wr); 556 return svc_rdma_send(rdma, send_wr);
526} 557}
527 558
528/* This function prepares the portion of the RPCRDMA message to be 559/* Prepare the portion of the RPC Reply that will be transmitted
529 * sent in the RDMA_SEND. This function is called after data sent via 560 * via RDMA Send. The RPC-over-RDMA transport header is prepared
530 * RDMA has already been transmitted. There are three cases: 561 * in sge[0], and the RPC xdr_buf is prepared in following sges.
531 * - The RPCRDMA header, RPC header, and payload are all sent in a 562 *
532 * single RDMA_SEND. This is the "inline" case. 563 * Depending on whether a Write list or Reply chunk is present,
533 * - The RPCRDMA header and some portion of the RPC header and data 564 * the server may send all, a portion of, or none of the xdr_buf.
534 * are sent via this RDMA_SEND and another portion of the data is 565 * In the latter case, only the transport header (sge[0]) is
535 * sent via RDMA. 566 * transmitted.
536 * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC 567 *
537 * header and data are all transmitted via RDMA. 568 * RDMA Send is the last step of transmitting an RPC reply. Pages
538 * In all three cases, this function prepares the RPCRDMA header in 569 * involved in the earlier RDMA Writes are here transferred out
539 * sge[0], the 'type' parameter indicates the type to place in the 570 * of the rqstp and into the ctxt's page array. These pages are
540 * RPCRDMA header, and the 'byte_count' field indicates how much of 571 * DMA unmapped by each Write completion, but the subsequent Send
541 * the XDR to include in this RDMA_SEND. NB: The offset of the payload 572 * completion finally releases these pages.
542 * to send is zero in the XDR. 573 *
574 * Assumptions:
575 * - The Reply's transport header will never be larger than a page.
543 */ 576 */
544static int send_reply(struct svcxprt_rdma *rdma, 577static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
545 struct svc_rqst *rqstp, 578 __be32 *rdma_argp, __be32 *rdma_resp,
546 struct page *page, 579 struct svc_rqst *rqstp,
547 struct rpcrdma_msg *rdma_resp, 580 __be32 *wr_lst, __be32 *rp_ch)
548 struct svc_rdma_req_map *vec,
549 int byte_count,
550 u32 inv_rkey)
551{ 581{
552 struct svc_rdma_op_ctxt *ctxt; 582 struct svc_rdma_op_ctxt *ctxt;
553 u32 xdr_off; 583 u32 inv_rkey;
554 int sge_no; 584 int ret;
555 int sge_bytes; 585
556 int ret = -EIO; 586 dprintk("svcrdma: sending %s reply: head=%zu, pagelen=%u, tail=%zu\n",
587 (rp_ch ? "RDMA_NOMSG" : "RDMA_MSG"),
588 rqstp->rq_res.head[0].iov_len,
589 rqstp->rq_res.page_len,
590 rqstp->rq_res.tail[0].iov_len);
557 591
558 /* Prepare the context */
559 ctxt = svc_rdma_get_context(rdma); 592 ctxt = svc_rdma_get_context(rdma);
560 ctxt->direction = DMA_TO_DEVICE;
561 ctxt->pages[0] = page;
562 ctxt->count = 1;
563 593
564 /* Prepare the SGE for the RPCRDMA Header */ 594 ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp,
565 ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey; 595 svc_rdma_reply_hdr_len(rdma_resp));
566 ctxt->sge[0].length = 596 if (ret < 0)
567 svc_rdma_xdr_get_reply_hdr_len((__be32 *)rdma_resp);
568 ctxt->sge[0].addr =
569 ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
570 ctxt->sge[0].length, DMA_TO_DEVICE);
571 if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
572 goto err; 597 goto err;
573 svc_rdma_count_mappings(rdma, ctxt);
574
575 ctxt->direction = DMA_TO_DEVICE;
576 598
577 /* Map the payload indicated by 'byte_count' */ 599 if (!rp_ch) {
578 xdr_off = 0; 600 ret = svc_rdma_map_reply_msg(rdma, ctxt,
579 for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) { 601 &rqstp->rq_res, wr_lst);
580 sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count); 602 if (ret < 0)
581 byte_count -= sge_bytes;
582 ctxt->sge[sge_no].addr =
583 dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
584 sge_bytes, DMA_TO_DEVICE);
585 xdr_off += sge_bytes;
586 if (ib_dma_mapping_error(rdma->sc_cm_id->device,
587 ctxt->sge[sge_no].addr))
588 goto err; 603 goto err;
589 svc_rdma_count_mappings(rdma, ctxt);
590 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
591 ctxt->sge[sge_no].length = sge_bytes;
592 }
593 if (byte_count != 0) {
594 pr_err("svcrdma: Could not map %d bytes\n", byte_count);
595 goto err;
596 } 604 }
597 605
598 svc_rdma_save_io_pages(rqstp, ctxt); 606 svc_rdma_save_io_pages(rqstp, ctxt);
599 607
600 if (sge_no > rdma->sc_max_sge) { 608 inv_rkey = 0;
601 pr_err("svcrdma: Too many sges (%d)\n", sge_no); 609 if (rdma->sc_snd_w_inv)
602 goto err; 610 inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch);
603 } 611 ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, inv_rkey);
604
605 ret = svc_rdma_post_send_wr(rdma, ctxt, sge_no, inv_rkey);
606 if (ret) 612 if (ret)
607 goto err; 613 goto err;
608 614
609 return 0; 615 return 0;
610 616
611 err: 617err:
618 pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
612 svc_rdma_unmap_dma(ctxt); 619 svc_rdma_unmap_dma(ctxt);
613 svc_rdma_put_context(ctxt, 1); 620 svc_rdma_put_context(ctxt, 1);
614 return ret; 621 return ret;
@@ -618,41 +625,36 @@ void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
618{ 625{
619} 626}
620 627
628/**
629 * svc_rdma_sendto - Transmit an RPC reply
630 * @rqstp: processed RPC request, reply XDR already in ::rq_res
631 *
632 * Any resources still associated with @rqstp are released upon return.
633 * If no reply message was possible, the connection is closed.
634 *
635 * Returns:
636 * %0 if an RPC reply has been successfully posted,
637 * %-ENOMEM if a resource shortage occurred (connection is lost),
638 * %-ENOTCONN if posting failed (connection is lost).
639 */
621int svc_rdma_sendto(struct svc_rqst *rqstp) 640int svc_rdma_sendto(struct svc_rqst *rqstp)
622{ 641{
623 struct svc_xprt *xprt = rqstp->rq_xprt; 642 struct svc_xprt *xprt = rqstp->rq_xprt;
624 struct svcxprt_rdma *rdma = 643 struct svcxprt_rdma *rdma =
625 container_of(xprt, struct svcxprt_rdma, sc_xprt); 644 container_of(xprt, struct svcxprt_rdma, sc_xprt);
626 struct rpcrdma_msg *rdma_argp; 645 __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
627 struct rpcrdma_msg *rdma_resp; 646 struct xdr_buf *xdr = &rqstp->rq_res;
628 struct rpcrdma_write_array *wr_ary, *rp_ary;
629 int ret;
630 int inline_bytes;
631 struct page *res_page; 647 struct page *res_page;
632 struct svc_rdma_req_map *vec; 648 int ret;
633 u32 inv_rkey;
634 __be32 *p;
635
636 dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
637 649
638 /* Get the RDMA request header. The receive logic always 650 /* Find the call's chunk lists to decide how to send the reply.
639 * places this at the start of page 0. 651 * Receive places the Call's xprt header at the start of page 0.
640 */ 652 */
641 rdma_argp = page_address(rqstp->rq_pages[0]); 653 rdma_argp = page_address(rqstp->rq_pages[0]);
642 svc_rdma_get_write_arrays(rdma_argp, &wr_ary, &rp_ary); 654 svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch);
643 655
644 inv_rkey = 0; 656 dprintk("svcrdma: preparing response for XID 0x%08x\n",
645 if (rdma->sc_snd_w_inv) 657 be32_to_cpup(rdma_argp));
646 inv_rkey = svc_rdma_get_inv_rkey(&rdma_argp->rm_xid,
647 (__be32 *)wr_ary,
648 (__be32 *)rp_ary);
649
650 /* Build an req vec for the XDR */
651 vec = svc_rdma_get_req_map(rdma);
652 ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
653 if (ret)
654 goto err0;
655 inline_bytes = rqstp->rq_res.len;
656 658
657 /* Create the RDMA response header. xprt->xpt_mutex, 659 /* Create the RDMA response header. xprt->xpt_mutex,
658 * acquired in svc_send(), serializes RPC replies. The 660 * acquired in svc_send(), serializes RPC replies. The
@@ -666,54 +668,46 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
666 goto err0; 668 goto err0;
667 rdma_resp = page_address(res_page); 669 rdma_resp = page_address(res_page);
668 670
669 p = &rdma_resp->rm_xid; 671 p = rdma_resp;
670 *p++ = rdma_argp->rm_xid; 672 *p++ = *rdma_argp;
671 *p++ = rdma_argp->rm_vers; 673 *p++ = *(rdma_argp + 1);
672 *p++ = rdma->sc_fc_credits; 674 *p++ = rdma->sc_fc_credits;
673 *p++ = rp_ary ? rdma_nomsg : rdma_msg; 675 *p++ = rp_ch ? rdma_nomsg : rdma_msg;
674 676
675 /* Start with empty chunks */ 677 /* Start with empty chunks */
676 *p++ = xdr_zero; 678 *p++ = xdr_zero;
677 *p++ = xdr_zero; 679 *p++ = xdr_zero;
678 *p = xdr_zero; 680 *p = xdr_zero;
679 681
680 /* Send any write-chunk data and build resp write-list */ 682 if (wr_lst) {
681 if (wr_ary) { 683 /* XXX: Presume the client sent only one Write chunk */
682 ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec); 684 ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr);
683 if (ret < 0) 685 if (ret < 0)
684 goto err1; 686 goto err1;
685 inline_bytes -= ret + xdr_padsize(ret); 687 svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
686 } 688 }
687 689 if (rp_ch) {
688 /* Send any reply-list data and update resp reply-list */ 690 ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
689 if (rp_ary) {
690 ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
691 if (ret < 0) 691 if (ret < 0)
692 goto err1; 692 goto err1;
693 inline_bytes -= ret; 693 svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
694 } 694 }
695 695
696 /* Post a fresh Receive buffer _before_ sending the reply */
697 ret = svc_rdma_post_recv(rdma, GFP_KERNEL); 696 ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
698 if (ret) 697 if (ret)
699 goto err1; 698 goto err1;
700 699 ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp,
701 ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec, 700 wr_lst, rp_ch);
702 inline_bytes, inv_rkey);
703 if (ret < 0) 701 if (ret < 0)
704 goto err0; 702 goto err0;
705 703 return 0;
706 svc_rdma_put_req_map(rdma, vec);
707 dprintk("svcrdma: send_reply returns %d\n", ret);
708 return ret;
709 704
710 err1: 705 err1:
711 put_page(res_page); 706 put_page(res_page);
712 err0: 707 err0:
713 svc_rdma_put_req_map(rdma, vec);
714 pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n", 708 pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n",
715 ret); 709 ret);
716 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 710 set_bit(XPT_CLOSE, &xprt->xpt_flags);
717 return -ENOTCONN; 711 return -ENOTCONN;
718} 712}
719 713
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index b25c50992a95..237c377c1e06 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -1053,6 +1053,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
1053 memset(&qp_attr, 0, sizeof qp_attr); 1053 memset(&qp_attr, 0, sizeof qp_attr);
1054 qp_attr.event_handler = qp_event_handler; 1054 qp_attr.event_handler = qp_event_handler;
1055 qp_attr.qp_context = &newxprt->sc_xprt; 1055 qp_attr.qp_context = &newxprt->sc_xprt;
1056 qp_attr.port_num = newxprt->sc_cm_id->port_num;
1057 qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests;
1056 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; 1058 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
1057 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; 1059 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
1058 qp_attr.cap.max_send_sge = newxprt->sc_max_sge; 1060 qp_attr.cap.max_send_sge = newxprt->sc_max_sge;