aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma/transport.c
diff options
context:
space:
mode:
author\"Talpey, Thomas\ <Thomas.Talpey@netapp.com>2007-09-10 13:50:12 -0400
committerTrond Myklebust <Trond.Myklebust@netapp.com>2007-10-09 17:18:03 -0400
commitf58851e6b0f148fb4b2a1c6f70beb2f125863c0f (patch)
tree816604d59b5de0ee19ca69b19a6f34548dbb82c5 /net/sunrpc/xprtrdma/transport.c
parent2cf7ff7a37cc149bd59c4f3bad432f686a4619c8 (diff)
RPCRDMA: rpc rdma transport switch
This implements the configuration and building of the core transport switch implementation of the rpcrdma transport. Stubs are provided for the rpcrdma protocol handling, and the infiniband/iwarp verbs interface. These are provided in following patches. Signed-off-by: Tom Talpey <talpey@netapp.com> Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'net/sunrpc/xprtrdma/transport.c')
-rw-r--r--net/sunrpc/xprtrdma/transport.c800
1 files changed, 800 insertions, 0 deletions
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
new file mode 100644
index 000000000000..dc55cc974c90
--- /dev/null
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -0,0 +1,800 @@
1/*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
16 *
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
21 *
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 */
39
40/*
41 * transport.c
42 *
43 * This file contains the top-level implementation of an RPC RDMA
44 * transport.
45 *
46 * Naming convention: functions beginning with xprt_ are part of the
47 * transport switch. All others are RPC RDMA internal.
48 */
49
50#include <linux/module.h>
51#include <linux/init.h>
52#include <linux/seq_file.h>
53
54#include "xprt_rdma.h"
55
56#ifdef RPC_DEBUG
57# define RPCDBG_FACILITY RPCDBG_TRANS
58#endif
59
60MODULE_LICENSE("Dual BSD/GPL");
61
62MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
63MODULE_AUTHOR("Network Appliance, Inc.");
64
65/*
66 * tunables
67 */
68
69static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
70static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
71static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
72static unsigned int xprt_rdma_inline_write_padding;
73#if !RPCRDMA_PERSISTENT_REGISTRATION
74static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_REGISTER; /* FMR? */
75#else
76static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_ALLPHYSICAL;
77#endif
78
79#ifdef RPC_DEBUG
80
81static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
82static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
83static unsigned int zero;
84static unsigned int max_padding = PAGE_SIZE;
85static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
86static unsigned int max_memreg = RPCRDMA_LAST - 1;
87
88static struct ctl_table_header *sunrpc_table_header;
89
90static ctl_table xr_tunables_table[] = {
91 {
92 .ctl_name = CTL_SLOTTABLE_RDMA,
93 .procname = "rdma_slot_table_entries",
94 .data = &xprt_rdma_slot_table_entries,
95 .maxlen = sizeof(unsigned int),
96 .mode = 0644,
97 .proc_handler = &proc_dointvec_minmax,
98 .strategy = &sysctl_intvec,
99 .extra1 = &min_slot_table_size,
100 .extra2 = &max_slot_table_size
101 },
102 {
103 .ctl_name = CTL_RDMA_MAXINLINEREAD,
104 .procname = "rdma_max_inline_read",
105 .data = &xprt_rdma_max_inline_read,
106 .maxlen = sizeof(unsigned int),
107 .mode = 0644,
108 .proc_handler = &proc_dointvec,
109 .strategy = &sysctl_intvec,
110 },
111 {
112 .ctl_name = CTL_RDMA_MAXINLINEWRITE,
113 .procname = "rdma_max_inline_write",
114 .data = &xprt_rdma_max_inline_write,
115 .maxlen = sizeof(unsigned int),
116 .mode = 0644,
117 .proc_handler = &proc_dointvec,
118 .strategy = &sysctl_intvec,
119 },
120 {
121 .ctl_name = CTL_RDMA_WRITEPADDING,
122 .procname = "rdma_inline_write_padding",
123 .data = &xprt_rdma_inline_write_padding,
124 .maxlen = sizeof(unsigned int),
125 .mode = 0644,
126 .proc_handler = &proc_dointvec_minmax,
127 .strategy = &sysctl_intvec,
128 .extra1 = &zero,
129 .extra2 = &max_padding,
130 },
131 {
132 .ctl_name = CTL_RDMA_MEMREG,
133 .procname = "rdma_memreg_strategy",
134 .data = &xprt_rdma_memreg_strategy,
135 .maxlen = sizeof(unsigned int),
136 .mode = 0644,
137 .proc_handler = &proc_dointvec_minmax,
138 .strategy = &sysctl_intvec,
139 .extra1 = &min_memreg,
140 .extra2 = &max_memreg,
141 },
142 {
143 .ctl_name = 0,
144 },
145};
146
147static ctl_table sunrpc_table[] = {
148 {
149 .ctl_name = CTL_SUNRPC,
150 .procname = "sunrpc",
151 .mode = 0555,
152 .child = xr_tunables_table
153 },
154 {
155 .ctl_name = 0,
156 },
157};
158
159#endif
160
161static struct rpc_xprt_ops xprt_rdma_procs; /* forward reference */
162
163static void
164xprt_rdma_format_addresses(struct rpc_xprt *xprt)
165{
166 struct sockaddr_in *addr = (struct sockaddr_in *)
167 &rpcx_to_rdmad(xprt).addr;
168 char *buf;
169
170 buf = kzalloc(20, GFP_KERNEL);
171 if (buf)
172 snprintf(buf, 20, NIPQUAD_FMT, NIPQUAD(addr->sin_addr.s_addr));
173 xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
174
175 buf = kzalloc(8, GFP_KERNEL);
176 if (buf)
177 snprintf(buf, 8, "%u", ntohs(addr->sin_port));
178 xprt->address_strings[RPC_DISPLAY_PORT] = buf;
179
180 xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
181
182 buf = kzalloc(48, GFP_KERNEL);
183 if (buf)
184 snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
185 NIPQUAD(addr->sin_addr.s_addr),
186 ntohs(addr->sin_port), "rdma");
187 xprt->address_strings[RPC_DISPLAY_ALL] = buf;
188
189 buf = kzalloc(10, GFP_KERNEL);
190 if (buf)
191 snprintf(buf, 10, "%02x%02x%02x%02x",
192 NIPQUAD(addr->sin_addr.s_addr));
193 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
194
195 buf = kzalloc(8, GFP_KERNEL);
196 if (buf)
197 snprintf(buf, 8, "%4hx", ntohs(addr->sin_port));
198 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
199
200 buf = kzalloc(30, GFP_KERNEL);
201 if (buf)
202 snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
203 NIPQUAD(addr->sin_addr.s_addr),
204 ntohs(addr->sin_port) >> 8,
205 ntohs(addr->sin_port) & 0xff);
206 xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
207
208 /* netid */
209 xprt->address_strings[RPC_DISPLAY_NETID] = "rdma";
210}
211
212static void
213xprt_rdma_free_addresses(struct rpc_xprt *xprt)
214{
215 kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
216 kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
217 kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
218 kfree(xprt->address_strings[RPC_DISPLAY_HEX_ADDR]);
219 kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
220 kfree(xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR]);
221}
222
223static void
224xprt_rdma_connect_worker(struct work_struct *work)
225{
226 struct rpcrdma_xprt *r_xprt =
227 container_of(work, struct rpcrdma_xprt, rdma_connect.work);
228 struct rpc_xprt *xprt = &r_xprt->xprt;
229 int rc = 0;
230
231 if (!xprt->shutdown) {
232 xprt_clear_connected(xprt);
233
234 dprintk("RPC: %s: %sconnect\n", __func__,
235 r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
236 rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
237 if (rc)
238 goto out;
239 }
240 goto out_clear;
241
242out:
243 xprt_wake_pending_tasks(xprt, rc);
244
245out_clear:
246 dprintk("RPC: %s: exit\n", __func__);
247 xprt_clear_connecting(xprt);
248}
249
250/*
251 * xprt_rdma_destroy
252 *
253 * Destroy the xprt.
254 * Free all memory associated with the object, including its own.
255 * NOTE: none of the *destroy methods free memory for their top-level
256 * objects, even though they may have allocated it (they do free
257 * private memory). It's up to the caller to handle it. In this
258 * case (RDMA transport), all structure memory is inlined with the
259 * struct rpcrdma_xprt.
260 */
261static void
262xprt_rdma_destroy(struct rpc_xprt *xprt)
263{
264 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
265 int rc;
266
267 dprintk("RPC: %s: called\n", __func__);
268
269 cancel_delayed_work(&r_xprt->rdma_connect);
270 flush_scheduled_work();
271
272 xprt_clear_connected(xprt);
273
274 rpcrdma_buffer_destroy(&r_xprt->rx_buf);
275 rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
276 if (rc)
277 dprintk("RPC: %s: rpcrdma_ep_destroy returned %i\n",
278 __func__, rc);
279 rpcrdma_ia_close(&r_xprt->rx_ia);
280
281 xprt_rdma_free_addresses(xprt);
282
283 kfree(xprt->slot);
284 xprt->slot = NULL;
285 kfree(xprt);
286
287 dprintk("RPC: %s: returning\n", __func__);
288
289 module_put(THIS_MODULE);
290}
291
292/**
293 * xprt_setup_rdma - Set up transport to use RDMA
294 *
295 * @args: rpc transport arguments
296 */
297static struct rpc_xprt *
298xprt_setup_rdma(struct xprt_create *args)
299{
300 struct rpcrdma_create_data_internal cdata;
301 struct rpc_xprt *xprt;
302 struct rpcrdma_xprt *new_xprt;
303 struct rpcrdma_ep *new_ep;
304 struct sockaddr_in *sin;
305 int rc;
306
307 if (args->addrlen > sizeof(xprt->addr)) {
308 dprintk("RPC: %s: address too large\n", __func__);
309 return ERR_PTR(-EBADF);
310 }
311
312 xprt = kzalloc(sizeof(struct rpcrdma_xprt), GFP_KERNEL);
313 if (xprt == NULL) {
314 dprintk("RPC: %s: couldn't allocate rpcrdma_xprt\n",
315 __func__);
316 return ERR_PTR(-ENOMEM);
317 }
318
319 xprt->max_reqs = xprt_rdma_slot_table_entries;
320 xprt->slot = kcalloc(xprt->max_reqs,
321 sizeof(struct rpc_rqst), GFP_KERNEL);
322 if (xprt->slot == NULL) {
323 kfree(xprt);
324 dprintk("RPC: %s: couldn't allocate %d slots\n",
325 __func__, xprt->max_reqs);
326 return ERR_PTR(-ENOMEM);
327 }
328
329 /* 60 second timeout, no retries */
330 xprt_set_timeout(&xprt->timeout, 0, 60UL * HZ);
331 xprt->bind_timeout = (60U * HZ);
332 xprt->connect_timeout = (60U * HZ);
333 xprt->reestablish_timeout = (5U * HZ);
334 xprt->idle_timeout = (5U * 60 * HZ);
335
336 xprt->resvport = 0; /* privileged port not needed */
337 xprt->tsh_size = 0; /* RPC-RDMA handles framing */
338 xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE;
339 xprt->ops = &xprt_rdma_procs;
340
341 /*
342 * Set up RDMA-specific connect data.
343 */
344
345 /* Put server RDMA address in local cdata */
346 memcpy(&cdata.addr, args->dstaddr, args->addrlen);
347
348 /* Ensure xprt->addr holds valid server TCP (not RDMA)
349 * address, for any side protocols which peek at it */
350 xprt->prot = IPPROTO_TCP;
351 xprt->addrlen = args->addrlen;
352 memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
353
354 sin = (struct sockaddr_in *)&cdata.addr;
355 if (ntohs(sin->sin_port) != 0)
356 xprt_set_bound(xprt);
357
358 dprintk("RPC: %s: %u.%u.%u.%u:%u\n", __func__,
359 NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
360
361 /* Set max requests */
362 cdata.max_requests = xprt->max_reqs;
363
364 /* Set some length limits */
365 cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
366 cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
367
368 cdata.inline_wsize = xprt_rdma_max_inline_write;
369 if (cdata.inline_wsize > cdata.wsize)
370 cdata.inline_wsize = cdata.wsize;
371
372 cdata.inline_rsize = xprt_rdma_max_inline_read;
373 if (cdata.inline_rsize > cdata.rsize)
374 cdata.inline_rsize = cdata.rsize;
375
376 cdata.padding = xprt_rdma_inline_write_padding;
377
378 /*
379 * Create new transport instance, which includes initialized
380 * o ia
381 * o endpoint
382 * o buffers
383 */
384
385 new_xprt = rpcx_to_rdmax(xprt);
386
387 rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
388 xprt_rdma_memreg_strategy);
389 if (rc)
390 goto out1;
391
392 /*
393 * initialize and create ep
394 */
395 new_xprt->rx_data = cdata;
396 new_ep = &new_xprt->rx_ep;
397 new_ep->rep_remote_addr = cdata.addr;
398
399 rc = rpcrdma_ep_create(&new_xprt->rx_ep,
400 &new_xprt->rx_ia, &new_xprt->rx_data);
401 if (rc)
402 goto out2;
403
404 /*
405 * Allocate pre-registered send and receive buffers for headers and
406 * any inline data. Also specify any padding which will be provided
407 * from a preregistered zero buffer.
408 */
409 rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
410 &new_xprt->rx_data);
411 if (rc)
412 goto out3;
413
414 /*
415 * Register a callback for connection events. This is necessary because
416 * connection loss notification is async. We also catch connection loss
417 * when reaping receives.
418 */
419 INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
420 new_ep->rep_func = rpcrdma_conn_func;
421 new_ep->rep_xprt = xprt;
422
423 xprt_rdma_format_addresses(xprt);
424
425 if (!try_module_get(THIS_MODULE))
426 goto out4;
427
428 return xprt;
429
430out4:
431 xprt_rdma_free_addresses(xprt);
432 rc = -EINVAL;
433out3:
434 (void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
435out2:
436 rpcrdma_ia_close(&new_xprt->rx_ia);
437out1:
438 kfree(xprt->slot);
439 kfree(xprt);
440 return ERR_PTR(rc);
441}
442
443/*
444 * Close a connection, during shutdown or timeout/reconnect
445 */
446static void
447xprt_rdma_close(struct rpc_xprt *xprt)
448{
449 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
450
451 dprintk("RPC: %s: closing\n", __func__);
452 xprt_disconnect(xprt);
453 (void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
454}
455
456static void
457xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
458{
459 struct sockaddr_in *sap;
460
461 sap = (struct sockaddr_in *)&xprt->addr;
462 sap->sin_port = htons(port);
463 sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
464 sap->sin_port = htons(port);
465 dprintk("RPC: %s: %u\n", __func__, port);
466}
467
468static void
469xprt_rdma_connect(struct rpc_task *task)
470{
471 struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt;
472 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
473
474 if (!xprt_test_and_set_connecting(xprt)) {
475 if (r_xprt->rx_ep.rep_connected != 0) {
476 /* Reconnect */
477 schedule_delayed_work(&r_xprt->rdma_connect,
478 xprt->reestablish_timeout);
479 } else {
480 schedule_delayed_work(&r_xprt->rdma_connect, 0);
481 if (!RPC_IS_ASYNC(task))
482 flush_scheduled_work();
483 }
484 }
485}
486
487static int
488xprt_rdma_reserve_xprt(struct rpc_task *task)
489{
490 struct rpc_xprt *xprt = task->tk_xprt;
491 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
492 int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
493
494 /* == RPC_CWNDSCALE @ init, but *after* setup */
495 if (r_xprt->rx_buf.rb_cwndscale == 0UL) {
496 r_xprt->rx_buf.rb_cwndscale = xprt->cwnd;
497 dprintk("RPC: %s: cwndscale %lu\n", __func__,
498 r_xprt->rx_buf.rb_cwndscale);
499 BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
500 }
501 xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
502 return xprt_reserve_xprt_cong(task);
503}
504
505/*
506 * The RDMA allocate/free functions need the task structure as a place
507 * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
508 * sequence. For this reason, the recv buffers are attached to send
509 * buffers for portions of the RPC. Note that the RPC layer allocates
510 * both send and receive buffers in the same call. We may register
511 * the receive buffer portion when using reply chunks.
512 */
513static void *
514xprt_rdma_allocate(struct rpc_task *task, size_t size)
515{
516 struct rpc_xprt *xprt = task->tk_xprt;
517 struct rpcrdma_req *req, *nreq;
518
519 req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
520 BUG_ON(NULL == req);
521
522 if (size > req->rl_size) {
523 dprintk("RPC: %s: size %zd too large for buffer[%zd]: "
524 "prog %d vers %d proc %d\n",
525 __func__, size, req->rl_size,
526 task->tk_client->cl_prog, task->tk_client->cl_vers,
527 task->tk_msg.rpc_proc->p_proc);
528 /*
529 * Outgoing length shortage. Our inline write max must have
530 * been configured to perform direct i/o.
531 *
532 * This is therefore a large metadata operation, and the
533 * allocate call was made on the maximum possible message,
534 * e.g. containing long filename(s) or symlink data. In
535 * fact, while these metadata operations *might* carry
536 * large outgoing payloads, they rarely *do*. However, we
537 * have to commit to the request here, so reallocate and
538 * register it now. The data path will never require this
539 * reallocation.
540 *
541 * If the allocation or registration fails, the RPC framework
542 * will (doggedly) retry.
543 */
544 if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy ==
545 RPCRDMA_BOUNCEBUFFERS) {
546 /* forced to "pure inline" */
547 dprintk("RPC: %s: too much data (%zd) for inline "
548 "(r/w max %d/%d)\n", __func__, size,
549 rpcx_to_rdmad(xprt).inline_rsize,
550 rpcx_to_rdmad(xprt).inline_wsize);
551 size = req->rl_size;
552 rpc_exit(task, -EIO); /* fail the operation */
553 rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
554 goto out;
555 }
556 if (task->tk_flags & RPC_TASK_SWAPPER)
557 nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
558 else
559 nreq = kmalloc(sizeof *req + size, GFP_NOFS);
560 if (nreq == NULL)
561 goto outfail;
562
563 if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
564 nreq->rl_base, size + sizeof(struct rpcrdma_req)
565 - offsetof(struct rpcrdma_req, rl_base),
566 &nreq->rl_handle, &nreq->rl_iov)) {
567 kfree(nreq);
568 goto outfail;
569 }
570 rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
571 nreq->rl_size = size;
572 nreq->rl_niovs = 0;
573 nreq->rl_nchunks = 0;
574 nreq->rl_buffer = (struct rpcrdma_buffer *)req;
575 nreq->rl_reply = req->rl_reply;
576 memcpy(nreq->rl_segments,
577 req->rl_segments, sizeof nreq->rl_segments);
578 /* flag the swap with an unused field */
579 nreq->rl_iov.length = 0;
580 req->rl_reply = NULL;
581 req = nreq;
582 }
583 dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
584out:
585 return req->rl_xdr_buf;
586
587outfail:
588 rpcrdma_buffer_put(req);
589 rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
590 return NULL;
591}
592
593/*
594 * This function returns all RDMA resources to the pool.
595 */
596static void
597xprt_rdma_free(void *buffer)
598{
599 struct rpcrdma_req *req;
600 struct rpcrdma_xprt *r_xprt;
601 struct rpcrdma_rep *rep;
602 int i;
603
604 if (buffer == NULL)
605 return;
606
607 req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
608 r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
609 rep = req->rl_reply;
610
611 dprintk("RPC: %s: called on 0x%p%s\n",
612 __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
613
614 /*
615 * Finish the deregistration. When using mw bind, this was
616 * begun in rpcrdma_reply_handler(). In all other modes, we
617 * do it here, in thread context. The process is considered
618 * complete when the rr_func vector becomes NULL - this
619 * was put in place during rpcrdma_reply_handler() - the wait
620 * call below will not block if the dereg is "done". If
621 * interrupted, our framework will clean up.
622 */
623 for (i = 0; req->rl_nchunks;) {
624 --req->rl_nchunks;
625 i += rpcrdma_deregister_external(
626 &req->rl_segments[i], r_xprt, NULL);
627 }
628
629 if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
630 rep->rr_func = NULL; /* abandon the callback */
631 req->rl_reply = NULL;
632 }
633
634 if (req->rl_iov.length == 0) { /* see allocate above */
635 struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
636 oreq->rl_reply = req->rl_reply;
637 (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
638 req->rl_handle,
639 &req->rl_iov);
640 kfree(req);
641 req = oreq;
642 }
643
644 /* Put back request+reply buffers */
645 rpcrdma_buffer_put(req);
646}
647
648/*
649 * send_request invokes the meat of RPC RDMA. It must do the following:
650 * 1. Marshal the RPC request into an RPC RDMA request, which means
651 * putting a header in front of data, and creating IOVs for RDMA
652 * from those in the request.
653 * 2. In marshaling, detect opportunities for RDMA, and use them.
654 * 3. Post a recv message to set up asynch completion, then send
655 * the request (rpcrdma_ep_post).
656 * 4. No partial sends are possible in the RPC-RDMA protocol (as in UDP).
657 */
658
659static int
660xprt_rdma_send_request(struct rpc_task *task)
661{
662 struct rpc_rqst *rqst = task->tk_rqstp;
663 struct rpc_xprt *xprt = task->tk_xprt;
664 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
665 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
666
667 /* marshal the send itself */
668 if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
669 r_xprt->rx_stats.failed_marshal_count++;
670 dprintk("RPC: %s: rpcrdma_marshal_req failed\n",
671 __func__);
672 return -EIO;
673 }
674
675 if (req->rl_reply == NULL) /* e.g. reconnection */
676 rpcrdma_recv_buffer_get(req);
677
678 if (req->rl_reply) {
679 req->rl_reply->rr_func = rpcrdma_reply_handler;
680 /* this need only be done once, but... */
681 req->rl_reply->rr_xprt = xprt;
682 }
683
684 if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) {
685 xprt_disconnect(xprt);
686 return -ENOTCONN; /* implies disconnect */
687 }
688
689 rqst->rq_bytes_sent = 0;
690 return 0;
691}
692
693static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
694{
695 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
696 long idle_time = 0;
697
698 if (xprt_connected(xprt))
699 idle_time = (long)(jiffies - xprt->last_used) / HZ;
700
701 seq_printf(seq,
702 "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
703 "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
704
705 0, /* need a local port? */
706 xprt->stat.bind_count,
707 xprt->stat.connect_count,
708 xprt->stat.connect_time,
709 idle_time,
710 xprt->stat.sends,
711 xprt->stat.recvs,
712 xprt->stat.bad_xids,
713 xprt->stat.req_u,
714 xprt->stat.bklog_u,
715
716 r_xprt->rx_stats.read_chunk_count,
717 r_xprt->rx_stats.write_chunk_count,
718 r_xprt->rx_stats.reply_chunk_count,
719 r_xprt->rx_stats.total_rdma_request,
720 r_xprt->rx_stats.total_rdma_reply,
721 r_xprt->rx_stats.pullup_copy_count,
722 r_xprt->rx_stats.fixup_copy_count,
723 r_xprt->rx_stats.hardway_register_count,
724 r_xprt->rx_stats.failed_marshal_count,
725 r_xprt->rx_stats.bad_reply_count);
726}
727
728/*
729 * Plumbing for rpc transport switch and kernel module
730 */
731
732static struct rpc_xprt_ops xprt_rdma_procs = {
733 .reserve_xprt = xprt_rdma_reserve_xprt,
734 .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */
735 .release_request = xprt_release_rqst_cong, /* ditto */
736 .set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */
737 .rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */
738 .set_port = xprt_rdma_set_port,
739 .connect = xprt_rdma_connect,
740 .buf_alloc = xprt_rdma_allocate,
741 .buf_free = xprt_rdma_free,
742 .send_request = xprt_rdma_send_request,
743 .close = xprt_rdma_close,
744 .destroy = xprt_rdma_destroy,
745 .print_stats = xprt_rdma_print_stats
746};
747
748static struct xprt_class xprt_rdma = {
749 .list = LIST_HEAD_INIT(xprt_rdma.list),
750 .name = "rdma",
751 .owner = THIS_MODULE,
752 .ident = XPRT_TRANSPORT_RDMA,
753 .setup = xprt_setup_rdma,
754};
755
756static void __exit xprt_rdma_cleanup(void)
757{
758 int rc;
759
760 dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
761#ifdef RPC_DEBUG
762 if (sunrpc_table_header) {
763 unregister_sysctl_table(sunrpc_table_header);
764 sunrpc_table_header = NULL;
765 }
766#endif
767 rc = xprt_unregister_transport(&xprt_rdma);
768 if (rc)
769 dprintk("RPC: %s: xprt_unregister returned %i\n",
770 __func__, rc);
771}
772
773static int __init xprt_rdma_init(void)
774{
775 int rc;
776
777 rc = xprt_register_transport(&xprt_rdma);
778
779 if (rc)
780 return rc;
781
782 dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n");
783
784 dprintk(KERN_INFO "Defaults:\n");
785 dprintk(KERN_INFO "\tSlots %d\n"
786 "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
787 xprt_rdma_slot_table_entries,
788 xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
789 dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n",
790 xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
791
792#ifdef RPC_DEBUG
793 if (!sunrpc_table_header)
794 sunrpc_table_header = register_sysctl_table(sunrpc_table);
795#endif
796 return 0;
797}
798
799module_init(xprt_rdma_init);
800module_exit(xprt_rdma_cleanup);