aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
authorTom Tucker <tom@opengridcomputing.com>2007-12-12 17:13:23 -0500
committerJ. Bruce Fields <bfields@citi.umich.edu>2008-02-01 16:42:14 -0500
commitd5b31be6823320d81570e0199acd60d3a3f75d85 (patch)
tree7b8bee749a3c1b637c6d8e88ee73c802cb3698cd /net/sunrpc/xprtrdma
parent377f9b2f4529e0ac702fd7b91e216afd0adc959e (diff)
rdma: SVCRDMA recvfrom
This file implements the RDMA transport recvfrom function. The function dequeues work reqeust completion contexts from an I/O list that it shares with the I/O tasklet in svc_rdma_transport.c. For ONCRPC RDMA, an RPC may not be complete when it is received. Instead, the RDMA header that precedes the RPC message informs the transport where to get the RPC data from on the client and where to place it in the RPC message before it is delivered to the server. The svc_rdma_recvfrom function therefore, parses this RDMA header and issues any necessary RDMA operations to fetch the remainder of the RPC from the client. Special handling is required when the request involves an RDMA_READ. In this case, recvfrom submits the RDMA_READ requests to the underlying transport driver and then returns 0. When the transport completes the last RDMA_READ for the request, it enqueues it on a read completion queue and enqueues the transport. The recvfrom code favors this queue over the regular DTO queue when satisfying reads. Signed-off-by: Tom Tucker <tom@opengridcomputing.com> Acked-by: Neil Brown <neilb@suse.de> Signed-off-by: J. Bruce Fields <bfields@citi.umich.edu>
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c586
1 files changed, 586 insertions, 0 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
new file mode 100644
index 000000000000..ab54a736486e
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -0,0 +1,586 @@
1/*
2 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
16 *
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
21 *
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 *
39 * Author: Tom Tucker <tom@opengridcomputing.com>
40 */
41
42#include <linux/sunrpc/debug.h>
43#include <linux/sunrpc/rpc_rdma.h>
44#include <linux/spinlock.h>
45#include <asm/unaligned.h>
46#include <rdma/ib_verbs.h>
47#include <rdma/rdma_cm.h>
48#include <linux/sunrpc/svc_rdma.h>
49
50#define RPCDBG_FACILITY RPCDBG_SVCXPRT
51
52/*
53 * Replace the pages in the rq_argpages array with the pages from the SGE in
54 * the RDMA_RECV completion. The SGL should contain full pages up until the
55 * last one.
56 */
57static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
58 struct svc_rdma_op_ctxt *ctxt,
59 u32 byte_count)
60{
61 struct page *page;
62 u32 bc;
63 int sge_no;
64
65 /* Swap the page in the SGE with the page in argpages */
66 page = ctxt->pages[0];
67 put_page(rqstp->rq_pages[0]);
68 rqstp->rq_pages[0] = page;
69
70 /* Set up the XDR head */
71 rqstp->rq_arg.head[0].iov_base = page_address(page);
72 rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);
73 rqstp->rq_arg.len = byte_count;
74 rqstp->rq_arg.buflen = byte_count;
75
76 /* Compute bytes past head in the SGL */
77 bc = byte_count - rqstp->rq_arg.head[0].iov_len;
78
79 /* If data remains, store it in the pagelist */
80 rqstp->rq_arg.page_len = bc;
81 rqstp->rq_arg.page_base = 0;
82 rqstp->rq_arg.pages = &rqstp->rq_pages[1];
83 sge_no = 1;
84 while (bc && sge_no < ctxt->count) {
85 page = ctxt->pages[sge_no];
86 put_page(rqstp->rq_pages[sge_no]);
87 rqstp->rq_pages[sge_no] = page;
88 bc -= min(bc, ctxt->sge[sge_no].length);
89 rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
90 sge_no++;
91 }
92 rqstp->rq_respages = &rqstp->rq_pages[sge_no];
93
94 /* We should never run out of SGE because the limit is defined to
95 * support the max allowed RPC data length
96 */
97 BUG_ON(bc && (sge_no == ctxt->count));
98 BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)
99 != byte_count);
100 BUG_ON(rqstp->rq_arg.len != byte_count);
101
102 /* If not all pages were used from the SGL, free the remaining ones */
103 bc = sge_no;
104 while (sge_no < ctxt->count) {
105 page = ctxt->pages[sge_no++];
106 put_page(page);
107 }
108 ctxt->count = bc;
109
110 /* Set up tail */
111 rqstp->rq_arg.tail[0].iov_base = NULL;
112 rqstp->rq_arg.tail[0].iov_len = 0;
113}
114
115struct chunk_sge {
116 int start; /* sge no for this chunk */
117 int count; /* sge count for this chunk */
118};
119
120/* Encode a read-chunk-list as an array of IB SGE
121 *
122 * Assumptions:
123 * - chunk[0]->position points to pages[0] at an offset of 0
124 * - pages[] is not physically or virtually contigous and consists of
125 * PAGE_SIZE elements.
126 *
127 * Output:
128 * - sge array pointing into pages[] array.
129 * - chunk_sge array specifying sge index and count for each
130 * chunk in the read list
131 *
132 */
133static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
134 struct svc_rqst *rqstp,
135 struct svc_rdma_op_ctxt *head,
136 struct rpcrdma_msg *rmsgp,
137 struct ib_sge *sge,
138 struct chunk_sge *ch_sge_ary,
139 int ch_count,
140 int byte_count)
141{
142 int sge_no;
143 int sge_bytes;
144 int page_off;
145 int page_no;
146 int ch_bytes;
147 int ch_no;
148 struct rpcrdma_read_chunk *ch;
149
150 sge_no = 0;
151 page_no = 0;
152 page_off = 0;
153 ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
154 ch_no = 0;
155 ch_bytes = ch->rc_target.rs_length;
156 head->arg.head[0] = rqstp->rq_arg.head[0];
157 head->arg.tail[0] = rqstp->rq_arg.tail[0];
158 head->arg.pages = &head->pages[head->count];
159 head->sge[0].length = head->count; /* save count of hdr pages */
160 head->arg.page_base = 0;
161 head->arg.page_len = ch_bytes;
162 head->arg.len = rqstp->rq_arg.len + ch_bytes;
163 head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
164 head->count++;
165 ch_sge_ary[0].start = 0;
166 while (byte_count) {
167 sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
168 sge[sge_no].addr =
169 ib_dma_map_page(xprt->sc_cm_id->device,
170 rqstp->rq_arg.pages[page_no],
171 page_off, sge_bytes,
172 DMA_FROM_DEVICE);
173 sge[sge_no].length = sge_bytes;
174 sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
175 /*
176 * Don't bump head->count here because the same page
177 * may be used by multiple SGE.
178 */
179 head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
180 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
181
182 byte_count -= sge_bytes;
183 ch_bytes -= sge_bytes;
184 sge_no++;
185 /*
186 * If all bytes for this chunk have been mapped to an
187 * SGE, move to the next SGE
188 */
189 if (ch_bytes == 0) {
190 ch_sge_ary[ch_no].count =
191 sge_no - ch_sge_ary[ch_no].start;
192 ch_no++;
193 ch++;
194 ch_sge_ary[ch_no].start = sge_no;
195 ch_bytes = ch->rc_target.rs_length;
196 /* If bytes remaining account for next chunk */
197 if (byte_count) {
198 head->arg.page_len += ch_bytes;
199 head->arg.len += ch_bytes;
200 head->arg.buflen += ch_bytes;
201 }
202 }
203 /*
204 * If this SGE consumed all of the page, move to the
205 * next page
206 */
207 if ((sge_bytes + page_off) == PAGE_SIZE) {
208 page_no++;
209 page_off = 0;
210 /*
211 * If there are still bytes left to map, bump
212 * the page count
213 */
214 if (byte_count)
215 head->count++;
216 } else
217 page_off += sge_bytes;
218 }
219 BUG_ON(byte_count != 0);
220 return sge_no;
221}
222
223static void rdma_set_ctxt_sge(struct svc_rdma_op_ctxt *ctxt,
224 struct ib_sge *sge,
225 u64 *sgl_offset,
226 int count)
227{
228 int i;
229
230 ctxt->count = count;
231 for (i = 0; i < count; i++) {
232 ctxt->sge[i].addr = sge[i].addr;
233 ctxt->sge[i].length = sge[i].length;
234 *sgl_offset = *sgl_offset + sge[i].length;
235 }
236}
237
238static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
239{
240#ifdef RDMA_TRANSPORT_IWARP
241 if ((RDMA_TRANSPORT_IWARP ==
242 rdma_node_get_transport(xprt->sc_cm_id->
243 device->node_type))
244 && sge_count > 1)
245 return 1;
246 else
247#endif
248 return min_t(int, sge_count, xprt->sc_max_sge);
249}
250
251/*
252 * Use RDMA_READ to read data from the advertised client buffer into the
253 * XDR stream starting at rq_arg.head[0].iov_base.
254 * Each chunk in the array
255 * contains the following fields:
256 * discrim - '1', This isn't used for data placement
257 * position - The xdr stream offset (the same for every chunk)
258 * handle - RMR for client memory region
259 * length - data transfer length
260 * offset - 64 bit tagged offset in remote memory region
261 *
262 * On our side, we need to read into a pagelist. The first page immediately
263 * follows the RPC header.
264 *
265 * This function returns 1 to indicate success. The data is not yet in
266 * the pagelist and therefore the RPC request must be deferred. The
267 * I/O completion will enqueue the transport again and
268 * svc_rdma_recvfrom will complete the request.
269 *
270 * NOTE: The ctxt must not be touched after the last WR has been posted
271 * because the I/O completion processing may occur on another
272 * processor and free / modify the context. Ne touche pas!
273 */
274static int rdma_read_xdr(struct svcxprt_rdma *xprt,
275 struct rpcrdma_msg *rmsgp,
276 struct svc_rqst *rqstp,
277 struct svc_rdma_op_ctxt *hdr_ctxt)
278{
279 struct ib_send_wr read_wr;
280 int err = 0;
281 int ch_no;
282 struct ib_sge *sge;
283 int ch_count;
284 int byte_count;
285 int sge_count;
286 u64 sgl_offset;
287 struct rpcrdma_read_chunk *ch;
288 struct svc_rdma_op_ctxt *ctxt = NULL;
289 struct svc_rdma_op_ctxt *head;
290 struct svc_rdma_op_ctxt *tmp_sge_ctxt;
291 struct svc_rdma_op_ctxt *tmp_ch_ctxt;
292 struct chunk_sge *ch_sge_ary;
293
294 /* If no read list is present, return 0 */
295 ch = svc_rdma_get_read_chunk(rmsgp);
296 if (!ch)
297 return 0;
298
299 /* Allocate temporary contexts to keep SGE */
300 BUG_ON(sizeof(struct ib_sge) < sizeof(struct chunk_sge));
301 tmp_sge_ctxt = svc_rdma_get_context(xprt);
302 sge = tmp_sge_ctxt->sge;
303 tmp_ch_ctxt = svc_rdma_get_context(xprt);
304 ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge;
305
306 svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
307 sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp,
308 sge, ch_sge_ary,
309 ch_count, byte_count);
310 head = svc_rdma_get_context(xprt);
311 sgl_offset = 0;
312 ch_no = 0;
313
314 for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
315 ch->rc_discrim != 0; ch++, ch_no++) {
316next_sge:
317 if (!ctxt)
318 ctxt = head;
319 else {
320 ctxt->next = svc_rdma_get_context(xprt);
321 ctxt = ctxt->next;
322 }
323 ctxt->next = NULL;
324 ctxt->direction = DMA_FROM_DEVICE;
325 clear_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
326 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
327 if ((ch+1)->rc_discrim == 0) {
328 /*
329 * Checked in sq_cq_reap to see if we need to
330 * be enqueued
331 */
332 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
333 ctxt->next = hdr_ctxt;
334 hdr_ctxt->next = head;
335 }
336
337 /* Prepare READ WR */
338 memset(&read_wr, 0, sizeof read_wr);
339 ctxt->wr_op = IB_WR_RDMA_READ;
340 read_wr.wr_id = (unsigned long)ctxt;
341 read_wr.opcode = IB_WR_RDMA_READ;
342 read_wr.send_flags = IB_SEND_SIGNALED;
343 read_wr.wr.rdma.rkey = ch->rc_target.rs_handle;
344 read_wr.wr.rdma.remote_addr =
345 get_unaligned(&(ch->rc_target.rs_offset)) +
346 sgl_offset;
347 read_wr.sg_list = &sge[ch_sge_ary[ch_no].start];
348 read_wr.num_sge =
349 rdma_read_max_sge(xprt, ch_sge_ary[ch_no].count);
350 rdma_set_ctxt_sge(ctxt, &sge[ch_sge_ary[ch_no].start],
351 &sgl_offset,
352 read_wr.num_sge);
353
354 /* Post the read */
355 err = svc_rdma_send(xprt, &read_wr);
356 if (err) {
357 printk(KERN_ERR "svcrdma: Error posting send = %d\n",
358 err);
359 /*
360 * Break the circular list so free knows when
361 * to stop if the error happened to occur on
362 * the last read
363 */
364 ctxt->next = NULL;
365 goto out;
366 }
367 atomic_inc(&rdma_stat_read);
368
369 if (read_wr.num_sge < ch_sge_ary[ch_no].count) {
370 ch_sge_ary[ch_no].count -= read_wr.num_sge;
371 ch_sge_ary[ch_no].start += read_wr.num_sge;
372 goto next_sge;
373 }
374 sgl_offset = 0;
375 err = 0;
376 }
377
378 out:
379 svc_rdma_put_context(tmp_sge_ctxt, 0);
380 svc_rdma_put_context(tmp_ch_ctxt, 0);
381
382 /* Detach arg pages. svc_recv will replenish them */
383 for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
384 rqstp->rq_pages[ch_no] = NULL;
385
386 /*
387 * Detach res pages. svc_release must see a resused count of
388 * zero or it will attempt to put them.
389 */
390 while (rqstp->rq_resused)
391 rqstp->rq_respages[--rqstp->rq_resused] = NULL;
392
393 if (err) {
394 printk(KERN_ERR "svcrdma : RDMA_READ error = %d\n", err);
395 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
396 /* Free the linked list of read contexts */
397 while (head != NULL) {
398 ctxt = head->next;
399 svc_rdma_put_context(head, 1);
400 head = ctxt;
401 }
402 return 0;
403 }
404
405 return 1;
406}
407
408static int rdma_read_complete(struct svc_rqst *rqstp,
409 struct svc_rdma_op_ctxt *data)
410{
411 struct svc_rdma_op_ctxt *head = data->next;
412 int page_no;
413 int ret;
414
415 BUG_ON(!head);
416
417 /* Copy RPC pages */
418 for (page_no = 0; page_no < head->count; page_no++) {
419 put_page(rqstp->rq_pages[page_no]);
420 rqstp->rq_pages[page_no] = head->pages[page_no];
421 }
422 /* Point rq_arg.pages past header */
423 rqstp->rq_arg.pages = &rqstp->rq_pages[head->sge[0].length];
424 rqstp->rq_arg.page_len = head->arg.page_len;
425 rqstp->rq_arg.page_base = head->arg.page_base;
426
427 /* rq_respages starts after the last arg page */
428 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
429 rqstp->rq_resused = 0;
430
431 /* Rebuild rq_arg head and tail. */
432 rqstp->rq_arg.head[0] = head->arg.head[0];
433 rqstp->rq_arg.tail[0] = head->arg.tail[0];
434 rqstp->rq_arg.len = head->arg.len;
435 rqstp->rq_arg.buflen = head->arg.buflen;
436
437 /* XXX: What should this be? */
438 rqstp->rq_prot = IPPROTO_MAX;
439
440 /*
441 * Free the contexts we used to build the RDMA_READ. We have
442 * to be careful here because the context list uses the same
443 * next pointer used to chain the contexts associated with the
444 * RDMA_READ
445 */
446 data->next = NULL; /* terminate circular list */
447 do {
448 data = head->next;
449 svc_rdma_put_context(head, 0);
450 head = data;
451 } while (head != NULL);
452
453 ret = rqstp->rq_arg.head[0].iov_len
454 + rqstp->rq_arg.page_len
455 + rqstp->rq_arg.tail[0].iov_len;
456 dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "
457 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
458 ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
459 rqstp->rq_arg.head[0].iov_len);
460
461 /* Indicate that we've consumed an RQ credit */
462 rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
463 svc_xprt_received(rqstp->rq_xprt);
464 return ret;
465}
466
467/*
468 * Set up the rqstp thread context to point to the RQ buffer. If
469 * necessary, pull additional data from the client with an RDMA_READ
470 * request.
471 */
472int svc_rdma_recvfrom(struct svc_rqst *rqstp)
473{
474 struct svc_xprt *xprt = rqstp->rq_xprt;
475 struct svcxprt_rdma *rdma_xprt =
476 container_of(xprt, struct svcxprt_rdma, sc_xprt);
477 struct svc_rdma_op_ctxt *ctxt = NULL;
478 struct rpcrdma_msg *rmsgp;
479 int ret = 0;
480 int len;
481
482 dprintk("svcrdma: rqstp=%p\n", rqstp);
483
484 /*
485 * The rq_xprt_ctxt indicates if we've consumed an RQ credit
486 * or not. It is used in the rdma xpo_release_rqst function to
487 * determine whether or not to return an RQ WQE to the RQ.
488 */
489 rqstp->rq_xprt_ctxt = NULL;
490
491 spin_lock_bh(&rdma_xprt->sc_read_complete_lock);
492 if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
493 ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
494 struct svc_rdma_op_ctxt,
495 dto_q);
496 list_del_init(&ctxt->dto_q);
497 }
498 spin_unlock_bh(&rdma_xprt->sc_read_complete_lock);
499 if (ctxt)
500 return rdma_read_complete(rqstp, ctxt);
501
502 spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
503 if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
504 ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
505 struct svc_rdma_op_ctxt,
506 dto_q);
507 list_del_init(&ctxt->dto_q);
508 } else {
509 atomic_inc(&rdma_stat_rq_starve);
510 clear_bit(XPT_DATA, &xprt->xpt_flags);
511 ctxt = NULL;
512 }
513 spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
514 if (!ctxt) {
515 /* This is the EAGAIN path. The svc_recv routine will
516 * return -EAGAIN, the nfsd thread will go to call into
517 * svc_recv again and we shouldn't be on the active
518 * transport list
519 */
520 if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
521 goto close_out;
522
523 BUG_ON(ret);
524 goto out;
525 }
526 dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
527 ctxt, rdma_xprt, rqstp, ctxt->wc_status);
528 BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);
529 atomic_inc(&rdma_stat_recv);
530
531 /* Build up the XDR from the receive buffers. */
532 rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
533
534 /* Decode the RDMA header. */
535 len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
536 rqstp->rq_xprt_hlen = len;
537
538 /* If the request is invalid, reply with an error */
539 if (len < 0) {
540 if (len == -ENOSYS)
541 (void)svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
542 goto close_out;
543 }
544
545 /* Read read-list data. If we would need to wait, defer
546 * it. Not that in this case, we don't return the RQ credit
547 * until after the read completes.
548 */
549 if (rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt)) {
550 svc_xprt_received(xprt);
551 return 0;
552 }
553
554 /* Indicate we've consumed an RQ credit */
555 rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
556
557 ret = rqstp->rq_arg.head[0].iov_len
558 + rqstp->rq_arg.page_len
559 + rqstp->rq_arg.tail[0].iov_len;
560 svc_rdma_put_context(ctxt, 0);
561 out:
562 dprintk("svcrdma: ret = %d, rq_arg.len =%d, "
563 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
564 ret, rqstp->rq_arg.len,
565 rqstp->rq_arg.head[0].iov_base,
566 rqstp->rq_arg.head[0].iov_len);
567 rqstp->rq_prot = IPPROTO_MAX;
568 svc_xprt_copy_addrs(rqstp, xprt);
569 svc_xprt_received(xprt);
570 return ret;
571
572 close_out:
573 if (ctxt) {
574 svc_rdma_put_context(ctxt, 1);
575 /* Indicate we've consumed an RQ credit */
576 rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
577 }
578 dprintk("svcrdma: transport %p is closing\n", xprt);
579 /*
580 * Set the close bit and enqueue it. svc_recv will see the
581 * close bit and call svc_xprt_delete
582 */
583 set_bit(XPT_CLOSE, &xprt->xpt_flags);
584 svc_xprt_received(xprt);
585 return 0;
586}