aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/Makefile1
-rw-r--r--net/sunrpc/auth_gss/gss_krb5_wrap.c6
-rw-r--r--net/sunrpc/clnt.c52
-rw-r--r--net/sunrpc/rpc_pipe.c8
-rw-r--r--net/sunrpc/rpcb_clnt.c151
-rw-r--r--net/sunrpc/sched.c2
-rw-r--r--net/sunrpc/socklib.c3
-rw-r--r--net/sunrpc/sunrpc_syms.c2
-rw-r--r--net/sunrpc/timer.c4
-rw-r--r--net/sunrpc/xprt.c116
-rw-r--r--net/sunrpc/xprtrdma/Makefile3
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c868
-rw-r--r--net/sunrpc/xprtrdma/transport.c800
-rw-r--r--net/sunrpc/xprtrdma/verbs.c1626
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h330
-rw-r--r--net/sunrpc/xprtsock.c567
16 files changed, 4340 insertions, 199 deletions
diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile
index 8ebfc4db7f51..5c69a725e530 100644
--- a/net/sunrpc/Makefile
+++ b/net/sunrpc/Makefile
@@ -5,6 +5,7 @@
5 5
6obj-$(CONFIG_SUNRPC) += sunrpc.o 6obj-$(CONFIG_SUNRPC) += sunrpc.o
7obj-$(CONFIG_SUNRPC_GSS) += auth_gss/ 7obj-$(CONFIG_SUNRPC_GSS) += auth_gss/
8obj-$(CONFIG_SUNRPC_XPRT_RDMA) += xprtrdma/
8 9
9sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \ 10sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \
10 auth.o auth_null.o auth_unix.o \ 11 auth.o auth_null.o auth_unix.o \
diff --git a/net/sunrpc/auth_gss/gss_krb5_wrap.c b/net/sunrpc/auth_gss/gss_krb5_wrap.c
index 42b3220bed39..8bd074df27d3 100644
--- a/net/sunrpc/auth_gss/gss_krb5_wrap.c
+++ b/net/sunrpc/auth_gss/gss_krb5_wrap.c
@@ -42,7 +42,7 @@ gss_krb5_remove_padding(struct xdr_buf *buf, int blocksize)
42{ 42{
43 u8 *ptr; 43 u8 *ptr;
44 u8 pad; 44 u8 pad;
45 int len = buf->len; 45 size_t len = buf->len;
46 46
47 if (len <= buf->head[0].iov_len) { 47 if (len <= buf->head[0].iov_len) {
48 pad = *(u8 *)(buf->head[0].iov_base + len - 1); 48 pad = *(u8 *)(buf->head[0].iov_base + len - 1);
@@ -53,9 +53,9 @@ gss_krb5_remove_padding(struct xdr_buf *buf, int blocksize)
53 } else 53 } else
54 len -= buf->head[0].iov_len; 54 len -= buf->head[0].iov_len;
55 if (len <= buf->page_len) { 55 if (len <= buf->page_len) {
56 int last = (buf->page_base + len - 1) 56 unsigned int last = (buf->page_base + len - 1)
57 >>PAGE_CACHE_SHIFT; 57 >>PAGE_CACHE_SHIFT;
58 int offset = (buf->page_base + len - 1) 58 unsigned int offset = (buf->page_base + len - 1)
59 & (PAGE_CACHE_SIZE - 1); 59 & (PAGE_CACHE_SIZE - 1);
60 ptr = kmap_atomic(buf->pages[last], KM_USER0); 60 ptr = kmap_atomic(buf->pages[last], KM_USER0);
61 pad = *(ptr + offset); 61 pad = *(ptr + offset);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 52429b1ffcc1..76be83ee4b04 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -127,7 +127,14 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
127 struct rpc_clnt *clnt = NULL; 127 struct rpc_clnt *clnt = NULL;
128 struct rpc_auth *auth; 128 struct rpc_auth *auth;
129 int err; 129 int err;
130 int len; 130 size_t len;
131
132 /* sanity check the name before trying to print it */
133 err = -EINVAL;
134 len = strlen(servname);
135 if (len > RPC_MAXNETNAMELEN)
136 goto out_no_rpciod;
137 len++;
131 138
132 dprintk("RPC: creating %s client for %s (xprt %p)\n", 139 dprintk("RPC: creating %s client for %s (xprt %p)\n",
133 program->name, servname, xprt); 140 program->name, servname, xprt);
@@ -148,7 +155,6 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
148 clnt->cl_parent = clnt; 155 clnt->cl_parent = clnt;
149 156
150 clnt->cl_server = clnt->cl_inline_name; 157 clnt->cl_server = clnt->cl_inline_name;
151 len = strlen(servname) + 1;
152 if (len > sizeof(clnt->cl_inline_name)) { 158 if (len > sizeof(clnt->cl_inline_name)) {
153 char *buf = kmalloc(len, GFP_KERNEL); 159 char *buf = kmalloc(len, GFP_KERNEL);
154 if (buf != 0) 160 if (buf != 0)
@@ -234,8 +240,8 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
234{ 240{
235 struct rpc_xprt *xprt; 241 struct rpc_xprt *xprt;
236 struct rpc_clnt *clnt; 242 struct rpc_clnt *clnt;
237 struct rpc_xprtsock_create xprtargs = { 243 struct xprt_create xprtargs = {
238 .proto = args->protocol, 244 .ident = args->protocol,
239 .srcaddr = args->saddress, 245 .srcaddr = args->saddress,
240 .dstaddr = args->address, 246 .dstaddr = args->address,
241 .addrlen = args->addrsize, 247 .addrlen = args->addrsize,
@@ -253,7 +259,7 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
253 */ 259 */
254 if (args->servername == NULL) { 260 if (args->servername == NULL) {
255 struct sockaddr_in *addr = 261 struct sockaddr_in *addr =
256 (struct sockaddr_in *) &args->address; 262 (struct sockaddr_in *) args->address;
257 snprintf(servername, sizeof(servername), NIPQUAD_FMT, 263 snprintf(servername, sizeof(servername), NIPQUAD_FMT,
258 NIPQUAD(addr->sin_addr.s_addr)); 264 NIPQUAD(addr->sin_addr.s_addr));
259 args->servername = servername; 265 args->servername = servername;
@@ -269,9 +275,6 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
269 if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT) 275 if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
270 xprt->resvport = 0; 276 xprt->resvport = 0;
271 277
272 dprintk("RPC: creating %s client for %s (xprt %p)\n",
273 args->program->name, args->servername, xprt);
274
275 clnt = rpc_new_client(xprt, args->servername, args->program, 278 clnt = rpc_new_client(xprt, args->servername, args->program,
276 args->version, args->authflavor); 279 args->version, args->authflavor);
277 if (IS_ERR(clnt)) 280 if (IS_ERR(clnt))
@@ -439,7 +442,7 @@ rpc_release_client(struct rpc_clnt *clnt)
439 */ 442 */
440struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old, 443struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
441 struct rpc_program *program, 444 struct rpc_program *program,
442 int vers) 445 u32 vers)
443{ 446{
444 struct rpc_clnt *clnt; 447 struct rpc_clnt *clnt;
445 struct rpc_version *version; 448 struct rpc_version *version;
@@ -843,8 +846,7 @@ call_allocate(struct rpc_task *task)
843 dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid); 846 dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
844 847
845 if (RPC_IS_ASYNC(task) || !signalled()) { 848 if (RPC_IS_ASYNC(task) || !signalled()) {
846 xprt_release(task); 849 task->tk_action = call_allocate;
847 task->tk_action = call_reserve;
848 rpc_delay(task, HZ>>4); 850 rpc_delay(task, HZ>>4);
849 return; 851 return;
850 } 852 }
@@ -871,6 +873,7 @@ rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
871 buf->head[0].iov_len = len; 873 buf->head[0].iov_len = len;
872 buf->tail[0].iov_len = 0; 874 buf->tail[0].iov_len = 0;
873 buf->page_len = 0; 875 buf->page_len = 0;
876 buf->flags = 0;
874 buf->len = 0; 877 buf->len = 0;
875 buf->buflen = len; 878 buf->buflen = len;
876} 879}
@@ -937,7 +940,7 @@ call_bind(struct rpc_task *task)
937static void 940static void
938call_bind_status(struct rpc_task *task) 941call_bind_status(struct rpc_task *task)
939{ 942{
940 int status = -EACCES; 943 int status = -EIO;
941 944
942 if (task->tk_status >= 0) { 945 if (task->tk_status >= 0) {
943 dprint_status(task); 946 dprint_status(task);
@@ -947,9 +950,20 @@ call_bind_status(struct rpc_task *task)
947 } 950 }
948 951
949 switch (task->tk_status) { 952 switch (task->tk_status) {
953 case -EAGAIN:
954 dprintk("RPC: %5u rpcbind waiting for another request "
955 "to finish\n", task->tk_pid);
956 /* avoid busy-waiting here -- could be a network outage. */
957 rpc_delay(task, 5*HZ);
958 goto retry_timeout;
950 case -EACCES: 959 case -EACCES:
951 dprintk("RPC: %5u remote rpcbind: RPC program/version " 960 dprintk("RPC: %5u remote rpcbind: RPC program/version "
952 "unavailable\n", task->tk_pid); 961 "unavailable\n", task->tk_pid);
962 /* fail immediately if this is an RPC ping */
963 if (task->tk_msg.rpc_proc->p_proc == 0) {
964 status = -EOPNOTSUPP;
965 break;
966 }
953 rpc_delay(task, 3*HZ); 967 rpc_delay(task, 3*HZ);
954 goto retry_timeout; 968 goto retry_timeout;
955 case -ETIMEDOUT: 969 case -ETIMEDOUT:
@@ -957,6 +971,7 @@ call_bind_status(struct rpc_task *task)
957 task->tk_pid); 971 task->tk_pid);
958 goto retry_timeout; 972 goto retry_timeout;
959 case -EPFNOSUPPORT: 973 case -EPFNOSUPPORT:
974 /* server doesn't support any rpcbind version we know of */
960 dprintk("RPC: %5u remote rpcbind service unavailable\n", 975 dprintk("RPC: %5u remote rpcbind service unavailable\n",
961 task->tk_pid); 976 task->tk_pid);
962 break; 977 break;
@@ -969,7 +984,6 @@ call_bind_status(struct rpc_task *task)
969 default: 984 default:
970 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n", 985 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
971 task->tk_pid, -task->tk_status); 986 task->tk_pid, -task->tk_status);
972 status = -EIO;
973 } 987 }
974 988
975 rpc_exit(task, status); 989 rpc_exit(task, status);
@@ -1257,7 +1271,6 @@ call_refresh(struct rpc_task *task)
1257{ 1271{
1258 dprint_status(task); 1272 dprint_status(task);
1259 1273
1260 xprt_release(task); /* Must do to obtain new XID */
1261 task->tk_action = call_refreshresult; 1274 task->tk_action = call_refreshresult;
1262 task->tk_status = 0; 1275 task->tk_status = 0;
1263 task->tk_client->cl_stats->rpcauthrefresh++; 1276 task->tk_client->cl_stats->rpcauthrefresh++;
@@ -1375,6 +1388,8 @@ call_verify(struct rpc_task *task)
1375 dprintk("RPC: %5u %s: retry stale creds\n", 1388 dprintk("RPC: %5u %s: retry stale creds\n",
1376 task->tk_pid, __FUNCTION__); 1389 task->tk_pid, __FUNCTION__);
1377 rpcauth_invalcred(task); 1390 rpcauth_invalcred(task);
1391 /* Ensure we obtain a new XID! */
1392 xprt_release(task);
1378 task->tk_action = call_refresh; 1393 task->tk_action = call_refresh;
1379 goto out_retry; 1394 goto out_retry;
1380 case RPC_AUTH_BADCRED: 1395 case RPC_AUTH_BADCRED:
@@ -1523,13 +1538,18 @@ void rpc_show_tasks(void)
1523 spin_lock(&clnt->cl_lock); 1538 spin_lock(&clnt->cl_lock);
1524 list_for_each_entry(t, &clnt->cl_tasks, tk_task) { 1539 list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
1525 const char *rpc_waitq = "none"; 1540 const char *rpc_waitq = "none";
1541 int proc;
1542
1543 if (t->tk_msg.rpc_proc)
1544 proc = t->tk_msg.rpc_proc->p_proc;
1545 else
1546 proc = -1;
1526 1547
1527 if (RPC_IS_QUEUED(t)) 1548 if (RPC_IS_QUEUED(t))
1528 rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq); 1549 rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
1529 1550
1530 printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n", 1551 printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
1531 t->tk_pid, 1552 t->tk_pid, proc,
1532 (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
1533 t->tk_flags, t->tk_status, 1553 t->tk_flags, t->tk_status,
1534 t->tk_client, 1554 t->tk_client,
1535 (t->tk_client ? t->tk_client->cl_prog : 0), 1555 (t->tk_client ? t->tk_client->cl_prog : 0),
diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index 669e12a4ed18..c8433e8865aa 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -14,7 +14,7 @@
14#include <linux/pagemap.h> 14#include <linux/pagemap.h>
15#include <linux/mount.h> 15#include <linux/mount.h>
16#include <linux/namei.h> 16#include <linux/namei.h>
17#include <linux/dnotify.h> 17#include <linux/fsnotify.h>
18#include <linux/kernel.h> 18#include <linux/kernel.h>
19 19
20#include <asm/ioctls.h> 20#include <asm/ioctls.h>
@@ -329,6 +329,7 @@ rpc_show_info(struct seq_file *m, void *v)
329 clnt->cl_prog, clnt->cl_vers); 329 clnt->cl_prog, clnt->cl_vers);
330 seq_printf(m, "address: %s\n", rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR)); 330 seq_printf(m, "address: %s\n", rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
331 seq_printf(m, "protocol: %s\n", rpc_peeraddr2str(clnt, RPC_DISPLAY_PROTO)); 331 seq_printf(m, "protocol: %s\n", rpc_peeraddr2str(clnt, RPC_DISPLAY_PROTO));
332 seq_printf(m, "port: %s\n", rpc_peeraddr2str(clnt, RPC_DISPLAY_PORT));
332 return 0; 333 return 0;
333} 334}
334 335
@@ -585,6 +586,7 @@ rpc_populate(struct dentry *parent,
585 if (S_ISDIR(mode)) 586 if (S_ISDIR(mode))
586 inc_nlink(dir); 587 inc_nlink(dir);
587 d_add(dentry, inode); 588 d_add(dentry, inode);
589 fsnotify_create(dir, dentry);
588 } 590 }
589 mutex_unlock(&dir->i_mutex); 591 mutex_unlock(&dir->i_mutex);
590 return 0; 592 return 0;
@@ -606,7 +608,7 @@ __rpc_mkdir(struct inode *dir, struct dentry *dentry)
606 inode->i_ino = iunique(dir->i_sb, 100); 608 inode->i_ino = iunique(dir->i_sb, 100);
607 d_instantiate(dentry, inode); 609 d_instantiate(dentry, inode);
608 inc_nlink(dir); 610 inc_nlink(dir);
609 inode_dir_notify(dir, DN_CREATE); 611 fsnotify_mkdir(dir, dentry);
610 return 0; 612 return 0;
611out_err: 613out_err:
612 printk(KERN_WARNING "%s: %s failed to allocate inode for dentry %s\n", 614 printk(KERN_WARNING "%s: %s failed to allocate inode for dentry %s\n",
@@ -748,7 +750,7 @@ rpc_mkpipe(struct dentry *parent, const char *name, void *private, struct rpc_pi
748 rpci->flags = flags; 750 rpci->flags = flags;
749 rpci->ops = ops; 751 rpci->ops = ops;
750 rpci->nkern_readwriters = 1; 752 rpci->nkern_readwriters = 1;
751 inode_dir_notify(dir, DN_CREATE); 753 fsnotify_create(dir, dentry);
752 dget(dentry); 754 dget(dentry);
753out: 755out:
754 mutex_unlock(&dir->i_mutex); 756 mutex_unlock(&dir->i_mutex);
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index d1740dbab991..a05493aedb68 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -16,11 +16,14 @@
16 16
17#include <linux/types.h> 17#include <linux/types.h>
18#include <linux/socket.h> 18#include <linux/socket.h>
19#include <linux/in.h>
20#include <linux/in6.h>
19#include <linux/kernel.h> 21#include <linux/kernel.h>
20#include <linux/errno.h> 22#include <linux/errno.h>
21 23
22#include <linux/sunrpc/clnt.h> 24#include <linux/sunrpc/clnt.h>
23#include <linux/sunrpc/sched.h> 25#include <linux/sunrpc/sched.h>
26#include <linux/sunrpc/xprtsock.h>
24 27
25#ifdef RPC_DEBUG 28#ifdef RPC_DEBUG
26# define RPCDBG_FACILITY RPCDBG_BIND 29# define RPCDBG_FACILITY RPCDBG_BIND
@@ -91,26 +94,6 @@ enum {
91#define RPCB_MAXADDRLEN (128u) 94#define RPCB_MAXADDRLEN (128u)
92 95
93/* 96/*
94 * r_netid
95 *
96 * Quoting RFC 3530, section 2.2:
97 *
98 * For TCP over IPv4 the value of r_netid is the string "tcp". For UDP
99 * over IPv4 the value of r_netid is the string "udp".
100 *
101 * ...
102 *
103 * For TCP over IPv6 the value of r_netid is the string "tcp6". For UDP
104 * over IPv6 the value of r_netid is the string "udp6".
105 */
106#define RPCB_NETID_UDP "\165\144\160" /* "udp" */
107#define RPCB_NETID_TCP "\164\143\160" /* "tcp" */
108#define RPCB_NETID_UDP6 "\165\144\160\066" /* "udp6" */
109#define RPCB_NETID_TCP6 "\164\143\160\066" /* "tcp6" */
110
111#define RPCB_MAXNETIDLEN (4u)
112
113/*
114 * r_owner 97 * r_owner
115 * 98 *
116 * The "owner" is allowed to unset a service in the rpcbind database. 99 * The "owner" is allowed to unset a service in the rpcbind database.
@@ -120,7 +103,7 @@ enum {
120#define RPCB_MAXOWNERLEN sizeof(RPCB_OWNER_STRING) 103#define RPCB_MAXOWNERLEN sizeof(RPCB_OWNER_STRING)
121 104
122static void rpcb_getport_done(struct rpc_task *, void *); 105static void rpcb_getport_done(struct rpc_task *, void *);
123extern struct rpc_program rpcb_program; 106static struct rpc_program rpcb_program;
124 107
125struct rpcbind_args { 108struct rpcbind_args {
126 struct rpc_xprt * r_xprt; 109 struct rpc_xprt * r_xprt;
@@ -137,10 +120,13 @@ struct rpcbind_args {
137static struct rpc_procinfo rpcb_procedures2[]; 120static struct rpc_procinfo rpcb_procedures2[];
138static struct rpc_procinfo rpcb_procedures3[]; 121static struct rpc_procinfo rpcb_procedures3[];
139 122
140static struct rpcb_info { 123struct rpcb_info {
141 int rpc_vers; 124 int rpc_vers;
142 struct rpc_procinfo * rpc_proc; 125 struct rpc_procinfo * rpc_proc;
143} rpcb_next_version[]; 126};
127
128static struct rpcb_info rpcb_next_version[];
129static struct rpcb_info rpcb_next_version6[];
144 130
145static void rpcb_getport_prepare(struct rpc_task *task, void *calldata) 131static void rpcb_getport_prepare(struct rpc_task *task, void *calldata)
146{ 132{
@@ -190,7 +176,17 @@ static struct rpc_clnt *rpcb_create(char *hostname, struct sockaddr *srvaddr,
190 RPC_CLNT_CREATE_INTR), 176 RPC_CLNT_CREATE_INTR),
191 }; 177 };
192 178
193 ((struct sockaddr_in *)srvaddr)->sin_port = htons(RPCBIND_PORT); 179 switch (srvaddr->sa_family) {
180 case AF_INET:
181 ((struct sockaddr_in *)srvaddr)->sin_port = htons(RPCBIND_PORT);
182 break;
183 case AF_INET6:
184 ((struct sockaddr_in6 *)srvaddr)->sin6_port = htons(RPCBIND_PORT);
185 break;
186 default:
187 return NULL;
188 }
189
194 if (!privileged) 190 if (!privileged)
195 args.flags |= RPC_CLNT_CREATE_NONPRIVPORT; 191 args.flags |= RPC_CLNT_CREATE_NONPRIVPORT;
196 return rpc_create(&args); 192 return rpc_create(&args);
@@ -234,7 +230,7 @@ int rpcb_register(u32 prog, u32 vers, int prot, unsigned short port, int *okay)
234 prog, vers, prot, port); 230 prog, vers, prot, port);
235 231
236 rpcb_clnt = rpcb_create("localhost", (struct sockaddr *) &sin, 232 rpcb_clnt = rpcb_create("localhost", (struct sockaddr *) &sin,
237 IPPROTO_UDP, 2, 1); 233 XPRT_TRANSPORT_UDP, 2, 1);
238 if (IS_ERR(rpcb_clnt)) 234 if (IS_ERR(rpcb_clnt))
239 return PTR_ERR(rpcb_clnt); 235 return PTR_ERR(rpcb_clnt);
240 236
@@ -316,6 +312,7 @@ void rpcb_getport_async(struct rpc_task *task)
316 struct rpc_task *child; 312 struct rpc_task *child;
317 struct sockaddr addr; 313 struct sockaddr addr;
318 int status; 314 int status;
315 struct rpcb_info *info;
319 316
320 dprintk("RPC: %5u %s(%s, %u, %u, %d)\n", 317 dprintk("RPC: %5u %s(%s, %u, %u, %d)\n",
321 task->tk_pid, __FUNCTION__, 318 task->tk_pid, __FUNCTION__,
@@ -325,7 +322,7 @@ void rpcb_getport_async(struct rpc_task *task)
325 BUG_ON(clnt->cl_parent != clnt); 322 BUG_ON(clnt->cl_parent != clnt);
326 323
327 if (xprt_test_and_set_binding(xprt)) { 324 if (xprt_test_and_set_binding(xprt)) {
328 status = -EACCES; /* tell caller to check again */ 325 status = -EAGAIN; /* tell caller to check again */
329 dprintk("RPC: %5u %s: waiting for another binder\n", 326 dprintk("RPC: %5u %s: waiting for another binder\n",
330 task->tk_pid, __FUNCTION__); 327 task->tk_pid, __FUNCTION__);
331 goto bailout_nowake; 328 goto bailout_nowake;
@@ -343,18 +340,43 @@ void rpcb_getport_async(struct rpc_task *task)
343 goto bailout_nofree; 340 goto bailout_nofree;
344 } 341 }
345 342
346 if (rpcb_next_version[xprt->bind_index].rpc_proc == NULL) { 343 rpc_peeraddr(clnt, (void *)&addr, sizeof(addr));
344
345 /* Don't ever use rpcbind v2 for AF_INET6 requests */
346 switch (addr.sa_family) {
347 case AF_INET:
348 info = rpcb_next_version;
349 break;
350 case AF_INET6:
351 info = rpcb_next_version6;
352 break;
353 default:
354 status = -EAFNOSUPPORT;
355 dprintk("RPC: %5u %s: bad address family\n",
356 task->tk_pid, __FUNCTION__);
357 goto bailout_nofree;
358 }
359 if (info[xprt->bind_index].rpc_proc == NULL) {
347 xprt->bind_index = 0; 360 xprt->bind_index = 0;
348 status = -EACCES; /* tell caller to try again later */ 361 status = -EPFNOSUPPORT;
349 dprintk("RPC: %5u %s: no more getport versions available\n", 362 dprintk("RPC: %5u %s: no more getport versions available\n",
350 task->tk_pid, __FUNCTION__); 363 task->tk_pid, __FUNCTION__);
351 goto bailout_nofree; 364 goto bailout_nofree;
352 } 365 }
353 bind_version = rpcb_next_version[xprt->bind_index].rpc_vers; 366 bind_version = info[xprt->bind_index].rpc_vers;
354 367
355 dprintk("RPC: %5u %s: trying rpcbind version %u\n", 368 dprintk("RPC: %5u %s: trying rpcbind version %u\n",
356 task->tk_pid, __FUNCTION__, bind_version); 369 task->tk_pid, __FUNCTION__, bind_version);
357 370
371 rpcb_clnt = rpcb_create(clnt->cl_server, &addr, xprt->prot,
372 bind_version, 0);
373 if (IS_ERR(rpcb_clnt)) {
374 status = PTR_ERR(rpcb_clnt);
375 dprintk("RPC: %5u %s: rpcb_create failed, error %ld\n",
376 task->tk_pid, __FUNCTION__, PTR_ERR(rpcb_clnt));
377 goto bailout_nofree;
378 }
379
358 map = kzalloc(sizeof(struct rpcbind_args), GFP_ATOMIC); 380 map = kzalloc(sizeof(struct rpcbind_args), GFP_ATOMIC);
359 if (!map) { 381 if (!map) {
360 status = -ENOMEM; 382 status = -ENOMEM;
@@ -367,28 +389,19 @@ void rpcb_getport_async(struct rpc_task *task)
367 map->r_prot = xprt->prot; 389 map->r_prot = xprt->prot;
368 map->r_port = 0; 390 map->r_port = 0;
369 map->r_xprt = xprt_get(xprt); 391 map->r_xprt = xprt_get(xprt);
370 map->r_netid = (xprt->prot == IPPROTO_TCP) ? RPCB_NETID_TCP : 392 map->r_netid = rpc_peeraddr2str(clnt, RPC_DISPLAY_NETID);
371 RPCB_NETID_UDP; 393 memcpy(map->r_addr,
372 memcpy(&map->r_addr, rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR), 394 rpc_peeraddr2str(rpcb_clnt, RPC_DISPLAY_UNIVERSAL_ADDR),
373 sizeof(map->r_addr)); 395 sizeof(map->r_addr));
374 map->r_owner = RPCB_OWNER_STRING; /* ignored for GETADDR */ 396 map->r_owner = RPCB_OWNER_STRING; /* ignored for GETADDR */
375 397
376 rpc_peeraddr(clnt, (void *)&addr, sizeof(addr));
377 rpcb_clnt = rpcb_create(clnt->cl_server, &addr, xprt->prot, bind_version, 0);
378 if (IS_ERR(rpcb_clnt)) {
379 status = PTR_ERR(rpcb_clnt);
380 dprintk("RPC: %5u %s: rpcb_create failed, error %ld\n",
381 task->tk_pid, __FUNCTION__, PTR_ERR(rpcb_clnt));
382 goto bailout;
383 }
384
385 child = rpc_run_task(rpcb_clnt, RPC_TASK_ASYNC, &rpcb_getport_ops, map); 398 child = rpc_run_task(rpcb_clnt, RPC_TASK_ASYNC, &rpcb_getport_ops, map);
386 rpc_release_client(rpcb_clnt); 399 rpc_release_client(rpcb_clnt);
387 if (IS_ERR(child)) { 400 if (IS_ERR(child)) {
388 status = -EIO; 401 status = -EIO;
389 dprintk("RPC: %5u %s: rpc_run_task failed\n", 402 dprintk("RPC: %5u %s: rpc_run_task failed\n",
390 task->tk_pid, __FUNCTION__); 403 task->tk_pid, __FUNCTION__);
391 goto bailout_nofree; 404 goto bailout;
392 } 405 }
393 rpc_put_task(child); 406 rpc_put_task(child);
394 407
@@ -403,6 +416,7 @@ bailout_nofree:
403bailout_nowake: 416bailout_nowake:
404 task->tk_status = status; 417 task->tk_status = status;
405} 418}
419EXPORT_SYMBOL_GPL(rpcb_getport_async);
406 420
407/* 421/*
408 * Rpcbind child task calls this callback via tk_exit. 422 * Rpcbind child task calls this callback via tk_exit.
@@ -413,6 +427,10 @@ static void rpcb_getport_done(struct rpc_task *child, void *data)
413 struct rpc_xprt *xprt = map->r_xprt; 427 struct rpc_xprt *xprt = map->r_xprt;
414 int status = child->tk_status; 428 int status = child->tk_status;
415 429
430 /* Garbage reply: retry with a lesser rpcbind version */
431 if (status == -EIO)
432 status = -EPROTONOSUPPORT;
433
416 /* rpcbind server doesn't support this rpcbind protocol version */ 434 /* rpcbind server doesn't support this rpcbind protocol version */
417 if (status == -EPROTONOSUPPORT) 435 if (status == -EPROTONOSUPPORT)
418 xprt->bind_index++; 436 xprt->bind_index++;
@@ -490,16 +508,24 @@ static int rpcb_decode_getaddr(struct rpc_rqst *req, __be32 *p,
490 unsigned short *portp) 508 unsigned short *portp)
491{ 509{
492 char *addr; 510 char *addr;
493 int addr_len, c, i, f, first, val; 511 u32 addr_len;
512 int c, i, f, first, val;
494 513
495 *portp = 0; 514 *portp = 0;
496 addr_len = (unsigned int) ntohl(*p++); 515 addr_len = ntohl(*p++);
497 if (addr_len > RPCB_MAXADDRLEN) /* sanity */ 516
498 return -EINVAL; 517 /*
499 518 * Simple sanity check. The smallest possible universal
500 dprintk("RPC: rpcb_decode_getaddr returned string: '%s'\n", 519 * address is an IPv4 address string containing 11 bytes.
501 (char *) p); 520 */
502 521 if (addr_len < 11 || addr_len > RPCB_MAXADDRLEN)
522 goto out_err;
523
524 /*
525 * Start at the end and walk backwards until the first dot
526 * is encountered. When the second dot is found, we have
527 * both parts of the port number.
528 */
503 addr = (char *)p; 529 addr = (char *)p;
504 val = 0; 530 val = 0;
505 first = 1; 531 first = 1;
@@ -521,8 +547,19 @@ static int rpcb_decode_getaddr(struct rpc_rqst *req, __be32 *p,
521 } 547 }
522 } 548 }
523 549
550 /*
551 * Simple sanity check. If we never saw a dot in the reply,
552 * then this was probably just garbage.
553 */
554 if (first)
555 goto out_err;
556
524 dprintk("RPC: rpcb_decode_getaddr port=%u\n", *portp); 557 dprintk("RPC: rpcb_decode_getaddr port=%u\n", *portp);
525 return 0; 558 return 0;
559
560out_err:
561 dprintk("RPC: rpcbind server returned malformed reply\n");
562 return -EIO;
526} 563}
527 564
528#define RPCB_program_sz (1u) 565#define RPCB_program_sz (1u)
@@ -531,7 +568,7 @@ static int rpcb_decode_getaddr(struct rpc_rqst *req, __be32 *p,
531#define RPCB_port_sz (1u) 568#define RPCB_port_sz (1u)
532#define RPCB_boolean_sz (1u) 569#define RPCB_boolean_sz (1u)
533 570
534#define RPCB_netid_sz (1+XDR_QUADLEN(RPCB_MAXNETIDLEN)) 571#define RPCB_netid_sz (1+XDR_QUADLEN(RPCBIND_MAXNETIDLEN))
535#define RPCB_addr_sz (1+XDR_QUADLEN(RPCB_MAXADDRLEN)) 572#define RPCB_addr_sz (1+XDR_QUADLEN(RPCB_MAXADDRLEN))
536#define RPCB_ownerstring_sz (1+XDR_QUADLEN(RPCB_MAXOWNERLEN)) 573#define RPCB_ownerstring_sz (1+XDR_QUADLEN(RPCB_MAXOWNERLEN))
537 574
@@ -593,6 +630,14 @@ static struct rpcb_info rpcb_next_version[] = {
593 { 0, NULL }, 630 { 0, NULL },
594}; 631};
595 632
633static struct rpcb_info rpcb_next_version6[] = {
634#ifdef CONFIG_SUNRPC_BIND34
635 { 4, &rpcb_procedures4[RPCBPROC_GETVERSADDR] },
636 { 3, &rpcb_procedures3[RPCBPROC_GETADDR] },
637#endif
638 { 0, NULL },
639};
640
596static struct rpc_version rpcb_version2 = { 641static struct rpc_version rpcb_version2 = {
597 .number = 2, 642 .number = 2,
598 .nrprocs = RPCB_HIGHPROC_2, 643 .nrprocs = RPCB_HIGHPROC_2,
@@ -621,7 +666,7 @@ static struct rpc_version *rpcb_version[] = {
621 666
622static struct rpc_stat rpcb_stats; 667static struct rpc_stat rpcb_stats;
623 668
624struct rpc_program rpcb_program = { 669static struct rpc_program rpcb_program = {
625 .name = "rpcbind", 670 .name = "rpcbind",
626 .number = RPCBIND_PROGRAM, 671 .number = RPCBIND_PROGRAM,
627 .nrvers = ARRAY_SIZE(rpcb_version), 672 .nrvers = ARRAY_SIZE(rpcb_version),
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 954d7ec86c7e..3c773c53e12e 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -777,6 +777,7 @@ void *rpc_malloc(struct rpc_task *task, size_t size)
777 task->tk_pid, size, buf); 777 task->tk_pid, size, buf);
778 return &buf->data; 778 return &buf->data;
779} 779}
780EXPORT_SYMBOL_GPL(rpc_malloc);
780 781
781/** 782/**
782 * rpc_free - free buffer allocated via rpc_malloc 783 * rpc_free - free buffer allocated via rpc_malloc
@@ -802,6 +803,7 @@ void rpc_free(void *buffer)
802 else 803 else
803 kfree(buf); 804 kfree(buf);
804} 805}
806EXPORT_SYMBOL_GPL(rpc_free);
805 807
806/* 808/*
807 * Creation and deletion of RPC task structures 809 * Creation and deletion of RPC task structures
diff --git a/net/sunrpc/socklib.c b/net/sunrpc/socklib.c
index 1d377d1ab7f4..97ac45f034d6 100644
--- a/net/sunrpc/socklib.c
+++ b/net/sunrpc/socklib.c
@@ -34,6 +34,7 @@ size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len)
34 desc->offset += len; 34 desc->offset += len;
35 return len; 35 return len;
36} 36}
37EXPORT_SYMBOL_GPL(xdr_skb_read_bits);
37 38
38/** 39/**
39 * xdr_skb_read_and_csum_bits - copy and checksum from skb to buffer 40 * xdr_skb_read_and_csum_bits - copy and checksum from skb to buffer
@@ -137,6 +138,7 @@ copy_tail:
137out: 138out:
138 return copied; 139 return copied;
139} 140}
141EXPORT_SYMBOL_GPL(xdr_partial_copy_from_skb);
140 142
141/** 143/**
142 * csum_partial_copy_to_xdr - checksum and copy data 144 * csum_partial_copy_to_xdr - checksum and copy data
@@ -179,3 +181,4 @@ no_checksum:
179 return -1; 181 return -1;
180 return 0; 182 return 0;
181} 183}
184EXPORT_SYMBOL_GPL(csum_partial_copy_to_xdr);
diff --git a/net/sunrpc/sunrpc_syms.c b/net/sunrpc/sunrpc_syms.c
index 384c4ad5ab86..33d89e842c85 100644
--- a/net/sunrpc/sunrpc_syms.c
+++ b/net/sunrpc/sunrpc_syms.c
@@ -20,7 +20,7 @@
20#include <linux/sunrpc/auth.h> 20#include <linux/sunrpc/auth.h>
21#include <linux/workqueue.h> 21#include <linux/workqueue.h>
22#include <linux/sunrpc/rpc_pipe_fs.h> 22#include <linux/sunrpc/rpc_pipe_fs.h>
23 23#include <linux/sunrpc/xprtsock.h>
24 24
25/* RPC scheduler */ 25/* RPC scheduler */
26EXPORT_SYMBOL(rpc_execute); 26EXPORT_SYMBOL(rpc_execute);
diff --git a/net/sunrpc/timer.c b/net/sunrpc/timer.c
index 8142fdb8a930..31becbf09263 100644
--- a/net/sunrpc/timer.c
+++ b/net/sunrpc/timer.c
@@ -17,6 +17,7 @@
17 17
18#include <linux/types.h> 18#include <linux/types.h>
19#include <linux/unistd.h> 19#include <linux/unistd.h>
20#include <linux/module.h>
20 21
21#include <linux/sunrpc/clnt.h> 22#include <linux/sunrpc/clnt.h>
22 23
@@ -40,6 +41,7 @@ rpc_init_rtt(struct rpc_rtt *rt, unsigned long timeo)
40 rt->ntimeouts[i] = 0; 41 rt->ntimeouts[i] = 0;
41 } 42 }
42} 43}
44EXPORT_SYMBOL_GPL(rpc_init_rtt);
43 45
44/* 46/*
45 * NB: When computing the smoothed RTT and standard deviation, 47 * NB: When computing the smoothed RTT and standard deviation,
@@ -75,6 +77,7 @@ rpc_update_rtt(struct rpc_rtt *rt, unsigned timer, long m)
75 if (*sdrtt < RPC_RTO_MIN) 77 if (*sdrtt < RPC_RTO_MIN)
76 *sdrtt = RPC_RTO_MIN; 78 *sdrtt = RPC_RTO_MIN;
77} 79}
80EXPORT_SYMBOL_GPL(rpc_update_rtt);
78 81
79/* 82/*
80 * Estimate rto for an nfs rpc sent via. an unreliable datagram. 83 * Estimate rto for an nfs rpc sent via. an unreliable datagram.
@@ -103,3 +106,4 @@ rpc_calc_rto(struct rpc_rtt *rt, unsigned timer)
103 106
104 return res; 107 return res;
105} 108}
109EXPORT_SYMBOL_GPL(rpc_calc_rto);
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index c8c2edccad7e..282a9a2ec90c 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -62,6 +62,9 @@ static inline void do_xprt_reserve(struct rpc_task *);
62static void xprt_connect_status(struct rpc_task *task); 62static void xprt_connect_status(struct rpc_task *task);
63static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 63static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
64 64
65static spinlock_t xprt_list_lock = SPIN_LOCK_UNLOCKED;
66static LIST_HEAD(xprt_list);
67
65/* 68/*
66 * The transport code maintains an estimate on the maximum number of out- 69 * The transport code maintains an estimate on the maximum number of out-
67 * standing RPC requests, using a smoothed version of the congestion 70 * standing RPC requests, using a smoothed version of the congestion
@@ -81,6 +84,78 @@ static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
81#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) 84#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
82 85
83/** 86/**
87 * xprt_register_transport - register a transport implementation
88 * @transport: transport to register
89 *
90 * If a transport implementation is loaded as a kernel module, it can
91 * call this interface to make itself known to the RPC client.
92 *
93 * Returns:
94 * 0: transport successfully registered
95 * -EEXIST: transport already registered
96 * -EINVAL: transport module being unloaded
97 */
98int xprt_register_transport(struct xprt_class *transport)
99{
100 struct xprt_class *t;
101 int result;
102
103 result = -EEXIST;
104 spin_lock(&xprt_list_lock);
105 list_for_each_entry(t, &xprt_list, list) {
106 /* don't register the same transport class twice */
107 if (t->ident == transport->ident)
108 goto out;
109 }
110
111 result = -EINVAL;
112 if (try_module_get(THIS_MODULE)) {
113 list_add_tail(&transport->list, &xprt_list);
114 printk(KERN_INFO "RPC: Registered %s transport module.\n",
115 transport->name);
116 result = 0;
117 }
118
119out:
120 spin_unlock(&xprt_list_lock);
121 return result;
122}
123EXPORT_SYMBOL_GPL(xprt_register_transport);
124
125/**
126 * xprt_unregister_transport - unregister a transport implementation
127 * transport: transport to unregister
128 *
129 * Returns:
130 * 0: transport successfully unregistered
131 * -ENOENT: transport never registered
132 */
133int xprt_unregister_transport(struct xprt_class *transport)
134{
135 struct xprt_class *t;
136 int result;
137
138 result = 0;
139 spin_lock(&xprt_list_lock);
140 list_for_each_entry(t, &xprt_list, list) {
141 if (t == transport) {
142 printk(KERN_INFO
143 "RPC: Unregistered %s transport module.\n",
144 transport->name);
145 list_del_init(&transport->list);
146 module_put(THIS_MODULE);
147 goto out;
148 }
149 }
150 result = -ENOENT;
151
152out:
153 spin_unlock(&xprt_list_lock);
154 return result;
155}
156EXPORT_SYMBOL_GPL(xprt_unregister_transport);
157
158/**
84 * xprt_reserve_xprt - serialize write access to transports 159 * xprt_reserve_xprt - serialize write access to transports
85 * @task: task that is requesting access to the transport 160 * @task: task that is requesting access to the transport
86 * 161 *
@@ -118,6 +193,7 @@ out_sleep:
118 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 193 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
119 return 0; 194 return 0;
120} 195}
196EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
121 197
122static void xprt_clear_locked(struct rpc_xprt *xprt) 198static void xprt_clear_locked(struct rpc_xprt *xprt)
123{ 199{
@@ -167,6 +243,7 @@ out_sleep:
167 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 243 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
168 return 0; 244 return 0;
169} 245}
246EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
170 247
171static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 248static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
172{ 249{
@@ -246,6 +323,7 @@ void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
246 __xprt_lock_write_next(xprt); 323 __xprt_lock_write_next(xprt);
247 } 324 }
248} 325}
326EXPORT_SYMBOL_GPL(xprt_release_xprt);
249 327
250/** 328/**
251 * xprt_release_xprt_cong - allow other requests to use a transport 329 * xprt_release_xprt_cong - allow other requests to use a transport
@@ -262,6 +340,7 @@ void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
262 __xprt_lock_write_next_cong(xprt); 340 __xprt_lock_write_next_cong(xprt);
263 } 341 }
264} 342}
343EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
265 344
266static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 345static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
267{ 346{
@@ -314,6 +393,7 @@ void xprt_release_rqst_cong(struct rpc_task *task)
314{ 393{
315 __xprt_put_cong(task->tk_xprt, task->tk_rqstp); 394 __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
316} 395}
396EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
317 397
318/** 398/**
319 * xprt_adjust_cwnd - adjust transport congestion window 399 * xprt_adjust_cwnd - adjust transport congestion window
@@ -345,6 +425,7 @@ void xprt_adjust_cwnd(struct rpc_task *task, int result)
345 xprt->cwnd = cwnd; 425 xprt->cwnd = cwnd;
346 __xprt_put_cong(xprt, req); 426 __xprt_put_cong(xprt, req);
347} 427}
428EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
348 429
349/** 430/**
350 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 431 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
@@ -359,6 +440,7 @@ void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
359 else 440 else
360 rpc_wake_up(&xprt->pending); 441 rpc_wake_up(&xprt->pending);
361} 442}
443EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
362 444
363/** 445/**
364 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 446 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
@@ -373,6 +455,7 @@ void xprt_wait_for_buffer_space(struct rpc_task *task)
373 task->tk_timeout = req->rq_timeout; 455 task->tk_timeout = req->rq_timeout;
374 rpc_sleep_on(&xprt->pending, task, NULL, NULL); 456 rpc_sleep_on(&xprt->pending, task, NULL, NULL);
375} 457}
458EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
376 459
377/** 460/**
378 * xprt_write_space - wake the task waiting for transport output buffer space 461 * xprt_write_space - wake the task waiting for transport output buffer space
@@ -393,6 +476,7 @@ void xprt_write_space(struct rpc_xprt *xprt)
393 } 476 }
394 spin_unlock_bh(&xprt->transport_lock); 477 spin_unlock_bh(&xprt->transport_lock);
395} 478}
479EXPORT_SYMBOL_GPL(xprt_write_space);
396 480
397/** 481/**
398 * xprt_set_retrans_timeout_def - set a request's retransmit timeout 482 * xprt_set_retrans_timeout_def - set a request's retransmit timeout
@@ -406,6 +490,7 @@ void xprt_set_retrans_timeout_def(struct rpc_task *task)
406{ 490{
407 task->tk_timeout = task->tk_rqstp->rq_timeout; 491 task->tk_timeout = task->tk_rqstp->rq_timeout;
408} 492}
493EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
409 494
410/* 495/*
411 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout 496 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
@@ -425,6 +510,7 @@ void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
425 if (task->tk_timeout > max_timeout || task->tk_timeout == 0) 510 if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
426 task->tk_timeout = max_timeout; 511 task->tk_timeout = max_timeout;
427} 512}
513EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
428 514
429static void xprt_reset_majortimeo(struct rpc_rqst *req) 515static void xprt_reset_majortimeo(struct rpc_rqst *req)
430{ 516{
@@ -500,6 +586,7 @@ void xprt_disconnect(struct rpc_xprt *xprt)
500 xprt_wake_pending_tasks(xprt, -ENOTCONN); 586 xprt_wake_pending_tasks(xprt, -ENOTCONN);
501 spin_unlock_bh(&xprt->transport_lock); 587 spin_unlock_bh(&xprt->transport_lock);
502} 588}
589EXPORT_SYMBOL_GPL(xprt_disconnect);
503 590
504static void 591static void
505xprt_init_autodisconnect(unsigned long data) 592xprt_init_autodisconnect(unsigned long data)
@@ -610,6 +697,7 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
610 xprt->stat.bad_xids++; 697 xprt->stat.bad_xids++;
611 return NULL; 698 return NULL;
612} 699}
700EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
613 701
614/** 702/**
615 * xprt_update_rtt - update an RPC client's RTT state after receiving a reply 703 * xprt_update_rtt - update an RPC client's RTT state after receiving a reply
@@ -629,6 +717,7 @@ void xprt_update_rtt(struct rpc_task *task)
629 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 717 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
630 } 718 }
631} 719}
720EXPORT_SYMBOL_GPL(xprt_update_rtt);
632 721
633/** 722/**
634 * xprt_complete_rqst - called when reply processing is complete 723 * xprt_complete_rqst - called when reply processing is complete
@@ -653,6 +742,7 @@ void xprt_complete_rqst(struct rpc_task *task, int copied)
653 req->rq_received = req->rq_private_buf.len = copied; 742 req->rq_received = req->rq_private_buf.len = copied;
654 rpc_wake_up_task(task); 743 rpc_wake_up_task(task);
655} 744}
745EXPORT_SYMBOL_GPL(xprt_complete_rqst);
656 746
657static void xprt_timer(struct rpc_task *task) 747static void xprt_timer(struct rpc_task *task)
658{ 748{
@@ -889,23 +979,25 @@ void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long i
889 * @args: rpc transport creation arguments 979 * @args: rpc transport creation arguments
890 * 980 *
891 */ 981 */
892struct rpc_xprt *xprt_create_transport(struct rpc_xprtsock_create *args) 982struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
893{ 983{
894 struct rpc_xprt *xprt; 984 struct rpc_xprt *xprt;
895 struct rpc_rqst *req; 985 struct rpc_rqst *req;
986 struct xprt_class *t;
896 987
897 switch (args->proto) { 988 spin_lock(&xprt_list_lock);
898 case IPPROTO_UDP: 989 list_for_each_entry(t, &xprt_list, list) {
899 xprt = xs_setup_udp(args); 990 if (t->ident == args->ident) {
900 break; 991 spin_unlock(&xprt_list_lock);
901 case IPPROTO_TCP: 992 goto found;
902 xprt = xs_setup_tcp(args); 993 }
903 break;
904 default:
905 printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n",
906 args->proto);
907 return ERR_PTR(-EIO);
908 } 994 }
995 spin_unlock(&xprt_list_lock);
996 printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident);
997 return ERR_PTR(-EIO);
998
999found:
1000 xprt = t->setup(args);
909 if (IS_ERR(xprt)) { 1001 if (IS_ERR(xprt)) {
910 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1002 dprintk("RPC: xprt_create_transport: failed, %ld\n",
911 -PTR_ERR(xprt)); 1003 -PTR_ERR(xprt));
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
new file mode 100644
index 000000000000..264f0feeb513
--- /dev/null
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -0,0 +1,3 @@
1obj-$(CONFIG_SUNRPC_XPRT_RDMA) += xprtrdma.o
2
3xprtrdma-y := transport.o rpc_rdma.o verbs.o
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
new file mode 100644
index 000000000000..12db63580427
--- /dev/null
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -0,0 +1,868 @@
1/*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
16 *
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
21 *
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 */
39
40/*
41 * rpc_rdma.c
42 *
43 * This file contains the guts of the RPC RDMA protocol, and
44 * does marshaling/unmarshaling, etc. It is also where interfacing
45 * to the Linux RPC framework lives.
46 */
47
48#include "xprt_rdma.h"
49
50#include <linux/highmem.h>
51
52#ifdef RPC_DEBUG
53# define RPCDBG_FACILITY RPCDBG_TRANS
54#endif
55
56enum rpcrdma_chunktype {
57 rpcrdma_noch = 0,
58 rpcrdma_readch,
59 rpcrdma_areadch,
60 rpcrdma_writech,
61 rpcrdma_replych
62};
63
64#ifdef RPC_DEBUG
65static const char transfertypes[][12] = {
66 "pure inline", /* no chunks */
67 " read chunk", /* some argument via rdma read */
68 "*read chunk", /* entire request via rdma read */
69 "write chunk", /* some result via rdma write */
70 "reply chunk" /* entire reply via rdma write */
71};
72#endif
73
74/*
75 * Chunk assembly from upper layer xdr_buf.
76 *
77 * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
78 * elements. Segments are then coalesced when registered, if possible
79 * within the selected memreg mode.
80 *
81 * Note, this routine is never called if the connection's memory
82 * registration strategy is 0 (bounce buffers).
83 */
84
85static int
86rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, int pos,
87 enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
88{
89 int len, n = 0, p;
90
91 if (pos == 0 && xdrbuf->head[0].iov_len) {
92 seg[n].mr_page = NULL;
93 seg[n].mr_offset = xdrbuf->head[0].iov_base;
94 seg[n].mr_len = xdrbuf->head[0].iov_len;
95 pos += xdrbuf->head[0].iov_len;
96 ++n;
97 }
98
99 if (xdrbuf->page_len && (xdrbuf->pages[0] != NULL)) {
100 if (n == nsegs)
101 return 0;
102 seg[n].mr_page = xdrbuf->pages[0];
103 seg[n].mr_offset = (void *)(unsigned long) xdrbuf->page_base;
104 seg[n].mr_len = min_t(u32,
105 PAGE_SIZE - xdrbuf->page_base, xdrbuf->page_len);
106 len = xdrbuf->page_len - seg[n].mr_len;
107 pos += len;
108 ++n;
109 p = 1;
110 while (len > 0) {
111 if (n == nsegs)
112 return 0;
113 seg[n].mr_page = xdrbuf->pages[p];
114 seg[n].mr_offset = NULL;
115 seg[n].mr_len = min_t(u32, PAGE_SIZE, len);
116 len -= seg[n].mr_len;
117 ++n;
118 ++p;
119 }
120 }
121
122 if (pos < xdrbuf->len && xdrbuf->tail[0].iov_len) {
123 if (n == nsegs)
124 return 0;
125 seg[n].mr_page = NULL;
126 seg[n].mr_offset = xdrbuf->tail[0].iov_base;
127 seg[n].mr_len = xdrbuf->tail[0].iov_len;
128 pos += xdrbuf->tail[0].iov_len;
129 ++n;
130 }
131
132 if (pos < xdrbuf->len)
133 dprintk("RPC: %s: marshaled only %d of %d\n",
134 __func__, pos, xdrbuf->len);
135
136 return n;
137}
138
139/*
140 * Create read/write chunk lists, and reply chunks, for RDMA
141 *
142 * Assume check against THRESHOLD has been done, and chunks are required.
143 * Assume only encoding one list entry for read|write chunks. The NFSv3
144 * protocol is simple enough to allow this as it only has a single "bulk
145 * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
146 * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
147 *
148 * When used for a single reply chunk (which is a special write
149 * chunk used for the entire reply, rather than just the data), it
150 * is used primarily for READDIR and READLINK which would otherwise
151 * be severely size-limited by a small rdma inline read max. The server
152 * response will come back as an RDMA Write, followed by a message
153 * of type RDMA_NOMSG carrying the xid and length. As a result, reply
154 * chunks do not provide data alignment, however they do not require
155 * "fixup" (moving the response to the upper layer buffer) either.
156 *
157 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
158 *
159 * Read chunklist (a linked list):
160 * N elements, position P (same P for all chunks of same arg!):
161 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
162 *
163 * Write chunklist (a list of (one) counted array):
164 * N elements:
165 * 1 - N - HLOO - HLOO - ... - HLOO - 0
166 *
167 * Reply chunk (a counted array):
168 * N elements:
169 * 1 - N - HLOO - HLOO - ... - HLOO
170 */
171
172static unsigned int
173rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
174 struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
175{
176 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
177 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt);
178 int nsegs, nchunks = 0;
179 int pos;
180 struct rpcrdma_mr_seg *seg = req->rl_segments;
181 struct rpcrdma_read_chunk *cur_rchunk = NULL;
182 struct rpcrdma_write_array *warray = NULL;
183 struct rpcrdma_write_chunk *cur_wchunk = NULL;
184 u32 *iptr = headerp->rm_body.rm_chunks;
185
186 if (type == rpcrdma_readch || type == rpcrdma_areadch) {
187 /* a read chunk - server will RDMA Read our memory */
188 cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
189 } else {
190 /* a write or reply chunk - server will RDMA Write our memory */
191 *iptr++ = xdr_zero; /* encode a NULL read chunk list */
192 if (type == rpcrdma_replych)
193 *iptr++ = xdr_zero; /* a NULL write chunk list */
194 warray = (struct rpcrdma_write_array *) iptr;
195 cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
196 }
197
198 if (type == rpcrdma_replych || type == rpcrdma_areadch)
199 pos = 0;
200 else
201 pos = target->head[0].iov_len;
202
203 nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
204 if (nsegs == 0)
205 return 0;
206
207 do {
208 /* bind/register the memory, then build chunk from result. */
209 int n = rpcrdma_register_external(seg, nsegs,
210 cur_wchunk != NULL, r_xprt);
211 if (n <= 0)
212 goto out;
213 if (cur_rchunk) { /* read */
214 cur_rchunk->rc_discrim = xdr_one;
215 /* all read chunks have the same "position" */
216 cur_rchunk->rc_position = htonl(pos);
217 cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
218 cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
219 xdr_encode_hyper(
220 (u32 *)&cur_rchunk->rc_target.rs_offset,
221 seg->mr_base);
222 dprintk("RPC: %s: read chunk "
223 "elem %d@0x%llx:0x%x pos %d (%s)\n", __func__,
224 seg->mr_len, seg->mr_base, seg->mr_rkey, pos,
225 n < nsegs ? "more" : "last");
226 cur_rchunk++;
227 r_xprt->rx_stats.read_chunk_count++;
228 } else { /* write/reply */
229 cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
230 cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
231 xdr_encode_hyper(
232 (u32 *)&cur_wchunk->wc_target.rs_offset,
233 seg->mr_base);
234 dprintk("RPC: %s: %s chunk "
235 "elem %d@0x%llx:0x%x (%s)\n", __func__,
236 (type == rpcrdma_replych) ? "reply" : "write",
237 seg->mr_len, seg->mr_base, seg->mr_rkey,
238 n < nsegs ? "more" : "last");
239 cur_wchunk++;
240 if (type == rpcrdma_replych)
241 r_xprt->rx_stats.reply_chunk_count++;
242 else
243 r_xprt->rx_stats.write_chunk_count++;
244 r_xprt->rx_stats.total_rdma_request += seg->mr_len;
245 }
246 nchunks++;
247 seg += n;
248 nsegs -= n;
249 } while (nsegs);
250
251 /* success. all failures return above */
252 req->rl_nchunks = nchunks;
253
254 BUG_ON(nchunks == 0);
255
256 /*
257 * finish off header. If write, marshal discrim and nchunks.
258 */
259 if (cur_rchunk) {
260 iptr = (u32 *) cur_rchunk;
261 *iptr++ = xdr_zero; /* finish the read chunk list */
262 *iptr++ = xdr_zero; /* encode a NULL write chunk list */
263 *iptr++ = xdr_zero; /* encode a NULL reply chunk */
264 } else {
265 warray->wc_discrim = xdr_one;
266 warray->wc_nchunks = htonl(nchunks);
267 iptr = (u32 *) cur_wchunk;
268 if (type == rpcrdma_writech) {
269 *iptr++ = xdr_zero; /* finish the write chunk list */
270 *iptr++ = xdr_zero; /* encode a NULL reply chunk */
271 }
272 }
273
274 /*
275 * Return header size.
276 */
277 return (unsigned char *)iptr - (unsigned char *)headerp;
278
279out:
280 for (pos = 0; nchunks--;)
281 pos += rpcrdma_deregister_external(
282 &req->rl_segments[pos], r_xprt, NULL);
283 return 0;
284}
285
286/*
287 * Copy write data inline.
288 * This function is used for "small" requests. Data which is passed
289 * to RPC via iovecs (or page list) is copied directly into the
290 * pre-registered memory buffer for this request. For small amounts
291 * of data, this is efficient. The cutoff value is tunable.
292 */
293static int
294rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
295{
296 int i, npages, curlen;
297 int copy_len;
298 unsigned char *srcp, *destp;
299 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
300
301 destp = rqst->rq_svec[0].iov_base;
302 curlen = rqst->rq_svec[0].iov_len;
303 destp += curlen;
304 /*
305 * Do optional padding where it makes sense. Alignment of write
306 * payload can help the server, if our setting is accurate.
307 */
308 pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/);
309 if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
310 pad = 0; /* don't pad this request */
311
312 dprintk("RPC: %s: pad %d destp 0x%p len %d hdrlen %d\n",
313 __func__, pad, destp, rqst->rq_slen, curlen);
314
315 copy_len = rqst->rq_snd_buf.page_len;
316 r_xprt->rx_stats.pullup_copy_count += copy_len;
317 npages = PAGE_ALIGN(rqst->rq_snd_buf.page_base+copy_len) >> PAGE_SHIFT;
318 for (i = 0; copy_len && i < npages; i++) {
319 if (i == 0)
320 curlen = PAGE_SIZE - rqst->rq_snd_buf.page_base;
321 else
322 curlen = PAGE_SIZE;
323 if (curlen > copy_len)
324 curlen = copy_len;
325 dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n",
326 __func__, i, destp, copy_len, curlen);
327 srcp = kmap_atomic(rqst->rq_snd_buf.pages[i],
328 KM_SKB_SUNRPC_DATA);
329 if (i == 0)
330 memcpy(destp, srcp+rqst->rq_snd_buf.page_base, curlen);
331 else
332 memcpy(destp, srcp, curlen);
333 kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA);
334 rqst->rq_svec[0].iov_len += curlen;
335 destp += curlen;
336 copy_len -= curlen;
337 }
338 if (rqst->rq_snd_buf.tail[0].iov_len) {
339 curlen = rqst->rq_snd_buf.tail[0].iov_len;
340 if (destp != rqst->rq_snd_buf.tail[0].iov_base) {
341 memcpy(destp,
342 rqst->rq_snd_buf.tail[0].iov_base, curlen);
343 r_xprt->rx_stats.pullup_copy_count += curlen;
344 }
345 dprintk("RPC: %s: tail destp 0x%p len %d curlen %d\n",
346 __func__, destp, copy_len, curlen);
347 rqst->rq_svec[0].iov_len += curlen;
348 }
349 /* header now contains entire send message */
350 return pad;
351}
352
353/*
354 * Marshal a request: the primary job of this routine is to choose
355 * the transfer modes. See comments below.
356 *
357 * Uses multiple RDMA IOVs for a request:
358 * [0] -- RPC RDMA header, which uses memory from the *start* of the
359 * preregistered buffer that already holds the RPC data in
360 * its middle.
361 * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
362 * [2] -- optional padding.
363 * [3] -- if padded, header only in [1] and data here.
364 */
365
366int
367rpcrdma_marshal_req(struct rpc_rqst *rqst)
368{
369 struct rpc_xprt *xprt = rqst->rq_task->tk_xprt;
370 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
371 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
372 char *base;
373 size_t hdrlen, rpclen, padlen;
374 enum rpcrdma_chunktype rtype, wtype;
375 struct rpcrdma_msg *headerp;
376
377 /*
378 * rpclen gets amount of data in first buffer, which is the
379 * pre-registered buffer.
380 */
381 base = rqst->rq_svec[0].iov_base;
382 rpclen = rqst->rq_svec[0].iov_len;
383
384 /* build RDMA header in private area at front */
385 headerp = (struct rpcrdma_msg *) req->rl_base;
386 /* don't htonl XID, it's already done in request */
387 headerp->rm_xid = rqst->rq_xid;
388 headerp->rm_vers = xdr_one;
389 headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
390 headerp->rm_type = __constant_htonl(RDMA_MSG);
391
392 /*
393 * Chunks needed for results?
394 *
395 * o If the expected result is under the inline threshold, all ops
396 * return as inline (but see later).
397 * o Large non-read ops return as a single reply chunk.
398 * o Large read ops return data as write chunk(s), header as inline.
399 *
400 * Note: the NFS code sending down multiple result segments implies
401 * the op is one of read, readdir[plus], readlink or NFSv4 getacl.
402 */
403
404 /*
405 * This code can handle read chunks, write chunks OR reply
406 * chunks -- only one type. If the request is too big to fit
407 * inline, then we will choose read chunks. If the request is
408 * a READ, then use write chunks to separate the file data
409 * into pages; otherwise use reply chunks.
410 */
411 if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
412 wtype = rpcrdma_noch;
413 else if (rqst->rq_rcv_buf.page_len == 0)
414 wtype = rpcrdma_replych;
415 else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
416 wtype = rpcrdma_writech;
417 else
418 wtype = rpcrdma_replych;
419
420 /*
421 * Chunks needed for arguments?
422 *
423 * o If the total request is under the inline threshold, all ops
424 * are sent as inline.
425 * o Large non-write ops are sent with the entire message as a
426 * single read chunk (protocol 0-position special case).
427 * o Large write ops transmit data as read chunk(s), header as
428 * inline.
429 *
430 * Note: the NFS code sending down multiple argument segments
431 * implies the op is a write.
432 * TBD check NFSv4 setacl
433 */
434 if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
435 rtype = rpcrdma_noch;
436 else if (rqst->rq_snd_buf.page_len == 0)
437 rtype = rpcrdma_areadch;
438 else
439 rtype = rpcrdma_readch;
440
441 /* The following simplification is not true forever */
442 if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
443 wtype = rpcrdma_noch;
444 BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
445
446 if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS &&
447 (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) {
448 /* forced to "pure inline"? */
449 dprintk("RPC: %s: too much data (%d/%d) for inline\n",
450 __func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len);
451 return -1;
452 }
453
454 hdrlen = 28; /*sizeof *headerp;*/
455 padlen = 0;
456
457 /*
458 * Pull up any extra send data into the preregistered buffer.
459 * When padding is in use and applies to the transfer, insert
460 * it and change the message type.
461 */
462 if (rtype == rpcrdma_noch) {
463
464 padlen = rpcrdma_inline_pullup(rqst,
465 RPCRDMA_INLINE_PAD_VALUE(rqst));
466
467 if (padlen) {
468 headerp->rm_type = __constant_htonl(RDMA_MSGP);
469 headerp->rm_body.rm_padded.rm_align =
470 htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
471 headerp->rm_body.rm_padded.rm_thresh =
472 __constant_htonl(RPCRDMA_INLINE_PAD_THRESH);
473 headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
474 headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
475 headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
476 hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
477 BUG_ON(wtype != rpcrdma_noch);
478
479 } else {
480 headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
481 headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
482 headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
483 /* new length after pullup */
484 rpclen = rqst->rq_svec[0].iov_len;
485 /*
486 * Currently we try to not actually use read inline.
487 * Reply chunks have the desirable property that
488 * they land, packed, directly in the target buffers
489 * without headers, so they require no fixup. The
490 * additional RDMA Write op sends the same amount
491 * of data, streams on-the-wire and adds no overhead
492 * on receive. Therefore, we request a reply chunk
493 * for non-writes wherever feasible and efficient.
494 */
495 if (wtype == rpcrdma_noch &&
496 r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER)
497 wtype = rpcrdma_replych;
498 }
499 }
500
501 /*
502 * Marshal chunks. This routine will return the header length
503 * consumed by marshaling.
504 */
505 if (rtype != rpcrdma_noch) {
506 hdrlen = rpcrdma_create_chunks(rqst,
507 &rqst->rq_snd_buf, headerp, rtype);
508 wtype = rtype; /* simplify dprintk */
509
510 } else if (wtype != rpcrdma_noch) {
511 hdrlen = rpcrdma_create_chunks(rqst,
512 &rqst->rq_rcv_buf, headerp, wtype);
513 }
514
515 if (hdrlen == 0)
516 return -1;
517
518 dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd\n"
519 " headerp 0x%p base 0x%p lkey 0x%x\n",
520 __func__, transfertypes[wtype], hdrlen, rpclen, padlen,
521 headerp, base, req->rl_iov.lkey);
522
523 /*
524 * initialize send_iov's - normally only two: rdma chunk header and
525 * single preregistered RPC header buffer, but if padding is present,
526 * then use a preregistered (and zeroed) pad buffer between the RPC
527 * header and any write data. In all non-rdma cases, any following
528 * data has been copied into the RPC header buffer.
529 */
530 req->rl_send_iov[0].addr = req->rl_iov.addr;
531 req->rl_send_iov[0].length = hdrlen;
532 req->rl_send_iov[0].lkey = req->rl_iov.lkey;
533
534 req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
535 req->rl_send_iov[1].length = rpclen;
536 req->rl_send_iov[1].lkey = req->rl_iov.lkey;
537
538 req->rl_niovs = 2;
539
540 if (padlen) {
541 struct rpcrdma_ep *ep = &r_xprt->rx_ep;
542
543 req->rl_send_iov[2].addr = ep->rep_pad.addr;
544 req->rl_send_iov[2].length = padlen;
545 req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
546
547 req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
548 req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
549 req->rl_send_iov[3].lkey = req->rl_iov.lkey;
550
551 req->rl_niovs = 4;
552 }
553
554 return 0;
555}
556
557/*
558 * Chase down a received write or reply chunklist to get length
559 * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
560 */
561static int
562rpcrdma_count_chunks(struct rpcrdma_rep *rep, int max, int wrchunk, u32 **iptrp)
563{
564 unsigned int i, total_len;
565 struct rpcrdma_write_chunk *cur_wchunk;
566
567 i = ntohl(**iptrp); /* get array count */
568 if (i > max)
569 return -1;
570 cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
571 total_len = 0;
572 while (i--) {
573 struct rpcrdma_segment *seg = &cur_wchunk->wc_target;
574 ifdebug(FACILITY) {
575 u64 off;
576 xdr_decode_hyper((u32 *)&seg->rs_offset, &off);
577 dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n",
578 __func__,
579 ntohl(seg->rs_length),
580 off,
581 ntohl(seg->rs_handle));
582 }
583 total_len += ntohl(seg->rs_length);
584 ++cur_wchunk;
585 }
586 /* check and adjust for properly terminated write chunk */
587 if (wrchunk) {
588 u32 *w = (u32 *) cur_wchunk;
589 if (*w++ != xdr_zero)
590 return -1;
591 cur_wchunk = (struct rpcrdma_write_chunk *) w;
592 }
593 if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
594 return -1;
595
596 *iptrp = (u32 *) cur_wchunk;
597 return total_len;
598}
599
600/*
601 * Scatter inline received data back into provided iov's.
602 */
603static void
604rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len)
605{
606 int i, npages, curlen, olen;
607 char *destp;
608
609 curlen = rqst->rq_rcv_buf.head[0].iov_len;
610 if (curlen > copy_len) { /* write chunk header fixup */
611 curlen = copy_len;
612 rqst->rq_rcv_buf.head[0].iov_len = curlen;
613 }
614
615 dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n",
616 __func__, srcp, copy_len, curlen);
617
618 /* Shift pointer for first receive segment only */
619 rqst->rq_rcv_buf.head[0].iov_base = srcp;
620 srcp += curlen;
621 copy_len -= curlen;
622
623 olen = copy_len;
624 i = 0;
625 rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
626 if (copy_len && rqst->rq_rcv_buf.page_len) {
627 npages = PAGE_ALIGN(rqst->rq_rcv_buf.page_base +
628 rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
629 for (; i < npages; i++) {
630 if (i == 0)
631 curlen = PAGE_SIZE - rqst->rq_rcv_buf.page_base;
632 else
633 curlen = PAGE_SIZE;
634 if (curlen > copy_len)
635 curlen = copy_len;
636 dprintk("RPC: %s: page %d"
637 " srcp 0x%p len %d curlen %d\n",
638 __func__, i, srcp, copy_len, curlen);
639 destp = kmap_atomic(rqst->rq_rcv_buf.pages[i],
640 KM_SKB_SUNRPC_DATA);
641 if (i == 0)
642 memcpy(destp + rqst->rq_rcv_buf.page_base,
643 srcp, curlen);
644 else
645 memcpy(destp, srcp, curlen);
646 flush_dcache_page(rqst->rq_rcv_buf.pages[i]);
647 kunmap_atomic(destp, KM_SKB_SUNRPC_DATA);
648 srcp += curlen;
649 copy_len -= curlen;
650 if (copy_len == 0)
651 break;
652 }
653 rqst->rq_rcv_buf.page_len = olen - copy_len;
654 } else
655 rqst->rq_rcv_buf.page_len = 0;
656
657 if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
658 curlen = copy_len;
659 if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
660 curlen = rqst->rq_rcv_buf.tail[0].iov_len;
661 if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
662 memcpy(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
663 dprintk("RPC: %s: tail srcp 0x%p len %d curlen %d\n",
664 __func__, srcp, copy_len, curlen);
665 rqst->rq_rcv_buf.tail[0].iov_len = curlen;
666 copy_len -= curlen; ++i;
667 } else
668 rqst->rq_rcv_buf.tail[0].iov_len = 0;
669
670 if (copy_len)
671 dprintk("RPC: %s: %d bytes in"
672 " %d extra segments (%d lost)\n",
673 __func__, olen, i, copy_len);
674
675 /* TBD avoid a warning from call_decode() */
676 rqst->rq_private_buf = rqst->rq_rcv_buf;
677}
678
679/*
680 * This function is called when an async event is posted to
681 * the connection which changes the connection state. All it
682 * does at this point is mark the connection up/down, the rpc
683 * timers do the rest.
684 */
685void
686rpcrdma_conn_func(struct rpcrdma_ep *ep)
687{
688 struct rpc_xprt *xprt = ep->rep_xprt;
689
690 spin_lock_bh(&xprt->transport_lock);
691 if (ep->rep_connected > 0) {
692 if (!xprt_test_and_set_connected(xprt))
693 xprt_wake_pending_tasks(xprt, 0);
694 } else {
695 if (xprt_test_and_clear_connected(xprt))
696 xprt_wake_pending_tasks(xprt, ep->rep_connected);
697 }
698 spin_unlock_bh(&xprt->transport_lock);
699}
700
701/*
702 * This function is called when memory window unbind which we are waiting
703 * for completes. Just use rr_func (zeroed by upcall) to signal completion.
704 */
705static void
706rpcrdma_unbind_func(struct rpcrdma_rep *rep)
707{
708 wake_up(&rep->rr_unbind);
709}
710
711/*
712 * Called as a tasklet to do req/reply match and complete a request
713 * Errors must result in the RPC task either being awakened, or
714 * allowed to timeout, to discover the errors at that time.
715 */
716void
717rpcrdma_reply_handler(struct rpcrdma_rep *rep)
718{
719 struct rpcrdma_msg *headerp;
720 struct rpcrdma_req *req;
721 struct rpc_rqst *rqst;
722 struct rpc_xprt *xprt = rep->rr_xprt;
723 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
724 u32 *iptr;
725 int i, rdmalen, status;
726
727 /* Check status. If bad, signal disconnect and return rep to pool */
728 if (rep->rr_len == ~0U) {
729 rpcrdma_recv_buffer_put(rep);
730 if (r_xprt->rx_ep.rep_connected == 1) {
731 r_xprt->rx_ep.rep_connected = -EIO;
732 rpcrdma_conn_func(&r_xprt->rx_ep);
733 }
734 return;
735 }
736 if (rep->rr_len < 28) {
737 dprintk("RPC: %s: short/invalid reply\n", __func__);
738 goto repost;
739 }
740 headerp = (struct rpcrdma_msg *) rep->rr_base;
741 if (headerp->rm_vers != xdr_one) {
742 dprintk("RPC: %s: invalid version %d\n",
743 __func__, ntohl(headerp->rm_vers));
744 goto repost;
745 }
746
747 /* Get XID and try for a match. */
748 spin_lock(&xprt->transport_lock);
749 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
750 if (rqst == NULL) {
751 spin_unlock(&xprt->transport_lock);
752 dprintk("RPC: %s: reply 0x%p failed "
753 "to match any request xid 0x%08x len %d\n",
754 __func__, rep, headerp->rm_xid, rep->rr_len);
755repost:
756 r_xprt->rx_stats.bad_reply_count++;
757 rep->rr_func = rpcrdma_reply_handler;
758 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
759 rpcrdma_recv_buffer_put(rep);
760
761 return;
762 }
763
764 /* get request object */
765 req = rpcr_to_rdmar(rqst);
766
767 dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
768 " RPC request 0x%p xid 0x%08x\n",
769 __func__, rep, req, rqst, headerp->rm_xid);
770
771 BUG_ON(!req || req->rl_reply);
772
773 /* from here on, the reply is no longer an orphan */
774 req->rl_reply = rep;
775
776 /* check for expected message types */
777 /* The order of some of these tests is important. */
778 switch (headerp->rm_type) {
779 case __constant_htonl(RDMA_MSG):
780 /* never expect read chunks */
781 /* never expect reply chunks (two ways to check) */
782 /* never expect write chunks without having offered RDMA */
783 if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
784 (headerp->rm_body.rm_chunks[1] == xdr_zero &&
785 headerp->rm_body.rm_chunks[2] != xdr_zero) ||
786 (headerp->rm_body.rm_chunks[1] != xdr_zero &&
787 req->rl_nchunks == 0))
788 goto badheader;
789 if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
790 /* count any expected write chunks in read reply */
791 /* start at write chunk array count */
792 iptr = &headerp->rm_body.rm_chunks[2];
793 rdmalen = rpcrdma_count_chunks(rep,
794 req->rl_nchunks, 1, &iptr);
795 /* check for validity, and no reply chunk after */
796 if (rdmalen < 0 || *iptr++ != xdr_zero)
797 goto badheader;
798 rep->rr_len -=
799 ((unsigned char *)iptr - (unsigned char *)headerp);
800 status = rep->rr_len + rdmalen;
801 r_xprt->rx_stats.total_rdma_reply += rdmalen;
802 } else {
803 /* else ordinary inline */
804 iptr = (u32 *)((unsigned char *)headerp + 28);
805 rep->rr_len -= 28; /*sizeof *headerp;*/
806 status = rep->rr_len;
807 }
808 /* Fix up the rpc results for upper layer */
809 rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len);
810 break;
811
812 case __constant_htonl(RDMA_NOMSG):
813 /* never expect read or write chunks, always reply chunks */
814 if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
815 headerp->rm_body.rm_chunks[1] != xdr_zero ||
816 headerp->rm_body.rm_chunks[2] != xdr_one ||
817 req->rl_nchunks == 0)
818 goto badheader;
819 iptr = (u32 *)((unsigned char *)headerp + 28);
820 rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
821 if (rdmalen < 0)
822 goto badheader;
823 r_xprt->rx_stats.total_rdma_reply += rdmalen;
824 /* Reply chunk buffer already is the reply vector - no fixup. */
825 status = rdmalen;
826 break;
827
828badheader:
829 default:
830 dprintk("%s: invalid rpcrdma reply header (type %d):"
831 " chunks[012] == %d %d %d"
832 " expected chunks <= %d\n",
833 __func__, ntohl(headerp->rm_type),
834 headerp->rm_body.rm_chunks[0],
835 headerp->rm_body.rm_chunks[1],
836 headerp->rm_body.rm_chunks[2],
837 req->rl_nchunks);
838 status = -EIO;
839 r_xprt->rx_stats.bad_reply_count++;
840 break;
841 }
842
843 /* If using mw bind, start the deregister process now. */
844 /* (Note: if mr_free(), cannot perform it here, in tasklet context) */
845 if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) {
846 case RPCRDMA_MEMWINDOWS:
847 for (i = 0; req->rl_nchunks-- > 1;)
848 i += rpcrdma_deregister_external(
849 &req->rl_segments[i], r_xprt, NULL);
850 /* Optionally wait (not here) for unbinds to complete */
851 rep->rr_func = rpcrdma_unbind_func;
852 (void) rpcrdma_deregister_external(&req->rl_segments[i],
853 r_xprt, rep);
854 break;
855 case RPCRDMA_MEMWINDOWS_ASYNC:
856 for (i = 0; req->rl_nchunks--;)
857 i += rpcrdma_deregister_external(&req->rl_segments[i],
858 r_xprt, NULL);
859 break;
860 default:
861 break;
862 }
863
864 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
865 __func__, xprt, rqst, status);
866 xprt_complete_rqst(rqst->rq_task, status);
867 spin_unlock(&xprt->transport_lock);
868}
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
new file mode 100644
index 000000000000..dc55cc974c90
--- /dev/null
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -0,0 +1,800 @@
1/*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
16 *
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
21 *
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 */
39
40/*
41 * transport.c
42 *
43 * This file contains the top-level implementation of an RPC RDMA
44 * transport.
45 *
46 * Naming convention: functions beginning with xprt_ are part of the
47 * transport switch. All others are RPC RDMA internal.
48 */
49
50#include <linux/module.h>
51#include <linux/init.h>
52#include <linux/seq_file.h>
53
54#include "xprt_rdma.h"
55
56#ifdef RPC_DEBUG
57# define RPCDBG_FACILITY RPCDBG_TRANS
58#endif
59
60MODULE_LICENSE("Dual BSD/GPL");
61
62MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
63MODULE_AUTHOR("Network Appliance, Inc.");
64
65/*
66 * tunables
67 */
68
69static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
70static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
71static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
72static unsigned int xprt_rdma_inline_write_padding;
73#if !RPCRDMA_PERSISTENT_REGISTRATION
74static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_REGISTER; /* FMR? */
75#else
76static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_ALLPHYSICAL;
77#endif
78
79#ifdef RPC_DEBUG
80
81static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
82static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
83static unsigned int zero;
84static unsigned int max_padding = PAGE_SIZE;
85static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
86static unsigned int max_memreg = RPCRDMA_LAST - 1;
87
88static struct ctl_table_header *sunrpc_table_header;
89
90static ctl_table xr_tunables_table[] = {
91 {
92 .ctl_name = CTL_SLOTTABLE_RDMA,
93 .procname = "rdma_slot_table_entries",
94 .data = &xprt_rdma_slot_table_entries,
95 .maxlen = sizeof(unsigned int),
96 .mode = 0644,
97 .proc_handler = &proc_dointvec_minmax,
98 .strategy = &sysctl_intvec,
99 .extra1 = &min_slot_table_size,
100 .extra2 = &max_slot_table_size
101 },
102 {
103 .ctl_name = CTL_RDMA_MAXINLINEREAD,
104 .procname = "rdma_max_inline_read",
105 .data = &xprt_rdma_max_inline_read,
106 .maxlen = sizeof(unsigned int),
107 .mode = 0644,
108 .proc_handler = &proc_dointvec,
109 .strategy = &sysctl_intvec,
110 },
111 {
112 .ctl_name = CTL_RDMA_MAXINLINEWRITE,
113 .procname = "rdma_max_inline_write",
114 .data = &xprt_rdma_max_inline_write,
115 .maxlen = sizeof(unsigned int),
116 .mode = 0644,
117 .proc_handler = &proc_dointvec,
118 .strategy = &sysctl_intvec,
119 },
120 {
121 .ctl_name = CTL_RDMA_WRITEPADDING,
122 .procname = "rdma_inline_write_padding",
123 .data = &xprt_rdma_inline_write_padding,
124 .maxlen = sizeof(unsigned int),
125 .mode = 0644,
126 .proc_handler = &proc_dointvec_minmax,
127 .strategy = &sysctl_intvec,
128 .extra1 = &zero,
129 .extra2 = &max_padding,
130 },
131 {
132 .ctl_name = CTL_RDMA_MEMREG,
133 .procname = "rdma_memreg_strategy",
134 .data = &xprt_rdma_memreg_strategy,
135 .maxlen = sizeof(unsigned int),
136 .mode = 0644,
137 .proc_handler = &proc_dointvec_minmax,
138 .strategy = &sysctl_intvec,
139 .extra1 = &min_memreg,
140 .extra2 = &max_memreg,
141 },
142 {
143 .ctl_name = 0,
144 },
145};
146
147static ctl_table sunrpc_table[] = {
148 {
149 .ctl_name = CTL_SUNRPC,
150 .procname = "sunrpc",
151 .mode = 0555,
152 .child = xr_tunables_table
153 },
154 {
155 .ctl_name = 0,
156 },
157};
158
159#endif
160
161static struct rpc_xprt_ops xprt_rdma_procs; /* forward reference */
162
163static void
164xprt_rdma_format_addresses(struct rpc_xprt *xprt)
165{
166 struct sockaddr_in *addr = (struct sockaddr_in *)
167 &rpcx_to_rdmad(xprt).addr;
168 char *buf;
169
170 buf = kzalloc(20, GFP_KERNEL);
171 if (buf)
172 snprintf(buf, 20, NIPQUAD_FMT, NIPQUAD(addr->sin_addr.s_addr));
173 xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
174
175 buf = kzalloc(8, GFP_KERNEL);
176 if (buf)
177 snprintf(buf, 8, "%u", ntohs(addr->sin_port));
178 xprt->address_strings[RPC_DISPLAY_PORT] = buf;
179
180 xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
181
182 buf = kzalloc(48, GFP_KERNEL);
183 if (buf)
184 snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
185 NIPQUAD(addr->sin_addr.s_addr),
186 ntohs(addr->sin_port), "rdma");
187 xprt->address_strings[RPC_DISPLAY_ALL] = buf;
188
189 buf = kzalloc(10, GFP_KERNEL);
190 if (buf)
191 snprintf(buf, 10, "%02x%02x%02x%02x",
192 NIPQUAD(addr->sin_addr.s_addr));
193 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
194
195 buf = kzalloc(8, GFP_KERNEL);
196 if (buf)
197 snprintf(buf, 8, "%4hx", ntohs(addr->sin_port));
198 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
199
200 buf = kzalloc(30, GFP_KERNEL);
201 if (buf)
202 snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
203 NIPQUAD(addr->sin_addr.s_addr),
204 ntohs(addr->sin_port) >> 8,
205 ntohs(addr->sin_port) & 0xff);
206 xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
207
208 /* netid */
209 xprt->address_strings[RPC_DISPLAY_NETID] = "rdma";
210}
211
212static void
213xprt_rdma_free_addresses(struct rpc_xprt *xprt)
214{
215 kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
216 kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
217 kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
218 kfree(xprt->address_strings[RPC_DISPLAY_HEX_ADDR]);
219 kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
220 kfree(xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR]);
221}
222
223static void
224xprt_rdma_connect_worker(struct work_struct *work)
225{
226 struct rpcrdma_xprt *r_xprt =
227 container_of(work, struct rpcrdma_xprt, rdma_connect.work);
228 struct rpc_xprt *xprt = &r_xprt->xprt;
229 int rc = 0;
230
231 if (!xprt->shutdown) {
232 xprt_clear_connected(xprt);
233
234 dprintk("RPC: %s: %sconnect\n", __func__,
235 r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
236 rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
237 if (rc)
238 goto out;
239 }
240 goto out_clear;
241
242out:
243 xprt_wake_pending_tasks(xprt, rc);
244
245out_clear:
246 dprintk("RPC: %s: exit\n", __func__);
247 xprt_clear_connecting(xprt);
248}
249
250/*
251 * xprt_rdma_destroy
252 *
253 * Destroy the xprt.
254 * Free all memory associated with the object, including its own.
255 * NOTE: none of the *destroy methods free memory for their top-level
256 * objects, even though they may have allocated it (they do free
257 * private memory). It's up to the caller to handle it. In this
258 * case (RDMA transport), all structure memory is inlined with the
259 * struct rpcrdma_xprt.
260 */
261static void
262xprt_rdma_destroy(struct rpc_xprt *xprt)
263{
264 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
265 int rc;
266
267 dprintk("RPC: %s: called\n", __func__);
268
269 cancel_delayed_work(&r_xprt->rdma_connect);
270 flush_scheduled_work();
271
272 xprt_clear_connected(xprt);
273
274 rpcrdma_buffer_destroy(&r_xprt->rx_buf);
275 rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
276 if (rc)
277 dprintk("RPC: %s: rpcrdma_ep_destroy returned %i\n",
278 __func__, rc);
279 rpcrdma_ia_close(&r_xprt->rx_ia);
280
281 xprt_rdma_free_addresses(xprt);
282
283 kfree(xprt->slot);
284 xprt->slot = NULL;
285 kfree(xprt);
286
287 dprintk("RPC: %s: returning\n", __func__);
288
289 module_put(THIS_MODULE);
290}
291
292/**
293 * xprt_setup_rdma - Set up transport to use RDMA
294 *
295 * @args: rpc transport arguments
296 */
297static struct rpc_xprt *
298xprt_setup_rdma(struct xprt_create *args)
299{
300 struct rpcrdma_create_data_internal cdata;
301 struct rpc_xprt *xprt;
302 struct rpcrdma_xprt *new_xprt;
303 struct rpcrdma_ep *new_ep;
304 struct sockaddr_in *sin;
305 int rc;
306
307 if (args->addrlen > sizeof(xprt->addr)) {
308 dprintk("RPC: %s: address too large\n", __func__);
309 return ERR_PTR(-EBADF);
310 }
311
312 xprt = kzalloc(sizeof(struct rpcrdma_xprt), GFP_KERNEL);
313 if (xprt == NULL) {
314 dprintk("RPC: %s: couldn't allocate rpcrdma_xprt\n",
315 __func__);
316 return ERR_PTR(-ENOMEM);
317 }
318
319 xprt->max_reqs = xprt_rdma_slot_table_entries;
320 xprt->slot = kcalloc(xprt->max_reqs,
321 sizeof(struct rpc_rqst), GFP_KERNEL);
322 if (xprt->slot == NULL) {
323 kfree(xprt);
324 dprintk("RPC: %s: couldn't allocate %d slots\n",
325 __func__, xprt->max_reqs);
326 return ERR_PTR(-ENOMEM);
327 }
328
329 /* 60 second timeout, no retries */
330 xprt_set_timeout(&xprt->timeout, 0, 60UL * HZ);
331 xprt->bind_timeout = (60U * HZ);
332 xprt->connect_timeout = (60U * HZ);
333 xprt->reestablish_timeout = (5U * HZ);
334 xprt->idle_timeout = (5U * 60 * HZ);
335
336 xprt->resvport = 0; /* privileged port not needed */
337 xprt->tsh_size = 0; /* RPC-RDMA handles framing */
338 xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE;
339 xprt->ops = &xprt_rdma_procs;
340
341 /*
342 * Set up RDMA-specific connect data.
343 */
344
345 /* Put server RDMA address in local cdata */
346 memcpy(&cdata.addr, args->dstaddr, args->addrlen);
347
348 /* Ensure xprt->addr holds valid server TCP (not RDMA)
349 * address, for any side protocols which peek at it */
350 xprt->prot = IPPROTO_TCP;
351 xprt->addrlen = args->addrlen;
352 memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
353
354 sin = (struct sockaddr_in *)&cdata.addr;
355 if (ntohs(sin->sin_port) != 0)
356 xprt_set_bound(xprt);
357
358 dprintk("RPC: %s: %u.%u.%u.%u:%u\n", __func__,
359 NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
360
361 /* Set max requests */
362 cdata.max_requests = xprt->max_reqs;
363
364 /* Set some length limits */
365 cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
366 cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
367
368 cdata.inline_wsize = xprt_rdma_max_inline_write;
369 if (cdata.inline_wsize > cdata.wsize)
370 cdata.inline_wsize = cdata.wsize;
371
372 cdata.inline_rsize = xprt_rdma_max_inline_read;
373 if (cdata.inline_rsize > cdata.rsize)
374 cdata.inline_rsize = cdata.rsize;
375
376 cdata.padding = xprt_rdma_inline_write_padding;
377
378 /*
379 * Create new transport instance, which includes initialized
380 * o ia
381 * o endpoint
382 * o buffers
383 */
384
385 new_xprt = rpcx_to_rdmax(xprt);
386
387 rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
388 xprt_rdma_memreg_strategy);
389 if (rc)
390 goto out1;
391
392 /*
393 * initialize and create ep
394 */
395 new_xprt->rx_data = cdata;
396 new_ep = &new_xprt->rx_ep;
397 new_ep->rep_remote_addr = cdata.addr;
398
399 rc = rpcrdma_ep_create(&new_xprt->rx_ep,
400 &new_xprt->rx_ia, &new_xprt->rx_data);
401 if (rc)
402 goto out2;
403
404 /*
405 * Allocate pre-registered send and receive buffers for headers and
406 * any inline data. Also specify any padding which will be provided
407 * from a preregistered zero buffer.
408 */
409 rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
410 &new_xprt->rx_data);
411 if (rc)
412 goto out3;
413
414 /*
415 * Register a callback for connection events. This is necessary because
416 * connection loss notification is async. We also catch connection loss
417 * when reaping receives.
418 */
419 INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
420 new_ep->rep_func = rpcrdma_conn_func;
421 new_ep->rep_xprt = xprt;
422
423 xprt_rdma_format_addresses(xprt);
424
425 if (!try_module_get(THIS_MODULE))
426 goto out4;
427
428 return xprt;
429
430out4:
431 xprt_rdma_free_addresses(xprt);
432 rc = -EINVAL;
433out3:
434 (void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
435out2:
436 rpcrdma_ia_close(&new_xprt->rx_ia);
437out1:
438 kfree(xprt->slot);
439 kfree(xprt);
440 return ERR_PTR(rc);
441}
442
443/*
444 * Close a connection, during shutdown or timeout/reconnect
445 */
446static void
447xprt_rdma_close(struct rpc_xprt *xprt)
448{
449 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
450
451 dprintk("RPC: %s: closing\n", __func__);
452 xprt_disconnect(xprt);
453 (void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
454}
455
456static void
457xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
458{
459 struct sockaddr_in *sap;
460
461 sap = (struct sockaddr_in *)&xprt->addr;
462 sap->sin_port = htons(port);
463 sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
464 sap->sin_port = htons(port);
465 dprintk("RPC: %s: %u\n", __func__, port);
466}
467
468static void
469xprt_rdma_connect(struct rpc_task *task)
470{
471 struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt;
472 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
473
474 if (!xprt_test_and_set_connecting(xprt)) {
475 if (r_xprt->rx_ep.rep_connected != 0) {
476 /* Reconnect */
477 schedule_delayed_work(&r_xprt->rdma_connect,
478 xprt->reestablish_timeout);
479 } else {
480 schedule_delayed_work(&r_xprt->rdma_connect, 0);
481 if (!RPC_IS_ASYNC(task))
482 flush_scheduled_work();
483 }
484 }
485}
486
487static int
488xprt_rdma_reserve_xprt(struct rpc_task *task)
489{
490 struct rpc_xprt *xprt = task->tk_xprt;
491 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
492 int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
493
494 /* == RPC_CWNDSCALE @ init, but *after* setup */
495 if (r_xprt->rx_buf.rb_cwndscale == 0UL) {
496 r_xprt->rx_buf.rb_cwndscale = xprt->cwnd;
497 dprintk("RPC: %s: cwndscale %lu\n", __func__,
498 r_xprt->rx_buf.rb_cwndscale);
499 BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
500 }
501 xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
502 return xprt_reserve_xprt_cong(task);
503}
504
505/*
506 * The RDMA allocate/free functions need the task structure as a place
507 * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
508 * sequence. For this reason, the recv buffers are attached to send
509 * buffers for portions of the RPC. Note that the RPC layer allocates
510 * both send and receive buffers in the same call. We may register
511 * the receive buffer portion when using reply chunks.
512 */
513static void *
514xprt_rdma_allocate(struct rpc_task *task, size_t size)
515{
516 struct rpc_xprt *xprt = task->tk_xprt;
517 struct rpcrdma_req *req, *nreq;
518
519 req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
520 BUG_ON(NULL == req);
521
522 if (size > req->rl_size) {
523 dprintk("RPC: %s: size %zd too large for buffer[%zd]: "
524 "prog %d vers %d proc %d\n",
525 __func__, size, req->rl_size,
526 task->tk_client->cl_prog, task->tk_client->cl_vers,
527 task->tk_msg.rpc_proc->p_proc);
528 /*
529 * Outgoing length shortage. Our inline write max must have
530 * been configured to perform direct i/o.
531 *
532 * This is therefore a large metadata operation, and the
533 * allocate call was made on the maximum possible message,
534 * e.g. containing long filename(s) or symlink data. In
535 * fact, while these metadata operations *might* carry
536 * large outgoing payloads, they rarely *do*. However, we
537 * have to commit to the request here, so reallocate and
538 * register it now. The data path will never require this
539 * reallocation.
540 *
541 * If the allocation or registration fails, the RPC framework
542 * will (doggedly) retry.
543 */
544 if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy ==
545 RPCRDMA_BOUNCEBUFFERS) {
546 /* forced to "pure inline" */
547 dprintk("RPC: %s: too much data (%zd) for inline "
548 "(r/w max %d/%d)\n", __func__, size,
549 rpcx_to_rdmad(xprt).inline_rsize,
550 rpcx_to_rdmad(xprt).inline_wsize);
551 size = req->rl_size;
552 rpc_exit(task, -EIO); /* fail the operation */
553 rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
554 goto out;
555 }
556 if (task->tk_flags & RPC_TASK_SWAPPER)
557 nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
558 else
559 nreq = kmalloc(sizeof *req + size, GFP_NOFS);
560 if (nreq == NULL)
561 goto outfail;
562
563 if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
564 nreq->rl_base, size + sizeof(struct rpcrdma_req)
565 - offsetof(struct rpcrdma_req, rl_base),
566 &nreq->rl_handle, &nreq->rl_iov)) {
567 kfree(nreq);
568 goto outfail;
569 }
570 rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
571 nreq->rl_size = size;
572 nreq->rl_niovs = 0;
573 nreq->rl_nchunks = 0;
574 nreq->rl_buffer = (struct rpcrdma_buffer *)req;
575 nreq->rl_reply = req->rl_reply;
576 memcpy(nreq->rl_segments,
577 req->rl_segments, sizeof nreq->rl_segments);
578 /* flag the swap with an unused field */
579 nreq->rl_iov.length = 0;
580 req->rl_reply = NULL;
581 req = nreq;
582 }
583 dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
584out:
585 return req->rl_xdr_buf;
586
587outfail:
588 rpcrdma_buffer_put(req);
589 rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
590 return NULL;
591}
592
593/*
594 * This function returns all RDMA resources to the pool.
595 */
596static void
597xprt_rdma_free(void *buffer)
598{
599 struct rpcrdma_req *req;
600 struct rpcrdma_xprt *r_xprt;
601 struct rpcrdma_rep *rep;
602 int i;
603
604 if (buffer == NULL)
605 return;
606
607 req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
608 r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
609 rep = req->rl_reply;
610
611 dprintk("RPC: %s: called on 0x%p%s\n",
612 __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
613
614 /*
615 * Finish the deregistration. When using mw bind, this was
616 * begun in rpcrdma_reply_handler(). In all other modes, we
617 * do it here, in thread context. The process is considered
618 * complete when the rr_func vector becomes NULL - this
619 * was put in place during rpcrdma_reply_handler() - the wait
620 * call below will not block if the dereg is "done". If
621 * interrupted, our framework will clean up.
622 */
623 for (i = 0; req->rl_nchunks;) {
624 --req->rl_nchunks;
625 i += rpcrdma_deregister_external(
626 &req->rl_segments[i], r_xprt, NULL);
627 }
628
629 if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
630 rep->rr_func = NULL; /* abandon the callback */
631 req->rl_reply = NULL;
632 }
633
634 if (req->rl_iov.length == 0) { /* see allocate above */
635 struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
636 oreq->rl_reply = req->rl_reply;
637 (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
638 req->rl_handle,
639 &req->rl_iov);
640 kfree(req);
641 req = oreq;
642 }
643
644 /* Put back request+reply buffers */
645 rpcrdma_buffer_put(req);
646}
647
648/*
649 * send_request invokes the meat of RPC RDMA. It must do the following:
650 * 1. Marshal the RPC request into an RPC RDMA request, which means
651 * putting a header in front of data, and creating IOVs for RDMA
652 * from those in the request.
653 * 2. In marshaling, detect opportunities for RDMA, and use them.
654 * 3. Post a recv message to set up asynch completion, then send
655 * the request (rpcrdma_ep_post).
656 * 4. No partial sends are possible in the RPC-RDMA protocol (as in UDP).
657 */
658
659static int
660xprt_rdma_send_request(struct rpc_task *task)
661{
662 struct rpc_rqst *rqst = task->tk_rqstp;
663 struct rpc_xprt *xprt = task->tk_xprt;
664 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
665 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
666
667 /* marshal the send itself */
668 if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
669 r_xprt->rx_stats.failed_marshal_count++;
670 dprintk("RPC: %s: rpcrdma_marshal_req failed\n",
671 __func__);
672 return -EIO;
673 }
674
675 if (req->rl_reply == NULL) /* e.g. reconnection */
676 rpcrdma_recv_buffer_get(req);
677
678 if (req->rl_reply) {
679 req->rl_reply->rr_func = rpcrdma_reply_handler;
680 /* this need only be done once, but... */
681 req->rl_reply->rr_xprt = xprt;
682 }
683
684 if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) {
685 xprt_disconnect(xprt);
686 return -ENOTCONN; /* implies disconnect */
687 }
688
689 rqst->rq_bytes_sent = 0;
690 return 0;
691}
692
693static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
694{
695 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
696 long idle_time = 0;
697
698 if (xprt_connected(xprt))
699 idle_time = (long)(jiffies - xprt->last_used) / HZ;
700
701 seq_printf(seq,
702 "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
703 "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
704
705 0, /* need a local port? */
706 xprt->stat.bind_count,
707 xprt->stat.connect_count,
708 xprt->stat.connect_time,
709 idle_time,
710 xprt->stat.sends,
711 xprt->stat.recvs,
712 xprt->stat.bad_xids,
713 xprt->stat.req_u,
714 xprt->stat.bklog_u,
715
716 r_xprt->rx_stats.read_chunk_count,
717 r_xprt->rx_stats.write_chunk_count,
718 r_xprt->rx_stats.reply_chunk_count,
719 r_xprt->rx_stats.total_rdma_request,
720 r_xprt->rx_stats.total_rdma_reply,
721 r_xprt->rx_stats.pullup_copy_count,
722 r_xprt->rx_stats.fixup_copy_count,
723 r_xprt->rx_stats.hardway_register_count,
724 r_xprt->rx_stats.failed_marshal_count,
725 r_xprt->rx_stats.bad_reply_count);
726}
727
728/*
729 * Plumbing for rpc transport switch and kernel module
730 */
731
732static struct rpc_xprt_ops xprt_rdma_procs = {
733 .reserve_xprt = xprt_rdma_reserve_xprt,
734 .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */
735 .release_request = xprt_release_rqst_cong, /* ditto */
736 .set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */
737 .rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */
738 .set_port = xprt_rdma_set_port,
739 .connect = xprt_rdma_connect,
740 .buf_alloc = xprt_rdma_allocate,
741 .buf_free = xprt_rdma_free,
742 .send_request = xprt_rdma_send_request,
743 .close = xprt_rdma_close,
744 .destroy = xprt_rdma_destroy,
745 .print_stats = xprt_rdma_print_stats
746};
747
748static struct xprt_class xprt_rdma = {
749 .list = LIST_HEAD_INIT(xprt_rdma.list),
750 .name = "rdma",
751 .owner = THIS_MODULE,
752 .ident = XPRT_TRANSPORT_RDMA,
753 .setup = xprt_setup_rdma,
754};
755
756static void __exit xprt_rdma_cleanup(void)
757{
758 int rc;
759
760 dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
761#ifdef RPC_DEBUG
762 if (sunrpc_table_header) {
763 unregister_sysctl_table(sunrpc_table_header);
764 sunrpc_table_header = NULL;
765 }
766#endif
767 rc = xprt_unregister_transport(&xprt_rdma);
768 if (rc)
769 dprintk("RPC: %s: xprt_unregister returned %i\n",
770 __func__, rc);
771}
772
773static int __init xprt_rdma_init(void)
774{
775 int rc;
776
777 rc = xprt_register_transport(&xprt_rdma);
778
779 if (rc)
780 return rc;
781
782 dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n");
783
784 dprintk(KERN_INFO "Defaults:\n");
785 dprintk(KERN_INFO "\tSlots %d\n"
786 "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
787 xprt_rdma_slot_table_entries,
788 xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
789 dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n",
790 xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
791
792#ifdef RPC_DEBUG
793 if (!sunrpc_table_header)
794 sunrpc_table_header = register_sysctl_table(sunrpc_table);
795#endif
796 return 0;
797}
798
799module_init(xprt_rdma_init);
800module_exit(xprt_rdma_cleanup);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
new file mode 100644
index 000000000000..9ec8ca4f6028
--- /dev/null
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -0,0 +1,1626 @@
1/*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
16 *
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
21 *
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 */
39
40/*
41 * verbs.c
42 *
43 * Encapsulates the major functions managing:
44 * o adapters
45 * o endpoints
46 * o connections
47 * o buffer memory
48 */
49
50#include <linux/pci.h> /* for Tavor hack below */
51
52#include "xprt_rdma.h"
53
54/*
55 * Globals/Macros
56 */
57
58#ifdef RPC_DEBUG
59# define RPCDBG_FACILITY RPCDBG_TRANS
60#endif
61
62/*
63 * internal functions
64 */
65
66/*
67 * handle replies in tasklet context, using a single, global list
68 * rdma tasklet function -- just turn around and call the func
69 * for all replies on the list
70 */
71
72static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73static LIST_HEAD(rpcrdma_tasklets_g);
74
75static void
76rpcrdma_run_tasklet(unsigned long data)
77{
78 struct rpcrdma_rep *rep;
79 void (*func)(struct rpcrdma_rep *);
80 unsigned long flags;
81
82 data = data;
83 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84 while (!list_empty(&rpcrdma_tasklets_g)) {
85 rep = list_entry(rpcrdma_tasklets_g.next,
86 struct rpcrdma_rep, rr_list);
87 list_del(&rep->rr_list);
88 func = rep->rr_func;
89 rep->rr_func = NULL;
90 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91
92 if (func)
93 func(rep);
94 else
95 rpcrdma_recv_buffer_put(rep);
96
97 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98 }
99 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100}
101
102static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103
104static inline void
105rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106{
107 unsigned long flags;
108
109 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110 list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112 tasklet_schedule(&rpcrdma_tasklet_g);
113}
114
115static void
116rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117{
118 struct rpcrdma_ep *ep = context;
119
120 dprintk("RPC: %s: QP error %X on device %s ep %p\n",
121 __func__, event->event, event->device->name, context);
122 if (ep->rep_connected == 1) {
123 ep->rep_connected = -EIO;
124 ep->rep_func(ep);
125 wake_up_all(&ep->rep_connect_wait);
126 }
127}
128
129static void
130rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131{
132 struct rpcrdma_ep *ep = context;
133
134 dprintk("RPC: %s: CQ error %X on device %s ep %p\n",
135 __func__, event->event, event->device->name, context);
136 if (ep->rep_connected == 1) {
137 ep->rep_connected = -EIO;
138 ep->rep_func(ep);
139 wake_up_all(&ep->rep_connect_wait);
140 }
141}
142
143static inline
144void rpcrdma_event_process(struct ib_wc *wc)
145{
146 struct rpcrdma_rep *rep =
147 (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148
149 dprintk("RPC: %s: event rep %p status %X opcode %X length %u\n",
150 __func__, rep, wc->status, wc->opcode, wc->byte_len);
151
152 if (!rep) /* send or bind completion that we don't care about */
153 return;
154
155 if (IB_WC_SUCCESS != wc->status) {
156 dprintk("RPC: %s: %s WC status %X, connection lost\n",
157 __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158 wc->status);
159 rep->rr_len = ~0U;
160 rpcrdma_schedule_tasklet(rep);
161 return;
162 }
163
164 switch (wc->opcode) {
165 case IB_WC_RECV:
166 rep->rr_len = wc->byte_len;
167 ib_dma_sync_single_for_cpu(
168 rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169 rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170 /* Keep (only) the most recent credits, after check validity */
171 if (rep->rr_len >= 16) {
172 struct rpcrdma_msg *p =
173 (struct rpcrdma_msg *) rep->rr_base;
174 unsigned int credits = ntohl(p->rm_credit);
175 if (credits == 0) {
176 dprintk("RPC: %s: server"
177 " dropped credits to 0!\n", __func__);
178 /* don't deadlock */
179 credits = 1;
180 } else if (credits > rep->rr_buffer->rb_max_requests) {
181 dprintk("RPC: %s: server"
182 " over-crediting: %d (%d)\n",
183 __func__, credits,
184 rep->rr_buffer->rb_max_requests);
185 credits = rep->rr_buffer->rb_max_requests;
186 }
187 atomic_set(&rep->rr_buffer->rb_credits, credits);
188 }
189 /* fall through */
190 case IB_WC_BIND_MW:
191 rpcrdma_schedule_tasklet(rep);
192 break;
193 default:
194 dprintk("RPC: %s: unexpected WC event %X\n",
195 __func__, wc->opcode);
196 break;
197 }
198}
199
200static inline int
201rpcrdma_cq_poll(struct ib_cq *cq)
202{
203 struct ib_wc wc;
204 int rc;
205
206 for (;;) {
207 rc = ib_poll_cq(cq, 1, &wc);
208 if (rc < 0) {
209 dprintk("RPC: %s: ib_poll_cq failed %i\n",
210 __func__, rc);
211 return rc;
212 }
213 if (rc == 0)
214 break;
215
216 rpcrdma_event_process(&wc);
217 }
218
219 return 0;
220}
221
222/*
223 * rpcrdma_cq_event_upcall
224 *
225 * This upcall handles recv, send, bind and unbind events.
226 * It is reentrant but processes single events in order to maintain
227 * ordering of receives to keep server credits.
228 *
229 * It is the responsibility of the scheduled tasklet to return
230 * recv buffers to the pool. NOTE: this affects synchronization of
231 * connection shutdown. That is, the structures required for
232 * the completion of the reply handler must remain intact until
233 * all memory has been reclaimed.
234 *
235 * Note that send events are suppressed and do not result in an upcall.
236 */
237static void
238rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239{
240 int rc;
241
242 rc = rpcrdma_cq_poll(cq);
243 if (rc)
244 return;
245
246 rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247 if (rc) {
248 dprintk("RPC: %s: ib_req_notify_cq failed %i\n",
249 __func__, rc);
250 return;
251 }
252
253 rpcrdma_cq_poll(cq);
254}
255
256#ifdef RPC_DEBUG
257static const char * const conn[] = {
258 "address resolved",
259 "address error",
260 "route resolved",
261 "route error",
262 "connect request",
263 "connect response",
264 "connect error",
265 "unreachable",
266 "rejected",
267 "established",
268 "disconnected",
269 "device removal"
270};
271#endif
272
273static int
274rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275{
276 struct rpcrdma_xprt *xprt = id->context;
277 struct rpcrdma_ia *ia = &xprt->rx_ia;
278 struct rpcrdma_ep *ep = &xprt->rx_ep;
279 struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
280 struct ib_qp_attr attr;
281 struct ib_qp_init_attr iattr;
282 int connstate = 0;
283
284 switch (event->event) {
285 case RDMA_CM_EVENT_ADDR_RESOLVED:
286 case RDMA_CM_EVENT_ROUTE_RESOLVED:
287 complete(&ia->ri_done);
288 break;
289 case RDMA_CM_EVENT_ADDR_ERROR:
290 ia->ri_async_rc = -EHOSTUNREACH;
291 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
292 __func__, ep);
293 complete(&ia->ri_done);
294 break;
295 case RDMA_CM_EVENT_ROUTE_ERROR:
296 ia->ri_async_rc = -ENETUNREACH;
297 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
298 __func__, ep);
299 complete(&ia->ri_done);
300 break;
301 case RDMA_CM_EVENT_ESTABLISHED:
302 connstate = 1;
303 ib_query_qp(ia->ri_id->qp, &attr,
304 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
305 &iattr);
306 dprintk("RPC: %s: %d responder resources"
307 " (%d initiator)\n",
308 __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
309 goto connected;
310 case RDMA_CM_EVENT_CONNECT_ERROR:
311 connstate = -ENOTCONN;
312 goto connected;
313 case RDMA_CM_EVENT_UNREACHABLE:
314 connstate = -ENETDOWN;
315 goto connected;
316 case RDMA_CM_EVENT_REJECTED:
317 connstate = -ECONNREFUSED;
318 goto connected;
319 case RDMA_CM_EVENT_DISCONNECTED:
320 connstate = -ECONNABORTED;
321 goto connected;
322 case RDMA_CM_EVENT_DEVICE_REMOVAL:
323 connstate = -ENODEV;
324connected:
325 dprintk("RPC: %s: %s: %u.%u.%u.%u:%u"
326 " (ep 0x%p event 0x%x)\n",
327 __func__,
328 (event->event <= 11) ? conn[event->event] :
329 "unknown connection error",
330 NIPQUAD(addr->sin_addr.s_addr),
331 ntohs(addr->sin_port),
332 ep, event->event);
333 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
334 dprintk("RPC: %s: %sconnected\n",
335 __func__, connstate > 0 ? "" : "dis");
336 ep->rep_connected = connstate;
337 ep->rep_func(ep);
338 wake_up_all(&ep->rep_connect_wait);
339 break;
340 default:
341 ia->ri_async_rc = -EINVAL;
342 dprintk("RPC: %s: unexpected CM event %X\n",
343 __func__, event->event);
344 complete(&ia->ri_done);
345 break;
346 }
347
348 return 0;
349}
350
351static struct rdma_cm_id *
352rpcrdma_create_id(struct rpcrdma_xprt *xprt,
353 struct rpcrdma_ia *ia, struct sockaddr *addr)
354{
355 struct rdma_cm_id *id;
356 int rc;
357
358 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
359 if (IS_ERR(id)) {
360 rc = PTR_ERR(id);
361 dprintk("RPC: %s: rdma_create_id() failed %i\n",
362 __func__, rc);
363 return id;
364 }
365
366 ia->ri_async_rc = 0;
367 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
368 if (rc) {
369 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
370 __func__, rc);
371 goto out;
372 }
373 wait_for_completion(&ia->ri_done);
374 rc = ia->ri_async_rc;
375 if (rc)
376 goto out;
377
378 ia->ri_async_rc = 0;
379 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
380 if (rc) {
381 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
382 __func__, rc);
383 goto out;
384 }
385 wait_for_completion(&ia->ri_done);
386 rc = ia->ri_async_rc;
387 if (rc)
388 goto out;
389
390 return id;
391
392out:
393 rdma_destroy_id(id);
394 return ERR_PTR(rc);
395}
396
397/*
398 * Drain any cq, prior to teardown.
399 */
400static void
401rpcrdma_clean_cq(struct ib_cq *cq)
402{
403 struct ib_wc wc;
404 int count = 0;
405
406 while (1 == ib_poll_cq(cq, 1, &wc))
407 ++count;
408
409 if (count)
410 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
411 __func__, count, wc.opcode);
412}
413
414/*
415 * Exported functions.
416 */
417
418/*
419 * Open and initialize an Interface Adapter.
420 * o initializes fields of struct rpcrdma_ia, including
421 * interface and provider attributes and protection zone.
422 */
423int
424rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
425{
426 int rc;
427 struct rpcrdma_ia *ia = &xprt->rx_ia;
428
429 init_completion(&ia->ri_done);
430
431 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
432 if (IS_ERR(ia->ri_id)) {
433 rc = PTR_ERR(ia->ri_id);
434 goto out1;
435 }
436
437 ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
438 if (IS_ERR(ia->ri_pd)) {
439 rc = PTR_ERR(ia->ri_pd);
440 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
441 __func__, rc);
442 goto out2;
443 }
444
445 /*
446 * Optionally obtain an underlying physical identity mapping in
447 * order to do a memory window-based bind. This base registration
448 * is protected from remote access - that is enabled only by binding
449 * for the specific bytes targeted during each RPC operation, and
450 * revoked after the corresponding completion similar to a storage
451 * adapter.
452 */
453 if (memreg > RPCRDMA_REGISTER) {
454 int mem_priv = IB_ACCESS_LOCAL_WRITE;
455 switch (memreg) {
456#if RPCRDMA_PERSISTENT_REGISTRATION
457 case RPCRDMA_ALLPHYSICAL:
458 mem_priv |= IB_ACCESS_REMOTE_WRITE;
459 mem_priv |= IB_ACCESS_REMOTE_READ;
460 break;
461#endif
462 case RPCRDMA_MEMWINDOWS_ASYNC:
463 case RPCRDMA_MEMWINDOWS:
464 mem_priv |= IB_ACCESS_MW_BIND;
465 break;
466 default:
467 break;
468 }
469 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
470 if (IS_ERR(ia->ri_bind_mem)) {
471 printk(KERN_ALERT "%s: ib_get_dma_mr for "
472 "phys register failed with %lX\n\t"
473 "Will continue with degraded performance\n",
474 __func__, PTR_ERR(ia->ri_bind_mem));
475 memreg = RPCRDMA_REGISTER;
476 ia->ri_bind_mem = NULL;
477 }
478 }
479
480 /* Else will do memory reg/dereg for each chunk */
481 ia->ri_memreg_strategy = memreg;
482
483 return 0;
484out2:
485 rdma_destroy_id(ia->ri_id);
486out1:
487 return rc;
488}
489
490/*
491 * Clean up/close an IA.
492 * o if event handles and PD have been initialized, free them.
493 * o close the IA
494 */
495void
496rpcrdma_ia_close(struct rpcrdma_ia *ia)
497{
498 int rc;
499
500 dprintk("RPC: %s: entering\n", __func__);
501 if (ia->ri_bind_mem != NULL) {
502 rc = ib_dereg_mr(ia->ri_bind_mem);
503 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
504 __func__, rc);
505 }
506 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id) && ia->ri_id->qp)
507 rdma_destroy_qp(ia->ri_id);
508 if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
509 rc = ib_dealloc_pd(ia->ri_pd);
510 dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
511 __func__, rc);
512 }
513 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id))
514 rdma_destroy_id(ia->ri_id);
515}
516
517/*
518 * Create unconnected endpoint.
519 */
520int
521rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
522 struct rpcrdma_create_data_internal *cdata)
523{
524 struct ib_device_attr devattr;
525 int rc;
526
527 rc = ib_query_device(ia->ri_id->device, &devattr);
528 if (rc) {
529 dprintk("RPC: %s: ib_query_device failed %d\n",
530 __func__, rc);
531 return rc;
532 }
533
534 /* check provider's send/recv wr limits */
535 if (cdata->max_requests > devattr.max_qp_wr)
536 cdata->max_requests = devattr.max_qp_wr;
537
538 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
539 ep->rep_attr.qp_context = ep;
540 /* send_cq and recv_cq initialized below */
541 ep->rep_attr.srq = NULL;
542 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
543 switch (ia->ri_memreg_strategy) {
544 case RPCRDMA_MEMWINDOWS_ASYNC:
545 case RPCRDMA_MEMWINDOWS:
546 /* Add room for mw_binds+unbinds - overkill! */
547 ep->rep_attr.cap.max_send_wr++;
548 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
549 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
550 return -EINVAL;
551 break;
552 default:
553 break;
554 }
555 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
556 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
557 ep->rep_attr.cap.max_recv_sge = 1;
558 ep->rep_attr.cap.max_inline_data = 0;
559 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
560 ep->rep_attr.qp_type = IB_QPT_RC;
561 ep->rep_attr.port_num = ~0;
562
563 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
564 "iovs: send %d recv %d\n",
565 __func__,
566 ep->rep_attr.cap.max_send_wr,
567 ep->rep_attr.cap.max_recv_wr,
568 ep->rep_attr.cap.max_send_sge,
569 ep->rep_attr.cap.max_recv_sge);
570
571 /* set trigger for requesting send completion */
572 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /* - 1*/;
573 switch (ia->ri_memreg_strategy) {
574 case RPCRDMA_MEMWINDOWS_ASYNC:
575 case RPCRDMA_MEMWINDOWS:
576 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
577 break;
578 default:
579 break;
580 }
581 if (ep->rep_cqinit <= 2)
582 ep->rep_cqinit = 0;
583 INIT_CQCOUNT(ep);
584 ep->rep_ia = ia;
585 init_waitqueue_head(&ep->rep_connect_wait);
586
587 /*
588 * Create a single cq for receive dto and mw_bind (only ever
589 * care about unbind, really). Send completions are suppressed.
590 * Use single threaded tasklet upcalls to maintain ordering.
591 */
592 ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
593 rpcrdma_cq_async_error_upcall, NULL,
594 ep->rep_attr.cap.max_recv_wr +
595 ep->rep_attr.cap.max_send_wr + 1, 0);
596 if (IS_ERR(ep->rep_cq)) {
597 rc = PTR_ERR(ep->rep_cq);
598 dprintk("RPC: %s: ib_create_cq failed: %i\n",
599 __func__, rc);
600 goto out1;
601 }
602
603 rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
604 if (rc) {
605 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
606 __func__, rc);
607 goto out2;
608 }
609
610 ep->rep_attr.send_cq = ep->rep_cq;
611 ep->rep_attr.recv_cq = ep->rep_cq;
612
613 /* Initialize cma parameters */
614
615 /* RPC/RDMA does not use private data */
616 ep->rep_remote_cma.private_data = NULL;
617 ep->rep_remote_cma.private_data_len = 0;
618
619 /* Client offers RDMA Read but does not initiate */
620 switch (ia->ri_memreg_strategy) {
621 case RPCRDMA_BOUNCEBUFFERS:
622 ep->rep_remote_cma.responder_resources = 0;
623 break;
624 case RPCRDMA_MTHCAFMR:
625 case RPCRDMA_REGISTER:
626 ep->rep_remote_cma.responder_resources = cdata->max_requests *
627 (RPCRDMA_MAX_DATA_SEGS / 8);
628 break;
629 case RPCRDMA_MEMWINDOWS:
630 case RPCRDMA_MEMWINDOWS_ASYNC:
631#if RPCRDMA_PERSISTENT_REGISTRATION
632 case RPCRDMA_ALLPHYSICAL:
633#endif
634 ep->rep_remote_cma.responder_resources = cdata->max_requests *
635 (RPCRDMA_MAX_DATA_SEGS / 2);
636 break;
637 default:
638 break;
639 }
640 if (ep->rep_remote_cma.responder_resources > devattr.max_qp_rd_atom)
641 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
642 ep->rep_remote_cma.initiator_depth = 0;
643
644 ep->rep_remote_cma.retry_count = 7;
645 ep->rep_remote_cma.flow_control = 0;
646 ep->rep_remote_cma.rnr_retry_count = 0;
647
648 return 0;
649
650out2:
651 if (ib_destroy_cq(ep->rep_cq))
652 ;
653out1:
654 return rc;
655}
656
657/*
658 * rpcrdma_ep_destroy
659 *
660 * Disconnect and destroy endpoint. After this, the only
661 * valid operations on the ep are to free it (if dynamically
662 * allocated) or re-create it.
663 *
664 * The caller's error handling must be sure to not leak the endpoint
665 * if this function fails.
666 */
667int
668rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
669{
670 int rc;
671
672 dprintk("RPC: %s: entering, connected is %d\n",
673 __func__, ep->rep_connected);
674
675 if (ia->ri_id->qp) {
676 rc = rpcrdma_ep_disconnect(ep, ia);
677 if (rc)
678 dprintk("RPC: %s: rpcrdma_ep_disconnect"
679 " returned %i\n", __func__, rc);
680 }
681
682 ep->rep_func = NULL;
683
684 /* padding - could be done in rpcrdma_buffer_destroy... */
685 if (ep->rep_pad_mr) {
686 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
687 ep->rep_pad_mr = NULL;
688 }
689
690 if (ia->ri_id->qp) {
691 rdma_destroy_qp(ia->ri_id);
692 ia->ri_id->qp = NULL;
693 }
694
695 rpcrdma_clean_cq(ep->rep_cq);
696 rc = ib_destroy_cq(ep->rep_cq);
697 if (rc)
698 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
699 __func__, rc);
700
701 return rc;
702}
703
704/*
705 * Connect unconnected endpoint.
706 */
707int
708rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
709{
710 struct rdma_cm_id *id;
711 int rc = 0;
712 int retry_count = 0;
713 int reconnect = (ep->rep_connected != 0);
714
715 if (reconnect) {
716 struct rpcrdma_xprt *xprt;
717retry:
718 rc = rpcrdma_ep_disconnect(ep, ia);
719 if (rc && rc != -ENOTCONN)
720 dprintk("RPC: %s: rpcrdma_ep_disconnect"
721 " status %i\n", __func__, rc);
722 rpcrdma_clean_cq(ep->rep_cq);
723
724 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
725 id = rpcrdma_create_id(xprt, ia,
726 (struct sockaddr *)&xprt->rx_data.addr);
727 if (IS_ERR(id)) {
728 rc = PTR_ERR(id);
729 goto out;
730 }
731 /* TEMP TEMP TEMP - fail if new device:
732 * Deregister/remarshal *all* requests!
733 * Close and recreate adapter, pd, etc!
734 * Re-determine all attributes still sane!
735 * More stuff I haven't thought of!
736 * Rrrgh!
737 */
738 if (ia->ri_id->device != id->device) {
739 printk("RPC: %s: can't reconnect on "
740 "different device!\n", __func__);
741 rdma_destroy_id(id);
742 rc = -ENETDOWN;
743 goto out;
744 }
745 /* END TEMP */
746 rdma_destroy_id(ia->ri_id);
747 ia->ri_id = id;
748 }
749
750 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
751 if (rc) {
752 dprintk("RPC: %s: rdma_create_qp failed %i\n",
753 __func__, rc);
754 goto out;
755 }
756
757/* XXX Tavor device performs badly with 2K MTU! */
758if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
759 struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
760 if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
761 (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
762 pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
763 struct ib_qp_attr attr = {
764 .path_mtu = IB_MTU_1024
765 };
766 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
767 }
768}
769
770 /* Theoretically a client initiator_depth > 0 is not needed,
771 * but many peers fail to complete the connection unless they
772 * == responder_resources! */
773 if (ep->rep_remote_cma.initiator_depth !=
774 ep->rep_remote_cma.responder_resources)
775 ep->rep_remote_cma.initiator_depth =
776 ep->rep_remote_cma.responder_resources;
777
778 ep->rep_connected = 0;
779
780 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
781 if (rc) {
782 dprintk("RPC: %s: rdma_connect() failed with %i\n",
783 __func__, rc);
784 goto out;
785 }
786
787 if (reconnect)
788 return 0;
789
790 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
791
792 /*
793 * Check state. A non-peer reject indicates no listener
794 * (ECONNREFUSED), which may be a transient state. All
795 * others indicate a transport condition which has already
796 * undergone a best-effort.
797 */
798 if (ep->rep_connected == -ECONNREFUSED
799 && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
800 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
801 goto retry;
802 }
803 if (ep->rep_connected <= 0) {
804 /* Sometimes, the only way to reliably connect to remote
805 * CMs is to use same nonzero values for ORD and IRD. */
806 ep->rep_remote_cma.initiator_depth =
807 ep->rep_remote_cma.responder_resources;
808 if (ep->rep_remote_cma.initiator_depth == 0)
809 ++ep->rep_remote_cma.initiator_depth;
810 if (ep->rep_remote_cma.responder_resources == 0)
811 ++ep->rep_remote_cma.responder_resources;
812 if (retry_count++ == 0)
813 goto retry;
814 rc = ep->rep_connected;
815 } else {
816 dprintk("RPC: %s: connected\n", __func__);
817 }
818
819out:
820 if (rc)
821 ep->rep_connected = rc;
822 return rc;
823}
824
825/*
826 * rpcrdma_ep_disconnect
827 *
828 * This is separate from destroy to facilitate the ability
829 * to reconnect without recreating the endpoint.
830 *
831 * This call is not reentrant, and must not be made in parallel
832 * on the same endpoint.
833 */
834int
835rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
836{
837 int rc;
838
839 rpcrdma_clean_cq(ep->rep_cq);
840 rc = rdma_disconnect(ia->ri_id);
841 if (!rc) {
842 /* returns without wait if not connected */
843 wait_event_interruptible(ep->rep_connect_wait,
844 ep->rep_connected != 1);
845 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
846 (ep->rep_connected == 1) ? "still " : "dis");
847 } else {
848 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
849 ep->rep_connected = rc;
850 }
851 return rc;
852}
853
854/*
855 * Initialize buffer memory
856 */
857int
858rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
859 struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
860{
861 char *p;
862 size_t len;
863 int i, rc;
864
865 buf->rb_max_requests = cdata->max_requests;
866 spin_lock_init(&buf->rb_lock);
867 atomic_set(&buf->rb_credits, 1);
868
869 /* Need to allocate:
870 * 1. arrays for send and recv pointers
871 * 2. arrays of struct rpcrdma_req to fill in pointers
872 * 3. array of struct rpcrdma_rep for replies
873 * 4. padding, if any
874 * 5. mw's, if any
875 * Send/recv buffers in req/rep need to be registered
876 */
877
878 len = buf->rb_max_requests *
879 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
880 len += cdata->padding;
881 switch (ia->ri_memreg_strategy) {
882 case RPCRDMA_MTHCAFMR:
883 /* TBD we are perhaps overallocating here */
884 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
885 sizeof(struct rpcrdma_mw);
886 break;
887 case RPCRDMA_MEMWINDOWS_ASYNC:
888 case RPCRDMA_MEMWINDOWS:
889 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
890 sizeof(struct rpcrdma_mw);
891 break;
892 default:
893 break;
894 }
895
896 /* allocate 1, 4 and 5 in one shot */
897 p = kzalloc(len, GFP_KERNEL);
898 if (p == NULL) {
899 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
900 __func__, len);
901 rc = -ENOMEM;
902 goto out;
903 }
904 buf->rb_pool = p; /* for freeing it later */
905
906 buf->rb_send_bufs = (struct rpcrdma_req **) p;
907 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
908 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
909 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
910
911 /*
912 * Register the zeroed pad buffer, if any.
913 */
914 if (cdata->padding) {
915 rc = rpcrdma_register_internal(ia, p, cdata->padding,
916 &ep->rep_pad_mr, &ep->rep_pad);
917 if (rc)
918 goto out;
919 }
920 p += cdata->padding;
921
922 /*
923 * Allocate the fmr's, or mw's for mw_bind chunk registration.
924 * We "cycle" the mw's in order to minimize rkey reuse,
925 * and also reduce unbind-to-bind collision.
926 */
927 INIT_LIST_HEAD(&buf->rb_mws);
928 switch (ia->ri_memreg_strategy) {
929 case RPCRDMA_MTHCAFMR:
930 {
931 struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
932 struct ib_fmr_attr fa = {
933 RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT
934 };
935 /* TBD we are perhaps overallocating here */
936 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
937 r->r.fmr = ib_alloc_fmr(ia->ri_pd,
938 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
939 &fa);
940 if (IS_ERR(r->r.fmr)) {
941 rc = PTR_ERR(r->r.fmr);
942 dprintk("RPC: %s: ib_alloc_fmr"
943 " failed %i\n", __func__, rc);
944 goto out;
945 }
946 list_add(&r->mw_list, &buf->rb_mws);
947 ++r;
948 }
949 }
950 break;
951 case RPCRDMA_MEMWINDOWS_ASYNC:
952 case RPCRDMA_MEMWINDOWS:
953 {
954 struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
955 /* Allocate one extra request's worth, for full cycling */
956 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
957 r->r.mw = ib_alloc_mw(ia->ri_pd);
958 if (IS_ERR(r->r.mw)) {
959 rc = PTR_ERR(r->r.mw);
960 dprintk("RPC: %s: ib_alloc_mw"
961 " failed %i\n", __func__, rc);
962 goto out;
963 }
964 list_add(&r->mw_list, &buf->rb_mws);
965 ++r;
966 }
967 }
968 break;
969 default:
970 break;
971 }
972
973 /*
974 * Allocate/init the request/reply buffers. Doing this
975 * using kmalloc for now -- one for each buf.
976 */
977 for (i = 0; i < buf->rb_max_requests; i++) {
978 struct rpcrdma_req *req;
979 struct rpcrdma_rep *rep;
980
981 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
982 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
983 /* Typical ~2400b, so rounding up saves work later */
984 if (len < 4096)
985 len = 4096;
986 req = kmalloc(len, GFP_KERNEL);
987 if (req == NULL) {
988 dprintk("RPC: %s: request buffer %d alloc"
989 " failed\n", __func__, i);
990 rc = -ENOMEM;
991 goto out;
992 }
993 memset(req, 0, sizeof(struct rpcrdma_req));
994 buf->rb_send_bufs[i] = req;
995 buf->rb_send_bufs[i]->rl_buffer = buf;
996
997 rc = rpcrdma_register_internal(ia, req->rl_base,
998 len - offsetof(struct rpcrdma_req, rl_base),
999 &buf->rb_send_bufs[i]->rl_handle,
1000 &buf->rb_send_bufs[i]->rl_iov);
1001 if (rc)
1002 goto out;
1003
1004 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1005
1006 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1007 rep = kmalloc(len, GFP_KERNEL);
1008 if (rep == NULL) {
1009 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1010 __func__, i);
1011 rc = -ENOMEM;
1012 goto out;
1013 }
1014 memset(rep, 0, sizeof(struct rpcrdma_rep));
1015 buf->rb_recv_bufs[i] = rep;
1016 buf->rb_recv_bufs[i]->rr_buffer = buf;
1017 init_waitqueue_head(&rep->rr_unbind);
1018
1019 rc = rpcrdma_register_internal(ia, rep->rr_base,
1020 len - offsetof(struct rpcrdma_rep, rr_base),
1021 &buf->rb_recv_bufs[i]->rr_handle,
1022 &buf->rb_recv_bufs[i]->rr_iov);
1023 if (rc)
1024 goto out;
1025
1026 }
1027 dprintk("RPC: %s: max_requests %d\n",
1028 __func__, buf->rb_max_requests);
1029 /* done */
1030 return 0;
1031out:
1032 rpcrdma_buffer_destroy(buf);
1033 return rc;
1034}
1035
1036/*
1037 * Unregister and destroy buffer memory. Need to deal with
1038 * partial initialization, so it's callable from failed create.
1039 * Must be called before destroying endpoint, as registrations
1040 * reference it.
1041 */
1042void
1043rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1044{
1045 int rc, i;
1046 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1047
1048 /* clean up in reverse order from create
1049 * 1. recv mr memory (mr free, then kfree)
1050 * 1a. bind mw memory
1051 * 2. send mr memory (mr free, then kfree)
1052 * 3. padding (if any) [moved to rpcrdma_ep_destroy]
1053 * 4. arrays
1054 */
1055 dprintk("RPC: %s: entering\n", __func__);
1056
1057 for (i = 0; i < buf->rb_max_requests; i++) {
1058 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1059 rpcrdma_deregister_internal(ia,
1060 buf->rb_recv_bufs[i]->rr_handle,
1061 &buf->rb_recv_bufs[i]->rr_iov);
1062 kfree(buf->rb_recv_bufs[i]);
1063 }
1064 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1065 while (!list_empty(&buf->rb_mws)) {
1066 struct rpcrdma_mw *r;
1067 r = list_entry(buf->rb_mws.next,
1068 struct rpcrdma_mw, mw_list);
1069 list_del(&r->mw_list);
1070 switch (ia->ri_memreg_strategy) {
1071 case RPCRDMA_MTHCAFMR:
1072 rc = ib_dealloc_fmr(r->r.fmr);
1073 if (rc)
1074 dprintk("RPC: %s:"
1075 " ib_dealloc_fmr"
1076 " failed %i\n",
1077 __func__, rc);
1078 break;
1079 case RPCRDMA_MEMWINDOWS_ASYNC:
1080 case RPCRDMA_MEMWINDOWS:
1081 rc = ib_dealloc_mw(r->r.mw);
1082 if (rc)
1083 dprintk("RPC: %s:"
1084 " ib_dealloc_mw"
1085 " failed %i\n",
1086 __func__, rc);
1087 break;
1088 default:
1089 break;
1090 }
1091 }
1092 rpcrdma_deregister_internal(ia,
1093 buf->rb_send_bufs[i]->rl_handle,
1094 &buf->rb_send_bufs[i]->rl_iov);
1095 kfree(buf->rb_send_bufs[i]);
1096 }
1097 }
1098
1099 kfree(buf->rb_pool);
1100}
1101
1102/*
1103 * Get a set of request/reply buffers.
1104 *
1105 * Reply buffer (if needed) is attached to send buffer upon return.
1106 * Rule:
1107 * rb_send_index and rb_recv_index MUST always be pointing to the
1108 * *next* available buffer (non-NULL). They are incremented after
1109 * removing buffers, and decremented *before* returning them.
1110 */
1111struct rpcrdma_req *
1112rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1113{
1114 struct rpcrdma_req *req;
1115 unsigned long flags;
1116
1117 spin_lock_irqsave(&buffers->rb_lock, flags);
1118 if (buffers->rb_send_index == buffers->rb_max_requests) {
1119 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1120 dprintk("RPC: %s: out of request buffers\n", __func__);
1121 return ((struct rpcrdma_req *)NULL);
1122 }
1123
1124 req = buffers->rb_send_bufs[buffers->rb_send_index];
1125 if (buffers->rb_send_index < buffers->rb_recv_index) {
1126 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1127 __func__,
1128 buffers->rb_recv_index - buffers->rb_send_index);
1129 req->rl_reply = NULL;
1130 } else {
1131 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1132 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1133 }
1134 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1135 if (!list_empty(&buffers->rb_mws)) {
1136 int i = RPCRDMA_MAX_SEGS - 1;
1137 do {
1138 struct rpcrdma_mw *r;
1139 r = list_entry(buffers->rb_mws.next,
1140 struct rpcrdma_mw, mw_list);
1141 list_del(&r->mw_list);
1142 req->rl_segments[i].mr_chunk.rl_mw = r;
1143 } while (--i >= 0);
1144 }
1145 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1146 return req;
1147}
1148
1149/*
1150 * Put request/reply buffers back into pool.
1151 * Pre-decrement counter/array index.
1152 */
1153void
1154rpcrdma_buffer_put(struct rpcrdma_req *req)
1155{
1156 struct rpcrdma_buffer *buffers = req->rl_buffer;
1157 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1158 int i;
1159 unsigned long flags;
1160
1161 BUG_ON(req->rl_nchunks != 0);
1162 spin_lock_irqsave(&buffers->rb_lock, flags);
1163 buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1164 req->rl_niovs = 0;
1165 if (req->rl_reply) {
1166 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1167 init_waitqueue_head(&req->rl_reply->rr_unbind);
1168 req->rl_reply->rr_func = NULL;
1169 req->rl_reply = NULL;
1170 }
1171 switch (ia->ri_memreg_strategy) {
1172 case RPCRDMA_MTHCAFMR:
1173 case RPCRDMA_MEMWINDOWS_ASYNC:
1174 case RPCRDMA_MEMWINDOWS:
1175 /*
1176 * Cycle mw's back in reverse order, and "spin" them.
1177 * This delays and scrambles reuse as much as possible.
1178 */
1179 i = 1;
1180 do {
1181 struct rpcrdma_mw **mw;
1182 mw = &req->rl_segments[i].mr_chunk.rl_mw;
1183 list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1184 *mw = NULL;
1185 } while (++i < RPCRDMA_MAX_SEGS);
1186 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1187 &buffers->rb_mws);
1188 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1189 break;
1190 default:
1191 break;
1192 }
1193 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1194}
1195
1196/*
1197 * Recover reply buffers from pool.
1198 * This happens when recovering from error conditions.
1199 * Post-increment counter/array index.
1200 */
1201void
1202rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1203{
1204 struct rpcrdma_buffer *buffers = req->rl_buffer;
1205 unsigned long flags;
1206
1207 if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
1208 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1209 spin_lock_irqsave(&buffers->rb_lock, flags);
1210 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1211 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1212 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1213 }
1214 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1215}
1216
1217/*
1218 * Put reply buffers back into pool when not attached to
1219 * request. This happens in error conditions, and when
1220 * aborting unbinds. Pre-decrement counter/array index.
1221 */
1222void
1223rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1224{
1225 struct rpcrdma_buffer *buffers = rep->rr_buffer;
1226 unsigned long flags;
1227
1228 rep->rr_func = NULL;
1229 spin_lock_irqsave(&buffers->rb_lock, flags);
1230 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1231 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1232}
1233
1234/*
1235 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1236 */
1237
1238int
1239rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1240 struct ib_mr **mrp, struct ib_sge *iov)
1241{
1242 struct ib_phys_buf ipb;
1243 struct ib_mr *mr;
1244 int rc;
1245
1246 /*
1247 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1248 */
1249 iov->addr = ib_dma_map_single(ia->ri_id->device,
1250 va, len, DMA_BIDIRECTIONAL);
1251 iov->length = len;
1252
1253 if (ia->ri_bind_mem != NULL) {
1254 *mrp = NULL;
1255 iov->lkey = ia->ri_bind_mem->lkey;
1256 return 0;
1257 }
1258
1259 ipb.addr = iov->addr;
1260 ipb.size = iov->length;
1261 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1262 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1263
1264 dprintk("RPC: %s: phys convert: 0x%llx "
1265 "registered 0x%llx length %d\n",
1266 __func__, ipb.addr, iov->addr, len);
1267
1268 if (IS_ERR(mr)) {
1269 *mrp = NULL;
1270 rc = PTR_ERR(mr);
1271 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1272 } else {
1273 *mrp = mr;
1274 iov->lkey = mr->lkey;
1275 rc = 0;
1276 }
1277
1278 return rc;
1279}
1280
1281int
1282rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1283 struct ib_mr *mr, struct ib_sge *iov)
1284{
1285 int rc;
1286
1287 ib_dma_unmap_single(ia->ri_id->device,
1288 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1289
1290 if (NULL == mr)
1291 return 0;
1292
1293 rc = ib_dereg_mr(mr);
1294 if (rc)
1295 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1296 return rc;
1297}
1298
1299/*
1300 * Wrappers for chunk registration, shared by read/write chunk code.
1301 */
1302
1303static void
1304rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1305{
1306 seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1307 seg->mr_dmalen = seg->mr_len;
1308 if (seg->mr_page)
1309 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1310 seg->mr_page, offset_in_page(seg->mr_offset),
1311 seg->mr_dmalen, seg->mr_dir);
1312 else
1313 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1314 seg->mr_offset,
1315 seg->mr_dmalen, seg->mr_dir);
1316}
1317
1318static void
1319rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1320{
1321 if (seg->mr_page)
1322 ib_dma_unmap_page(ia->ri_id->device,
1323 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1324 else
1325 ib_dma_unmap_single(ia->ri_id->device,
1326 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1327}
1328
1329int
1330rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1331 int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1332{
1333 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1334 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1335 IB_ACCESS_REMOTE_READ);
1336 struct rpcrdma_mr_seg *seg1 = seg;
1337 int i;
1338 int rc = 0;
1339
1340 switch (ia->ri_memreg_strategy) {
1341
1342#if RPCRDMA_PERSISTENT_REGISTRATION
1343 case RPCRDMA_ALLPHYSICAL:
1344 rpcrdma_map_one(ia, seg, writing);
1345 seg->mr_rkey = ia->ri_bind_mem->rkey;
1346 seg->mr_base = seg->mr_dma;
1347 seg->mr_nsegs = 1;
1348 nsegs = 1;
1349 break;
1350#endif
1351
1352 /* Registration using fast memory registration */
1353 case RPCRDMA_MTHCAFMR:
1354 {
1355 u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1356 int len, pageoff = offset_in_page(seg->mr_offset);
1357 seg1->mr_offset -= pageoff; /* start of page */
1358 seg1->mr_len += pageoff;
1359 len = -pageoff;
1360 if (nsegs > RPCRDMA_MAX_DATA_SEGS)
1361 nsegs = RPCRDMA_MAX_DATA_SEGS;
1362 for (i = 0; i < nsegs;) {
1363 rpcrdma_map_one(ia, seg, writing);
1364 physaddrs[i] = seg->mr_dma;
1365 len += seg->mr_len;
1366 ++seg;
1367 ++i;
1368 /* Check for holes */
1369 if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
1370 offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1371 break;
1372 }
1373 nsegs = i;
1374 rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1375 physaddrs, nsegs, seg1->mr_dma);
1376 if (rc) {
1377 dprintk("RPC: %s: failed ib_map_phys_fmr "
1378 "%u@0x%llx+%i (%d)... status %i\n", __func__,
1379 len, (unsigned long long)seg1->mr_dma,
1380 pageoff, nsegs, rc);
1381 while (nsegs--)
1382 rpcrdma_unmap_one(ia, --seg);
1383 } else {
1384 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1385 seg1->mr_base = seg1->mr_dma + pageoff;
1386 seg1->mr_nsegs = nsegs;
1387 seg1->mr_len = len;
1388 }
1389 }
1390 break;
1391
1392 /* Registration using memory windows */
1393 case RPCRDMA_MEMWINDOWS_ASYNC:
1394 case RPCRDMA_MEMWINDOWS:
1395 {
1396 struct ib_mw_bind param;
1397 rpcrdma_map_one(ia, seg, writing);
1398 param.mr = ia->ri_bind_mem;
1399 param.wr_id = 0ULL; /* no send cookie */
1400 param.addr = seg->mr_dma;
1401 param.length = seg->mr_len;
1402 param.send_flags = 0;
1403 param.mw_access_flags = mem_priv;
1404
1405 DECR_CQCOUNT(&r_xprt->rx_ep);
1406 rc = ib_bind_mw(ia->ri_id->qp,
1407 seg->mr_chunk.rl_mw->r.mw, &param);
1408 if (rc) {
1409 dprintk("RPC: %s: failed ib_bind_mw "
1410 "%u@0x%llx status %i\n",
1411 __func__, seg->mr_len,
1412 (unsigned long long)seg->mr_dma, rc);
1413 rpcrdma_unmap_one(ia, seg);
1414 } else {
1415 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1416 seg->mr_base = param.addr;
1417 seg->mr_nsegs = 1;
1418 nsegs = 1;
1419 }
1420 }
1421 break;
1422
1423 /* Default registration each time */
1424 default:
1425 {
1426 struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1427 int len = 0;
1428 if (nsegs > RPCRDMA_MAX_DATA_SEGS)
1429 nsegs = RPCRDMA_MAX_DATA_SEGS;
1430 for (i = 0; i < nsegs;) {
1431 rpcrdma_map_one(ia, seg, writing);
1432 ipb[i].addr = seg->mr_dma;
1433 ipb[i].size = seg->mr_len;
1434 len += seg->mr_len;
1435 ++seg;
1436 ++i;
1437 /* Check for holes */
1438 if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
1439 offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1440 break;
1441 }
1442 nsegs = i;
1443 seg1->mr_base = seg1->mr_dma;
1444 seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1445 ipb, nsegs, mem_priv, &seg1->mr_base);
1446 if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1447 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1448 dprintk("RPC: %s: failed ib_reg_phys_mr "
1449 "%u@0x%llx (%d)... status %i\n",
1450 __func__, len,
1451 (unsigned long long)seg1->mr_dma, nsegs, rc);
1452 while (nsegs--)
1453 rpcrdma_unmap_one(ia, --seg);
1454 } else {
1455 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1456 seg1->mr_nsegs = nsegs;
1457 seg1->mr_len = len;
1458 }
1459 }
1460 break;
1461 }
1462 if (rc)
1463 return -1;
1464
1465 return nsegs;
1466}
1467
1468int
1469rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1470 struct rpcrdma_xprt *r_xprt, void *r)
1471{
1472 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1473 struct rpcrdma_mr_seg *seg1 = seg;
1474 int nsegs = seg->mr_nsegs, rc;
1475
1476 switch (ia->ri_memreg_strategy) {
1477
1478#if RPCRDMA_PERSISTENT_REGISTRATION
1479 case RPCRDMA_ALLPHYSICAL:
1480 BUG_ON(nsegs != 1);
1481 rpcrdma_unmap_one(ia, seg);
1482 rc = 0;
1483 break;
1484#endif
1485
1486 case RPCRDMA_MTHCAFMR:
1487 {
1488 LIST_HEAD(l);
1489 list_add(&seg->mr_chunk.rl_mw->r.fmr->list, &l);
1490 rc = ib_unmap_fmr(&l);
1491 while (seg1->mr_nsegs--)
1492 rpcrdma_unmap_one(ia, seg++);
1493 }
1494 if (rc)
1495 dprintk("RPC: %s: failed ib_unmap_fmr,"
1496 " status %i\n", __func__, rc);
1497 break;
1498
1499 case RPCRDMA_MEMWINDOWS_ASYNC:
1500 case RPCRDMA_MEMWINDOWS:
1501 {
1502 struct ib_mw_bind param;
1503 BUG_ON(nsegs != 1);
1504 param.mr = ia->ri_bind_mem;
1505 param.addr = 0ULL; /* unbind */
1506 param.length = 0;
1507 param.mw_access_flags = 0;
1508 if (r) {
1509 param.wr_id = (u64) (unsigned long) r;
1510 param.send_flags = IB_SEND_SIGNALED;
1511 INIT_CQCOUNT(&r_xprt->rx_ep);
1512 } else {
1513 param.wr_id = 0ULL;
1514 param.send_flags = 0;
1515 DECR_CQCOUNT(&r_xprt->rx_ep);
1516 }
1517 rc = ib_bind_mw(ia->ri_id->qp,
1518 seg->mr_chunk.rl_mw->r.mw, &param);
1519 rpcrdma_unmap_one(ia, seg);
1520 }
1521 if (rc)
1522 dprintk("RPC: %s: failed ib_(un)bind_mw,"
1523 " status %i\n", __func__, rc);
1524 else
1525 r = NULL; /* will upcall on completion */
1526 break;
1527
1528 default:
1529 rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1530 seg1->mr_chunk.rl_mr = NULL;
1531 while (seg1->mr_nsegs--)
1532 rpcrdma_unmap_one(ia, seg++);
1533 if (rc)
1534 dprintk("RPC: %s: failed ib_dereg_mr,"
1535 " status %i\n", __func__, rc);
1536 break;
1537 }
1538 if (r) {
1539 struct rpcrdma_rep *rep = r;
1540 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1541 rep->rr_func = NULL;
1542 func(rep); /* dereg done, callback now */
1543 }
1544 return nsegs;
1545}
1546
1547/*
1548 * Prepost any receive buffer, then post send.
1549 *
1550 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1551 */
1552int
1553rpcrdma_ep_post(struct rpcrdma_ia *ia,
1554 struct rpcrdma_ep *ep,
1555 struct rpcrdma_req *req)
1556{
1557 struct ib_send_wr send_wr, *send_wr_fail;
1558 struct rpcrdma_rep *rep = req->rl_reply;
1559 int rc;
1560
1561 if (rep) {
1562 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1563 if (rc)
1564 goto out;
1565 req->rl_reply = NULL;
1566 }
1567
1568 send_wr.next = NULL;
1569 send_wr.wr_id = 0ULL; /* no send cookie */
1570 send_wr.sg_list = req->rl_send_iov;
1571 send_wr.num_sge = req->rl_niovs;
1572 send_wr.opcode = IB_WR_SEND;
1573 send_wr.imm_data = 0;
1574 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
1575 ib_dma_sync_single_for_device(ia->ri_id->device,
1576 req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1577 DMA_TO_DEVICE);
1578 ib_dma_sync_single_for_device(ia->ri_id->device,
1579 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1580 DMA_TO_DEVICE);
1581 ib_dma_sync_single_for_device(ia->ri_id->device,
1582 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1583 DMA_TO_DEVICE);
1584
1585 if (DECR_CQCOUNT(ep) > 0)
1586 send_wr.send_flags = 0;
1587 else { /* Provider must take a send completion every now and then */
1588 INIT_CQCOUNT(ep);
1589 send_wr.send_flags = IB_SEND_SIGNALED;
1590 }
1591
1592 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1593 if (rc)
1594 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
1595 rc);
1596out:
1597 return rc;
1598}
1599
1600/*
1601 * (Re)post a receive buffer.
1602 */
1603int
1604rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1605 struct rpcrdma_ep *ep,
1606 struct rpcrdma_rep *rep)
1607{
1608 struct ib_recv_wr recv_wr, *recv_wr_fail;
1609 int rc;
1610
1611 recv_wr.next = NULL;
1612 recv_wr.wr_id = (u64) (unsigned long) rep;
1613 recv_wr.sg_list = &rep->rr_iov;
1614 recv_wr.num_sge = 1;
1615
1616 ib_dma_sync_single_for_cpu(ia->ri_id->device,
1617 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1618
1619 DECR_CQCOUNT(ep);
1620 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1621
1622 if (rc)
1623 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
1624 rc);
1625 return rc;
1626}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
new file mode 100644
index 000000000000..2427822f8bd4
--- /dev/null
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -0,0 +1,330 @@
1/*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
16 *
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
21 *
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 */
39
40#ifndef _LINUX_SUNRPC_XPRT_RDMA_H
41#define _LINUX_SUNRPC_XPRT_RDMA_H
42
43#include <linux/wait.h> /* wait_queue_head_t, etc */
44#include <linux/spinlock.h> /* spinlock_t, etc */
45#include <asm/atomic.h> /* atomic_t, etc */
46
47#include <rdma/rdma_cm.h> /* RDMA connection api */
48#include <rdma/ib_verbs.h> /* RDMA verbs api */
49
50#include <linux/sunrpc/clnt.h> /* rpc_xprt */
51#include <linux/sunrpc/rpc_rdma.h> /* RPC/RDMA protocol */
52#include <linux/sunrpc/xprtrdma.h> /* xprt parameters */
53
54/*
55 * Interface Adapter -- one per transport instance
56 */
57struct rpcrdma_ia {
58 struct rdma_cm_id *ri_id;
59 struct ib_pd *ri_pd;
60 struct ib_mr *ri_bind_mem;
61 struct completion ri_done;
62 int ri_async_rc;
63 enum rpcrdma_memreg ri_memreg_strategy;
64};
65
66/*
67 * RDMA Endpoint -- one per transport instance
68 */
69
70struct rpcrdma_ep {
71 atomic_t rep_cqcount;
72 int rep_cqinit;
73 int rep_connected;
74 struct rpcrdma_ia *rep_ia;
75 struct ib_cq *rep_cq;
76 struct ib_qp_init_attr rep_attr;
77 wait_queue_head_t rep_connect_wait;
78 struct ib_sge rep_pad; /* holds zeroed pad */
79 struct ib_mr *rep_pad_mr; /* holds zeroed pad */
80 void (*rep_func)(struct rpcrdma_ep *);
81 struct rpc_xprt *rep_xprt; /* for rep_func */
82 struct rdma_conn_param rep_remote_cma;
83 struct sockaddr_storage rep_remote_addr;
84};
85
86#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
87#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
88
89/*
90 * struct rpcrdma_rep -- this structure encapsulates state required to recv
91 * and complete a reply, asychronously. It needs several pieces of
92 * state:
93 * o recv buffer (posted to provider)
94 * o ib_sge (also donated to provider)
95 * o status of reply (length, success or not)
96 * o bookkeeping state to get run by tasklet (list, etc)
97 *
98 * These are allocated during initialization, per-transport instance;
99 * however, the tasklet execution list itself is global, as it should
100 * always be pretty short.
101 *
102 * N of these are associated with a transport instance, and stored in
103 * struct rpcrdma_buffer. N is the max number of outstanding requests.
104 */
105
106/* temporary static scatter/gather max */
107#define RPCRDMA_MAX_DATA_SEGS (8) /* max scatter/gather */
108#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */
109#define MAX_RPCRDMAHDR (\
110 /* max supported RPC/RDMA header */ \
111 sizeof(struct rpcrdma_msg) + (2 * sizeof(u32)) + \
112 (sizeof(struct rpcrdma_read_chunk) * RPCRDMA_MAX_SEGS) + sizeof(u32))
113
114struct rpcrdma_buffer;
115
116struct rpcrdma_rep {
117 unsigned int rr_len; /* actual received reply length */
118 struct rpcrdma_buffer *rr_buffer; /* home base for this structure */
119 struct rpc_xprt *rr_xprt; /* needed for request/reply matching */
120 void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */
121 struct list_head rr_list; /* tasklet list */
122 wait_queue_head_t rr_unbind; /* optional unbind wait */
123 struct ib_sge rr_iov; /* for posting */
124 struct ib_mr *rr_handle; /* handle for mem in rr_iov */
125 char rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */
126};
127
128/*
129 * struct rpcrdma_req -- structure central to the request/reply sequence.
130 *
131 * N of these are associated with a transport instance, and stored in
132 * struct rpcrdma_buffer. N is the max number of outstanding requests.
133 *
134 * It includes pre-registered buffer memory for send AND recv.
135 * The recv buffer, however, is not owned by this structure, and
136 * is "donated" to the hardware when a recv is posted. When a
137 * reply is handled, the recv buffer used is given back to the
138 * struct rpcrdma_req associated with the request.
139 *
140 * In addition to the basic memory, this structure includes an array
141 * of iovs for send operations. The reason is that the iovs passed to
142 * ib_post_{send,recv} must not be modified until the work request
143 * completes.
144 *
145 * NOTES:
146 * o RPCRDMA_MAX_SEGS is the max number of addressible chunk elements we
147 * marshal. The number needed varies depending on the iov lists that
148 * are passed to us, the memory registration mode we are in, and if
149 * physical addressing is used, the layout.
150 */
151
152struct rpcrdma_mr_seg { /* chunk descriptors */
153 union { /* chunk memory handles */
154 struct ib_mr *rl_mr; /* if registered directly */
155 struct rpcrdma_mw { /* if registered from region */
156 union {
157 struct ib_mw *mw;
158 struct ib_fmr *fmr;
159 } r;
160 struct list_head mw_list;
161 } *rl_mw;
162 } mr_chunk;
163 u64 mr_base; /* registration result */
164 u32 mr_rkey; /* registration result */
165 u32 mr_len; /* length of chunk or segment */
166 int mr_nsegs; /* number of segments in chunk or 0 */
167 enum dma_data_direction mr_dir; /* segment mapping direction */
168 dma_addr_t mr_dma; /* segment mapping address */
169 size_t mr_dmalen; /* segment mapping length */
170 struct page *mr_page; /* owning page, if any */
171 char *mr_offset; /* kva if no page, else offset */
172};
173
174struct rpcrdma_req {
175 size_t rl_size; /* actual length of buffer */
176 unsigned int rl_niovs; /* 0, 2 or 4 */
177 unsigned int rl_nchunks; /* non-zero if chunks */
178 struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
179 struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
180 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
181 struct ib_sge rl_send_iov[4]; /* for active requests */
182 struct ib_sge rl_iov; /* for posting */
183 struct ib_mr *rl_handle; /* handle for mem in rl_iov */
184 char rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */
185 __u32 rl_xdr_buf[0]; /* start of returned rpc rq_buffer */
186};
187#define rpcr_to_rdmar(r) \
188 container_of((r)->rq_buffer, struct rpcrdma_req, rl_xdr_buf[0])
189
190/*
191 * struct rpcrdma_buffer -- holds list/queue of pre-registered memory for
192 * inline requests/replies, and client/server credits.
193 *
194 * One of these is associated with a transport instance
195 */
196struct rpcrdma_buffer {
197 spinlock_t rb_lock; /* protects indexes */
198 atomic_t rb_credits; /* most recent server credits */
199 unsigned long rb_cwndscale; /* cached framework rpc_cwndscale */
200 int rb_max_requests;/* client max requests */
201 struct list_head rb_mws; /* optional memory windows/fmrs */
202 int rb_send_index;
203 struct rpcrdma_req **rb_send_bufs;
204 int rb_recv_index;
205 struct rpcrdma_rep **rb_recv_bufs;
206 char *rb_pool;
207};
208#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
209
210/*
211 * Internal structure for transport instance creation. This
212 * exists primarily for modularity.
213 *
214 * This data should be set with mount options
215 */
216struct rpcrdma_create_data_internal {
217 struct sockaddr_storage addr; /* RDMA server address */
218 unsigned int max_requests; /* max requests (slots) in flight */
219 unsigned int rsize; /* mount rsize - max read hdr+data */
220 unsigned int wsize; /* mount wsize - max write hdr+data */
221 unsigned int inline_rsize; /* max non-rdma read data payload */
222 unsigned int inline_wsize; /* max non-rdma write data payload */
223 unsigned int padding; /* non-rdma write header padding */
224};
225
226#define RPCRDMA_INLINE_READ_THRESHOLD(rq) \
227 (rpcx_to_rdmad(rq->rq_task->tk_xprt).inline_rsize)
228
229#define RPCRDMA_INLINE_WRITE_THRESHOLD(rq)\
230 (rpcx_to_rdmad(rq->rq_task->tk_xprt).inline_wsize)
231
232#define RPCRDMA_INLINE_PAD_VALUE(rq)\
233 rpcx_to_rdmad(rq->rq_task->tk_xprt).padding
234
235/*
236 * Statistics for RPCRDMA
237 */
238struct rpcrdma_stats {
239 unsigned long read_chunk_count;
240 unsigned long write_chunk_count;
241 unsigned long reply_chunk_count;
242
243 unsigned long long total_rdma_request;
244 unsigned long long total_rdma_reply;
245
246 unsigned long long pullup_copy_count;
247 unsigned long long fixup_copy_count;
248 unsigned long hardway_register_count;
249 unsigned long failed_marshal_count;
250 unsigned long bad_reply_count;
251};
252
253/*
254 * RPCRDMA transport -- encapsulates the structures above for
255 * integration with RPC.
256 *
257 * The contained structures are embedded, not pointers,
258 * for convenience. This structure need not be visible externally.
259 *
260 * It is allocated and initialized during mount, and released
261 * during unmount.
262 */
263struct rpcrdma_xprt {
264 struct rpc_xprt xprt;
265 struct rpcrdma_ia rx_ia;
266 struct rpcrdma_ep rx_ep;
267 struct rpcrdma_buffer rx_buf;
268 struct rpcrdma_create_data_internal rx_data;
269 struct delayed_work rdma_connect;
270 struct rpcrdma_stats rx_stats;
271};
272
273#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, xprt)
274#define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data)
275
276/*
277 * Interface Adapter calls - xprtrdma/verbs.c
278 */
279int rpcrdma_ia_open(struct rpcrdma_xprt *, struct sockaddr *, int);
280void rpcrdma_ia_close(struct rpcrdma_ia *);
281
282/*
283 * Endpoint calls - xprtrdma/verbs.c
284 */
285int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
286 struct rpcrdma_create_data_internal *);
287int rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
288int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
289int rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
290
291int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
292 struct rpcrdma_req *);
293int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
294 struct rpcrdma_rep *);
295
296/*
297 * Buffer calls - xprtrdma/verbs.c
298 */
299int rpcrdma_buffer_create(struct rpcrdma_buffer *, struct rpcrdma_ep *,
300 struct rpcrdma_ia *,
301 struct rpcrdma_create_data_internal *);
302void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
303
304struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
305void rpcrdma_buffer_put(struct rpcrdma_req *);
306void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
307void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
308
309int rpcrdma_register_internal(struct rpcrdma_ia *, void *, int,
310 struct ib_mr **, struct ib_sge *);
311int rpcrdma_deregister_internal(struct rpcrdma_ia *,
312 struct ib_mr *, struct ib_sge *);
313
314int rpcrdma_register_external(struct rpcrdma_mr_seg *,
315 int, int, struct rpcrdma_xprt *);
316int rpcrdma_deregister_external(struct rpcrdma_mr_seg *,
317 struct rpcrdma_xprt *, void *);
318
319/*
320 * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
321 */
322void rpcrdma_conn_func(struct rpcrdma_ep *);
323void rpcrdma_reply_handler(struct rpcrdma_rep *);
324
325/*
326 * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
327 */
328int rpcrdma_marshal_req(struct rpc_rqst *);
329
330#endif /* _LINUX_SUNRPC_XPRT_RDMA_H */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 282efd447a61..02298f529dad 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -13,10 +13,14 @@
13 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> 13 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14 * 14 *
15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com> 15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16 *
17 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18 * <gilles.quillard@bull.net>
16 */ 19 */
17 20
18#include <linux/types.h> 21#include <linux/types.h>
19#include <linux/slab.h> 22#include <linux/slab.h>
23#include <linux/module.h>
20#include <linux/capability.h> 24#include <linux/capability.h>
21#include <linux/pagemap.h> 25#include <linux/pagemap.h>
22#include <linux/errno.h> 26#include <linux/errno.h>
@@ -28,6 +32,7 @@
28#include <linux/tcp.h> 32#include <linux/tcp.h>
29#include <linux/sunrpc/clnt.h> 33#include <linux/sunrpc/clnt.h>
30#include <linux/sunrpc/sched.h> 34#include <linux/sunrpc/sched.h>
35#include <linux/sunrpc/xprtsock.h>
31#include <linux/file.h> 36#include <linux/file.h>
32 37
33#include <net/sock.h> 38#include <net/sock.h>
@@ -260,14 +265,29 @@ struct sock_xprt {
260#define TCP_RCV_COPY_XID (1UL << 2) 265#define TCP_RCV_COPY_XID (1UL << 2)
261#define TCP_RCV_COPY_DATA (1UL << 3) 266#define TCP_RCV_COPY_DATA (1UL << 3)
262 267
263static void xs_format_peer_addresses(struct rpc_xprt *xprt) 268static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
269{
270 return (struct sockaddr *) &xprt->addr;
271}
272
273static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
264{ 274{
265 struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr; 275 return (struct sockaddr_in *) &xprt->addr;
276}
277
278static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
279{
280 return (struct sockaddr_in6 *) &xprt->addr;
281}
282
283static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt)
284{
285 struct sockaddr_in *addr = xs_addr_in(xprt);
266 char *buf; 286 char *buf;
267 287
268 buf = kzalloc(20, GFP_KERNEL); 288 buf = kzalloc(20, GFP_KERNEL);
269 if (buf) { 289 if (buf) {
270 snprintf(buf, 20, "%u.%u.%u.%u", 290 snprintf(buf, 20, NIPQUAD_FMT,
271 NIPQUAD(addr->sin_addr.s_addr)); 291 NIPQUAD(addr->sin_addr.s_addr));
272 } 292 }
273 xprt->address_strings[RPC_DISPLAY_ADDR] = buf; 293 xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
@@ -279,26 +299,123 @@ static void xs_format_peer_addresses(struct rpc_xprt *xprt)
279 } 299 }
280 xprt->address_strings[RPC_DISPLAY_PORT] = buf; 300 xprt->address_strings[RPC_DISPLAY_PORT] = buf;
281 301
282 if (xprt->prot == IPPROTO_UDP) 302 buf = kzalloc(8, GFP_KERNEL);
283 xprt->address_strings[RPC_DISPLAY_PROTO] = "udp"; 303 if (buf) {
284 else 304 if (xprt->prot == IPPROTO_UDP)
285 xprt->address_strings[RPC_DISPLAY_PROTO] = "tcp"; 305 snprintf(buf, 8, "udp");
306 else
307 snprintf(buf, 8, "tcp");
308 }
309 xprt->address_strings[RPC_DISPLAY_PROTO] = buf;
286 310
287 buf = kzalloc(48, GFP_KERNEL); 311 buf = kzalloc(48, GFP_KERNEL);
288 if (buf) { 312 if (buf) {
289 snprintf(buf, 48, "addr=%u.%u.%u.%u port=%u proto=%s", 313 snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
290 NIPQUAD(addr->sin_addr.s_addr), 314 NIPQUAD(addr->sin_addr.s_addr),
291 ntohs(addr->sin_port), 315 ntohs(addr->sin_port),
292 xprt->prot == IPPROTO_UDP ? "udp" : "tcp"); 316 xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
293 } 317 }
294 xprt->address_strings[RPC_DISPLAY_ALL] = buf; 318 xprt->address_strings[RPC_DISPLAY_ALL] = buf;
319
320 buf = kzalloc(10, GFP_KERNEL);
321 if (buf) {
322 snprintf(buf, 10, "%02x%02x%02x%02x",
323 NIPQUAD(addr->sin_addr.s_addr));
324 }
325 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
326
327 buf = kzalloc(8, GFP_KERNEL);
328 if (buf) {
329 snprintf(buf, 8, "%4hx",
330 ntohs(addr->sin_port));
331 }
332 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
333
334 buf = kzalloc(30, GFP_KERNEL);
335 if (buf) {
336 snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
337 NIPQUAD(addr->sin_addr.s_addr),
338 ntohs(addr->sin_port) >> 8,
339 ntohs(addr->sin_port) & 0xff);
340 }
341 xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
342
343 xprt->address_strings[RPC_DISPLAY_NETID] =
344 kstrdup(xprt->prot == IPPROTO_UDP ?
345 RPCBIND_NETID_UDP : RPCBIND_NETID_TCP, GFP_KERNEL);
346}
347
348static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt)
349{
350 struct sockaddr_in6 *addr = xs_addr_in6(xprt);
351 char *buf;
352
353 buf = kzalloc(40, GFP_KERNEL);
354 if (buf) {
355 snprintf(buf, 40, NIP6_FMT,
356 NIP6(addr->sin6_addr));
357 }
358 xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
359
360 buf = kzalloc(8, GFP_KERNEL);
361 if (buf) {
362 snprintf(buf, 8, "%u",
363 ntohs(addr->sin6_port));
364 }
365 xprt->address_strings[RPC_DISPLAY_PORT] = buf;
366
367 buf = kzalloc(8, GFP_KERNEL);
368 if (buf) {
369 if (xprt->prot == IPPROTO_UDP)
370 snprintf(buf, 8, "udp");
371 else
372 snprintf(buf, 8, "tcp");
373 }
374 xprt->address_strings[RPC_DISPLAY_PROTO] = buf;
375
376 buf = kzalloc(64, GFP_KERNEL);
377 if (buf) {
378 snprintf(buf, 64, "addr="NIP6_FMT" port=%u proto=%s",
379 NIP6(addr->sin6_addr),
380 ntohs(addr->sin6_port),
381 xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
382 }
383 xprt->address_strings[RPC_DISPLAY_ALL] = buf;
384
385 buf = kzalloc(36, GFP_KERNEL);
386 if (buf) {
387 snprintf(buf, 36, NIP6_SEQFMT,
388 NIP6(addr->sin6_addr));
389 }
390 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
391
392 buf = kzalloc(8, GFP_KERNEL);
393 if (buf) {
394 snprintf(buf, 8, "%4hx",
395 ntohs(addr->sin6_port));
396 }
397 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
398
399 buf = kzalloc(50, GFP_KERNEL);
400 if (buf) {
401 snprintf(buf, 50, NIP6_FMT".%u.%u",
402 NIP6(addr->sin6_addr),
403 ntohs(addr->sin6_port) >> 8,
404 ntohs(addr->sin6_port) & 0xff);
405 }
406 xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
407
408 xprt->address_strings[RPC_DISPLAY_NETID] =
409 kstrdup(xprt->prot == IPPROTO_UDP ?
410 RPCBIND_NETID_UDP6 : RPCBIND_NETID_TCP6, GFP_KERNEL);
295} 411}
296 412
297static void xs_free_peer_addresses(struct rpc_xprt *xprt) 413static void xs_free_peer_addresses(struct rpc_xprt *xprt)
298{ 414{
299 kfree(xprt->address_strings[RPC_DISPLAY_ADDR]); 415 int i;
300 kfree(xprt->address_strings[RPC_DISPLAY_PORT]); 416
301 kfree(xprt->address_strings[RPC_DISPLAY_ALL]); 417 for (i = 0; i < RPC_DISPLAY_MAX; i++)
418 kfree(xprt->address_strings[i]);
302} 419}
303 420
304#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) 421#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL)
@@ -463,19 +580,20 @@ static int xs_udp_send_request(struct rpc_task *task)
463 580
464 req->rq_xtime = jiffies; 581 req->rq_xtime = jiffies;
465 status = xs_sendpages(transport->sock, 582 status = xs_sendpages(transport->sock,
466 (struct sockaddr *) &xprt->addr, 583 xs_addr(xprt),
467 xprt->addrlen, xdr, 584 xprt->addrlen, xdr,
468 req->rq_bytes_sent); 585 req->rq_bytes_sent);
469 586
470 dprintk("RPC: xs_udp_send_request(%u) = %d\n", 587 dprintk("RPC: xs_udp_send_request(%u) = %d\n",
471 xdr->len - req->rq_bytes_sent, status); 588 xdr->len - req->rq_bytes_sent, status);
472 589
473 if (likely(status >= (int) req->rq_slen)) 590 if (status >= 0) {
474 return 0; 591 task->tk_bytes_sent += status;
475 592 if (status >= req->rq_slen)
476 /* Still some bytes left; set up for a retry later. */ 593 return 0;
477 if (status > 0) 594 /* Still some bytes left; set up for a retry later. */
478 status = -EAGAIN; 595 status = -EAGAIN;
596 }
479 597
480 switch (status) { 598 switch (status) {
481 case -ENETUNREACH: 599 case -ENETUNREACH:
@@ -523,7 +641,8 @@ static int xs_tcp_send_request(struct rpc_task *task)
523 struct rpc_xprt *xprt = req->rq_xprt; 641 struct rpc_xprt *xprt = req->rq_xprt;
524 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 642 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
525 struct xdr_buf *xdr = &req->rq_snd_buf; 643 struct xdr_buf *xdr = &req->rq_snd_buf;
526 int status, retry = 0; 644 int status;
645 unsigned int retry = 0;
527 646
528 xs_encode_tcp_record_marker(&req->rq_snd_buf); 647 xs_encode_tcp_record_marker(&req->rq_snd_buf);
529 648
@@ -661,6 +780,7 @@ static void xs_destroy(struct rpc_xprt *xprt)
661 xs_free_peer_addresses(xprt); 780 xs_free_peer_addresses(xprt);
662 kfree(xprt->slot); 781 kfree(xprt->slot);
663 kfree(xprt); 782 kfree(xprt);
783 module_put(THIS_MODULE);
664} 784}
665 785
666static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) 786static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
@@ -1139,14 +1259,23 @@ static unsigned short xs_get_random_port(void)
1139 */ 1259 */
1140static void xs_set_port(struct rpc_xprt *xprt, unsigned short port) 1260static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1141{ 1261{
1142 struct sockaddr_in *sap = (struct sockaddr_in *) &xprt->addr; 1262 struct sockaddr *addr = xs_addr(xprt);
1143 1263
1144 dprintk("RPC: setting port for xprt %p to %u\n", xprt, port); 1264 dprintk("RPC: setting port for xprt %p to %u\n", xprt, port);
1145 1265
1146 sap->sin_port = htons(port); 1266 switch (addr->sa_family) {
1267 case AF_INET:
1268 ((struct sockaddr_in *)addr)->sin_port = htons(port);
1269 break;
1270 case AF_INET6:
1271 ((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
1272 break;
1273 default:
1274 BUG();
1275 }
1147} 1276}
1148 1277
1149static int xs_bind(struct sock_xprt *transport, struct socket *sock) 1278static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
1150{ 1279{
1151 struct sockaddr_in myaddr = { 1280 struct sockaddr_in myaddr = {
1152 .sin_family = AF_INET, 1281 .sin_family = AF_INET,
@@ -1174,8 +1303,42 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1174 else 1303 else
1175 port--; 1304 port--;
1176 } while (err == -EADDRINUSE && port != transport->port); 1305 } while (err == -EADDRINUSE && port != transport->port);
1177 dprintk("RPC: xs_bind "NIPQUAD_FMT":%u: %s (%d)\n", 1306 dprintk("RPC: %s "NIPQUAD_FMT":%u: %s (%d)\n",
1178 NIPQUAD(myaddr.sin_addr), port, err ? "failed" : "ok", err); 1307 __FUNCTION__, NIPQUAD(myaddr.sin_addr),
1308 port, err ? "failed" : "ok", err);
1309 return err;
1310}
1311
1312static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
1313{
1314 struct sockaddr_in6 myaddr = {
1315 .sin6_family = AF_INET6,
1316 };
1317 struct sockaddr_in6 *sa;
1318 int err;
1319 unsigned short port = transport->port;
1320
1321 if (!transport->xprt.resvport)
1322 port = 0;
1323 sa = (struct sockaddr_in6 *)&transport->addr;
1324 myaddr.sin6_addr = sa->sin6_addr;
1325 do {
1326 myaddr.sin6_port = htons(port);
1327 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1328 sizeof(myaddr));
1329 if (!transport->xprt.resvport)
1330 break;
1331 if (err == 0) {
1332 transport->port = port;
1333 break;
1334 }
1335 if (port <= xprt_min_resvport)
1336 port = xprt_max_resvport;
1337 else
1338 port--;
1339 } while (err == -EADDRINUSE && port != transport->port);
1340 dprintk("RPC: xs_bind6 "NIP6_FMT":%u: %s (%d)\n",
1341 NIP6(myaddr.sin6_addr), port, err ? "failed" : "ok", err);
1179 return err; 1342 return err;
1180} 1343}
1181 1344
@@ -1183,38 +1346,69 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1183static struct lock_class_key xs_key[2]; 1346static struct lock_class_key xs_key[2];
1184static struct lock_class_key xs_slock_key[2]; 1347static struct lock_class_key xs_slock_key[2];
1185 1348
1186static inline void xs_reclassify_socket(struct socket *sock) 1349static inline void xs_reclassify_socket4(struct socket *sock)
1187{ 1350{
1188 struct sock *sk = sock->sk; 1351 struct sock *sk = sock->sk;
1352
1189 BUG_ON(sock_owned_by_user(sk)); 1353 BUG_ON(sock_owned_by_user(sk));
1190 switch (sk->sk_family) { 1354 sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1191 case AF_INET: 1355 &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1192 sock_lock_init_class_and_name(sk, "slock-AF_INET-NFS", 1356}
1193 &xs_slock_key[0], "sk_lock-AF_INET-NFS", &xs_key[0]);
1194 break;
1195 1357
1196 case AF_INET6: 1358static inline void xs_reclassify_socket6(struct socket *sock)
1197 sock_lock_init_class_and_name(sk, "slock-AF_INET6-NFS", 1359{
1198 &xs_slock_key[1], "sk_lock-AF_INET6-NFS", &xs_key[1]); 1360 struct sock *sk = sock->sk;
1199 break;
1200 1361
1201 default: 1362 BUG_ON(sock_owned_by_user(sk));
1202 BUG(); 1363 sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1203 } 1364 &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1204} 1365}
1205#else 1366#else
1206static inline void xs_reclassify_socket(struct socket *sock) 1367static inline void xs_reclassify_socket4(struct socket *sock)
1368{
1369}
1370
1371static inline void xs_reclassify_socket6(struct socket *sock)
1207{ 1372{
1208} 1373}
1209#endif 1374#endif
1210 1375
1376static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1377{
1378 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1379
1380 if (!transport->inet) {
1381 struct sock *sk = sock->sk;
1382
1383 write_lock_bh(&sk->sk_callback_lock);
1384
1385 sk->sk_user_data = xprt;
1386 transport->old_data_ready = sk->sk_data_ready;
1387 transport->old_state_change = sk->sk_state_change;
1388 transport->old_write_space = sk->sk_write_space;
1389 sk->sk_data_ready = xs_udp_data_ready;
1390 sk->sk_write_space = xs_udp_write_space;
1391 sk->sk_no_check = UDP_CSUM_NORCV;
1392 sk->sk_allocation = GFP_ATOMIC;
1393
1394 xprt_set_connected(xprt);
1395
1396 /* Reset to new socket */
1397 transport->sock = sock;
1398 transport->inet = sk;
1399
1400 write_unlock_bh(&sk->sk_callback_lock);
1401 }
1402 xs_udp_do_set_buffer_size(xprt);
1403}
1404
1211/** 1405/**
1212 * xs_udp_connect_worker - set up a UDP socket 1406 * xs_udp_connect_worker4 - set up a UDP socket
1213 * @work: RPC transport to connect 1407 * @work: RPC transport to connect
1214 * 1408 *
1215 * Invoked by a work queue tasklet. 1409 * Invoked by a work queue tasklet.
1216 */ 1410 */
1217static void xs_udp_connect_worker(struct work_struct *work) 1411static void xs_udp_connect_worker4(struct work_struct *work)
1218{ 1412{
1219 struct sock_xprt *transport = 1413 struct sock_xprt *transport =
1220 container_of(work, struct sock_xprt, connect_worker.work); 1414 container_of(work, struct sock_xprt, connect_worker.work);
@@ -1232,9 +1426,9 @@ static void xs_udp_connect_worker(struct work_struct *work)
1232 dprintk("RPC: can't create UDP transport socket (%d).\n", -err); 1426 dprintk("RPC: can't create UDP transport socket (%d).\n", -err);
1233 goto out; 1427 goto out;
1234 } 1428 }
1235 xs_reclassify_socket(sock); 1429 xs_reclassify_socket4(sock);
1236 1430
1237 if (xs_bind(transport, sock)) { 1431 if (xs_bind4(transport, sock)) {
1238 sock_release(sock); 1432 sock_release(sock);
1239 goto out; 1433 goto out;
1240 } 1434 }
@@ -1242,29 +1436,48 @@ static void xs_udp_connect_worker(struct work_struct *work)
1242 dprintk("RPC: worker connecting xprt %p to address: %s\n", 1436 dprintk("RPC: worker connecting xprt %p to address: %s\n",
1243 xprt, xprt->address_strings[RPC_DISPLAY_ALL]); 1437 xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1244 1438
1245 if (!transport->inet) { 1439 xs_udp_finish_connecting(xprt, sock);
1246 struct sock *sk = sock->sk; 1440 status = 0;
1441out:
1442 xprt_wake_pending_tasks(xprt, status);
1443 xprt_clear_connecting(xprt);
1444}
1247 1445
1248 write_lock_bh(&sk->sk_callback_lock); 1446/**
1447 * xs_udp_connect_worker6 - set up a UDP socket
1448 * @work: RPC transport to connect
1449 *
1450 * Invoked by a work queue tasklet.
1451 */
1452static void xs_udp_connect_worker6(struct work_struct *work)
1453{
1454 struct sock_xprt *transport =
1455 container_of(work, struct sock_xprt, connect_worker.work);
1456 struct rpc_xprt *xprt = &transport->xprt;
1457 struct socket *sock = transport->sock;
1458 int err, status = -EIO;
1249 1459
1250 sk->sk_user_data = xprt; 1460 if (xprt->shutdown || !xprt_bound(xprt))
1251 transport->old_data_ready = sk->sk_data_ready; 1461 goto out;
1252 transport->old_state_change = sk->sk_state_change;
1253 transport->old_write_space = sk->sk_write_space;
1254 sk->sk_data_ready = xs_udp_data_ready;
1255 sk->sk_write_space = xs_udp_write_space;
1256 sk->sk_no_check = UDP_CSUM_NORCV;
1257 sk->sk_allocation = GFP_ATOMIC;
1258 1462
1259 xprt_set_connected(xprt); 1463 /* Start by resetting any existing state */
1464 xs_close(xprt);
1260 1465
1261 /* Reset to new socket */ 1466 if ((err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
1262 transport->sock = sock; 1467 dprintk("RPC: can't create UDP transport socket (%d).\n", -err);
1263 transport->inet = sk; 1468 goto out;
1469 }
1470 xs_reclassify_socket6(sock);
1264 1471
1265 write_unlock_bh(&sk->sk_callback_lock); 1472 if (xs_bind6(transport, sock) < 0) {
1473 sock_release(sock);
1474 goto out;
1266 } 1475 }
1267 xs_udp_do_set_buffer_size(xprt); 1476
1477 dprintk("RPC: worker connecting xprt %p to address: %s\n",
1478 xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1479
1480 xs_udp_finish_connecting(xprt, sock);
1268 status = 0; 1481 status = 0;
1269out: 1482out:
1270 xprt_wake_pending_tasks(xprt, status); 1483 xprt_wake_pending_tasks(xprt, status);
@@ -1295,13 +1508,52 @@ static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
1295 result); 1508 result);
1296} 1509}
1297 1510
1511static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1512{
1513 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1514
1515 if (!transport->inet) {
1516 struct sock *sk = sock->sk;
1517
1518 write_lock_bh(&sk->sk_callback_lock);
1519
1520 sk->sk_user_data = xprt;
1521 transport->old_data_ready = sk->sk_data_ready;
1522 transport->old_state_change = sk->sk_state_change;
1523 transport->old_write_space = sk->sk_write_space;
1524 sk->sk_data_ready = xs_tcp_data_ready;
1525 sk->sk_state_change = xs_tcp_state_change;
1526 sk->sk_write_space = xs_tcp_write_space;
1527 sk->sk_allocation = GFP_ATOMIC;
1528
1529 /* socket options */
1530 sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1531 sock_reset_flag(sk, SOCK_LINGER);
1532 tcp_sk(sk)->linger2 = 0;
1533 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1534
1535 xprt_clear_connected(xprt);
1536
1537 /* Reset to new socket */
1538 transport->sock = sock;
1539 transport->inet = sk;
1540
1541 write_unlock_bh(&sk->sk_callback_lock);
1542 }
1543
1544 /* Tell the socket layer to start connecting... */
1545 xprt->stat.connect_count++;
1546 xprt->stat.connect_start = jiffies;
1547 return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
1548}
1549
1298/** 1550/**
1299 * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint 1551 * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
1300 * @work: RPC transport to connect 1552 * @work: RPC transport to connect
1301 * 1553 *
1302 * Invoked by a work queue tasklet. 1554 * Invoked by a work queue tasklet.
1303 */ 1555 */
1304static void xs_tcp_connect_worker(struct work_struct *work) 1556static void xs_tcp_connect_worker4(struct work_struct *work)
1305{ 1557{
1306 struct sock_xprt *transport = 1558 struct sock_xprt *transport =
1307 container_of(work, struct sock_xprt, connect_worker.work); 1559 container_of(work, struct sock_xprt, connect_worker.work);
@@ -1315,13 +1567,12 @@ static void xs_tcp_connect_worker(struct work_struct *work)
1315 if (!sock) { 1567 if (!sock) {
1316 /* start from scratch */ 1568 /* start from scratch */
1317 if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) { 1569 if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
1318 dprintk("RPC: can't create TCP transport " 1570 dprintk("RPC: can't create TCP transport socket (%d).\n", -err);
1319 "socket (%d).\n", -err);
1320 goto out; 1571 goto out;
1321 } 1572 }
1322 xs_reclassify_socket(sock); 1573 xs_reclassify_socket4(sock);
1323 1574
1324 if (xs_bind(transport, sock)) { 1575 if (xs_bind4(transport, sock) < 0) {
1325 sock_release(sock); 1576 sock_release(sock);
1326 goto out; 1577 goto out;
1327 } 1578 }
@@ -1332,43 +1583,70 @@ static void xs_tcp_connect_worker(struct work_struct *work)
1332 dprintk("RPC: worker connecting xprt %p to address: %s\n", 1583 dprintk("RPC: worker connecting xprt %p to address: %s\n",
1333 xprt, xprt->address_strings[RPC_DISPLAY_ALL]); 1584 xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1334 1585
1335 if (!transport->inet) { 1586 status = xs_tcp_finish_connecting(xprt, sock);
1336 struct sock *sk = sock->sk; 1587 dprintk("RPC: %p connect status %d connected %d sock state %d\n",
1337 1588 xprt, -status, xprt_connected(xprt),
1338 write_lock_bh(&sk->sk_callback_lock); 1589 sock->sk->sk_state);
1590 if (status < 0) {
1591 switch (status) {
1592 case -EINPROGRESS:
1593 case -EALREADY:
1594 goto out_clear;
1595 case -ECONNREFUSED:
1596 case -ECONNRESET:
1597 /* retry with existing socket, after a delay */
1598 break;
1599 default:
1600 /* get rid of existing socket, and retry */
1601 xs_close(xprt);
1602 break;
1603 }
1604 }
1605out:
1606 xprt_wake_pending_tasks(xprt, status);
1607out_clear:
1608 xprt_clear_connecting(xprt);
1609}
1339 1610
1340 sk->sk_user_data = xprt; 1611/**
1341 transport->old_data_ready = sk->sk_data_ready; 1612 * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
1342 transport->old_state_change = sk->sk_state_change; 1613 * @work: RPC transport to connect
1343 transport->old_write_space = sk->sk_write_space; 1614 *
1344 sk->sk_data_ready = xs_tcp_data_ready; 1615 * Invoked by a work queue tasklet.
1345 sk->sk_state_change = xs_tcp_state_change; 1616 */
1346 sk->sk_write_space = xs_tcp_write_space; 1617static void xs_tcp_connect_worker6(struct work_struct *work)
1347 sk->sk_allocation = GFP_ATOMIC; 1618{
1619 struct sock_xprt *transport =
1620 container_of(work, struct sock_xprt, connect_worker.work);
1621 struct rpc_xprt *xprt = &transport->xprt;
1622 struct socket *sock = transport->sock;
1623 int err, status = -EIO;
1348 1624
1349 /* socket options */ 1625 if (xprt->shutdown || !xprt_bound(xprt))
1350 sk->sk_userlocks |= SOCK_BINDPORT_LOCK; 1626 goto out;
1351 sock_reset_flag(sk, SOCK_LINGER);
1352 tcp_sk(sk)->linger2 = 0;
1353 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1354 1627
1355 xprt_clear_connected(xprt); 1628 if (!sock) {
1629 /* start from scratch */
1630 if ((err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
1631 dprintk("RPC: can't create TCP transport socket (%d).\n", -err);
1632 goto out;
1633 }
1634 xs_reclassify_socket6(sock);
1356 1635
1357 /* Reset to new socket */ 1636 if (xs_bind6(transport, sock) < 0) {
1358 transport->sock = sock; 1637 sock_release(sock);
1359 transport->inet = sk; 1638 goto out;
1639 }
1640 } else
1641 /* "close" the socket, preserving the local port */
1642 xs_tcp_reuse_connection(xprt);
1360 1643
1361 write_unlock_bh(&sk->sk_callback_lock); 1644 dprintk("RPC: worker connecting xprt %p to address: %s\n",
1362 } 1645 xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1363 1646
1364 /* Tell the socket layer to start connecting... */ 1647 status = xs_tcp_finish_connecting(xprt, sock);
1365 xprt->stat.connect_count++;
1366 xprt->stat.connect_start = jiffies;
1367 status = kernel_connect(sock, (struct sockaddr *) &xprt->addr,
1368 xprt->addrlen, O_NONBLOCK);
1369 dprintk("RPC: %p connect status %d connected %d sock state %d\n", 1648 dprintk("RPC: %p connect status %d connected %d sock state %d\n",
1370 xprt, -status, xprt_connected(xprt), 1649 xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
1371 sock->sk->sk_state);
1372 if (status < 0) { 1650 if (status < 0) {
1373 switch (status) { 1651 switch (status) {
1374 case -EINPROGRESS: 1652 case -EINPROGRESS:
@@ -1508,7 +1786,8 @@ static struct rpc_xprt_ops xs_tcp_ops = {
1508 .print_stats = xs_tcp_print_stats, 1786 .print_stats = xs_tcp_print_stats,
1509}; 1787};
1510 1788
1511static struct rpc_xprt *xs_setup_xprt(struct rpc_xprtsock_create *args, unsigned int slot_table_size) 1789static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
1790 unsigned int slot_table_size)
1512{ 1791{
1513 struct rpc_xprt *xprt; 1792 struct rpc_xprt *xprt;
1514 struct sock_xprt *new; 1793 struct sock_xprt *new;
@@ -1549,8 +1828,9 @@ static struct rpc_xprt *xs_setup_xprt(struct rpc_xprtsock_create *args, unsigned
1549 * @args: rpc transport creation arguments 1828 * @args: rpc transport creation arguments
1550 * 1829 *
1551 */ 1830 */
1552struct rpc_xprt *xs_setup_udp(struct rpc_xprtsock_create *args) 1831struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
1553{ 1832{
1833 struct sockaddr *addr = args->dstaddr;
1554 struct rpc_xprt *xprt; 1834 struct rpc_xprt *xprt;
1555 struct sock_xprt *transport; 1835 struct sock_xprt *transport;
1556 1836
@@ -1559,15 +1839,11 @@ struct rpc_xprt *xs_setup_udp(struct rpc_xprtsock_create *args)
1559 return xprt; 1839 return xprt;
1560 transport = container_of(xprt, struct sock_xprt, xprt); 1840 transport = container_of(xprt, struct sock_xprt, xprt);
1561 1841
1562 if (ntohs(((struct sockaddr_in *)args->dstaddr)->sin_port) != 0)
1563 xprt_set_bound(xprt);
1564
1565 xprt->prot = IPPROTO_UDP; 1842 xprt->prot = IPPROTO_UDP;
1566 xprt->tsh_size = 0; 1843 xprt->tsh_size = 0;
1567 /* XXX: header size can vary due to auth type, IPv6, etc. */ 1844 /* XXX: header size can vary due to auth type, IPv6, etc. */
1568 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 1845 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
1569 1846
1570 INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_connect_worker);
1571 xprt->bind_timeout = XS_BIND_TO; 1847 xprt->bind_timeout = XS_BIND_TO;
1572 xprt->connect_timeout = XS_UDP_CONN_TO; 1848 xprt->connect_timeout = XS_UDP_CONN_TO;
1573 xprt->reestablish_timeout = XS_UDP_REEST_TO; 1849 xprt->reestablish_timeout = XS_UDP_REEST_TO;
@@ -1580,11 +1856,37 @@ struct rpc_xprt *xs_setup_udp(struct rpc_xprtsock_create *args)
1580 else 1856 else
1581 xprt_set_timeout(&xprt->timeout, 5, 5 * HZ); 1857 xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
1582 1858
1583 xs_format_peer_addresses(xprt); 1859 switch (addr->sa_family) {
1860 case AF_INET:
1861 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
1862 xprt_set_bound(xprt);
1863
1864 INIT_DELAYED_WORK(&transport->connect_worker,
1865 xs_udp_connect_worker4);
1866 xs_format_ipv4_peer_addresses(xprt);
1867 break;
1868 case AF_INET6:
1869 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
1870 xprt_set_bound(xprt);
1871
1872 INIT_DELAYED_WORK(&transport->connect_worker,
1873 xs_udp_connect_worker6);
1874 xs_format_ipv6_peer_addresses(xprt);
1875 break;
1876 default:
1877 kfree(xprt);
1878 return ERR_PTR(-EAFNOSUPPORT);
1879 }
1880
1584 dprintk("RPC: set up transport to address %s\n", 1881 dprintk("RPC: set up transport to address %s\n",
1585 xprt->address_strings[RPC_DISPLAY_ALL]); 1882 xprt->address_strings[RPC_DISPLAY_ALL]);
1586 1883
1587 return xprt; 1884 if (try_module_get(THIS_MODULE))
1885 return xprt;
1886
1887 kfree(xprt->slot);
1888 kfree(xprt);
1889 return ERR_PTR(-EINVAL);
1588} 1890}
1589 1891
1590/** 1892/**
@@ -1592,8 +1894,9 @@ struct rpc_xprt *xs_setup_udp(struct rpc_xprtsock_create *args)
1592 * @args: rpc transport creation arguments 1894 * @args: rpc transport creation arguments
1593 * 1895 *
1594 */ 1896 */
1595struct rpc_xprt *xs_setup_tcp(struct rpc_xprtsock_create *args) 1897struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
1596{ 1898{
1899 struct sockaddr *addr = args->dstaddr;
1597 struct rpc_xprt *xprt; 1900 struct rpc_xprt *xprt;
1598 struct sock_xprt *transport; 1901 struct sock_xprt *transport;
1599 1902
@@ -1602,14 +1905,10 @@ struct rpc_xprt *xs_setup_tcp(struct rpc_xprtsock_create *args)
1602 return xprt; 1905 return xprt;
1603 transport = container_of(xprt, struct sock_xprt, xprt); 1906 transport = container_of(xprt, struct sock_xprt, xprt);
1604 1907
1605 if (ntohs(((struct sockaddr_in *)args->dstaddr)->sin_port) != 0)
1606 xprt_set_bound(xprt);
1607
1608 xprt->prot = IPPROTO_TCP; 1908 xprt->prot = IPPROTO_TCP;
1609 xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); 1909 xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
1610 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 1910 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
1611 1911
1612 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker);
1613 xprt->bind_timeout = XS_BIND_TO; 1912 xprt->bind_timeout = XS_BIND_TO;
1614 xprt->connect_timeout = XS_TCP_CONN_TO; 1913 xprt->connect_timeout = XS_TCP_CONN_TO;
1615 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 1914 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
@@ -1622,15 +1921,55 @@ struct rpc_xprt *xs_setup_tcp(struct rpc_xprtsock_create *args)
1622 else 1921 else
1623 xprt_set_timeout(&xprt->timeout, 2, 60 * HZ); 1922 xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
1624 1923
1625 xs_format_peer_addresses(xprt); 1924 switch (addr->sa_family) {
1925 case AF_INET:
1926 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
1927 xprt_set_bound(xprt);
1928
1929 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4);
1930 xs_format_ipv4_peer_addresses(xprt);
1931 break;
1932 case AF_INET6:
1933 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
1934 xprt_set_bound(xprt);
1935
1936 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6);
1937 xs_format_ipv6_peer_addresses(xprt);
1938 break;
1939 default:
1940 kfree(xprt);
1941 return ERR_PTR(-EAFNOSUPPORT);
1942 }
1943
1626 dprintk("RPC: set up transport to address %s\n", 1944 dprintk("RPC: set up transport to address %s\n",
1627 xprt->address_strings[RPC_DISPLAY_ALL]); 1945 xprt->address_strings[RPC_DISPLAY_ALL]);
1628 1946
1629 return xprt; 1947 if (try_module_get(THIS_MODULE))
1948 return xprt;
1949
1950 kfree(xprt->slot);
1951 kfree(xprt);
1952 return ERR_PTR(-EINVAL);
1630} 1953}
1631 1954
1955static struct xprt_class xs_udp_transport = {
1956 .list = LIST_HEAD_INIT(xs_udp_transport.list),
1957 .name = "udp",
1958 .owner = THIS_MODULE,
1959 .ident = IPPROTO_UDP,
1960 .setup = xs_setup_udp,
1961};
1962
1963static struct xprt_class xs_tcp_transport = {
1964 .list = LIST_HEAD_INIT(xs_tcp_transport.list),
1965 .name = "tcp",
1966 .owner = THIS_MODULE,
1967 .ident = IPPROTO_TCP,
1968 .setup = xs_setup_tcp,
1969};
1970
1632/** 1971/**
1633 * init_socket_xprt - set up xprtsock's sysctls 1972 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
1634 * 1973 *
1635 */ 1974 */
1636int init_socket_xprt(void) 1975int init_socket_xprt(void)
@@ -1640,11 +1979,14 @@ int init_socket_xprt(void)
1640 sunrpc_table_header = register_sysctl_table(sunrpc_table); 1979 sunrpc_table_header = register_sysctl_table(sunrpc_table);
1641#endif 1980#endif
1642 1981
1982 xprt_register_transport(&xs_udp_transport);
1983 xprt_register_transport(&xs_tcp_transport);
1984
1643 return 0; 1985 return 0;
1644} 1986}
1645 1987
1646/** 1988/**
1647 * cleanup_socket_xprt - remove xprtsock's sysctls 1989 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
1648 * 1990 *
1649 */ 1991 */
1650void cleanup_socket_xprt(void) 1992void cleanup_socket_xprt(void)
@@ -1655,4 +1997,7 @@ void cleanup_socket_xprt(void)
1655 sunrpc_table_header = NULL; 1997 sunrpc_table_header = NULL;
1656 } 1998 }
1657#endif 1999#endif
2000
2001 xprt_unregister_transport(&xs_udp_transport);
2002 xprt_unregister_transport(&xs_tcp_transport);
1658} 2003}