aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTom Tucker <tom@opengridcomputing.com>2007-12-12 17:13:25 -0500
committerJ. Bruce Fields <bfields@citi.umich.edu>2008-02-01 16:42:14 -0500
commitc06b540a54ad01d2fda8cfb5d8823b9b3d8d1cb2 (patch)
treee983370ad07af62682d59084d5ae8fe1ba9d3684
parentd5b31be6823320d81570e0199acd60d3a3f75d85 (diff)
rdma: SVCRDMA sendto
This file implements the RDMA transport sendto function. A RPC reply on an RDMA transport consists of some number of RDMA_WRITE requests followed by an RDMA_SEND request. The sendto function parses the ONCRPC RDMA reply header to determine how to send the reply back to the client. The send queue is sized so as to be able to send complete replies for requests in most cases. In the event that there are not enough SQ WR slots to reply, e.g. big data, the send will block the NFSD thread. The I/O callback functions in svc_rdma_transport.c that reap WR completions wake any waiters blocked on the SQ. In general, the goal is not to block NFSD threads and the has_wspace method stall requests when the SQ is nearly full. Signed-off-by: Tom Tucker <tom@opengridcomputing.com> Acked-by: Neil Brown <neilb@suse.de> Signed-off-by: J. Bruce Fields <bfields@citi.umich.edu>
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c520
1 files changed, 520 insertions, 0 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
new file mode 100644
index 000000000000..3e321949e1dc
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -0,0 +1,520 @@
1/*
2 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
16 *
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
21 *
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 *
39 * Author: Tom Tucker <tom@opengridcomputing.com>
40 */
41
42#include <linux/sunrpc/debug.h>
43#include <linux/sunrpc/rpc_rdma.h>
44#include <linux/spinlock.h>
45#include <asm/unaligned.h>
46#include <rdma/ib_verbs.h>
47#include <rdma/rdma_cm.h>
48#include <linux/sunrpc/svc_rdma.h>
49
50#define RPCDBG_FACILITY RPCDBG_SVCXPRT
51
52/* Encode an XDR as an array of IB SGE
53 *
54 * Assumptions:
55 * - head[0] is physically contiguous.
56 * - tail[0] is physically contiguous.
57 * - pages[] is not physically or virtually contigous and consists of
58 * PAGE_SIZE elements.
59 *
60 * Output:
61 * SGE[0] reserved for RCPRDMA header
62 * SGE[1] data from xdr->head[]
63 * SGE[2..sge_count-2] data from xdr->pages[]
64 * SGE[sge_count-1] data from xdr->tail.
65 *
66 */
67static struct ib_sge *xdr_to_sge(struct svcxprt_rdma *xprt,
68 struct xdr_buf *xdr,
69 struct ib_sge *sge,
70 int *sge_count)
71{
72 /* Max we need is the length of the XDR / pagesize + one for
73 * head + one for tail + one for RPCRDMA header
74 */
75 int sge_max = (xdr->len+PAGE_SIZE-1) / PAGE_SIZE + 3;
76 int sge_no;
77 u32 byte_count = xdr->len;
78 u32 sge_bytes;
79 u32 page_bytes;
80 int page_off;
81 int page_no;
82
83 /* Skip the first sge, this is for the RPCRDMA header */
84 sge_no = 1;
85
86 /* Head SGE */
87 sge[sge_no].addr = ib_dma_map_single(xprt->sc_cm_id->device,
88 xdr->head[0].iov_base,
89 xdr->head[0].iov_len,
90 DMA_TO_DEVICE);
91 sge_bytes = min_t(u32, byte_count, xdr->head[0].iov_len);
92 byte_count -= sge_bytes;
93 sge[sge_no].length = sge_bytes;
94 sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
95 sge_no++;
96
97 /* pages SGE */
98 page_no = 0;
99 page_bytes = xdr->page_len;
100 page_off = xdr->page_base;
101 while (byte_count && page_bytes) {
102 sge_bytes = min_t(u32, byte_count, (PAGE_SIZE-page_off));
103 sge[sge_no].addr =
104 ib_dma_map_page(xprt->sc_cm_id->device,
105 xdr->pages[page_no], page_off,
106 sge_bytes, DMA_TO_DEVICE);
107 sge_bytes = min(sge_bytes, page_bytes);
108 byte_count -= sge_bytes;
109 page_bytes -= sge_bytes;
110 sge[sge_no].length = sge_bytes;
111 sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
112
113 sge_no++;
114 page_no++;
115 page_off = 0; /* reset for next time through loop */
116 }
117
118 /* Tail SGE */
119 if (byte_count && xdr->tail[0].iov_len) {
120 sge[sge_no].addr =
121 ib_dma_map_single(xprt->sc_cm_id->device,
122 xdr->tail[0].iov_base,
123 xdr->tail[0].iov_len,
124 DMA_TO_DEVICE);
125 sge_bytes = min_t(u32, byte_count, xdr->tail[0].iov_len);
126 byte_count -= sge_bytes;
127 sge[sge_no].length = sge_bytes;
128 sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
129 sge_no++;
130 }
131
132 BUG_ON(sge_no > sge_max);
133 BUG_ON(byte_count != 0);
134
135 *sge_count = sge_no;
136 return sge;
137}
138
139
140/* Assumptions:
141 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
142 */
143static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
144 u32 rmr, u64 to,
145 u32 xdr_off, int write_len,
146 struct ib_sge *xdr_sge, int sge_count)
147{
148 struct svc_rdma_op_ctxt *tmp_sge_ctxt;
149 struct ib_send_wr write_wr;
150 struct ib_sge *sge;
151 int xdr_sge_no;
152 int sge_no;
153 int sge_bytes;
154 int sge_off;
155 int bc;
156 struct svc_rdma_op_ctxt *ctxt;
157 int ret = 0;
158
159 BUG_ON(sge_count >= 32);
160 dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
161 "write_len=%d, xdr_sge=%p, sge_count=%d\n",
162 rmr, to, xdr_off, write_len, xdr_sge, sge_count);
163
164 ctxt = svc_rdma_get_context(xprt);
165 ctxt->count = 0;
166 tmp_sge_ctxt = svc_rdma_get_context(xprt);
167 sge = tmp_sge_ctxt->sge;
168
169 /* Find the SGE associated with xdr_off */
170 for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < sge_count;
171 xdr_sge_no++) {
172 if (xdr_sge[xdr_sge_no].length > bc)
173 break;
174 bc -= xdr_sge[xdr_sge_no].length;
175 }
176
177 sge_off = bc;
178 bc = write_len;
179 sge_no = 0;
180
181 /* Copy the remaining SGE */
182 while (bc != 0 && xdr_sge_no < sge_count) {
183 sge[sge_no].addr = xdr_sge[xdr_sge_no].addr + sge_off;
184 sge[sge_no].lkey = xdr_sge[xdr_sge_no].lkey;
185 sge_bytes = min((size_t)bc,
186 (size_t)(xdr_sge[xdr_sge_no].length-sge_off));
187 sge[sge_no].length = sge_bytes;
188
189 sge_off = 0;
190 sge_no++;
191 xdr_sge_no++;
192 bc -= sge_bytes;
193 }
194
195 BUG_ON(bc != 0);
196 BUG_ON(xdr_sge_no > sge_count);
197
198 /* Prepare WRITE WR */
199 memset(&write_wr, 0, sizeof write_wr);
200 ctxt->wr_op = IB_WR_RDMA_WRITE;
201 write_wr.wr_id = (unsigned long)ctxt;
202 write_wr.sg_list = &sge[0];
203 write_wr.num_sge = sge_no;
204 write_wr.opcode = IB_WR_RDMA_WRITE;
205 write_wr.send_flags = IB_SEND_SIGNALED;
206 write_wr.wr.rdma.rkey = rmr;
207 write_wr.wr.rdma.remote_addr = to;
208
209 /* Post It */
210 atomic_inc(&rdma_stat_write);
211 if (svc_rdma_send(xprt, &write_wr)) {
212 svc_rdma_put_context(ctxt, 1);
213 /* Fatal error, close transport */
214 ret = -EIO;
215 }
216 svc_rdma_put_context(tmp_sge_ctxt, 0);
217 return ret;
218}
219
220static int send_write_chunks(struct svcxprt_rdma *xprt,
221 struct rpcrdma_msg *rdma_argp,
222 struct rpcrdma_msg *rdma_resp,
223 struct svc_rqst *rqstp,
224 struct ib_sge *sge,
225 int sge_count)
226{
227 u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
228 int write_len;
229 int max_write;
230 u32 xdr_off;
231 int chunk_off;
232 int chunk_no;
233 struct rpcrdma_write_array *arg_ary;
234 struct rpcrdma_write_array *res_ary;
235 int ret;
236
237 arg_ary = svc_rdma_get_write_array(rdma_argp);
238 if (!arg_ary)
239 return 0;
240 res_ary = (struct rpcrdma_write_array *)
241 &rdma_resp->rm_body.rm_chunks[1];
242
243 max_write = xprt->sc_max_sge * PAGE_SIZE;
244
245 /* Write chunks start at the pagelist */
246 for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
247 xfer_len && chunk_no < arg_ary->wc_nchunks;
248 chunk_no++) {
249 struct rpcrdma_segment *arg_ch;
250 u64 rs_offset;
251
252 arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
253 write_len = min(xfer_len, arg_ch->rs_length);
254
255 /* Prepare the response chunk given the length actually
256 * written */
257 rs_offset = get_unaligned(&(arg_ch->rs_offset));
258 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
259 arg_ch->rs_handle,
260 rs_offset,
261 write_len);
262 chunk_off = 0;
263 while (write_len) {
264 int this_write;
265 this_write = min(write_len, max_write);
266 ret = send_write(xprt, rqstp,
267 arg_ch->rs_handle,
268 rs_offset + chunk_off,
269 xdr_off,
270 this_write,
271 sge,
272 sge_count);
273 if (ret) {
274 dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
275 ret);
276 return -EIO;
277 }
278 chunk_off += this_write;
279 xdr_off += this_write;
280 xfer_len -= this_write;
281 write_len -= this_write;
282 }
283 }
284 /* Update the req with the number of chunks actually used */
285 svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
286
287 return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
288}
289
290static int send_reply_chunks(struct svcxprt_rdma *xprt,
291 struct rpcrdma_msg *rdma_argp,
292 struct rpcrdma_msg *rdma_resp,
293 struct svc_rqst *rqstp,
294 struct ib_sge *sge,
295 int sge_count)
296{
297 u32 xfer_len = rqstp->rq_res.len;
298 int write_len;
299 int max_write;
300 u32 xdr_off;
301 int chunk_no;
302 int chunk_off;
303 struct rpcrdma_segment *ch;
304 struct rpcrdma_write_array *arg_ary;
305 struct rpcrdma_write_array *res_ary;
306 int ret;
307
308 arg_ary = svc_rdma_get_reply_array(rdma_argp);
309 if (!arg_ary)
310 return 0;
311 /* XXX: need to fix when reply lists occur with read-list and or
312 * write-list */
313 res_ary = (struct rpcrdma_write_array *)
314 &rdma_resp->rm_body.rm_chunks[2];
315
316 max_write = xprt->sc_max_sge * PAGE_SIZE;
317
318 /* xdr offset starts at RPC message */
319 for (xdr_off = 0, chunk_no = 0;
320 xfer_len && chunk_no < arg_ary->wc_nchunks;
321 chunk_no++) {
322 u64 rs_offset;
323 ch = &arg_ary->wc_array[chunk_no].wc_target;
324 write_len = min(xfer_len, ch->rs_length);
325
326
327 /* Prepare the reply chunk given the length actually
328 * written */
329 rs_offset = get_unaligned(&(ch->rs_offset));
330 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
331 ch->rs_handle, rs_offset,
332 write_len);
333 chunk_off = 0;
334 while (write_len) {
335 int this_write;
336
337 this_write = min(write_len, max_write);
338 ret = send_write(xprt, rqstp,
339 ch->rs_handle,
340 rs_offset + chunk_off,
341 xdr_off,
342 this_write,
343 sge,
344 sge_count);
345 if (ret) {
346 dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
347 ret);
348 return -EIO;
349 }
350 chunk_off += this_write;
351 xdr_off += this_write;
352 xfer_len -= this_write;
353 write_len -= this_write;
354 }
355 }
356 /* Update the req with the number of chunks actually used */
357 svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
358
359 return rqstp->rq_res.len;
360}
361
362/* This function prepares the portion of the RPCRDMA message to be
363 * sent in the RDMA_SEND. This function is called after data sent via
364 * RDMA has already been transmitted. There are three cases:
365 * - The RPCRDMA header, RPC header, and payload are all sent in a
366 * single RDMA_SEND. This is the "inline" case.
367 * - The RPCRDMA header and some portion of the RPC header and data
368 * are sent via this RDMA_SEND and another portion of the data is
369 * sent via RDMA.
370 * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
371 * header and data are all transmitted via RDMA.
372 * In all three cases, this function prepares the RPCRDMA header in
373 * sge[0], the 'type' parameter indicates the type to place in the
374 * RPCRDMA header, and the 'byte_count' field indicates how much of
375 * the XDR to include in this RDMA_SEND.
376 */
377static int send_reply(struct svcxprt_rdma *rdma,
378 struct svc_rqst *rqstp,
379 struct page *page,
380 struct rpcrdma_msg *rdma_resp,
381 struct svc_rdma_op_ctxt *ctxt,
382 int sge_count,
383 int byte_count)
384{
385 struct ib_send_wr send_wr;
386 int sge_no;
387 int sge_bytes;
388 int page_no;
389 int ret;
390
391 /* Prepare the context */
392 ctxt->pages[0] = page;
393 ctxt->count = 1;
394
395 /* Prepare the SGE for the RPCRDMA Header */
396 ctxt->sge[0].addr =
397 ib_dma_map_page(rdma->sc_cm_id->device,
398 page, 0, PAGE_SIZE, DMA_TO_DEVICE);
399 ctxt->direction = DMA_TO_DEVICE;
400 ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
401 ctxt->sge[0].lkey = rdma->sc_phys_mr->lkey;
402
403 /* Determine how many of our SGE are to be transmitted */
404 for (sge_no = 1; byte_count && sge_no < sge_count; sge_no++) {
405 sge_bytes = min((size_t)ctxt->sge[sge_no].length,
406 (size_t)byte_count);
407 byte_count -= sge_bytes;
408 }
409 BUG_ON(byte_count != 0);
410
411 /* Save all respages in the ctxt and remove them from the
412 * respages array. They are our pages until the I/O
413 * completes.
414 */
415 for (page_no = 0; page_no < rqstp->rq_resused; page_no++) {
416 ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
417 ctxt->count++;
418 rqstp->rq_respages[page_no] = NULL;
419 }
420
421 BUG_ON(sge_no > rdma->sc_max_sge);
422 memset(&send_wr, 0, sizeof send_wr);
423 ctxt->wr_op = IB_WR_SEND;
424 send_wr.wr_id = (unsigned long)ctxt;
425 send_wr.sg_list = ctxt->sge;
426 send_wr.num_sge = sge_no;
427 send_wr.opcode = IB_WR_SEND;
428 send_wr.send_flags = IB_SEND_SIGNALED;
429
430 ret = svc_rdma_send(rdma, &send_wr);
431 if (ret)
432 svc_rdma_put_context(ctxt, 1);
433
434 return ret;
435}
436
437void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
438{
439}
440
441/*
442 * Return the start of an xdr buffer.
443 */
444static void *xdr_start(struct xdr_buf *xdr)
445{
446 return xdr->head[0].iov_base -
447 (xdr->len -
448 xdr->page_len -
449 xdr->tail[0].iov_len -
450 xdr->head[0].iov_len);
451}
452
453int svc_rdma_sendto(struct svc_rqst *rqstp)
454{
455 struct svc_xprt *xprt = rqstp->rq_xprt;
456 struct svcxprt_rdma *rdma =
457 container_of(xprt, struct svcxprt_rdma, sc_xprt);
458 struct rpcrdma_msg *rdma_argp;
459 struct rpcrdma_msg *rdma_resp;
460 struct rpcrdma_write_array *reply_ary;
461 enum rpcrdma_proc reply_type;
462 int ret;
463 int inline_bytes;
464 struct ib_sge *sge;
465 int sge_count = 0;
466 struct page *res_page;
467 struct svc_rdma_op_ctxt *ctxt;
468
469 dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
470
471 /* Get the RDMA request header. */
472 rdma_argp = xdr_start(&rqstp->rq_arg);
473
474 /* Build an SGE for the XDR */
475 ctxt = svc_rdma_get_context(rdma);
476 ctxt->direction = DMA_TO_DEVICE;
477 sge = xdr_to_sge(rdma, &rqstp->rq_res, ctxt->sge, &sge_count);
478
479 inline_bytes = rqstp->rq_res.len;
480
481 /* Create the RDMA response header */
482 res_page = svc_rdma_get_page();
483 rdma_resp = page_address(res_page);
484 reply_ary = svc_rdma_get_reply_array(rdma_argp);
485 if (reply_ary)
486 reply_type = RDMA_NOMSG;
487 else
488 reply_type = RDMA_MSG;
489 svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
490 rdma_resp, reply_type);
491
492 /* Send any write-chunk data and build resp write-list */
493 ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
494 rqstp, sge, sge_count);
495 if (ret < 0) {
496 printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
497 ret);
498 goto error;
499 }
500 inline_bytes -= ret;
501
502 /* Send any reply-list data and update resp reply-list */
503 ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
504 rqstp, sge, sge_count);
505 if (ret < 0) {
506 printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
507 ret);
508 goto error;
509 }
510 inline_bytes -= ret;
511
512 ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, sge_count,
513 inline_bytes);
514 dprintk("svcrdma: send_reply returns %d\n", ret);
515 return ret;
516 error:
517 svc_rdma_put_context(ctxt, 0);
518 put_page(res_page);
519 return ret;
520}