aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
diff options
context:
space:
mode:
authorTom Tucker <tom@opengridcomputing.com>2008-08-12 16:12:10 -0400
committerTom Tucker <tom@opengridcomputing.com>2008-10-06 15:46:01 -0400
commit146b6df6a537939570c5772ebd7db826fdbd5d82 (patch)
tree1acf55f3244719e543fb5f6eed6de11c5dd3110e /net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
parent5b180a9a64ca2217a658bd515ef910eafefc5e5a (diff)
svcrdma: Modify the RPC recv path to use FRMR when available
RPCRDMA requests that specify a read-list are fetched with RDMA_READ. Using an FRMR to map the data sink improves NFSRDMA security on transports that place the RDMA_READ data sink LKEY on the wire because the valid lifetime of the MR is only the duration of the RDMA_READ. The LKEY is invalidated when the last RDMA_READ WR completes. Mapping the data sink also allows for very large amounts to data to be fetched with a single WR, so if the client is also using FRMR, the entire RPC read-list can be fetched with a single WR. Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
Diffstat (limited to 'net/sunrpc/xprtrdma/svc_rdma_recvfrom.c')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c187
1 files changed, 166 insertions, 21 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 74de31a0661..a4756576d68 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -116,7 +116,7 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
116 * 116 *
117 * Assumptions: 117 * Assumptions:
118 * - chunk[0]->position points to pages[0] at an offset of 0 118 * - chunk[0]->position points to pages[0] at an offset of 0
119 * - pages[] is not physically or virtually contigous and consists of 119 * - pages[] is not physically or virtually contiguous and consists of
120 * PAGE_SIZE elements. 120 * PAGE_SIZE elements.
121 * 121 *
122 * Output: 122 * Output:
@@ -125,7 +125,7 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
125 * chunk in the read list 125 * chunk in the read list
126 * 126 *
127 */ 127 */
128static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt, 128static int map_read_chunks(struct svcxprt_rdma *xprt,
129 struct svc_rqst *rqstp, 129 struct svc_rqst *rqstp,
130 struct svc_rdma_op_ctxt *head, 130 struct svc_rdma_op_ctxt *head,
131 struct rpcrdma_msg *rmsgp, 131 struct rpcrdma_msg *rmsgp,
@@ -211,26 +211,128 @@ static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
211 return sge_no; 211 return sge_no;
212} 212}
213 213
214static void rdma_set_ctxt_sge(struct svcxprt_rdma *xprt, 214/* Map a read-chunk-list to an XDR and fast register the page-list.
215 struct svc_rdma_op_ctxt *ctxt, 215 *
216 struct kvec *vec, 216 * Assumptions:
217 u64 *sgl_offset, 217 * - chunk[0] position points to pages[0] at an offset of 0
218 int count) 218 * - pages[] will be made physically contiguous by creating a one-off memory
219 * region using the fastreg verb.
220 * - byte_count is # of bytes in read-chunk-list
221 * - ch_count is # of chunks in read-chunk-list
222 *
223 * Output:
224 * - sge array pointing into pages[] array.
225 * - chunk_sge array specifying sge index and count for each
226 * chunk in the read list
227 */
228static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
229 struct svc_rqst *rqstp,
230 struct svc_rdma_op_ctxt *head,
231 struct rpcrdma_msg *rmsgp,
232 struct svc_rdma_req_map *rpl_map,
233 struct svc_rdma_req_map *chl_map,
234 int ch_count,
235 int byte_count)
236{
237 int page_no;
238 int ch_no;
239 u32 offset;
240 struct rpcrdma_read_chunk *ch;
241 struct svc_rdma_fastreg_mr *frmr;
242 int ret = 0;
243
244 frmr = svc_rdma_get_frmr(xprt);
245 if (IS_ERR(frmr))
246 return -ENOMEM;
247
248 head->frmr = frmr;
249 head->arg.head[0] = rqstp->rq_arg.head[0];
250 head->arg.tail[0] = rqstp->rq_arg.tail[0];
251 head->arg.pages = &head->pages[head->count];
252 head->hdr_count = head->count; /* save count of hdr pages */
253 head->arg.page_base = 0;
254 head->arg.page_len = byte_count;
255 head->arg.len = rqstp->rq_arg.len + byte_count;
256 head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
257
258 /* Fast register the page list */
259 frmr->kva = page_address(rqstp->rq_arg.pages[0]);
260 frmr->direction = DMA_FROM_DEVICE;
261 frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
262 frmr->map_len = byte_count;
263 frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;
264 for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
265 frmr->page_list->page_list[page_no] =
266 ib_dma_map_single(xprt->sc_cm_id->device,
267 page_address(rqstp->rq_arg.pages[page_no]),
268 PAGE_SIZE, DMA_TO_DEVICE);
269 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
270 frmr->page_list->page_list[page_no]))
271 goto fatal_err;
272 atomic_inc(&xprt->sc_dma_used);
273 head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
274 }
275 head->count += page_no;
276
277 /* rq_respages points one past arg pages */
278 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
279
280 /* Create the reply and chunk maps */
281 offset = 0;
282 ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
283 for (ch_no = 0; ch_no < ch_count; ch_no++) {
284 rpl_map->sge[ch_no].iov_base = frmr->kva + offset;
285 rpl_map->sge[ch_no].iov_len = ch->rc_target.rs_length;
286 chl_map->ch[ch_no].count = 1;
287 chl_map->ch[ch_no].start = ch_no;
288 offset += ch->rc_target.rs_length;
289 ch++;
290 }
291
292 ret = svc_rdma_fastreg(xprt, frmr);
293 if (ret)
294 goto fatal_err;
295
296 return ch_no;
297
298 fatal_err:
299 printk("svcrdma: error fast registering xdr for xprt %p", xprt);
300 svc_rdma_put_frmr(xprt, frmr);
301 return -EIO;
302}
303
304static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
305 struct svc_rdma_op_ctxt *ctxt,
306 struct svc_rdma_fastreg_mr *frmr,
307 struct kvec *vec,
308 u64 *sgl_offset,
309 int count)
219{ 310{
220 int i; 311 int i;
221 312
222 ctxt->count = count; 313 ctxt->count = count;
223 ctxt->direction = DMA_FROM_DEVICE; 314 ctxt->direction = DMA_FROM_DEVICE;
224 for (i = 0; i < count; i++) { 315 for (i = 0; i < count; i++) {
225 atomic_inc(&xprt->sc_dma_used); 316 ctxt->sge[i].length = 0; /* in case map fails */
226 ctxt->sge[i].addr = 317 if (!frmr) {
227 ib_dma_map_single(xprt->sc_cm_id->device, 318 ctxt->sge[i].addr =
228 vec[i].iov_base, vec[i].iov_len, 319 ib_dma_map_single(xprt->sc_cm_id->device,
229 DMA_FROM_DEVICE); 320 vec[i].iov_base,
321 vec[i].iov_len,
322 DMA_FROM_DEVICE);
323 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
324 ctxt->sge[i].addr))
325 return -EINVAL;
326 ctxt->sge[i].lkey = xprt->sc_dma_lkey;
327 atomic_inc(&xprt->sc_dma_used);
328 } else {
329 ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;
330 ctxt->sge[i].lkey = frmr->mr->lkey;
331 }
230 ctxt->sge[i].length = vec[i].iov_len; 332 ctxt->sge[i].length = vec[i].iov_len;
231 ctxt->sge[i].lkey = xprt->sc_phys_mr->lkey;
232 *sgl_offset = *sgl_offset + vec[i].iov_len; 333 *sgl_offset = *sgl_offset + vec[i].iov_len;
233 } 334 }
335 return 0;
234} 336}
235 337
236static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) 338static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
@@ -278,6 +380,7 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
278 struct svc_rdma_op_ctxt *hdr_ctxt) 380 struct svc_rdma_op_ctxt *hdr_ctxt)
279{ 381{
280 struct ib_send_wr read_wr; 382 struct ib_send_wr read_wr;
383 struct ib_send_wr inv_wr;
281 int err = 0; 384 int err = 0;
282 int ch_no; 385 int ch_no;
283 int ch_count; 386 int ch_count;
@@ -301,9 +404,20 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
301 svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count); 404 svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
302 if (ch_count > RPCSVC_MAXPAGES) 405 if (ch_count > RPCSVC_MAXPAGES)
303 return -EINVAL; 406 return -EINVAL;
304 sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp, 407
305 rpl_map, chl_map, 408 if (!xprt->sc_frmr_pg_list_len)
306 ch_count, byte_count); 409 sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
410 rpl_map, chl_map, ch_count,
411 byte_count);
412 else
413 sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
414 rpl_map, chl_map, ch_count,
415 byte_count);
416 if (sge_count < 0) {
417 err = -EIO;
418 goto out;
419 }
420
307 sgl_offset = 0; 421 sgl_offset = 0;
308 ch_no = 0; 422 ch_no = 0;
309 423
@@ -312,13 +426,16 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
312next_sge: 426next_sge:
313 ctxt = svc_rdma_get_context(xprt); 427 ctxt = svc_rdma_get_context(xprt);
314 ctxt->direction = DMA_FROM_DEVICE; 428 ctxt->direction = DMA_FROM_DEVICE;
429 ctxt->frmr = hdr_ctxt->frmr;
430 ctxt->read_hdr = NULL;
315 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 431 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
432 clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
316 433
317 /* Prepare READ WR */ 434 /* Prepare READ WR */
318 memset(&read_wr, 0, sizeof read_wr); 435 memset(&read_wr, 0, sizeof read_wr);
319 ctxt->wr_op = IB_WR_RDMA_READ;
320 read_wr.wr_id = (unsigned long)ctxt; 436 read_wr.wr_id = (unsigned long)ctxt;
321 read_wr.opcode = IB_WR_RDMA_READ; 437 read_wr.opcode = IB_WR_RDMA_READ;
438 ctxt->wr_op = read_wr.opcode;
322 read_wr.send_flags = IB_SEND_SIGNALED; 439 read_wr.send_flags = IB_SEND_SIGNALED;
323 read_wr.wr.rdma.rkey = ch->rc_target.rs_handle; 440 read_wr.wr.rdma.rkey = ch->rc_target.rs_handle;
324 read_wr.wr.rdma.remote_addr = 441 read_wr.wr.rdma.remote_addr =
@@ -327,10 +444,15 @@ next_sge:
327 read_wr.sg_list = ctxt->sge; 444 read_wr.sg_list = ctxt->sge;
328 read_wr.num_sge = 445 read_wr.num_sge =
329 rdma_read_max_sge(xprt, chl_map->ch[ch_no].count); 446 rdma_read_max_sge(xprt, chl_map->ch[ch_no].count);
330 rdma_set_ctxt_sge(xprt, ctxt, 447 err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,
331 &rpl_map->sge[chl_map->ch[ch_no].start], 448 &rpl_map->sge[chl_map->ch[ch_no].start],
332 &sgl_offset, 449 &sgl_offset,
333 read_wr.num_sge); 450 read_wr.num_sge);
451 if (err) {
452 svc_rdma_unmap_dma(ctxt);
453 svc_rdma_put_context(ctxt, 0);
454 goto out;
455 }
334 if (((ch+1)->rc_discrim == 0) && 456 if (((ch+1)->rc_discrim == 0) &&
335 (read_wr.num_sge == chl_map->ch[ch_no].count)) { 457 (read_wr.num_sge == chl_map->ch[ch_no].count)) {
336 /* 458 /*
@@ -339,6 +461,29 @@ next_sge:
339 * the client and the RPC needs to be enqueued. 461 * the client and the RPC needs to be enqueued.
340 */ 462 */
341 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 463 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
464 if (hdr_ctxt->frmr) {
465 set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
466 /*
467 * Invalidate the local MR used to map the data
468 * sink.
469 */
470 if (xprt->sc_dev_caps &
471 SVCRDMA_DEVCAP_READ_W_INV) {
472 read_wr.opcode =
473 IB_WR_RDMA_READ_WITH_INV;
474 ctxt->wr_op = read_wr.opcode;
475 read_wr.ex.invalidate_rkey =
476 ctxt->frmr->mr->lkey;
477 } else {
478 /* Prepare INVALIDATE WR */
479 memset(&inv_wr, 0, sizeof inv_wr);
480 inv_wr.opcode = IB_WR_LOCAL_INV;
481 inv_wr.send_flags = IB_SEND_SIGNALED;
482 inv_wr.ex.invalidate_rkey =
483 hdr_ctxt->frmr->mr->lkey;
484 read_wr.next = &inv_wr;
485 }
486 }
342 ctxt->read_hdr = hdr_ctxt; 487 ctxt->read_hdr = hdr_ctxt;
343 } 488 }
344 /* Post the read */ 489 /* Post the read */