summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2015-11-09 21:11:22 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2015-11-09 21:11:22 -0500
commite6604ecb70d4b1dbc0372c6518b51c25c4b135a1 (patch)
tree2d12c51b84c3ba8472e59ddbe37da034e2c5251f /net
parent9d74288ca79249af4b906215788b37d52263b58b (diff)
parent941c3ff3102ccce440034d59cf9e4e9cc10b720d (diff)
Merge tag 'nfs-for-4.4-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust: "Highlights include: New features: - RDMA client backchannel from Chuck - Support for NFSv4.2 file CLONE using the btrfs ioctl Bugfixes + cleanups: - Move socket data receive out of the bottom halves and into a workqueue - Refactor NFSv4 error handling so synchronous and asynchronous RPC handles errors identically. - Fix a panic when blocks or object layouts reads return a bad data length - Fix nfsroot so it can handle a 1024 byte long path. - Fix bad usage of page offset in bl_read_pagelist - Various NFSv4 callback cleanups+fixes - Fix GETATTR bitmap verification - Support hexadecimal number for sunrpc debug sysctl files" * tag 'nfs-for-4.4-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (53 commits) Sunrpc: Supports hexadecimal number for sysctl files of sunrpc debug nfs: Fix GETATTR bitmap verification nfs: Remove unused xdr page offsets in getacl/setacl arguments fs/nfs: remove unnecessary new_valid_dev check SUNRPC: fix variable type NFS: Enable client side NFSv4.1 backchannel to use other transports pNFS/flexfiles: Add support for FF_FLAGS_NO_IO_THRU_MDS pNFS/flexfiles: When mirrored, retry failed reads by switching mirrors SUNRPC: Remove the TCP-only restriction in bc_svc_process() svcrdma: Add backward direction service for RPC/RDMA transport xprtrdma: Handle incoming backward direction RPC calls xprtrdma: Add support for sending backward direction RPC replies xprtrdma: Pre-allocate Work Requests for backchannel xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers SUNRPC: Abstract backchannel operations xprtrdma: Saving IRQs no longer needed for rb_lock xprtrdma: Remove reply tasklet xprtrdma: Use workqueue to process RPC/RDMA replies xprtrdma: Replace send and receive arrays xprtrdma: Refactor reply handler error handling ...
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/backchannel_rqst.c24
-rw-r--r--net/sunrpc/svc.c5
-rw-r--r--net/sunrpc/sysctl.c23
-rw-r--r--net/sunrpc/xprtrdma/Makefile1
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c394
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c7
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c148
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c6
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c58
-rw-r--r--net/sunrpc/xprtrdma/transport.c18
-rw-r--r--net/sunrpc/xprtrdma/verbs.c479
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h54
-rw-r--r--net/sunrpc/xprtsock.c260
13 files changed, 1083 insertions, 394 deletions
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 6255d141133b..229956bf8457 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -138,6 +138,14 @@ out_free:
138 */ 138 */
139int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) 139int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
140{ 140{
141 if (!xprt->ops->bc_setup)
142 return 0;
143 return xprt->ops->bc_setup(xprt, min_reqs);
144}
145EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
146
147int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
148{
141 struct rpc_rqst *req; 149 struct rpc_rqst *req;
142 struct list_head tmp_list; 150 struct list_head tmp_list;
143 int i; 151 int i;
@@ -192,7 +200,6 @@ out_free:
192 dprintk("RPC: setup backchannel transport failed\n"); 200 dprintk("RPC: setup backchannel transport failed\n");
193 return -ENOMEM; 201 return -ENOMEM;
194} 202}
195EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
196 203
197/** 204/**
198 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures. 205 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
@@ -205,6 +212,13 @@ EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
205 */ 212 */
206void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) 213void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
207{ 214{
215 if (xprt->ops->bc_destroy)
216 xprt->ops->bc_destroy(xprt, max_reqs);
217}
218EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
219
220void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
221{
208 struct rpc_rqst *req = NULL, *tmp = NULL; 222 struct rpc_rqst *req = NULL, *tmp = NULL;
209 223
210 dprintk("RPC: destroy backchannel transport\n"); 224 dprintk("RPC: destroy backchannel transport\n");
@@ -227,7 +241,6 @@ out:
227 dprintk("RPC: backchannel list empty= %s\n", 241 dprintk("RPC: backchannel list empty= %s\n",
228 list_empty(&xprt->bc_pa_list) ? "true" : "false"); 242 list_empty(&xprt->bc_pa_list) ? "true" : "false");
229} 243}
230EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
231 244
232static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid) 245static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)
233{ 246{
@@ -264,6 +277,13 @@ void xprt_free_bc_request(struct rpc_rqst *req)
264{ 277{
265 struct rpc_xprt *xprt = req->rq_xprt; 278 struct rpc_xprt *xprt = req->rq_xprt;
266 279
280 xprt->ops->bc_free_rqst(req);
281}
282
283void xprt_free_bc_rqst(struct rpc_rqst *req)
284{
285 struct rpc_xprt *xprt = req->rq_xprt;
286
267 dprintk("RPC: free backchannel req=%p\n", req); 287 dprintk("RPC: free backchannel req=%p\n", req);
268 288
269 req->rq_connect_cookie = xprt->connect_cookie - 1; 289 req->rq_connect_cookie = xprt->connect_cookie - 1;
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index a8f579df14d8..bc5b7b5032ca 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1367,11 +1367,6 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1367 /* reset result send buffer "put" position */ 1367 /* reset result send buffer "put" position */
1368 resv->iov_len = 0; 1368 resv->iov_len = 0;
1369 1369
1370 if (rqstp->rq_prot != IPPROTO_TCP) {
1371 printk(KERN_ERR "No support for Non-TCP transports!\n");
1372 BUG();
1373 }
1374
1375 /* 1370 /*
1376 * Skip the next two words because they've already been 1371 * Skip the next two words because they've already been
1377 * processed in the transport 1372 * processed in the transport
diff --git a/net/sunrpc/sysctl.c b/net/sunrpc/sysctl.c
index 887f0183b4c6..c88d9bc06f5c 100644
--- a/net/sunrpc/sysctl.c
+++ b/net/sunrpc/sysctl.c
@@ -76,7 +76,7 @@ static int
76proc_dodebug(struct ctl_table *table, int write, 76proc_dodebug(struct ctl_table *table, int write,
77 void __user *buffer, size_t *lenp, loff_t *ppos) 77 void __user *buffer, size_t *lenp, loff_t *ppos)
78{ 78{
79 char tmpbuf[20], c, *s; 79 char tmpbuf[20], c, *s = NULL;
80 char __user *p; 80 char __user *p;
81 unsigned int value; 81 unsigned int value;
82 size_t left, len; 82 size_t left, len;
@@ -103,23 +103,24 @@ proc_dodebug(struct ctl_table *table, int write,
103 return -EFAULT; 103 return -EFAULT;
104 tmpbuf[left] = '\0'; 104 tmpbuf[left] = '\0';
105 105
106 for (s = tmpbuf, value = 0; '0' <= *s && *s <= '9'; s++, left--) 106 value = simple_strtol(tmpbuf, &s, 0);
107 value = 10 * value + (*s - '0'); 107 if (s) {
108 if (*s && !isspace(*s)) 108 left -= (s - tmpbuf);
109 return -EINVAL; 109 if (left && !isspace(*s))
110 while (left && isspace(*s)) 110 return -EINVAL;
111 left--, s++; 111 while (left && isspace(*s))
112 left--, s++;
113 } else
114 left = 0;
112 *(unsigned int *) table->data = value; 115 *(unsigned int *) table->data = value;
113 /* Display the RPC tasks on writing to rpc_debug */ 116 /* Display the RPC tasks on writing to rpc_debug */
114 if (strcmp(table->procname, "rpc_debug") == 0) 117 if (strcmp(table->procname, "rpc_debug") == 0)
115 rpc_show_tasks(&init_net); 118 rpc_show_tasks(&init_net);
116 } else { 119 } else {
117 if (!access_ok(VERIFY_WRITE, buffer, left)) 120 len = sprintf(tmpbuf, "0x%04x", *(unsigned int *) table->data);
118 return -EFAULT;
119 len = sprintf(tmpbuf, "%d", *(unsigned int *) table->data);
120 if (len > left) 121 if (len > left)
121 len = left; 122 len = left;
122 if (__copy_to_user(buffer, tmpbuf, len)) 123 if (copy_to_user(buffer, tmpbuf, len))
123 return -EFAULT; 124 return -EFAULT;
124 if ((left -= len) > 0) { 125 if ((left -= len) > 0) {
125 if (put_user('\n', (char __user *)buffer + len)) 126 if (put_user('\n', (char __user *)buffer + len))
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 48913de240bd..33f99d3004f2 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
5 svc_rdma.o svc_rdma_transport.o \ 5 svc_rdma.o svc_rdma_transport.o \
6 svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ 6 svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
7 module.o 7 module.o
8rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
new file mode 100644
index 000000000000..2dcb44f69e53
--- /dev/null
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -0,0 +1,394 @@
1/*
2 * Copyright (c) 2015 Oracle. All rights reserved.
3 *
4 * Support for backward direction RPCs on RPC/RDMA.
5 */
6
7#include <linux/module.h>
8#include <linux/sunrpc/xprt.h>
9#include <linux/sunrpc/svc.h>
10#include <linux/sunrpc/svc_xprt.h>
11
12#include "xprt_rdma.h"
13
14#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
15# define RPCDBG_FACILITY RPCDBG_TRANS
16#endif
17
18#define RPCRDMA_BACKCHANNEL_DEBUG
19
20static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
21 struct rpc_rqst *rqst)
22{
23 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
24 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
25
26 spin_lock(&buf->rb_reqslock);
27 list_del(&req->rl_all);
28 spin_unlock(&buf->rb_reqslock);
29
30 rpcrdma_destroy_req(&r_xprt->rx_ia, req);
31
32 kfree(rqst);
33}
34
35static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
36 struct rpc_rqst *rqst)
37{
38 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
39 struct rpcrdma_regbuf *rb;
40 struct rpcrdma_req *req;
41 struct xdr_buf *buf;
42 size_t size;
43
44 req = rpcrdma_create_req(r_xprt);
45 if (!req)
46 return -ENOMEM;
47 req->rl_backchannel = true;
48
49 size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
50 rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
51 if (IS_ERR(rb))
52 goto out_fail;
53 req->rl_rdmabuf = rb;
54
55 size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
56 rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
57 if (IS_ERR(rb))
58 goto out_fail;
59 rb->rg_owner = req;
60 req->rl_sendbuf = rb;
61 /* so that rpcr_to_rdmar works when receiving a request */
62 rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
63
64 buf = &rqst->rq_snd_buf;
65 buf->head[0].iov_base = rqst->rq_buffer;
66 buf->head[0].iov_len = 0;
67 buf->tail[0].iov_base = NULL;
68 buf->tail[0].iov_len = 0;
69 buf->page_len = 0;
70 buf->len = 0;
71 buf->buflen = size;
72
73 return 0;
74
75out_fail:
76 rpcrdma_bc_free_rqst(r_xprt, rqst);
77 return -ENOMEM;
78}
79
80/* Allocate and add receive buffers to the rpcrdma_buffer's
81 * existing list of rep's. These are released when the
82 * transport is destroyed.
83 */
84static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
85 unsigned int count)
86{
87 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
88 struct rpcrdma_rep *rep;
89 unsigned long flags;
90 int rc = 0;
91
92 while (count--) {
93 rep = rpcrdma_create_rep(r_xprt);
94 if (IS_ERR(rep)) {
95 pr_err("RPC: %s: reply buffer alloc failed\n",
96 __func__);
97 rc = PTR_ERR(rep);
98 break;
99 }
100
101 spin_lock_irqsave(&buffers->rb_lock, flags);
102 list_add(&rep->rr_list, &buffers->rb_recv_bufs);
103 spin_unlock_irqrestore(&buffers->rb_lock, flags);
104 }
105
106 return rc;
107}
108
109/**
110 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
111 * @xprt: transport associated with these backchannel resources
112 * @reqs: number of concurrent incoming requests to expect
113 *
114 * Returns 0 on success; otherwise a negative errno
115 */
116int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
117{
118 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
119 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
120 struct rpc_rqst *rqst;
121 unsigned int i;
122 int rc;
123
124 /* The backchannel reply path returns each rpc_rqst to the
125 * bc_pa_list _after_ the reply is sent. If the server is
126 * faster than the client, it can send another backward
127 * direction request before the rpc_rqst is returned to the
128 * list. The client rejects the request in this case.
129 *
130 * Twice as many rpc_rqsts are prepared to ensure there is
131 * always an rpc_rqst available as soon as a reply is sent.
132 */
133 if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
134 goto out_err;
135
136 for (i = 0; i < (reqs << 1); i++) {
137 rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
138 if (!rqst) {
139 pr_err("RPC: %s: Failed to create bc rpc_rqst\n",
140 __func__);
141 goto out_free;
142 }
143
144 rqst->rq_xprt = &r_xprt->rx_xprt;
145 INIT_LIST_HEAD(&rqst->rq_list);
146 INIT_LIST_HEAD(&rqst->rq_bc_list);
147
148 if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
149 goto out_free;
150
151 spin_lock_bh(&xprt->bc_pa_lock);
152 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
153 spin_unlock_bh(&xprt->bc_pa_lock);
154 }
155
156 rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
157 if (rc)
158 goto out_free;
159
160 rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
161 if (rc)
162 goto out_free;
163
164 buffer->rb_bc_srv_max_requests = reqs;
165 request_module("svcrdma");
166
167 return 0;
168
169out_free:
170 xprt_rdma_bc_destroy(xprt, reqs);
171
172out_err:
173 pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
174 return -ENOMEM;
175}
176
177/**
178 * xprt_rdma_bc_up - Create transport endpoint for backchannel service
179 * @serv: server endpoint
180 * @net: network namespace
181 *
182 * The "xprt" is an implied argument: it supplies the name of the
183 * backchannel transport class.
184 *
185 * Returns zero on success, negative errno on failure
186 */
187int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
188{
189 int ret;
190
191 ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
192 if (ret < 0)
193 return ret;
194 return 0;
195}
196
197/**
198 * rpcrdma_bc_marshal_reply - Send backwards direction reply
199 * @rqst: buffer containing RPC reply data
200 *
201 * Returns zero on success.
202 */
203int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
204{
205 struct rpc_xprt *xprt = rqst->rq_xprt;
206 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
207 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
208 struct rpcrdma_msg *headerp;
209 size_t rpclen;
210
211 headerp = rdmab_to_msg(req->rl_rdmabuf);
212 headerp->rm_xid = rqst->rq_xid;
213 headerp->rm_vers = rpcrdma_version;
214 headerp->rm_credit =
215 cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
216 headerp->rm_type = rdma_msg;
217 headerp->rm_body.rm_chunks[0] = xdr_zero;
218 headerp->rm_body.rm_chunks[1] = xdr_zero;
219 headerp->rm_body.rm_chunks[2] = xdr_zero;
220
221 rpclen = rqst->rq_svec[0].iov_len;
222
223 pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
224 __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
225 pr_info("RPC: %s: RPC/RDMA: %*ph\n",
226 __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
227 pr_info("RPC: %s: RPC: %*ph\n",
228 __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
229
230 req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
231 req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
232 req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
233
234 req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
235 req->rl_send_iov[1].length = rpclen;
236 req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
237
238 req->rl_niovs = 2;
239 return 0;
240}
241
242/**
243 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
244 * @xprt: transport associated with these backchannel resources
245 * @reqs: number of incoming requests to destroy; ignored
246 */
247void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
248{
249 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
250 struct rpc_rqst *rqst, *tmp;
251
252 spin_lock_bh(&xprt->bc_pa_lock);
253 list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
254 list_del(&rqst->rq_bc_pa_list);
255 spin_unlock_bh(&xprt->bc_pa_lock);
256
257 rpcrdma_bc_free_rqst(r_xprt, rqst);
258
259 spin_lock_bh(&xprt->bc_pa_lock);
260 }
261 spin_unlock_bh(&xprt->bc_pa_lock);
262}
263
264/**
265 * xprt_rdma_bc_free_rqst - Release a backchannel rqst
266 * @rqst: request to release
267 */
268void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
269{
270 struct rpc_xprt *xprt = rqst->rq_xprt;
271
272 smp_mb__before_atomic();
273 WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
274 clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
275 smp_mb__after_atomic();
276
277 spin_lock_bh(&xprt->bc_pa_lock);
278 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
279 spin_unlock_bh(&xprt->bc_pa_lock);
280}
281
282/**
283 * rpcrdma_bc_receive_call - Handle a backward direction call
284 * @xprt: transport receiving the call
285 * @rep: receive buffer containing the call
286 *
287 * Called in the RPC reply handler, which runs in a tasklet.
288 * Be quick about it.
289 *
290 * Operational assumptions:
291 * o Backchannel credits are ignored, just as the NFS server
292 * forechannel currently does
293 * o The ULP manages a replay cache (eg, NFSv4.1 sessions).
294 * No replay detection is done at the transport level
295 */
296void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
297 struct rpcrdma_rep *rep)
298{
299 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
300 struct rpcrdma_msg *headerp;
301 struct svc_serv *bc_serv;
302 struct rpcrdma_req *req;
303 struct rpc_rqst *rqst;
304 struct xdr_buf *buf;
305 size_t size;
306 __be32 *p;
307
308 headerp = rdmab_to_msg(rep->rr_rdmabuf);
309#ifdef RPCRDMA_BACKCHANNEL_DEBUG
310 pr_info("RPC: %s: callback XID %08x, length=%u\n",
311 __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
312 pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp);
313#endif
314
315 /* Sanity check:
316 * Need at least enough bytes for RPC/RDMA header, as code
317 * here references the header fields by array offset. Also,
318 * backward calls are always inline, so ensure there
319 * are some bytes beyond the RPC/RDMA header.
320 */
321 if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
322 goto out_short;
323 p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
324 size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
325
326 /* Grab a free bc rqst */
327 spin_lock(&xprt->bc_pa_lock);
328 if (list_empty(&xprt->bc_pa_list)) {
329 spin_unlock(&xprt->bc_pa_lock);
330 goto out_overflow;
331 }
332 rqst = list_first_entry(&xprt->bc_pa_list,
333 struct rpc_rqst, rq_bc_pa_list);
334 list_del(&rqst->rq_bc_pa_list);
335 spin_unlock(&xprt->bc_pa_lock);
336#ifdef RPCRDMA_BACKCHANNEL_DEBUG
337 pr_info("RPC: %s: using rqst %p\n", __func__, rqst);
338#endif
339
340 /* Prepare rqst */
341 rqst->rq_reply_bytes_recvd = 0;
342 rqst->rq_bytes_sent = 0;
343 rqst->rq_xid = headerp->rm_xid;
344 set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
345
346 buf = &rqst->rq_rcv_buf;
347 memset(buf, 0, sizeof(*buf));
348 buf->head[0].iov_base = p;
349 buf->head[0].iov_len = size;
350 buf->len = size;
351
352 /* The receive buffer has to be hooked to the rpcrdma_req
353 * so that it can be reposted after the server is done
354 * parsing it but just before sending the backward
355 * direction reply.
356 */
357 req = rpcr_to_rdmar(rqst);
358#ifdef RPCRDMA_BACKCHANNEL_DEBUG
359 pr_info("RPC: %s: attaching rep %p to req %p\n",
360 __func__, rep, req);
361#endif
362 req->rl_reply = rep;
363
364 /* Defeat the retransmit detection logic in send_request */
365 req->rl_connect_cookie = 0;
366
367 /* Queue rqst for ULP's callback service */
368 bc_serv = xprt->bc_serv;
369 spin_lock(&bc_serv->sv_cb_lock);
370 list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
371 spin_unlock(&bc_serv->sv_cb_lock);
372
373 wake_up(&bc_serv->sv_cb_waitq);
374
375 r_xprt->rx_stats.bcall_count++;
376 return;
377
378out_overflow:
379 pr_warn("RPC/RDMA backchannel overflow\n");
380 xprt_disconnect_done(xprt);
381 /* This receive buffer gets reposted automatically
382 * when the connection is re-established.
383 */
384 return;
385
386out_short:
387 pr_warn("RPC/RDMA short backward direction call\n");
388
389 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
390 xprt_disconnect_done(xprt);
391 else
392 pr_warn("RPC: %s: reposting rep %p\n",
393 __func__, rep);
394}
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index a1434447b0d6..88cf9e7269c2 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -256,8 +256,11 @@ frwr_sendcompletion(struct ib_wc *wc)
256 256
257 /* WARNING: Only wr_id and status are reliable at this point */ 257 /* WARNING: Only wr_id and status are reliable at this point */
258 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; 258 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
259 pr_warn("RPC: %s: frmr %p flushed, status %s (%d)\n", 259 if (wc->status == IB_WC_WR_FLUSH_ERR)
260 __func__, r, ib_wc_status_msg(wc->status), wc->status); 260 dprintk("RPC: %s: frmr %p flushed\n", __func__, r);
261 else
262 pr_warn("RPC: %s: frmr %p error, status %s (%d)\n",
263 __func__, r, ib_wc_status_msg(wc->status), wc->status);
261 r->r.frmr.fr_state = FRMR_IS_STALE; 264 r->r.frmr.fr_state = FRMR_IS_STALE;
262} 265}
263 266
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index bc8bd6577467..c10d9699441c 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -441,6 +441,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
441 enum rpcrdma_chunktype rtype, wtype; 441 enum rpcrdma_chunktype rtype, wtype;
442 struct rpcrdma_msg *headerp; 442 struct rpcrdma_msg *headerp;
443 443
444#if defined(CONFIG_SUNRPC_BACKCHANNEL)
445 if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
446 return rpcrdma_bc_marshal_reply(rqst);
447#endif
448
444 /* 449 /*
445 * rpclen gets amount of data in first buffer, which is the 450 * rpclen gets amount of data in first buffer, which is the
446 * pre-registered buffer. 451 * pre-registered buffer.
@@ -711,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work)
711 spin_unlock_bh(&xprt->transport_lock); 716 spin_unlock_bh(&xprt->transport_lock);
712} 717}
713 718
719#if defined(CONFIG_SUNRPC_BACKCHANNEL)
720/* By convention, backchannel calls arrive via rdma_msg type
721 * messages, and never populate the chunk lists. This makes
722 * the RPC/RDMA header small and fixed in size, so it is
723 * straightforward to check the RPC header's direction field.
724 */
725static bool
726rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
727{
728 __be32 *p = (__be32 *)headerp;
729
730 if (headerp->rm_type != rdma_msg)
731 return false;
732 if (headerp->rm_body.rm_chunks[0] != xdr_zero)
733 return false;
734 if (headerp->rm_body.rm_chunks[1] != xdr_zero)
735 return false;
736 if (headerp->rm_body.rm_chunks[2] != xdr_zero)
737 return false;
738
739 /* sanity */
740 if (p[7] != headerp->rm_xid)
741 return false;
742 /* call direction */
743 if (p[8] != cpu_to_be32(RPC_CALL))
744 return false;
745
746 return true;
747}
748#endif /* CONFIG_SUNRPC_BACKCHANNEL */
749
714/* 750/*
715 * This function is called when an async event is posted to 751 * This function is called when an async event is posted to
716 * the connection which changes the connection state. All it 752 * the connection which changes the connection state. All it
@@ -723,8 +759,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
723 schedule_delayed_work(&ep->rep_connect_worker, 0); 759 schedule_delayed_work(&ep->rep_connect_worker, 0);
724} 760}
725 761
726/* 762/* Process received RPC/RDMA messages.
727 * Called as a tasklet to do req/reply match and complete a request 763 *
728 * Errors must result in the RPC task either being awakened, or 764 * Errors must result in the RPC task either being awakened, or
729 * allowed to timeout, to discover the errors at that time. 765 * allowed to timeout, to discover the errors at that time.
730 */ 766 */
@@ -741,52 +777,32 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
741 unsigned long cwnd; 777 unsigned long cwnd;
742 u32 credits; 778 u32 credits;
743 779
744 /* Check status. If bad, signal disconnect and return rep to pool */ 780 dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
745 if (rep->rr_len == ~0U) { 781
746 rpcrdma_recv_buffer_put(rep); 782 if (rep->rr_len == RPCRDMA_BAD_LEN)
747 if (r_xprt->rx_ep.rep_connected == 1) { 783 goto out_badstatus;
748 r_xprt->rx_ep.rep_connected = -EIO; 784 if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
749 rpcrdma_conn_func(&r_xprt->rx_ep); 785 goto out_shortreply;
750 } 786
751 return;
752 }
753 if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
754 dprintk("RPC: %s: short/invalid reply\n", __func__);
755 goto repost;
756 }
757 headerp = rdmab_to_msg(rep->rr_rdmabuf); 787 headerp = rdmab_to_msg(rep->rr_rdmabuf);
758 if (headerp->rm_vers != rpcrdma_version) { 788 if (headerp->rm_vers != rpcrdma_version)
759 dprintk("RPC: %s: invalid version %d\n", 789 goto out_badversion;
760 __func__, be32_to_cpu(headerp->rm_vers)); 790#if defined(CONFIG_SUNRPC_BACKCHANNEL)
761 goto repost; 791 if (rpcrdma_is_bcall(headerp))
762 } 792 goto out_bcall;
793#endif
763 794
764 /* Get XID and try for a match. */ 795 /* Match incoming rpcrdma_rep to an rpcrdma_req to
765 spin_lock(&xprt->transport_lock); 796 * get context for handling any incoming chunks.
797 */
798 spin_lock_bh(&xprt->transport_lock);
766 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); 799 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
767 if (rqst == NULL) { 800 if (!rqst)
768 spin_unlock(&xprt->transport_lock); 801 goto out_nomatch;
769 dprintk("RPC: %s: reply 0x%p failed "
770 "to match any request xid 0x%08x len %d\n",
771 __func__, rep, be32_to_cpu(headerp->rm_xid),
772 rep->rr_len);
773repost:
774 r_xprt->rx_stats.bad_reply_count++;
775 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
776 rpcrdma_recv_buffer_put(rep);
777 802
778 return;
779 }
780
781 /* get request object */
782 req = rpcr_to_rdmar(rqst); 803 req = rpcr_to_rdmar(rqst);
783 if (req->rl_reply) { 804 if (req->rl_reply)
784 spin_unlock(&xprt->transport_lock); 805 goto out_duplicate;
785 dprintk("RPC: %s: duplicate reply 0x%p to RPC "
786 "request 0x%p: xid 0x%08x\n", __func__, rep, req,
787 be32_to_cpu(headerp->rm_xid));
788 goto repost;
789 }
790 806
791 dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" 807 dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
792 " RPC request 0x%p xid 0x%08x\n", 808 " RPC request 0x%p xid 0x%08x\n",
@@ -883,8 +899,50 @@ badheader:
883 if (xprt->cwnd > cwnd) 899 if (xprt->cwnd > cwnd)
884 xprt_release_rqst_cong(rqst->rq_task); 900 xprt_release_rqst_cong(rqst->rq_task);
885 901
902 xprt_complete_rqst(rqst->rq_task, status);
903 spin_unlock_bh(&xprt->transport_lock);
886 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", 904 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
887 __func__, xprt, rqst, status); 905 __func__, xprt, rqst, status);
888 xprt_complete_rqst(rqst->rq_task, status); 906 return;
889 spin_unlock(&xprt->transport_lock); 907
908out_badstatus:
909 rpcrdma_recv_buffer_put(rep);
910 if (r_xprt->rx_ep.rep_connected == 1) {
911 r_xprt->rx_ep.rep_connected = -EIO;
912 rpcrdma_conn_func(&r_xprt->rx_ep);
913 }
914 return;
915
916#if defined(CONFIG_SUNRPC_BACKCHANNEL)
917out_bcall:
918 rpcrdma_bc_receive_call(r_xprt, rep);
919 return;
920#endif
921
922out_shortreply:
923 dprintk("RPC: %s: short/invalid reply\n", __func__);
924 goto repost;
925
926out_badversion:
927 dprintk("RPC: %s: invalid version %d\n",
928 __func__, be32_to_cpu(headerp->rm_vers));
929 goto repost;
930
931out_nomatch:
932 spin_unlock_bh(&xprt->transport_lock);
933 dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
934 __func__, be32_to_cpu(headerp->rm_xid),
935 rep->rr_len);
936 goto repost;
937
938out_duplicate:
939 spin_unlock_bh(&xprt->transport_lock);
940 dprintk("RPC: %s: "
941 "duplicate reply %p to RPC request %p: xid 0x%08x\n",
942 __func__, rep, req, be32_to_cpu(headerp->rm_xid));
943
944repost:
945 r_xprt->rx_stats.bad_reply_count++;
946 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
947 rpcrdma_recv_buffer_put(rep);
890} 948}
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index 2cd252f023a5..1b7051bdbdc8 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -239,6 +239,9 @@ void svc_rdma_cleanup(void)
239 unregister_sysctl_table(svcrdma_table_header); 239 unregister_sysctl_table(svcrdma_table_header);
240 svcrdma_table_header = NULL; 240 svcrdma_table_header = NULL;
241 } 241 }
242#if defined(CONFIG_SUNRPC_BACKCHANNEL)
243 svc_unreg_xprt_class(&svc_rdma_bc_class);
244#endif
242 svc_unreg_xprt_class(&svc_rdma_class); 245 svc_unreg_xprt_class(&svc_rdma_class);
243 kmem_cache_destroy(svc_rdma_map_cachep); 246 kmem_cache_destroy(svc_rdma_map_cachep);
244 kmem_cache_destroy(svc_rdma_ctxt_cachep); 247 kmem_cache_destroy(svc_rdma_ctxt_cachep);
@@ -286,6 +289,9 @@ int svc_rdma_init(void)
286 289
287 /* Register RDMA with the SVC transport switch */ 290 /* Register RDMA with the SVC transport switch */
288 svc_reg_xprt_class(&svc_rdma_class); 291 svc_reg_xprt_class(&svc_rdma_class);
292#if defined(CONFIG_SUNRPC_BACKCHANNEL)
293 svc_reg_xprt_class(&svc_rdma_bc_class);
294#endif
289 return 0; 295 return 0;
290 err1: 296 err1:
291 kmem_cache_destroy(svc_rdma_map_cachep); 297 kmem_cache_destroy(svc_rdma_map_cachep);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index a266e870d870..b348b4adef29 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -56,6 +56,7 @@
56 56
57#define RPCDBG_FACILITY RPCDBG_SVCXPRT 57#define RPCDBG_FACILITY RPCDBG_SVCXPRT
58 58
59static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
59static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 60static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
60 struct net *net, 61 struct net *net,
61 struct sockaddr *sa, int salen, 62 struct sockaddr *sa, int salen,
@@ -95,6 +96,63 @@ struct svc_xprt_class svc_rdma_class = {
95 .xcl_ident = XPRT_TRANSPORT_RDMA, 96 .xcl_ident = XPRT_TRANSPORT_RDMA,
96}; 97};
97 98
99#if defined(CONFIG_SUNRPC_BACKCHANNEL)
100static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
101 struct sockaddr *, int, int);
102static void svc_rdma_bc_detach(struct svc_xprt *);
103static void svc_rdma_bc_free(struct svc_xprt *);
104
105static struct svc_xprt_ops svc_rdma_bc_ops = {
106 .xpo_create = svc_rdma_bc_create,
107 .xpo_detach = svc_rdma_bc_detach,
108 .xpo_free = svc_rdma_bc_free,
109 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
110 .xpo_secure_port = svc_rdma_secure_port,
111};
112
113struct svc_xprt_class svc_rdma_bc_class = {
114 .xcl_name = "rdma-bc",
115 .xcl_owner = THIS_MODULE,
116 .xcl_ops = &svc_rdma_bc_ops,
117 .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
118};
119
120static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
121 struct net *net,
122 struct sockaddr *sa, int salen,
123 int flags)
124{
125 struct svcxprt_rdma *cma_xprt;
126 struct svc_xprt *xprt;
127
128 cma_xprt = rdma_create_xprt(serv, 0);
129 if (!cma_xprt)
130 return ERR_PTR(-ENOMEM);
131 xprt = &cma_xprt->sc_xprt;
132
133 svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
134 serv->sv_bc_xprt = xprt;
135
136 dprintk("svcrdma: %s(%p)\n", __func__, xprt);
137 return xprt;
138}
139
140static void svc_rdma_bc_detach(struct svc_xprt *xprt)
141{
142 dprintk("svcrdma: %s(%p)\n", __func__, xprt);
143}
144
145static void svc_rdma_bc_free(struct svc_xprt *xprt)
146{
147 struct svcxprt_rdma *rdma =
148 container_of(xprt, struct svcxprt_rdma, sc_xprt);
149
150 dprintk("svcrdma: %s(%p)\n", __func__, xprt);
151 if (xprt)
152 kfree(rdma);
153}
154#endif /* CONFIG_SUNRPC_BACKCHANNEL */
155
98struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 156struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
99{ 157{
100 struct svc_rdma_op_ctxt *ctxt; 158 struct svc_rdma_op_ctxt *ctxt;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 41e452bc580c..8c545f7d7525 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -676,7 +676,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
676static int 676static int
677xprt_rdma_enable_swap(struct rpc_xprt *xprt) 677xprt_rdma_enable_swap(struct rpc_xprt *xprt)
678{ 678{
679 return -EINVAL; 679 return 0;
680} 680}
681 681
682static void 682static void
@@ -705,7 +705,13 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
705 .print_stats = xprt_rdma_print_stats, 705 .print_stats = xprt_rdma_print_stats,
706 .enable_swap = xprt_rdma_enable_swap, 706 .enable_swap = xprt_rdma_enable_swap,
707 .disable_swap = xprt_rdma_disable_swap, 707 .disable_swap = xprt_rdma_disable_swap,
708 .inject_disconnect = xprt_rdma_inject_disconnect 708 .inject_disconnect = xprt_rdma_inject_disconnect,
709#if defined(CONFIG_SUNRPC_BACKCHANNEL)
710 .bc_setup = xprt_rdma_bc_setup,
711 .bc_up = xprt_rdma_bc_up,
712 .bc_free_rqst = xprt_rdma_bc_free_rqst,
713 .bc_destroy = xprt_rdma_bc_destroy,
714#endif
709}; 715};
710 716
711static struct xprt_class xprt_rdma = { 717static struct xprt_class xprt_rdma = {
@@ -732,6 +738,7 @@ void xprt_rdma_cleanup(void)
732 dprintk("RPC: %s: xprt_unregister returned %i\n", 738 dprintk("RPC: %s: xprt_unregister returned %i\n",
733 __func__, rc); 739 __func__, rc);
734 740
741 rpcrdma_destroy_wq();
735 frwr_destroy_recovery_wq(); 742 frwr_destroy_recovery_wq();
736} 743}
737 744
@@ -743,8 +750,15 @@ int xprt_rdma_init(void)
743 if (rc) 750 if (rc)
744 return rc; 751 return rc;
745 752
753 rc = rpcrdma_alloc_wq();
754 if (rc) {
755 frwr_destroy_recovery_wq();
756 return rc;
757 }
758
746 rc = xprt_register_transport(&xprt_rdma); 759 rc = xprt_register_transport(&xprt_rdma);
747 if (rc) { 760 if (rc) {
761 rpcrdma_destroy_wq();
748 frwr_destroy_recovery_wq(); 762 frwr_destroy_recovery_wq();
749 return rc; 763 return rc;
750 } 764 }
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index f63369bd01c5..eadd1655145a 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -68,47 +68,33 @@
68 * internal functions 68 * internal functions
69 */ 69 */
70 70
71/* 71static struct workqueue_struct *rpcrdma_receive_wq;
72 * handle replies in tasklet context, using a single, global list
73 * rdma tasklet function -- just turn around and call the func
74 * for all replies on the list
75 */
76
77static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
78static LIST_HEAD(rpcrdma_tasklets_g);
79 72
80static void 73int
81rpcrdma_run_tasklet(unsigned long data) 74rpcrdma_alloc_wq(void)
82{ 75{
83 struct rpcrdma_rep *rep; 76 struct workqueue_struct *recv_wq;
84 unsigned long flags;
85
86 data = data;
87 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88 while (!list_empty(&rpcrdma_tasklets_g)) {
89 rep = list_entry(rpcrdma_tasklets_g.next,
90 struct rpcrdma_rep, rr_list);
91 list_del(&rep->rr_list);
92 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93 77
94 rpcrdma_reply_handler(rep); 78 recv_wq = alloc_workqueue("xprtrdma_receive",
79 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
80 0);
81 if (!recv_wq)
82 return -ENOMEM;
95 83
96 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); 84 rpcrdma_receive_wq = recv_wq;
97 } 85 return 0;
98 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
99} 86}
100 87
101static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL); 88void
102 89rpcrdma_destroy_wq(void)
103static void
104rpcrdma_schedule_tasklet(struct list_head *sched_list)
105{ 90{
106 unsigned long flags; 91 struct workqueue_struct *wq;
107 92
108 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); 93 if (rpcrdma_receive_wq) {
109 list_splice_tail(sched_list, &rpcrdma_tasklets_g); 94 wq = rpcrdma_receive_wq;
110 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); 95 rpcrdma_receive_wq = NULL;
111 tasklet_schedule(&rpcrdma_tasklet_g); 96 destroy_workqueue(wq);
97 }
112} 98}
113 99
114static void 100static void
@@ -158,63 +144,54 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
158 } 144 }
159} 145}
160 146
161static int 147/* The common case is a single send completion is waiting. By
162rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) 148 * passing two WC entries to ib_poll_cq, a return code of 1
149 * means there is exactly one WC waiting and no more. We don't
150 * have to invoke ib_poll_cq again to know that the CQ has been
151 * properly drained.
152 */
153static void
154rpcrdma_sendcq_poll(struct ib_cq *cq)
163{ 155{
164 struct ib_wc *wcs; 156 struct ib_wc *pos, wcs[2];
165 int budget, count, rc; 157 int count, rc;
166 158
167 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
168 do { 159 do {
169 wcs = ep->rep_send_wcs; 160 pos = wcs;
170 161
171 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); 162 rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
172 if (rc <= 0) 163 if (rc < 0)
173 return rc; 164 break;
174 165
175 count = rc; 166 count = rc;
176 while (count-- > 0) 167 while (count-- > 0)
177 rpcrdma_sendcq_process_wc(wcs++); 168 rpcrdma_sendcq_process_wc(pos++);
178 } while (rc == RPCRDMA_POLLSIZE && --budget); 169 } while (rc == ARRAY_SIZE(wcs));
179 return 0; 170 return;
180} 171}
181 172
182/* 173/* Handle provider send completion upcalls.
183 * Handle send, fast_reg_mr, and local_inv completions.
184 *
185 * Send events are typically suppressed and thus do not result
186 * in an upcall. Occasionally one is signaled, however. This
187 * prevents the provider's completion queue from wrapping and
188 * losing a completion.
189 */ 174 */
190static void 175static void
191rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) 176rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
192{ 177{
193 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; 178 do {
194 int rc; 179 rpcrdma_sendcq_poll(cq);
195 180 } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
196 rc = rpcrdma_sendcq_poll(cq, ep); 181 IB_CQ_REPORT_MISSED_EVENTS) > 0);
197 if (rc) { 182}
198 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
199 __func__, rc);
200 return;
201 }
202 183
203 rc = ib_req_notify_cq(cq, 184static void
204 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); 185rpcrdma_receive_worker(struct work_struct *work)
205 if (rc == 0) 186{
206 return; 187 struct rpcrdma_rep *rep =
207 if (rc < 0) { 188 container_of(work, struct rpcrdma_rep, rr_work);
208 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
209 __func__, rc);
210 return;
211 }
212 189
213 rpcrdma_sendcq_poll(cq, ep); 190 rpcrdma_reply_handler(rep);
214} 191}
215 192
216static void 193static void
217rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) 194rpcrdma_recvcq_process_wc(struct ib_wc *wc)
218{ 195{
219 struct rpcrdma_rep *rep = 196 struct rpcrdma_rep *rep =
220 (struct rpcrdma_rep *)(unsigned long)wc->wr_id; 197 (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -237,91 +214,60 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
237 prefetch(rdmab_to_msg(rep->rr_rdmabuf)); 214 prefetch(rdmab_to_msg(rep->rr_rdmabuf));
238 215
239out_schedule: 216out_schedule:
240 list_add_tail(&rep->rr_list, sched_list); 217 queue_work(rpcrdma_receive_wq, &rep->rr_work);
241 return; 218 return;
219
242out_fail: 220out_fail:
243 if (wc->status != IB_WC_WR_FLUSH_ERR) 221 if (wc->status != IB_WC_WR_FLUSH_ERR)
244 pr_err("RPC: %s: rep %p: %s\n", 222 pr_err("RPC: %s: rep %p: %s\n",
245 __func__, rep, ib_wc_status_msg(wc->status)); 223 __func__, rep, ib_wc_status_msg(wc->status));
246 rep->rr_len = ~0U; 224 rep->rr_len = RPCRDMA_BAD_LEN;
247 goto out_schedule; 225 goto out_schedule;
248} 226}
249 227
250static int 228/* The wc array is on stack: automatic memory is always CPU-local.
251rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) 229 *
230 * struct ib_wc is 64 bytes, making the poll array potentially
231 * large. But this is at the bottom of the call chain. Further
232 * substantial work is done in another thread.
233 */
234static void
235rpcrdma_recvcq_poll(struct ib_cq *cq)
252{ 236{
253 struct list_head sched_list; 237 struct ib_wc *pos, wcs[4];
254 struct ib_wc *wcs; 238 int count, rc;
255 int budget, count, rc;
256 239
257 INIT_LIST_HEAD(&sched_list);
258 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
259 do { 240 do {
260 wcs = ep->rep_recv_wcs; 241 pos = wcs;
261 242
262 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); 243 rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
263 if (rc <= 0) 244 if (rc < 0)
264 goto out_schedule; 245 break;
265 246
266 count = rc; 247 count = rc;
267 while (count-- > 0) 248 while (count-- > 0)
268 rpcrdma_recvcq_process_wc(wcs++, &sched_list); 249 rpcrdma_recvcq_process_wc(pos++);
269 } while (rc == RPCRDMA_POLLSIZE && --budget); 250 } while (rc == ARRAY_SIZE(wcs));
270 rc = 0;
271
272out_schedule:
273 rpcrdma_schedule_tasklet(&sched_list);
274 return rc;
275} 251}
276 252
277/* 253/* Handle provider receive completion upcalls.
278 * Handle receive completions.
279 *
280 * It is reentrant but processes single events in order to maintain
281 * ordering of receives to keep server credits.
282 *
283 * It is the responsibility of the scheduled tasklet to return
284 * recv buffers to the pool. NOTE: this affects synchronization of
285 * connection shutdown. That is, the structures required for
286 * the completion of the reply handler must remain intact until
287 * all memory has been reclaimed.
288 */ 254 */
289static void 255static void
290rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) 256rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
291{ 257{
292 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; 258 do {
293 int rc; 259 rpcrdma_recvcq_poll(cq);
294 260 } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
295 rc = rpcrdma_recvcq_poll(cq, ep); 261 IB_CQ_REPORT_MISSED_EVENTS) > 0);
296 if (rc) {
297 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
298 __func__, rc);
299 return;
300 }
301
302 rc = ib_req_notify_cq(cq,
303 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
304 if (rc == 0)
305 return;
306 if (rc < 0) {
307 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
308 __func__, rc);
309 return;
310 }
311
312 rpcrdma_recvcq_poll(cq, ep);
313} 262}
314 263
315static void 264static void
316rpcrdma_flush_cqs(struct rpcrdma_ep *ep) 265rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
317{ 266{
318 struct ib_wc wc; 267 struct ib_wc wc;
319 LIST_HEAD(sched_list);
320 268
321 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) 269 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
322 rpcrdma_recvcq_process_wc(&wc, &sched_list); 270 rpcrdma_recvcq_process_wc(&wc);
323 if (!list_empty(&sched_list))
324 rpcrdma_schedule_tasklet(&sched_list);
325 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) 271 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
326 rpcrdma_sendcq_process_wc(&wc); 272 rpcrdma_sendcq_process_wc(&wc);
327} 273}
@@ -623,6 +569,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
623 struct ib_device_attr *devattr = &ia->ri_devattr; 569 struct ib_device_attr *devattr = &ia->ri_devattr;
624 struct ib_cq *sendcq, *recvcq; 570 struct ib_cq *sendcq, *recvcq;
625 struct ib_cq_init_attr cq_attr = {}; 571 struct ib_cq_init_attr cq_attr = {};
572 unsigned int max_qp_wr;
626 int rc, err; 573 int rc, err;
627 574
628 if (devattr->max_sge < RPCRDMA_MAX_IOVS) { 575 if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
@@ -631,18 +578,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
631 return -ENOMEM; 578 return -ENOMEM;
632 } 579 }
633 580
581 if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
582 dprintk("RPC: %s: insufficient wqe's available\n",
583 __func__);
584 return -ENOMEM;
585 }
586 max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
587
634 /* check provider's send/recv wr limits */ 588 /* check provider's send/recv wr limits */
635 if (cdata->max_requests > devattr->max_qp_wr) 589 if (cdata->max_requests > max_qp_wr)
636 cdata->max_requests = devattr->max_qp_wr; 590 cdata->max_requests = max_qp_wr;
637 591
638 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 592 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
639 ep->rep_attr.qp_context = ep; 593 ep->rep_attr.qp_context = ep;
640 ep->rep_attr.srq = NULL; 594 ep->rep_attr.srq = NULL;
641 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 595 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
596 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
642 rc = ia->ri_ops->ro_open(ia, ep, cdata); 597 rc = ia->ri_ops->ro_open(ia, ep, cdata);
643 if (rc) 598 if (rc)
644 return rc; 599 return rc;
645 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 600 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
601 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
646 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; 602 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
647 ep->rep_attr.cap.max_recv_sge = 1; 603 ep->rep_attr.cap.max_recv_sge = 1;
648 ep->rep_attr.cap.max_inline_data = 0; 604 ep->rep_attr.cap.max_inline_data = 0;
@@ -670,7 +626,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
670 626
671 cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1; 627 cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
672 sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall, 628 sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
673 rpcrdma_cq_async_error_upcall, ep, &cq_attr); 629 rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
674 if (IS_ERR(sendcq)) { 630 if (IS_ERR(sendcq)) {
675 rc = PTR_ERR(sendcq); 631 rc = PTR_ERR(sendcq);
676 dprintk("RPC: %s: failed to create send CQ: %i\n", 632 dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -687,7 +643,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
687 643
688 cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1; 644 cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
689 recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall, 645 recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
690 rpcrdma_cq_async_error_upcall, ep, &cq_attr); 646 rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
691 if (IS_ERR(recvcq)) { 647 if (IS_ERR(recvcq)) {
692 rc = PTR_ERR(recvcq); 648 rc = PTR_ERR(recvcq);
693 dprintk("RPC: %s: failed to create recv CQ: %i\n", 649 dprintk("RPC: %s: failed to create recv CQ: %i\n",
@@ -886,7 +842,21 @@ retry:
886 } 842 }
887 rc = ep->rep_connected; 843 rc = ep->rep_connected;
888 } else { 844 } else {
845 struct rpcrdma_xprt *r_xprt;
846 unsigned int extras;
847
889 dprintk("RPC: %s: connected\n", __func__); 848 dprintk("RPC: %s: connected\n", __func__);
849
850 r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
851 extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
852
853 if (extras) {
854 rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
855 if (rc)
856 pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
857 __func__, rc);
858 rc = 0;
859 }
890 } 860 }
891 861
892out: 862out:
@@ -923,20 +893,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
923 } 893 }
924} 894}
925 895
926static struct rpcrdma_req * 896struct rpcrdma_req *
927rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 897rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
928{ 898{
899 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
929 struct rpcrdma_req *req; 900 struct rpcrdma_req *req;
930 901
931 req = kzalloc(sizeof(*req), GFP_KERNEL); 902 req = kzalloc(sizeof(*req), GFP_KERNEL);
932 if (req == NULL) 903 if (req == NULL)
933 return ERR_PTR(-ENOMEM); 904 return ERR_PTR(-ENOMEM);
934 905
906 INIT_LIST_HEAD(&req->rl_free);
907 spin_lock(&buffer->rb_reqslock);
908 list_add(&req->rl_all, &buffer->rb_allreqs);
909 spin_unlock(&buffer->rb_reqslock);
935 req->rl_buffer = &r_xprt->rx_buf; 910 req->rl_buffer = &r_xprt->rx_buf;
936 return req; 911 return req;
937} 912}
938 913
939static struct rpcrdma_rep * 914struct rpcrdma_rep *
940rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 915rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
941{ 916{
942 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 917 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
@@ -958,6 +933,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
958 933
959 rep->rr_device = ia->ri_device; 934 rep->rr_device = ia->ri_device;
960 rep->rr_rxprt = r_xprt; 935 rep->rr_rxprt = r_xprt;
936 INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
961 return rep; 937 return rep;
962 938
963out_free: 939out_free:
@@ -971,44 +947,21 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
971{ 947{
972 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 948 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
973 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 949 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
974 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
975 char *p;
976 size_t len;
977 int i, rc; 950 int i, rc;
978 951
979 buf->rb_max_requests = cdata->max_requests; 952 buf->rb_max_requests = r_xprt->rx_data.max_requests;
953 buf->rb_bc_srv_max_requests = 0;
980 spin_lock_init(&buf->rb_lock); 954 spin_lock_init(&buf->rb_lock);
981 955
982 /* Need to allocate:
983 * 1. arrays for send and recv pointers
984 * 2. arrays of struct rpcrdma_req to fill in pointers
985 * 3. array of struct rpcrdma_rep for replies
986 * Send/recv buffers in req/rep need to be registered
987 */
988 len = buf->rb_max_requests *
989 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
990
991 p = kzalloc(len, GFP_KERNEL);
992 if (p == NULL) {
993 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
994 __func__, len);
995 rc = -ENOMEM;
996 goto out;
997 }
998 buf->rb_pool = p; /* for freeing it later */
999
1000 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1001 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1002 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1003 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1004
1005 rc = ia->ri_ops->ro_init(r_xprt); 956 rc = ia->ri_ops->ro_init(r_xprt);
1006 if (rc) 957 if (rc)
1007 goto out; 958 goto out;
1008 959
960 INIT_LIST_HEAD(&buf->rb_send_bufs);
961 INIT_LIST_HEAD(&buf->rb_allreqs);
962 spin_lock_init(&buf->rb_reqslock);
1009 for (i = 0; i < buf->rb_max_requests; i++) { 963 for (i = 0; i < buf->rb_max_requests; i++) {
1010 struct rpcrdma_req *req; 964 struct rpcrdma_req *req;
1011 struct rpcrdma_rep *rep;
1012 965
1013 req = rpcrdma_create_req(r_xprt); 966 req = rpcrdma_create_req(r_xprt);
1014 if (IS_ERR(req)) { 967 if (IS_ERR(req)) {
@@ -1017,7 +970,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1017 rc = PTR_ERR(req); 970 rc = PTR_ERR(req);
1018 goto out; 971 goto out;
1019 } 972 }
1020 buf->rb_send_bufs[i] = req; 973 req->rl_backchannel = false;
974 list_add(&req->rl_free, &buf->rb_send_bufs);
975 }
976
977 INIT_LIST_HEAD(&buf->rb_recv_bufs);
978 for (i = 0; i < buf->rb_max_requests + 2; i++) {
979 struct rpcrdma_rep *rep;
1021 980
1022 rep = rpcrdma_create_rep(r_xprt); 981 rep = rpcrdma_create_rep(r_xprt);
1023 if (IS_ERR(rep)) { 982 if (IS_ERR(rep)) {
@@ -1026,7 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1026 rc = PTR_ERR(rep); 985 rc = PTR_ERR(rep);
1027 goto out; 986 goto out;
1028 } 987 }
1029 buf->rb_recv_bufs[i] = rep; 988 list_add(&rep->rr_list, &buf->rb_recv_bufs);
1030 } 989 }
1031 990
1032 return 0; 991 return 0;
@@ -1035,22 +994,38 @@ out:
1035 return rc; 994 return rc;
1036} 995}
1037 996
997static struct rpcrdma_req *
998rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
999{
1000 struct rpcrdma_req *req;
1001
1002 req = list_first_entry(&buf->rb_send_bufs,
1003 struct rpcrdma_req, rl_free);
1004 list_del(&req->rl_free);
1005 return req;
1006}
1007
1008static struct rpcrdma_rep *
1009rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
1010{
1011 struct rpcrdma_rep *rep;
1012
1013 rep = list_first_entry(&buf->rb_recv_bufs,
1014 struct rpcrdma_rep, rr_list);
1015 list_del(&rep->rr_list);
1016 return rep;
1017}
1018
1038static void 1019static void
1039rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) 1020rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1040{ 1021{
1041 if (!rep)
1042 return;
1043
1044 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); 1022 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1045 kfree(rep); 1023 kfree(rep);
1046} 1024}
1047 1025
1048static void 1026void
1049rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) 1027rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1050{ 1028{
1051 if (!req)
1052 return;
1053
1054 rpcrdma_free_regbuf(ia, req->rl_sendbuf); 1029 rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1055 rpcrdma_free_regbuf(ia, req->rl_rdmabuf); 1030 rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1056 kfree(req); 1031 kfree(req);
@@ -1060,25 +1035,29 @@ void
1060rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1035rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1061{ 1036{
1062 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1037 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1063 int i;
1064 1038
1065 /* clean up in reverse order from create 1039 while (!list_empty(&buf->rb_recv_bufs)) {
1066 * 1. recv mr memory (mr free, then kfree) 1040 struct rpcrdma_rep *rep;
1067 * 2. send mr memory (mr free, then kfree)
1068 * 3. MWs
1069 */
1070 dprintk("RPC: %s: entering\n", __func__);
1071 1041
1072 for (i = 0; i < buf->rb_max_requests; i++) { 1042 rep = rpcrdma_buffer_get_rep_locked(buf);
1073 if (buf->rb_recv_bufs) 1043 rpcrdma_destroy_rep(ia, rep);
1074 rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1075 if (buf->rb_send_bufs)
1076 rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1077 } 1044 }
1078 1045
1079 ia->ri_ops->ro_destroy(buf); 1046 spin_lock(&buf->rb_reqslock);
1047 while (!list_empty(&buf->rb_allreqs)) {
1048 struct rpcrdma_req *req;
1049
1050 req = list_first_entry(&buf->rb_allreqs,
1051 struct rpcrdma_req, rl_all);
1052 list_del(&req->rl_all);
1053
1054 spin_unlock(&buf->rb_reqslock);
1055 rpcrdma_destroy_req(ia, req);
1056 spin_lock(&buf->rb_reqslock);
1057 }
1058 spin_unlock(&buf->rb_reqslock);
1080 1059
1081 kfree(buf->rb_pool); 1060 ia->ri_ops->ro_destroy(buf);
1082} 1061}
1083 1062
1084struct rpcrdma_mw * 1063struct rpcrdma_mw *
@@ -1110,53 +1089,34 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
1110 spin_unlock(&buf->rb_mwlock); 1089 spin_unlock(&buf->rb_mwlock);
1111} 1090}
1112 1091
1113static void
1114rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1115{
1116 buf->rb_send_bufs[--buf->rb_send_index] = req;
1117 req->rl_niovs = 0;
1118 if (req->rl_reply) {
1119 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1120 req->rl_reply = NULL;
1121 }
1122}
1123
1124/* 1092/*
1125 * Get a set of request/reply buffers. 1093 * Get a set of request/reply buffers.
1126 * 1094 *
1127 * Reply buffer (if needed) is attached to send buffer upon return. 1095 * Reply buffer (if available) is attached to send buffer upon return.
1128 * Rule:
1129 * rb_send_index and rb_recv_index MUST always be pointing to the
1130 * *next* available buffer (non-NULL). They are incremented after
1131 * removing buffers, and decremented *before* returning them.
1132 */ 1096 */
1133struct rpcrdma_req * 1097struct rpcrdma_req *
1134rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1098rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1135{ 1099{
1136 struct rpcrdma_req *req; 1100 struct rpcrdma_req *req;
1137 unsigned long flags;
1138
1139 spin_lock_irqsave(&buffers->rb_lock, flags);
1140 1101
1141 if (buffers->rb_send_index == buffers->rb_max_requests) { 1102 spin_lock(&buffers->rb_lock);
1142 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1103 if (list_empty(&buffers->rb_send_bufs))
1143 dprintk("RPC: %s: out of request buffers\n", __func__); 1104 goto out_reqbuf;
1144 return ((struct rpcrdma_req *)NULL); 1105 req = rpcrdma_buffer_get_req_locked(buffers);
1145 } 1106 if (list_empty(&buffers->rb_recv_bufs))
1146 1107 goto out_repbuf;
1147 req = buffers->rb_send_bufs[buffers->rb_send_index]; 1108 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1148 if (buffers->rb_send_index < buffers->rb_recv_index) { 1109 spin_unlock(&buffers->rb_lock);
1149 dprintk("RPC: %s: %d extra receives outstanding (ok)\n", 1110 return req;
1150 __func__,
1151 buffers->rb_recv_index - buffers->rb_send_index);
1152 req->rl_reply = NULL;
1153 } else {
1154 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1155 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1156 }
1157 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1158 1111
1159 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1112out_reqbuf:
1113 spin_unlock(&buffers->rb_lock);
1114 pr_warn("RPC: %s: out of request buffers\n", __func__);
1115 return NULL;
1116out_repbuf:
1117 spin_unlock(&buffers->rb_lock);
1118 pr_warn("RPC: %s: out of reply buffers\n", __func__);
1119 req->rl_reply = NULL;
1160 return req; 1120 return req;
1161} 1121}
1162 1122
@@ -1168,30 +1128,31 @@ void
1168rpcrdma_buffer_put(struct rpcrdma_req *req) 1128rpcrdma_buffer_put(struct rpcrdma_req *req)
1169{ 1129{
1170 struct rpcrdma_buffer *buffers = req->rl_buffer; 1130 struct rpcrdma_buffer *buffers = req->rl_buffer;
1171 unsigned long flags; 1131 struct rpcrdma_rep *rep = req->rl_reply;
1172 1132
1173 spin_lock_irqsave(&buffers->rb_lock, flags); 1133 req->rl_niovs = 0;
1174 rpcrdma_buffer_put_sendbuf(req, buffers); 1134 req->rl_reply = NULL;
1175 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1135
1136 spin_lock(&buffers->rb_lock);
1137 list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
1138 if (rep)
1139 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1140 spin_unlock(&buffers->rb_lock);
1176} 1141}
1177 1142
1178/* 1143/*
1179 * Recover reply buffers from pool. 1144 * Recover reply buffers from pool.
1180 * This happens when recovering from error conditions. 1145 * This happens when recovering from disconnect.
1181 * Post-increment counter/array index.
1182 */ 1146 */
1183void 1147void
1184rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1148rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1185{ 1149{
1186 struct rpcrdma_buffer *buffers = req->rl_buffer; 1150 struct rpcrdma_buffer *buffers = req->rl_buffer;
1187 unsigned long flags;
1188 1151
1189 spin_lock_irqsave(&buffers->rb_lock, flags); 1152 spin_lock(&buffers->rb_lock);
1190 if (buffers->rb_recv_index < buffers->rb_max_requests) { 1153 if (!list_empty(&buffers->rb_recv_bufs))
1191 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; 1154 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1192 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; 1155 spin_unlock(&buffers->rb_lock);
1193 }
1194 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1195} 1156}
1196 1157
1197/* 1158/*
@@ -1202,11 +1163,10 @@ void
1202rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1163rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1203{ 1164{
1204 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1165 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1205 unsigned long flags;
1206 1166
1207 spin_lock_irqsave(&buffers->rb_lock, flags); 1167 spin_lock(&buffers->rb_lock);
1208 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep; 1168 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1209 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1169 spin_unlock(&buffers->rb_lock);
1210} 1170}
1211 1171
1212/* 1172/*
@@ -1363,6 +1323,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1363 return rc; 1323 return rc;
1364} 1324}
1365 1325
1326/**
1327 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
1328 * @r_xprt: transport associated with these backchannel resources
1329 * @min_reqs: minimum number of incoming requests expected
1330 *
1331 * Returns zero if all requested buffers were posted, or a negative errno.
1332 */
1333int
1334rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
1335{
1336 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
1337 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1338 struct rpcrdma_ep *ep = &r_xprt->rx_ep;
1339 struct rpcrdma_rep *rep;
1340 unsigned long flags;
1341 int rc;
1342
1343 while (count--) {
1344 spin_lock_irqsave(&buffers->rb_lock, flags);
1345 if (list_empty(&buffers->rb_recv_bufs))
1346 goto out_reqbuf;
1347 rep = rpcrdma_buffer_get_rep_locked(buffers);
1348 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1349
1350 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1351 if (rc)
1352 goto out_rc;
1353 }
1354
1355 return 0;
1356
1357out_reqbuf:
1358 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1359 pr_warn("%s: no extra receive buffers\n", __func__);
1360 return -ENOMEM;
1361
1362out_rc:
1363 rpcrdma_recv_buffer_put(rep);
1364 return rc;
1365}
1366
1366/* How many chunk list items fit within our inline buffers? 1367/* How many chunk list items fit within our inline buffers?
1367 */ 1368 */
1368unsigned int 1369unsigned int
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c82abf44e39d..ac7f8d4f632a 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -77,9 +77,6 @@ struct rpcrdma_ia {
77 * RDMA Endpoint -- one per transport instance 77 * RDMA Endpoint -- one per transport instance
78 */ 78 */
79 79
80#define RPCRDMA_WC_BUDGET (128)
81#define RPCRDMA_POLLSIZE (16)
82
83struct rpcrdma_ep { 80struct rpcrdma_ep {
84 atomic_t rep_cqcount; 81 atomic_t rep_cqcount;
85 int rep_cqinit; 82 int rep_cqinit;
@@ -89,8 +86,6 @@ struct rpcrdma_ep {
89 struct rdma_conn_param rep_remote_cma; 86 struct rdma_conn_param rep_remote_cma;
90 struct sockaddr_storage rep_remote_addr; 87 struct sockaddr_storage rep_remote_addr;
91 struct delayed_work rep_connect_worker; 88 struct delayed_work rep_connect_worker;
92 struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
93 struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
94}; 89};
95 90
96/* 91/*
@@ -106,6 +101,16 @@ struct rpcrdma_ep {
106 */ 101 */
107#define RPCRDMA_IGNORE_COMPLETION (0ULL) 102#define RPCRDMA_IGNORE_COMPLETION (0ULL)
108 103
104/* Pre-allocate extra Work Requests for handling backward receives
105 * and sends. This is a fixed value because the Work Queues are
106 * allocated when the forward channel is set up.
107 */
108#if defined(CONFIG_SUNRPC_BACKCHANNEL)
109#define RPCRDMA_BACKWARD_WRS (8)
110#else
111#define RPCRDMA_BACKWARD_WRS (0)
112#endif
113
109/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV 114/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
110 * 115 *
111 * The below structure appears at the front of a large region of kmalloc'd 116 * The below structure appears at the front of a large region of kmalloc'd
@@ -169,10 +174,13 @@ struct rpcrdma_rep {
169 unsigned int rr_len; 174 unsigned int rr_len;
170 struct ib_device *rr_device; 175 struct ib_device *rr_device;
171 struct rpcrdma_xprt *rr_rxprt; 176 struct rpcrdma_xprt *rr_rxprt;
177 struct work_struct rr_work;
172 struct list_head rr_list; 178 struct list_head rr_list;
173 struct rpcrdma_regbuf *rr_rdmabuf; 179 struct rpcrdma_regbuf *rr_rdmabuf;
174}; 180};
175 181
182#define RPCRDMA_BAD_LEN (~0U)
183
176/* 184/*
177 * struct rpcrdma_mw - external memory region metadata 185 * struct rpcrdma_mw - external memory region metadata
178 * 186 *
@@ -256,6 +264,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
256#define RPCRDMA_MAX_IOVS (2) 264#define RPCRDMA_MAX_IOVS (2)
257 265
258struct rpcrdma_req { 266struct rpcrdma_req {
267 struct list_head rl_free;
259 unsigned int rl_niovs; 268 unsigned int rl_niovs;
260 unsigned int rl_nchunks; 269 unsigned int rl_nchunks;
261 unsigned int rl_connect_cookie; 270 unsigned int rl_connect_cookie;
@@ -265,6 +274,9 @@ struct rpcrdma_req {
265 struct rpcrdma_regbuf *rl_rdmabuf; 274 struct rpcrdma_regbuf *rl_rdmabuf;
266 struct rpcrdma_regbuf *rl_sendbuf; 275 struct rpcrdma_regbuf *rl_sendbuf;
267 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; 276 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
277
278 struct list_head rl_all;
279 bool rl_backchannel;
268}; 280};
269 281
270static inline struct rpcrdma_req * 282static inline struct rpcrdma_req *
@@ -289,12 +301,14 @@ struct rpcrdma_buffer {
289 struct list_head rb_all; 301 struct list_head rb_all;
290 char *rb_pool; 302 char *rb_pool;
291 303
292 spinlock_t rb_lock; /* protect buf arrays */ 304 spinlock_t rb_lock; /* protect buf lists */
305 struct list_head rb_send_bufs;
306 struct list_head rb_recv_bufs;
293 u32 rb_max_requests; 307 u32 rb_max_requests;
294 int rb_send_index; 308
295 int rb_recv_index; 309 u32 rb_bc_srv_max_requests;
296 struct rpcrdma_req **rb_send_bufs; 310 spinlock_t rb_reqslock; /* protect rb_allreqs */
297 struct rpcrdma_rep **rb_recv_bufs; 311 struct list_head rb_allreqs;
298}; 312};
299#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) 313#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
300 314
@@ -340,6 +354,7 @@ struct rpcrdma_stats {
340 unsigned long failed_marshal_count; 354 unsigned long failed_marshal_count;
341 unsigned long bad_reply_count; 355 unsigned long bad_reply_count;
342 unsigned long nomsg_call_count; 356 unsigned long nomsg_call_count;
357 unsigned long bcall_count;
343}; 358};
344 359
345/* 360/*
@@ -415,6 +430,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
415/* 430/*
416 * Buffer calls - xprtrdma/verbs.c 431 * Buffer calls - xprtrdma/verbs.c
417 */ 432 */
433struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
434struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
435void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
418int rpcrdma_buffer_create(struct rpcrdma_xprt *); 436int rpcrdma_buffer_create(struct rpcrdma_xprt *);
419void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); 437void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
420 438
@@ -431,10 +449,14 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
431 struct rpcrdma_regbuf *); 449 struct rpcrdma_regbuf *);
432 450
433unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); 451unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
452int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
434 453
435int frwr_alloc_recovery_wq(void); 454int frwr_alloc_recovery_wq(void);
436void frwr_destroy_recovery_wq(void); 455void frwr_destroy_recovery_wq(void);
437 456
457int rpcrdma_alloc_wq(void);
458void rpcrdma_destroy_wq(void);
459
438/* 460/*
439 * Wrappers for chunk registration, shared by read/write chunk code. 461 * Wrappers for chunk registration, shared by read/write chunk code.
440 */ 462 */
@@ -495,6 +517,18 @@ int rpcrdma_marshal_req(struct rpc_rqst *);
495int xprt_rdma_init(void); 517int xprt_rdma_init(void);
496void xprt_rdma_cleanup(void); 518void xprt_rdma_cleanup(void);
497 519
520/* Backchannel calls - xprtrdma/backchannel.c
521 */
522#if defined(CONFIG_SUNRPC_BACKCHANNEL)
523int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
524int xprt_rdma_bc_up(struct svc_serv *, struct net *);
525int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
526void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
527int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
528void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
529void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
530#endif /* CONFIG_SUNRPC_BACKCHANNEL */
531
498/* Temporary NFS request map cache. Created in svc_rdma.c */ 532/* Temporary NFS request map cache. Created in svc_rdma.c */
499extern struct kmem_cache *svc_rdma_map_cachep; 533extern struct kmem_cache *svc_rdma_map_cachep;
500/* WR context cache. Created in svc_rdma.c */ 534/* WR context cache. Created in svc_rdma.c */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 1a85e0ed0b48..1d1a70498910 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -360,8 +360,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
360 int flags = XS_SENDMSG_FLAGS; 360 int flags = XS_SENDMSG_FLAGS;
361 361
362 remainder -= len; 362 remainder -= len;
363 if (remainder != 0 || more) 363 if (more)
364 flags |= MSG_MORE; 364 flags |= MSG_MORE;
365 if (remainder != 0)
366 flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE;
365 err = do_sendpage(sock, *ppage, base, len, flags); 367 err = do_sendpage(sock, *ppage, base, len, flags);
366 if (remainder == 0 || err != len) 368 if (remainder == 0 || err != len)
367 break; 369 break;
@@ -823,6 +825,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
823 825
824 kernel_sock_shutdown(sock, SHUT_RDWR); 826 kernel_sock_shutdown(sock, SHUT_RDWR);
825 827
828 mutex_lock(&transport->recv_mutex);
826 write_lock_bh(&sk->sk_callback_lock); 829 write_lock_bh(&sk->sk_callback_lock);
827 transport->inet = NULL; 830 transport->inet = NULL;
828 transport->sock = NULL; 831 transport->sock = NULL;
@@ -833,6 +836,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
833 xprt_clear_connected(xprt); 836 xprt_clear_connected(xprt);
834 write_unlock_bh(&sk->sk_callback_lock); 837 write_unlock_bh(&sk->sk_callback_lock);
835 xs_sock_reset_connection_flags(xprt); 838 xs_sock_reset_connection_flags(xprt);
839 mutex_unlock(&transport->recv_mutex);
836 840
837 trace_rpc_socket_close(xprt, sock); 841 trace_rpc_socket_close(xprt, sock);
838 sock_release(sock); 842 sock_release(sock);
@@ -886,6 +890,7 @@ static void xs_destroy(struct rpc_xprt *xprt)
886 890
887 cancel_delayed_work_sync(&transport->connect_worker); 891 cancel_delayed_work_sync(&transport->connect_worker);
888 xs_close(xprt); 892 xs_close(xprt);
893 cancel_work_sync(&transport->recv_worker);
889 xs_xprt_free(xprt); 894 xs_xprt_free(xprt);
890 module_put(THIS_MODULE); 895 module_put(THIS_MODULE);
891} 896}
@@ -906,44 +911,36 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
906} 911}
907 912
908/** 913/**
909 * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets 914 * xs_local_data_read_skb
910 * @sk: socket with data to read 915 * @xprt: transport
916 * @sk: socket
917 * @skb: skbuff
911 * 918 *
912 * Currently this assumes we can read the whole reply in a single gulp. 919 * Currently this assumes we can read the whole reply in a single gulp.
913 */ 920 */
914static void xs_local_data_ready(struct sock *sk) 921static void xs_local_data_read_skb(struct rpc_xprt *xprt,
922 struct sock *sk,
923 struct sk_buff *skb)
915{ 924{
916 struct rpc_task *task; 925 struct rpc_task *task;
917 struct rpc_xprt *xprt;
918 struct rpc_rqst *rovr; 926 struct rpc_rqst *rovr;
919 struct sk_buff *skb; 927 int repsize, copied;
920 int err, repsize, copied;
921 u32 _xid; 928 u32 _xid;
922 __be32 *xp; 929 __be32 *xp;
923 930
924 read_lock_bh(&sk->sk_callback_lock);
925 dprintk("RPC: %s...\n", __func__);
926 xprt = xprt_from_sock(sk);
927 if (xprt == NULL)
928 goto out;
929
930 skb = skb_recv_datagram(sk, 0, 1, &err);
931 if (skb == NULL)
932 goto out;
933
934 repsize = skb->len - sizeof(rpc_fraghdr); 931 repsize = skb->len - sizeof(rpc_fraghdr);
935 if (repsize < 4) { 932 if (repsize < 4) {
936 dprintk("RPC: impossible RPC reply size %d\n", repsize); 933 dprintk("RPC: impossible RPC reply size %d\n", repsize);
937 goto dropit; 934 return;
938 } 935 }
939 936
940 /* Copy the XID from the skb... */ 937 /* Copy the XID from the skb... */
941 xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); 938 xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
942 if (xp == NULL) 939 if (xp == NULL)
943 goto dropit; 940 return;
944 941
945 /* Look up and lock the request corresponding to the given XID */ 942 /* Look up and lock the request corresponding to the given XID */
946 spin_lock(&xprt->transport_lock); 943 spin_lock_bh(&xprt->transport_lock);
947 rovr = xprt_lookup_rqst(xprt, *xp); 944 rovr = xprt_lookup_rqst(xprt, *xp);
948 if (!rovr) 945 if (!rovr)
949 goto out_unlock; 946 goto out_unlock;
@@ -961,50 +958,68 @@ static void xs_local_data_ready(struct sock *sk)
961 xprt_complete_rqst(task, copied); 958 xprt_complete_rqst(task, copied);
962 959
963 out_unlock: 960 out_unlock:
964 spin_unlock(&xprt->transport_lock); 961 spin_unlock_bh(&xprt->transport_lock);
965 dropit: 962}
966 skb_free_datagram(sk, skb); 963
967 out: 964static void xs_local_data_receive(struct sock_xprt *transport)
968 read_unlock_bh(&sk->sk_callback_lock); 965{
966 struct sk_buff *skb;
967 struct sock *sk;
968 int err;
969
970 mutex_lock(&transport->recv_mutex);
971 sk = transport->inet;
972 if (sk == NULL)
973 goto out;
974 for (;;) {
975 skb = skb_recv_datagram(sk, 0, 1, &err);
976 if (skb == NULL)
977 break;
978 xs_local_data_read_skb(&transport->xprt, sk, skb);
979 skb_free_datagram(sk, skb);
980 }
981out:
982 mutex_unlock(&transport->recv_mutex);
983}
984
985static void xs_local_data_receive_workfn(struct work_struct *work)
986{
987 struct sock_xprt *transport =
988 container_of(work, struct sock_xprt, recv_worker);
989 xs_local_data_receive(transport);
969} 990}
970 991
971/** 992/**
972 * xs_udp_data_ready - "data ready" callback for UDP sockets 993 * xs_udp_data_read_skb - receive callback for UDP sockets
973 * @sk: socket with data to read 994 * @xprt: transport
995 * @sk: socket
996 * @skb: skbuff
974 * 997 *
975 */ 998 */
976static void xs_udp_data_ready(struct sock *sk) 999static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
1000 struct sock *sk,
1001 struct sk_buff *skb)
977{ 1002{
978 struct rpc_task *task; 1003 struct rpc_task *task;
979 struct rpc_xprt *xprt;
980 struct rpc_rqst *rovr; 1004 struct rpc_rqst *rovr;
981 struct sk_buff *skb; 1005 int repsize, copied;
982 int err, repsize, copied;
983 u32 _xid; 1006 u32 _xid;
984 __be32 *xp; 1007 __be32 *xp;
985 1008
986 read_lock_bh(&sk->sk_callback_lock);
987 dprintk("RPC: xs_udp_data_ready...\n");
988 if (!(xprt = xprt_from_sock(sk)))
989 goto out;
990
991 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
992 goto out;
993
994 repsize = skb->len - sizeof(struct udphdr); 1009 repsize = skb->len - sizeof(struct udphdr);
995 if (repsize < 4) { 1010 if (repsize < 4) {
996 dprintk("RPC: impossible RPC reply size %d!\n", repsize); 1011 dprintk("RPC: impossible RPC reply size %d!\n", repsize);
997 goto dropit; 1012 return;
998 } 1013 }
999 1014
1000 /* Copy the XID from the skb... */ 1015 /* Copy the XID from the skb... */
1001 xp = skb_header_pointer(skb, sizeof(struct udphdr), 1016 xp = skb_header_pointer(skb, sizeof(struct udphdr),
1002 sizeof(_xid), &_xid); 1017 sizeof(_xid), &_xid);
1003 if (xp == NULL) 1018 if (xp == NULL)
1004 goto dropit; 1019 return;
1005 1020
1006 /* Look up and lock the request corresponding to the given XID */ 1021 /* Look up and lock the request corresponding to the given XID */
1007 spin_lock(&xprt->transport_lock); 1022 spin_lock_bh(&xprt->transport_lock);
1008 rovr = xprt_lookup_rqst(xprt, *xp); 1023 rovr = xprt_lookup_rqst(xprt, *xp);
1009 if (!rovr) 1024 if (!rovr)
1010 goto out_unlock; 1025 goto out_unlock;
@@ -1025,10 +1040,54 @@ static void xs_udp_data_ready(struct sock *sk)
1025 xprt_complete_rqst(task, copied); 1040 xprt_complete_rqst(task, copied);
1026 1041
1027 out_unlock: 1042 out_unlock:
1028 spin_unlock(&xprt->transport_lock); 1043 spin_unlock_bh(&xprt->transport_lock);
1029 dropit: 1044}
1030 skb_free_datagram(sk, skb); 1045
1031 out: 1046static void xs_udp_data_receive(struct sock_xprt *transport)
1047{
1048 struct sk_buff *skb;
1049 struct sock *sk;
1050 int err;
1051
1052 mutex_lock(&transport->recv_mutex);
1053 sk = transport->inet;
1054 if (sk == NULL)
1055 goto out;
1056 for (;;) {
1057 skb = skb_recv_datagram(sk, 0, 1, &err);
1058 if (skb == NULL)
1059 break;
1060 xs_udp_data_read_skb(&transport->xprt, sk, skb);
1061 skb_free_datagram(sk, skb);
1062 }
1063out:
1064 mutex_unlock(&transport->recv_mutex);
1065}
1066
1067static void xs_udp_data_receive_workfn(struct work_struct *work)
1068{
1069 struct sock_xprt *transport =
1070 container_of(work, struct sock_xprt, recv_worker);
1071 xs_udp_data_receive(transport);
1072}
1073
1074/**
1075 * xs_data_ready - "data ready" callback for UDP sockets
1076 * @sk: socket with data to read
1077 *
1078 */
1079static void xs_data_ready(struct sock *sk)
1080{
1081 struct rpc_xprt *xprt;
1082
1083 read_lock_bh(&sk->sk_callback_lock);
1084 dprintk("RPC: xs_data_ready...\n");
1085 xprt = xprt_from_sock(sk);
1086 if (xprt != NULL) {
1087 struct sock_xprt *transport = container_of(xprt,
1088 struct sock_xprt, xprt);
1089 queue_work(rpciod_workqueue, &transport->recv_worker);
1090 }
1032 read_unlock_bh(&sk->sk_callback_lock); 1091 read_unlock_bh(&sk->sk_callback_lock);
1033} 1092}
1034 1093
@@ -1243,12 +1302,12 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1243 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); 1302 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid));
1244 1303
1245 /* Find and lock the request corresponding to this xid */ 1304 /* Find and lock the request corresponding to this xid */
1246 spin_lock(&xprt->transport_lock); 1305 spin_lock_bh(&xprt->transport_lock);
1247 req = xprt_lookup_rqst(xprt, transport->tcp_xid); 1306 req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1248 if (!req) { 1307 if (!req) {
1249 dprintk("RPC: XID %08x request not found!\n", 1308 dprintk("RPC: XID %08x request not found!\n",
1250 ntohl(transport->tcp_xid)); 1309 ntohl(transport->tcp_xid));
1251 spin_unlock(&xprt->transport_lock); 1310 spin_unlock_bh(&xprt->transport_lock);
1252 return -1; 1311 return -1;
1253 } 1312 }
1254 1313
@@ -1257,7 +1316,7 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1257 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) 1316 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1258 xprt_complete_rqst(req->rq_task, transport->tcp_copied); 1317 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1259 1318
1260 spin_unlock(&xprt->transport_lock); 1319 spin_unlock_bh(&xprt->transport_lock);
1261 return 0; 1320 return 0;
1262} 1321}
1263 1322
@@ -1277,10 +1336,10 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
1277 struct rpc_rqst *req; 1336 struct rpc_rqst *req;
1278 1337
1279 /* Look up and lock the request corresponding to the given XID */ 1338 /* Look up and lock the request corresponding to the given XID */
1280 spin_lock(&xprt->transport_lock); 1339 spin_lock_bh(&xprt->transport_lock);
1281 req = xprt_lookup_bc_request(xprt, transport->tcp_xid); 1340 req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
1282 if (req == NULL) { 1341 if (req == NULL) {
1283 spin_unlock(&xprt->transport_lock); 1342 spin_unlock_bh(&xprt->transport_lock);
1284 printk(KERN_WARNING "Callback slot table overflowed\n"); 1343 printk(KERN_WARNING "Callback slot table overflowed\n");
1285 xprt_force_disconnect(xprt); 1344 xprt_force_disconnect(xprt);
1286 return -1; 1345 return -1;
@@ -1291,7 +1350,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
1291 1350
1292 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) 1351 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1293 xprt_complete_bc_request(req, transport->tcp_copied); 1352 xprt_complete_bc_request(req, transport->tcp_copied);
1294 spin_unlock(&xprt->transport_lock); 1353 spin_unlock_bh(&xprt->transport_lock);
1295 1354
1296 return 0; 1355 return 0;
1297} 1356}
@@ -1306,6 +1365,17 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1306 xs_tcp_read_reply(xprt, desc) : 1365 xs_tcp_read_reply(xprt, desc) :
1307 xs_tcp_read_callback(xprt, desc); 1366 xs_tcp_read_callback(xprt, desc);
1308} 1367}
1368
1369static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
1370{
1371 int ret;
1372
1373 ret = svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
1374 SVC_SOCK_ANONYMOUS);
1375 if (ret < 0)
1376 return ret;
1377 return 0;
1378}
1309#else 1379#else
1310static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, 1380static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1311 struct xdr_skb_reader *desc) 1381 struct xdr_skb_reader *desc)
@@ -1391,6 +1461,44 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
1391 return len - desc.count; 1461 return len - desc.count;
1392} 1462}
1393 1463
1464static void xs_tcp_data_receive(struct sock_xprt *transport)
1465{
1466 struct rpc_xprt *xprt = &transport->xprt;
1467 struct sock *sk;
1468 read_descriptor_t rd_desc = {
1469 .count = 2*1024*1024,
1470 .arg.data = xprt,
1471 };
1472 unsigned long total = 0;
1473 int read = 0;
1474
1475 mutex_lock(&transport->recv_mutex);
1476 sk = transport->inet;
1477 if (sk == NULL)
1478 goto out;
1479
1480 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1481 for (;;) {
1482 lock_sock(sk);
1483 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1484 release_sock(sk);
1485 if (read <= 0)
1486 break;
1487 total += read;
1488 rd_desc.count = 65536;
1489 }
1490out:
1491 mutex_unlock(&transport->recv_mutex);
1492 trace_xs_tcp_data_ready(xprt, read, total);
1493}
1494
1495static void xs_tcp_data_receive_workfn(struct work_struct *work)
1496{
1497 struct sock_xprt *transport =
1498 container_of(work, struct sock_xprt, recv_worker);
1499 xs_tcp_data_receive(transport);
1500}
1501
1394/** 1502/**
1395 * xs_tcp_data_ready - "data ready" callback for TCP sockets 1503 * xs_tcp_data_ready - "data ready" callback for TCP sockets
1396 * @sk: socket with data to read 1504 * @sk: socket with data to read
@@ -1398,34 +1506,24 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
1398 */ 1506 */
1399static void xs_tcp_data_ready(struct sock *sk) 1507static void xs_tcp_data_ready(struct sock *sk)
1400{ 1508{
1509 struct sock_xprt *transport;
1401 struct rpc_xprt *xprt; 1510 struct rpc_xprt *xprt;
1402 read_descriptor_t rd_desc;
1403 int read;
1404 unsigned long total = 0;
1405 1511
1406 dprintk("RPC: xs_tcp_data_ready...\n"); 1512 dprintk("RPC: xs_tcp_data_ready...\n");
1407 1513
1408 read_lock_bh(&sk->sk_callback_lock); 1514 read_lock_bh(&sk->sk_callback_lock);
1409 if (!(xprt = xprt_from_sock(sk))) { 1515 if (!(xprt = xprt_from_sock(sk)))
1410 read = 0;
1411 goto out; 1516 goto out;
1412 } 1517 transport = container_of(xprt, struct sock_xprt, xprt);
1518
1413 /* Any data means we had a useful conversation, so 1519 /* Any data means we had a useful conversation, so
1414 * the we don't need to delay the next reconnect 1520 * the we don't need to delay the next reconnect
1415 */ 1521 */
1416 if (xprt->reestablish_timeout) 1522 if (xprt->reestablish_timeout)
1417 xprt->reestablish_timeout = 0; 1523 xprt->reestablish_timeout = 0;
1524 queue_work(rpciod_workqueue, &transport->recv_worker);
1418 1525
1419 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1420 rd_desc.arg.data = xprt;
1421 do {
1422 rd_desc.count = 65536;
1423 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1424 if (read > 0)
1425 total += read;
1426 } while (read > 0);
1427out: 1526out:
1428 trace_xs_tcp_data_ready(xprt, read, total);
1429 read_unlock_bh(&sk->sk_callback_lock); 1527 read_unlock_bh(&sk->sk_callback_lock);
1430} 1528}
1431 1529
@@ -1873,7 +1971,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1873 xs_save_old_callbacks(transport, sk); 1971 xs_save_old_callbacks(transport, sk);
1874 1972
1875 sk->sk_user_data = xprt; 1973 sk->sk_user_data = xprt;
1876 sk->sk_data_ready = xs_local_data_ready; 1974 sk->sk_data_ready = xs_data_ready;
1877 sk->sk_write_space = xs_udp_write_space; 1975 sk->sk_write_space = xs_udp_write_space;
1878 sk->sk_error_report = xs_error_report; 1976 sk->sk_error_report = xs_error_report;
1879 sk->sk_allocation = GFP_NOIO; 1977 sk->sk_allocation = GFP_NOIO;
@@ -2059,7 +2157,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2059 xs_save_old_callbacks(transport, sk); 2157 xs_save_old_callbacks(transport, sk);
2060 2158
2061 sk->sk_user_data = xprt; 2159 sk->sk_user_data = xprt;
2062 sk->sk_data_ready = xs_udp_data_ready; 2160 sk->sk_data_ready = xs_data_ready;
2063 sk->sk_write_space = xs_udp_write_space; 2161 sk->sk_write_space = xs_udp_write_space;
2064 sk->sk_allocation = GFP_NOIO; 2162 sk->sk_allocation = GFP_NOIO;
2065 2163
@@ -2472,7 +2570,7 @@ static int bc_send_request(struct rpc_task *task)
2472{ 2570{
2473 struct rpc_rqst *req = task->tk_rqstp; 2571 struct rpc_rqst *req = task->tk_rqstp;
2474 struct svc_xprt *xprt; 2572 struct svc_xprt *xprt;
2475 u32 len; 2573 int len;
2476 2574
2477 dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid)); 2575 dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
2478 /* 2576 /*
@@ -2580,6 +2678,12 @@ static struct rpc_xprt_ops xs_tcp_ops = {
2580 .enable_swap = xs_enable_swap, 2678 .enable_swap = xs_enable_swap,
2581 .disable_swap = xs_disable_swap, 2679 .disable_swap = xs_disable_swap,
2582 .inject_disconnect = xs_inject_disconnect, 2680 .inject_disconnect = xs_inject_disconnect,
2681#ifdef CONFIG_SUNRPC_BACKCHANNEL
2682 .bc_setup = xprt_setup_bc,
2683 .bc_up = xs_tcp_bc_up,
2684 .bc_free_rqst = xprt_free_bc_rqst,
2685 .bc_destroy = xprt_destroy_bc,
2686#endif
2583}; 2687};
2584 2688
2585/* 2689/*
@@ -2650,6 +2754,7 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2650 } 2754 }
2651 2755
2652 new = container_of(xprt, struct sock_xprt, xprt); 2756 new = container_of(xprt, struct sock_xprt, xprt);
2757 mutex_init(&new->recv_mutex);
2653 memcpy(&xprt->addr, args->dstaddr, args->addrlen); 2758 memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2654 xprt->addrlen = args->addrlen; 2759 xprt->addrlen = args->addrlen;
2655 if (args->srcaddr) 2760 if (args->srcaddr)
@@ -2703,6 +2808,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
2703 xprt->ops = &xs_local_ops; 2808 xprt->ops = &xs_local_ops;
2704 xprt->timeout = &xs_local_default_timeout; 2809 xprt->timeout = &xs_local_default_timeout;
2705 2810
2811 INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn);
2706 INIT_DELAYED_WORK(&transport->connect_worker, 2812 INIT_DELAYED_WORK(&transport->connect_worker,
2707 xs_dummy_setup_socket); 2813 xs_dummy_setup_socket);
2708 2814
@@ -2774,21 +2880,20 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2774 2880
2775 xprt->timeout = &xs_udp_default_timeout; 2881 xprt->timeout = &xs_udp_default_timeout;
2776 2882
2883 INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
2884 INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
2885
2777 switch (addr->sa_family) { 2886 switch (addr->sa_family) {
2778 case AF_INET: 2887 case AF_INET:
2779 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2888 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2780 xprt_set_bound(xprt); 2889 xprt_set_bound(xprt);
2781 2890
2782 INIT_DELAYED_WORK(&transport->connect_worker,
2783 xs_udp_setup_socket);
2784 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); 2891 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2785 break; 2892 break;
2786 case AF_INET6: 2893 case AF_INET6:
2787 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2894 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2788 xprt_set_bound(xprt); 2895 xprt_set_bound(xprt);
2789 2896
2790 INIT_DELAYED_WORK(&transport->connect_worker,
2791 xs_udp_setup_socket);
2792 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); 2897 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2793 break; 2898 break;
2794 default: 2899 default:
@@ -2853,21 +2958,20 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2853 xprt->ops = &xs_tcp_ops; 2958 xprt->ops = &xs_tcp_ops;
2854 xprt->timeout = &xs_tcp_default_timeout; 2959 xprt->timeout = &xs_tcp_default_timeout;
2855 2960
2961 INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn);
2962 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
2963
2856 switch (addr->sa_family) { 2964 switch (addr->sa_family) {
2857 case AF_INET: 2965 case AF_INET:
2858 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2966 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2859 xprt_set_bound(xprt); 2967 xprt_set_bound(xprt);
2860 2968
2861 INIT_DELAYED_WORK(&transport->connect_worker,
2862 xs_tcp_setup_socket);
2863 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); 2969 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2864 break; 2970 break;
2865 case AF_INET6: 2971 case AF_INET6:
2866 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2972 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2867 xprt_set_bound(xprt); 2973 xprt_set_bound(xprt);
2868 2974
2869 INIT_DELAYED_WORK(&transport->connect_worker,
2870 xs_tcp_setup_socket);
2871 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); 2975 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2872 break; 2976 break;
2873 default: 2977 default: