aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorTrond Myklebust <trond.myklebust@primarydata.com>2015-11-02 17:09:24 -0500
committerTrond Myklebust <trond.myklebust@primarydata.com>2015-11-02 17:09:24 -0500
commitac3c860c758a864adb1dd5c7d68dadebe6c86f64 (patch)
tree62c2bd4214874d583f6ef5eb898604e4a1b3d44c /net
parent260074cd8413489903d4484058e61649d6e08580 (diff)
parent76566773a1f1c2295ed901b6f1241cfe10d99029 (diff)
Merge tag 'nfs-rdma-4.4-2' of git://git.linux-nfs.org/projects/anna/nfs-rdma
NFS: NFSoRDMA Client Side Changes In addition to a variety of bugfixes, these patches are mostly geared at enabling both swap and backchannel support to the NFS over RDMA client. Signed-off-by: Anna Schumake <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net')
-rw-r--r--net/ceph/osd_client.c13
-rw-r--r--net/sunrpc/backchannel_rqst.c24
-rw-r--r--net/sunrpc/svc.c5
-rw-r--r--net/sunrpc/xprtrdma/Makefile1
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c394
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c7
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c148
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c6
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c8
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c58
-rw-r--r--net/sunrpc/xprtrdma/transport.c18
-rw-r--r--net/sunrpc/xprtrdma/verbs.c487
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h54
-rw-r--r--net/sunrpc/xprtsock.c17
14 files changed, 923 insertions, 317 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 80b94e37c94a..f79ccac6699f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -285,6 +285,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
285 switch (op->op) { 285 switch (op->op) {
286 case CEPH_OSD_OP_READ: 286 case CEPH_OSD_OP_READ:
287 case CEPH_OSD_OP_WRITE: 287 case CEPH_OSD_OP_WRITE:
288 case CEPH_OSD_OP_WRITEFULL:
288 ceph_osd_data_release(&op->extent.osd_data); 289 ceph_osd_data_release(&op->extent.osd_data);
289 break; 290 break;
290 case CEPH_OSD_OP_CALL: 291 case CEPH_OSD_OP_CALL:
@@ -485,13 +486,14 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
485 size_t payload_len = 0; 486 size_t payload_len = 0;
486 487
487 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 488 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
488 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE); 489 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
490 opcode != CEPH_OSD_OP_TRUNCATE);
489 491
490 op->extent.offset = offset; 492 op->extent.offset = offset;
491 op->extent.length = length; 493 op->extent.length = length;
492 op->extent.truncate_size = truncate_size; 494 op->extent.truncate_size = truncate_size;
493 op->extent.truncate_seq = truncate_seq; 495 op->extent.truncate_seq = truncate_seq;
494 if (opcode == CEPH_OSD_OP_WRITE) 496 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
495 payload_len += length; 497 payload_len += length;
496 498
497 op->payload_len = payload_len; 499 op->payload_len = payload_len;
@@ -670,9 +672,11 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
670 break; 672 break;
671 case CEPH_OSD_OP_READ: 673 case CEPH_OSD_OP_READ:
672 case CEPH_OSD_OP_WRITE: 674 case CEPH_OSD_OP_WRITE:
675 case CEPH_OSD_OP_WRITEFULL:
673 case CEPH_OSD_OP_ZERO: 676 case CEPH_OSD_OP_ZERO:
674 case CEPH_OSD_OP_TRUNCATE: 677 case CEPH_OSD_OP_TRUNCATE:
675 if (src->op == CEPH_OSD_OP_WRITE) 678 if (src->op == CEPH_OSD_OP_WRITE ||
679 src->op == CEPH_OSD_OP_WRITEFULL)
676 request_data_len = src->extent.length; 680 request_data_len = src->extent.length;
677 dst->extent.offset = cpu_to_le64(src->extent.offset); 681 dst->extent.offset = cpu_to_le64(src->extent.offset);
678 dst->extent.length = cpu_to_le64(src->extent.length); 682 dst->extent.length = cpu_to_le64(src->extent.length);
@@ -681,7 +685,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
681 dst->extent.truncate_seq = 685 dst->extent.truncate_seq =
682 cpu_to_le32(src->extent.truncate_seq); 686 cpu_to_le32(src->extent.truncate_seq);
683 osd_data = &src->extent.osd_data; 687 osd_data = &src->extent.osd_data;
684 if (src->op == CEPH_OSD_OP_WRITE) 688 if (src->op == CEPH_OSD_OP_WRITE ||
689 src->op == CEPH_OSD_OP_WRITEFULL)
685 ceph_osdc_msg_data_add(req->r_request, osd_data); 690 ceph_osdc_msg_data_add(req->r_request, osd_data);
686 else 691 else
687 ceph_osdc_msg_data_add(req->r_reply, osd_data); 692 ceph_osdc_msg_data_add(req->r_reply, osd_data);
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 6255d141133b..229956bf8457 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -138,6 +138,14 @@ out_free:
138 */ 138 */
139int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) 139int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
140{ 140{
141 if (!xprt->ops->bc_setup)
142 return 0;
143 return xprt->ops->bc_setup(xprt, min_reqs);
144}
145EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
146
147int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
148{
141 struct rpc_rqst *req; 149 struct rpc_rqst *req;
142 struct list_head tmp_list; 150 struct list_head tmp_list;
143 int i; 151 int i;
@@ -192,7 +200,6 @@ out_free:
192 dprintk("RPC: setup backchannel transport failed\n"); 200 dprintk("RPC: setup backchannel transport failed\n");
193 return -ENOMEM; 201 return -ENOMEM;
194} 202}
195EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
196 203
197/** 204/**
198 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures. 205 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
@@ -205,6 +212,13 @@ EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
205 */ 212 */
206void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) 213void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
207{ 214{
215 if (xprt->ops->bc_destroy)
216 xprt->ops->bc_destroy(xprt, max_reqs);
217}
218EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
219
220void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
221{
208 struct rpc_rqst *req = NULL, *tmp = NULL; 222 struct rpc_rqst *req = NULL, *tmp = NULL;
209 223
210 dprintk("RPC: destroy backchannel transport\n"); 224 dprintk("RPC: destroy backchannel transport\n");
@@ -227,7 +241,6 @@ out:
227 dprintk("RPC: backchannel list empty= %s\n", 241 dprintk("RPC: backchannel list empty= %s\n",
228 list_empty(&xprt->bc_pa_list) ? "true" : "false"); 242 list_empty(&xprt->bc_pa_list) ? "true" : "false");
229} 243}
230EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
231 244
232static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid) 245static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)
233{ 246{
@@ -264,6 +277,13 @@ void xprt_free_bc_request(struct rpc_rqst *req)
264{ 277{
265 struct rpc_xprt *xprt = req->rq_xprt; 278 struct rpc_xprt *xprt = req->rq_xprt;
266 279
280 xprt->ops->bc_free_rqst(req);
281}
282
283void xprt_free_bc_rqst(struct rpc_rqst *req)
284{
285 struct rpc_xprt *xprt = req->rq_xprt;
286
267 dprintk("RPC: free backchannel req=%p\n", req); 287 dprintk("RPC: free backchannel req=%p\n", req);
268 288
269 req->rq_connect_cookie = xprt->connect_cookie - 1; 289 req->rq_connect_cookie = xprt->connect_cookie - 1;
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index a8f579df14d8..bc5b7b5032ca 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1367,11 +1367,6 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1367 /* reset result send buffer "put" position */ 1367 /* reset result send buffer "put" position */
1368 resv->iov_len = 0; 1368 resv->iov_len = 0;
1369 1369
1370 if (rqstp->rq_prot != IPPROTO_TCP) {
1371 printk(KERN_ERR "No support for Non-TCP transports!\n");
1372 BUG();
1373 }
1374
1375 /* 1370 /*
1376 * Skip the next two words because they've already been 1371 * Skip the next two words because they've already been
1377 * processed in the transport 1372 * processed in the transport
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 48913de240bd..33f99d3004f2 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
5 svc_rdma.o svc_rdma_transport.o \ 5 svc_rdma.o svc_rdma_transport.o \
6 svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ 6 svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
7 module.o 7 module.o
8rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
new file mode 100644
index 000000000000..2dcb44f69e53
--- /dev/null
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -0,0 +1,394 @@
1/*
2 * Copyright (c) 2015 Oracle. All rights reserved.
3 *
4 * Support for backward direction RPCs on RPC/RDMA.
5 */
6
7#include <linux/module.h>
8#include <linux/sunrpc/xprt.h>
9#include <linux/sunrpc/svc.h>
10#include <linux/sunrpc/svc_xprt.h>
11
12#include "xprt_rdma.h"
13
14#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
15# define RPCDBG_FACILITY RPCDBG_TRANS
16#endif
17
18#define RPCRDMA_BACKCHANNEL_DEBUG
19
20static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
21 struct rpc_rqst *rqst)
22{
23 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
24 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
25
26 spin_lock(&buf->rb_reqslock);
27 list_del(&req->rl_all);
28 spin_unlock(&buf->rb_reqslock);
29
30 rpcrdma_destroy_req(&r_xprt->rx_ia, req);
31
32 kfree(rqst);
33}
34
35static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
36 struct rpc_rqst *rqst)
37{
38 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
39 struct rpcrdma_regbuf *rb;
40 struct rpcrdma_req *req;
41 struct xdr_buf *buf;
42 size_t size;
43
44 req = rpcrdma_create_req(r_xprt);
45 if (!req)
46 return -ENOMEM;
47 req->rl_backchannel = true;
48
49 size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
50 rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
51 if (IS_ERR(rb))
52 goto out_fail;
53 req->rl_rdmabuf = rb;
54
55 size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
56 rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
57 if (IS_ERR(rb))
58 goto out_fail;
59 rb->rg_owner = req;
60 req->rl_sendbuf = rb;
61 /* so that rpcr_to_rdmar works when receiving a request */
62 rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
63
64 buf = &rqst->rq_snd_buf;
65 buf->head[0].iov_base = rqst->rq_buffer;
66 buf->head[0].iov_len = 0;
67 buf->tail[0].iov_base = NULL;
68 buf->tail[0].iov_len = 0;
69 buf->page_len = 0;
70 buf->len = 0;
71 buf->buflen = size;
72
73 return 0;
74
75out_fail:
76 rpcrdma_bc_free_rqst(r_xprt, rqst);
77 return -ENOMEM;
78}
79
80/* Allocate and add receive buffers to the rpcrdma_buffer's
81 * existing list of rep's. These are released when the
82 * transport is destroyed.
83 */
84static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
85 unsigned int count)
86{
87 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
88 struct rpcrdma_rep *rep;
89 unsigned long flags;
90 int rc = 0;
91
92 while (count--) {
93 rep = rpcrdma_create_rep(r_xprt);
94 if (IS_ERR(rep)) {
95 pr_err("RPC: %s: reply buffer alloc failed\n",
96 __func__);
97 rc = PTR_ERR(rep);
98 break;
99 }
100
101 spin_lock_irqsave(&buffers->rb_lock, flags);
102 list_add(&rep->rr_list, &buffers->rb_recv_bufs);
103 spin_unlock_irqrestore(&buffers->rb_lock, flags);
104 }
105
106 return rc;
107}
108
109/**
110 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
111 * @xprt: transport associated with these backchannel resources
112 * @reqs: number of concurrent incoming requests to expect
113 *
114 * Returns 0 on success; otherwise a negative errno
115 */
116int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
117{
118 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
119 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
120 struct rpc_rqst *rqst;
121 unsigned int i;
122 int rc;
123
124 /* The backchannel reply path returns each rpc_rqst to the
125 * bc_pa_list _after_ the reply is sent. If the server is
126 * faster than the client, it can send another backward
127 * direction request before the rpc_rqst is returned to the
128 * list. The client rejects the request in this case.
129 *
130 * Twice as many rpc_rqsts are prepared to ensure there is
131 * always an rpc_rqst available as soon as a reply is sent.
132 */
133 if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
134 goto out_err;
135
136 for (i = 0; i < (reqs << 1); i++) {
137 rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
138 if (!rqst) {
139 pr_err("RPC: %s: Failed to create bc rpc_rqst\n",
140 __func__);
141 goto out_free;
142 }
143
144 rqst->rq_xprt = &r_xprt->rx_xprt;
145 INIT_LIST_HEAD(&rqst->rq_list);
146 INIT_LIST_HEAD(&rqst->rq_bc_list);
147
148 if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
149 goto out_free;
150
151 spin_lock_bh(&xprt->bc_pa_lock);
152 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
153 spin_unlock_bh(&xprt->bc_pa_lock);
154 }
155
156 rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
157 if (rc)
158 goto out_free;
159
160 rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
161 if (rc)
162 goto out_free;
163
164 buffer->rb_bc_srv_max_requests = reqs;
165 request_module("svcrdma");
166
167 return 0;
168
169out_free:
170 xprt_rdma_bc_destroy(xprt, reqs);
171
172out_err:
173 pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
174 return -ENOMEM;
175}
176
177/**
178 * xprt_rdma_bc_up - Create transport endpoint for backchannel service
179 * @serv: server endpoint
180 * @net: network namespace
181 *
182 * The "xprt" is an implied argument: it supplies the name of the
183 * backchannel transport class.
184 *
185 * Returns zero on success, negative errno on failure
186 */
187int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
188{
189 int ret;
190
191 ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
192 if (ret < 0)
193 return ret;
194 return 0;
195}
196
197/**
198 * rpcrdma_bc_marshal_reply - Send backwards direction reply
199 * @rqst: buffer containing RPC reply data
200 *
201 * Returns zero on success.
202 */
203int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
204{
205 struct rpc_xprt *xprt = rqst->rq_xprt;
206 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
207 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
208 struct rpcrdma_msg *headerp;
209 size_t rpclen;
210
211 headerp = rdmab_to_msg(req->rl_rdmabuf);
212 headerp->rm_xid = rqst->rq_xid;
213 headerp->rm_vers = rpcrdma_version;
214 headerp->rm_credit =
215 cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
216 headerp->rm_type = rdma_msg;
217 headerp->rm_body.rm_chunks[0] = xdr_zero;
218 headerp->rm_body.rm_chunks[1] = xdr_zero;
219 headerp->rm_body.rm_chunks[2] = xdr_zero;
220
221 rpclen = rqst->rq_svec[0].iov_len;
222
223 pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
224 __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
225 pr_info("RPC: %s: RPC/RDMA: %*ph\n",
226 __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
227 pr_info("RPC: %s: RPC: %*ph\n",
228 __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
229
230 req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
231 req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
232 req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
233
234 req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
235 req->rl_send_iov[1].length = rpclen;
236 req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
237
238 req->rl_niovs = 2;
239 return 0;
240}
241
242/**
243 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
244 * @xprt: transport associated with these backchannel resources
245 * @reqs: number of incoming requests to destroy; ignored
246 */
247void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
248{
249 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
250 struct rpc_rqst *rqst, *tmp;
251
252 spin_lock_bh(&xprt->bc_pa_lock);
253 list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
254 list_del(&rqst->rq_bc_pa_list);
255 spin_unlock_bh(&xprt->bc_pa_lock);
256
257 rpcrdma_bc_free_rqst(r_xprt, rqst);
258
259 spin_lock_bh(&xprt->bc_pa_lock);
260 }
261 spin_unlock_bh(&xprt->bc_pa_lock);
262}
263
264/**
265 * xprt_rdma_bc_free_rqst - Release a backchannel rqst
266 * @rqst: request to release
267 */
268void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
269{
270 struct rpc_xprt *xprt = rqst->rq_xprt;
271
272 smp_mb__before_atomic();
273 WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
274 clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
275 smp_mb__after_atomic();
276
277 spin_lock_bh(&xprt->bc_pa_lock);
278 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
279 spin_unlock_bh(&xprt->bc_pa_lock);
280}
281
282/**
283 * rpcrdma_bc_receive_call - Handle a backward direction call
284 * @xprt: transport receiving the call
285 * @rep: receive buffer containing the call
286 *
287 * Called in the RPC reply handler, which runs in a tasklet.
288 * Be quick about it.
289 *
290 * Operational assumptions:
291 * o Backchannel credits are ignored, just as the NFS server
292 * forechannel currently does
293 * o The ULP manages a replay cache (eg, NFSv4.1 sessions).
294 * No replay detection is done at the transport level
295 */
296void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
297 struct rpcrdma_rep *rep)
298{
299 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
300 struct rpcrdma_msg *headerp;
301 struct svc_serv *bc_serv;
302 struct rpcrdma_req *req;
303 struct rpc_rqst *rqst;
304 struct xdr_buf *buf;
305 size_t size;
306 __be32 *p;
307
308 headerp = rdmab_to_msg(rep->rr_rdmabuf);
309#ifdef RPCRDMA_BACKCHANNEL_DEBUG
310 pr_info("RPC: %s: callback XID %08x, length=%u\n",
311 __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
312 pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp);
313#endif
314
315 /* Sanity check:
316 * Need at least enough bytes for RPC/RDMA header, as code
317 * here references the header fields by array offset. Also,
318 * backward calls are always inline, so ensure there
319 * are some bytes beyond the RPC/RDMA header.
320 */
321 if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
322 goto out_short;
323 p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
324 size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
325
326 /* Grab a free bc rqst */
327 spin_lock(&xprt->bc_pa_lock);
328 if (list_empty(&xprt->bc_pa_list)) {
329 spin_unlock(&xprt->bc_pa_lock);
330 goto out_overflow;
331 }
332 rqst = list_first_entry(&xprt->bc_pa_list,
333 struct rpc_rqst, rq_bc_pa_list);
334 list_del(&rqst->rq_bc_pa_list);
335 spin_unlock(&xprt->bc_pa_lock);
336#ifdef RPCRDMA_BACKCHANNEL_DEBUG
337 pr_info("RPC: %s: using rqst %p\n", __func__, rqst);
338#endif
339
340 /* Prepare rqst */
341 rqst->rq_reply_bytes_recvd = 0;
342 rqst->rq_bytes_sent = 0;
343 rqst->rq_xid = headerp->rm_xid;
344 set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
345
346 buf = &rqst->rq_rcv_buf;
347 memset(buf, 0, sizeof(*buf));
348 buf->head[0].iov_base = p;
349 buf->head[0].iov_len = size;
350 buf->len = size;
351
352 /* The receive buffer has to be hooked to the rpcrdma_req
353 * so that it can be reposted after the server is done
354 * parsing it but just before sending the backward
355 * direction reply.
356 */
357 req = rpcr_to_rdmar(rqst);
358#ifdef RPCRDMA_BACKCHANNEL_DEBUG
359 pr_info("RPC: %s: attaching rep %p to req %p\n",
360 __func__, rep, req);
361#endif
362 req->rl_reply = rep;
363
364 /* Defeat the retransmit detection logic in send_request */
365 req->rl_connect_cookie = 0;
366
367 /* Queue rqst for ULP's callback service */
368 bc_serv = xprt->bc_serv;
369 spin_lock(&bc_serv->sv_cb_lock);
370 list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
371 spin_unlock(&bc_serv->sv_cb_lock);
372
373 wake_up(&bc_serv->sv_cb_waitq);
374
375 r_xprt->rx_stats.bcall_count++;
376 return;
377
378out_overflow:
379 pr_warn("RPC/RDMA backchannel overflow\n");
380 xprt_disconnect_done(xprt);
381 /* This receive buffer gets reposted automatically
382 * when the connection is re-established.
383 */
384 return;
385
386out_short:
387 pr_warn("RPC/RDMA short backward direction call\n");
388
389 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
390 xprt_disconnect_done(xprt);
391 else
392 pr_warn("RPC: %s: reposting rep %p\n",
393 __func__, rep);
394}
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 5318951b3b53..0a362397e434 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -252,8 +252,11 @@ frwr_sendcompletion(struct ib_wc *wc)
252 252
253 /* WARNING: Only wr_id and status are reliable at this point */ 253 /* WARNING: Only wr_id and status are reliable at this point */
254 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; 254 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
255 pr_warn("RPC: %s: frmr %p flushed, status %s (%d)\n", 255 if (wc->status == IB_WC_WR_FLUSH_ERR)
256 __func__, r, ib_wc_status_msg(wc->status), wc->status); 256 dprintk("RPC: %s: frmr %p flushed\n", __func__, r);
257 else
258 pr_warn("RPC: %s: frmr %p error, status %s (%d)\n",
259 __func__, r, ib_wc_status_msg(wc->status), wc->status);
257 r->r.frmr.fr_state = FRMR_IS_STALE; 260 r->r.frmr.fr_state = FRMR_IS_STALE;
258} 261}
259 262
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index bc8bd6577467..c10d9699441c 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -441,6 +441,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
441 enum rpcrdma_chunktype rtype, wtype; 441 enum rpcrdma_chunktype rtype, wtype;
442 struct rpcrdma_msg *headerp; 442 struct rpcrdma_msg *headerp;
443 443
444#if defined(CONFIG_SUNRPC_BACKCHANNEL)
445 if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
446 return rpcrdma_bc_marshal_reply(rqst);
447#endif
448
444 /* 449 /*
445 * rpclen gets amount of data in first buffer, which is the 450 * rpclen gets amount of data in first buffer, which is the
446 * pre-registered buffer. 451 * pre-registered buffer.
@@ -711,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work)
711 spin_unlock_bh(&xprt->transport_lock); 716 spin_unlock_bh(&xprt->transport_lock);
712} 717}
713 718
719#if defined(CONFIG_SUNRPC_BACKCHANNEL)
720/* By convention, backchannel calls arrive via rdma_msg type
721 * messages, and never populate the chunk lists. This makes
722 * the RPC/RDMA header small and fixed in size, so it is
723 * straightforward to check the RPC header's direction field.
724 */
725static bool
726rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
727{
728 __be32 *p = (__be32 *)headerp;
729
730 if (headerp->rm_type != rdma_msg)
731 return false;
732 if (headerp->rm_body.rm_chunks[0] != xdr_zero)
733 return false;
734 if (headerp->rm_body.rm_chunks[1] != xdr_zero)
735 return false;
736 if (headerp->rm_body.rm_chunks[2] != xdr_zero)
737 return false;
738
739 /* sanity */
740 if (p[7] != headerp->rm_xid)
741 return false;
742 /* call direction */
743 if (p[8] != cpu_to_be32(RPC_CALL))
744 return false;
745
746 return true;
747}
748#endif /* CONFIG_SUNRPC_BACKCHANNEL */
749
714/* 750/*
715 * This function is called when an async event is posted to 751 * This function is called when an async event is posted to
716 * the connection which changes the connection state. All it 752 * the connection which changes the connection state. All it
@@ -723,8 +759,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
723 schedule_delayed_work(&ep->rep_connect_worker, 0); 759 schedule_delayed_work(&ep->rep_connect_worker, 0);
724} 760}
725 761
726/* 762/* Process received RPC/RDMA messages.
727 * Called as a tasklet to do req/reply match and complete a request 763 *
728 * Errors must result in the RPC task either being awakened, or 764 * Errors must result in the RPC task either being awakened, or
729 * allowed to timeout, to discover the errors at that time. 765 * allowed to timeout, to discover the errors at that time.
730 */ 766 */
@@ -741,52 +777,32 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
741 unsigned long cwnd; 777 unsigned long cwnd;
742 u32 credits; 778 u32 credits;
743 779
744 /* Check status. If bad, signal disconnect and return rep to pool */ 780 dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
745 if (rep->rr_len == ~0U) { 781
746 rpcrdma_recv_buffer_put(rep); 782 if (rep->rr_len == RPCRDMA_BAD_LEN)
747 if (r_xprt->rx_ep.rep_connected == 1) { 783 goto out_badstatus;
748 r_xprt->rx_ep.rep_connected = -EIO; 784 if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
749 rpcrdma_conn_func(&r_xprt->rx_ep); 785 goto out_shortreply;
750 } 786
751 return;
752 }
753 if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
754 dprintk("RPC: %s: short/invalid reply\n", __func__);
755 goto repost;
756 }
757 headerp = rdmab_to_msg(rep->rr_rdmabuf); 787 headerp = rdmab_to_msg(rep->rr_rdmabuf);
758 if (headerp->rm_vers != rpcrdma_version) { 788 if (headerp->rm_vers != rpcrdma_version)
759 dprintk("RPC: %s: invalid version %d\n", 789 goto out_badversion;
760 __func__, be32_to_cpu(headerp->rm_vers)); 790#if defined(CONFIG_SUNRPC_BACKCHANNEL)
761 goto repost; 791 if (rpcrdma_is_bcall(headerp))
762 } 792 goto out_bcall;
793#endif
763 794
764 /* Get XID and try for a match. */ 795 /* Match incoming rpcrdma_rep to an rpcrdma_req to
765 spin_lock(&xprt->transport_lock); 796 * get context for handling any incoming chunks.
797 */
798 spin_lock_bh(&xprt->transport_lock);
766 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); 799 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
767 if (rqst == NULL) { 800 if (!rqst)
768 spin_unlock(&xprt->transport_lock); 801 goto out_nomatch;
769 dprintk("RPC: %s: reply 0x%p failed "
770 "to match any request xid 0x%08x len %d\n",
771 __func__, rep, be32_to_cpu(headerp->rm_xid),
772 rep->rr_len);
773repost:
774 r_xprt->rx_stats.bad_reply_count++;
775 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
776 rpcrdma_recv_buffer_put(rep);
777 802
778 return;
779 }
780
781 /* get request object */
782 req = rpcr_to_rdmar(rqst); 803 req = rpcr_to_rdmar(rqst);
783 if (req->rl_reply) { 804 if (req->rl_reply)
784 spin_unlock(&xprt->transport_lock); 805 goto out_duplicate;
785 dprintk("RPC: %s: duplicate reply 0x%p to RPC "
786 "request 0x%p: xid 0x%08x\n", __func__, rep, req,
787 be32_to_cpu(headerp->rm_xid));
788 goto repost;
789 }
790 806
791 dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" 807 dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
792 " RPC request 0x%p xid 0x%08x\n", 808 " RPC request 0x%p xid 0x%08x\n",
@@ -883,8 +899,50 @@ badheader:
883 if (xprt->cwnd > cwnd) 899 if (xprt->cwnd > cwnd)
884 xprt_release_rqst_cong(rqst->rq_task); 900 xprt_release_rqst_cong(rqst->rq_task);
885 901
902 xprt_complete_rqst(rqst->rq_task, status);
903 spin_unlock_bh(&xprt->transport_lock);
886 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", 904 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
887 __func__, xprt, rqst, status); 905 __func__, xprt, rqst, status);
888 xprt_complete_rqst(rqst->rq_task, status); 906 return;
889 spin_unlock(&xprt->transport_lock); 907
908out_badstatus:
909 rpcrdma_recv_buffer_put(rep);
910 if (r_xprt->rx_ep.rep_connected == 1) {
911 r_xprt->rx_ep.rep_connected = -EIO;
912 rpcrdma_conn_func(&r_xprt->rx_ep);
913 }
914 return;
915
916#if defined(CONFIG_SUNRPC_BACKCHANNEL)
917out_bcall:
918 rpcrdma_bc_receive_call(r_xprt, rep);
919 return;
920#endif
921
922out_shortreply:
923 dprintk("RPC: %s: short/invalid reply\n", __func__);
924 goto repost;
925
926out_badversion:
927 dprintk("RPC: %s: invalid version %d\n",
928 __func__, be32_to_cpu(headerp->rm_vers));
929 goto repost;
930
931out_nomatch:
932 spin_unlock_bh(&xprt->transport_lock);
933 dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
934 __func__, be32_to_cpu(headerp->rm_xid),
935 rep->rr_len);
936 goto repost;
937
938out_duplicate:
939 spin_unlock_bh(&xprt->transport_lock);
940 dprintk("RPC: %s: "
941 "duplicate reply %p to RPC request %p: xid 0x%08x\n",
942 __func__, rep, req, be32_to_cpu(headerp->rm_xid));
943
944repost:
945 r_xprt->rx_stats.bad_reply_count++;
946 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
947 rpcrdma_recv_buffer_put(rep);
890} 948}
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index 2cd252f023a5..1b7051bdbdc8 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -239,6 +239,9 @@ void svc_rdma_cleanup(void)
239 unregister_sysctl_table(svcrdma_table_header); 239 unregister_sysctl_table(svcrdma_table_header);
240 svcrdma_table_header = NULL; 240 svcrdma_table_header = NULL;
241 } 241 }
242#if defined(CONFIG_SUNRPC_BACKCHANNEL)
243 svc_unreg_xprt_class(&svc_rdma_bc_class);
244#endif
242 svc_unreg_xprt_class(&svc_rdma_class); 245 svc_unreg_xprt_class(&svc_rdma_class);
243 kmem_cache_destroy(svc_rdma_map_cachep); 246 kmem_cache_destroy(svc_rdma_map_cachep);
244 kmem_cache_destroy(svc_rdma_ctxt_cachep); 247 kmem_cache_destroy(svc_rdma_ctxt_cachep);
@@ -286,6 +289,9 @@ int svc_rdma_init(void)
286 289
287 /* Register RDMA with the SVC transport switch */ 290 /* Register RDMA with the SVC transport switch */
288 svc_reg_xprt_class(&svc_rdma_class); 291 svc_reg_xprt_class(&svc_rdma_class);
292#if defined(CONFIG_SUNRPC_BACKCHANNEL)
293 svc_reg_xprt_class(&svc_rdma_bc_class);
294#endif
289 return 0; 295 return 0;
290 err1: 296 err1:
291 kmem_cache_destroy(svc_rdma_map_cachep); 297 kmem_cache_destroy(svc_rdma_map_cachep);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index cb5174284074..f0c3ff67ca98 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -136,7 +136,8 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
136 ctxt->direction = DMA_FROM_DEVICE; 136 ctxt->direction = DMA_FROM_DEVICE;
137 ctxt->read_hdr = head; 137 ctxt->read_hdr = head;
138 pages_needed = min_t(int, pages_needed, xprt->sc_max_sge_rd); 138 pages_needed = min_t(int, pages_needed, xprt->sc_max_sge_rd);
139 read = min_t(int, pages_needed << PAGE_SHIFT, rs_length); 139 read = min_t(int, (pages_needed << PAGE_SHIFT) - *page_offset,
140 rs_length);
140 141
141 for (pno = 0; pno < pages_needed; pno++) { 142 for (pno = 0; pno < pages_needed; pno++) {
142 int len = min_t(int, rs_length, PAGE_SIZE - pg_off); 143 int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
@@ -235,7 +236,8 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
235 ctxt->direction = DMA_FROM_DEVICE; 236 ctxt->direction = DMA_FROM_DEVICE;
236 ctxt->frmr = frmr; 237 ctxt->frmr = frmr;
237 pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len); 238 pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len);
238 read = min_t(int, pages_needed << PAGE_SHIFT, rs_length); 239 read = min_t(int, (pages_needed << PAGE_SHIFT) - *page_offset,
240 rs_length);
239 241
240 frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]); 242 frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
241 frmr->direction = DMA_FROM_DEVICE; 243 frmr->direction = DMA_FROM_DEVICE;
@@ -531,7 +533,7 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
531 rqstp->rq_arg.page_base = head->arg.page_base; 533 rqstp->rq_arg.page_base = head->arg.page_base;
532 534
533 /* rq_respages starts after the last arg page */ 535 /* rq_respages starts after the last arg page */
534 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no]; 536 rqstp->rq_respages = &rqstp->rq_pages[page_no];
535 rqstp->rq_next_page = rqstp->rq_respages + 1; 537 rqstp->rq_next_page = rqstp->rq_respages + 1;
536 538
537 /* Rebuild rq_arg head and tail. */ 539 /* Rebuild rq_arg head and tail. */
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index fcc3eb80c265..a133b1e5b5f6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -56,6 +56,7 @@
56 56
57#define RPCDBG_FACILITY RPCDBG_SVCXPRT 57#define RPCDBG_FACILITY RPCDBG_SVCXPRT
58 58
59static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
59static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 60static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
60 struct net *net, 61 struct net *net,
61 struct sockaddr *sa, int salen, 62 struct sockaddr *sa, int salen,
@@ -95,6 +96,63 @@ struct svc_xprt_class svc_rdma_class = {
95 .xcl_ident = XPRT_TRANSPORT_RDMA, 96 .xcl_ident = XPRT_TRANSPORT_RDMA,
96}; 97};
97 98
99#if defined(CONFIG_SUNRPC_BACKCHANNEL)
100static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
101 struct sockaddr *, int, int);
102static void svc_rdma_bc_detach(struct svc_xprt *);
103static void svc_rdma_bc_free(struct svc_xprt *);
104
105static struct svc_xprt_ops svc_rdma_bc_ops = {
106 .xpo_create = svc_rdma_bc_create,
107 .xpo_detach = svc_rdma_bc_detach,
108 .xpo_free = svc_rdma_bc_free,
109 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
110 .xpo_secure_port = svc_rdma_secure_port,
111};
112
113struct svc_xprt_class svc_rdma_bc_class = {
114 .xcl_name = "rdma-bc",
115 .xcl_owner = THIS_MODULE,
116 .xcl_ops = &svc_rdma_bc_ops,
117 .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
118};
119
120static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
121 struct net *net,
122 struct sockaddr *sa, int salen,
123 int flags)
124{
125 struct svcxprt_rdma *cma_xprt;
126 struct svc_xprt *xprt;
127
128 cma_xprt = rdma_create_xprt(serv, 0);
129 if (!cma_xprt)
130 return ERR_PTR(-ENOMEM);
131 xprt = &cma_xprt->sc_xprt;
132
133 svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
134 serv->sv_bc_xprt = xprt;
135
136 dprintk("svcrdma: %s(%p)\n", __func__, xprt);
137 return xprt;
138}
139
140static void svc_rdma_bc_detach(struct svc_xprt *xprt)
141{
142 dprintk("svcrdma: %s(%p)\n", __func__, xprt);
143}
144
145static void svc_rdma_bc_free(struct svc_xprt *xprt)
146{
147 struct svcxprt_rdma *rdma =
148 container_of(xprt, struct svcxprt_rdma, sc_xprt);
149
150 dprintk("svcrdma: %s(%p)\n", __func__, xprt);
151 if (xprt)
152 kfree(rdma);
153}
154#endif /* CONFIG_SUNRPC_BACKCHANNEL */
155
98struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 156struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
99{ 157{
100 struct svc_rdma_op_ctxt *ctxt; 158 struct svc_rdma_op_ctxt *ctxt;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 41e452bc580c..8c545f7d7525 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -676,7 +676,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
676static int 676static int
677xprt_rdma_enable_swap(struct rpc_xprt *xprt) 677xprt_rdma_enable_swap(struct rpc_xprt *xprt)
678{ 678{
679 return -EINVAL; 679 return 0;
680} 680}
681 681
682static void 682static void
@@ -705,7 +705,13 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
705 .print_stats = xprt_rdma_print_stats, 705 .print_stats = xprt_rdma_print_stats,
706 .enable_swap = xprt_rdma_enable_swap, 706 .enable_swap = xprt_rdma_enable_swap,
707 .disable_swap = xprt_rdma_disable_swap, 707 .disable_swap = xprt_rdma_disable_swap,
708 .inject_disconnect = xprt_rdma_inject_disconnect 708 .inject_disconnect = xprt_rdma_inject_disconnect,
709#if defined(CONFIG_SUNRPC_BACKCHANNEL)
710 .bc_setup = xprt_rdma_bc_setup,
711 .bc_up = xprt_rdma_bc_up,
712 .bc_free_rqst = xprt_rdma_bc_free_rqst,
713 .bc_destroy = xprt_rdma_bc_destroy,
714#endif
709}; 715};
710 716
711static struct xprt_class xprt_rdma = { 717static struct xprt_class xprt_rdma = {
@@ -732,6 +738,7 @@ void xprt_rdma_cleanup(void)
732 dprintk("RPC: %s: xprt_unregister returned %i\n", 738 dprintk("RPC: %s: xprt_unregister returned %i\n",
733 __func__, rc); 739 __func__, rc);
734 740
741 rpcrdma_destroy_wq();
735 frwr_destroy_recovery_wq(); 742 frwr_destroy_recovery_wq();
736} 743}
737 744
@@ -743,8 +750,15 @@ int xprt_rdma_init(void)
743 if (rc) 750 if (rc)
744 return rc; 751 return rc;
745 752
753 rc = rpcrdma_alloc_wq();
754 if (rc) {
755 frwr_destroy_recovery_wq();
756 return rc;
757 }
758
746 rc = xprt_register_transport(&xprt_rdma); 759 rc = xprt_register_transport(&xprt_rdma);
747 if (rc) { 760 if (rc) {
761 rpcrdma_destroy_wq();
748 frwr_destroy_recovery_wq(); 762 frwr_destroy_recovery_wq();
749 return rc; 763 return rc;
750 } 764 }
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 8a477e27bad7..93883ffb86e0 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -68,47 +68,33 @@
68 * internal functions 68 * internal functions
69 */ 69 */
70 70
71/* 71static struct workqueue_struct *rpcrdma_receive_wq;
72 * handle replies in tasklet context, using a single, global list
73 * rdma tasklet function -- just turn around and call the func
74 * for all replies on the list
75 */
76
77static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
78static LIST_HEAD(rpcrdma_tasklets_g);
79 72
80static void 73int
81rpcrdma_run_tasklet(unsigned long data) 74rpcrdma_alloc_wq(void)
82{ 75{
83 struct rpcrdma_rep *rep; 76 struct workqueue_struct *recv_wq;
84 unsigned long flags;
85
86 data = data;
87 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88 while (!list_empty(&rpcrdma_tasklets_g)) {
89 rep = list_entry(rpcrdma_tasklets_g.next,
90 struct rpcrdma_rep, rr_list);
91 list_del(&rep->rr_list);
92 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93 77
94 rpcrdma_reply_handler(rep); 78 recv_wq = alloc_workqueue("xprtrdma_receive",
79 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
80 0);
81 if (!recv_wq)
82 return -ENOMEM;
95 83
96 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); 84 rpcrdma_receive_wq = recv_wq;
97 } 85 return 0;
98 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
99} 86}
100 87
101static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL); 88void
102 89rpcrdma_destroy_wq(void)
103static void
104rpcrdma_schedule_tasklet(struct list_head *sched_list)
105{ 90{
106 unsigned long flags; 91 struct workqueue_struct *wq;
107 92
108 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); 93 if (rpcrdma_receive_wq) {
109 list_splice_tail(sched_list, &rpcrdma_tasklets_g); 94 wq = rpcrdma_receive_wq;
110 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); 95 rpcrdma_receive_wq = NULL;
111 tasklet_schedule(&rpcrdma_tasklet_g); 96 destroy_workqueue(wq);
97 }
112} 98}
113 99
114static void 100static void
@@ -158,63 +144,54 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
158 } 144 }
159} 145}
160 146
161static int 147/* The common case is a single send completion is waiting. By
162rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) 148 * passing two WC entries to ib_poll_cq, a return code of 1
149 * means there is exactly one WC waiting and no more. We don't
150 * have to invoke ib_poll_cq again to know that the CQ has been
151 * properly drained.
152 */
153static void
154rpcrdma_sendcq_poll(struct ib_cq *cq)
163{ 155{
164 struct ib_wc *wcs; 156 struct ib_wc *pos, wcs[2];
165 int budget, count, rc; 157 int count, rc;
166 158
167 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
168 do { 159 do {
169 wcs = ep->rep_send_wcs; 160 pos = wcs;
170 161
171 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); 162 rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
172 if (rc <= 0) 163 if (rc < 0)
173 return rc; 164 break;
174 165
175 count = rc; 166 count = rc;
176 while (count-- > 0) 167 while (count-- > 0)
177 rpcrdma_sendcq_process_wc(wcs++); 168 rpcrdma_sendcq_process_wc(pos++);
178 } while (rc == RPCRDMA_POLLSIZE && --budget); 169 } while (rc == ARRAY_SIZE(wcs));
179 return 0; 170 return;
180} 171}
181 172
182/* 173/* Handle provider send completion upcalls.
183 * Handle send, fast_reg_mr, and local_inv completions.
184 *
185 * Send events are typically suppressed and thus do not result
186 * in an upcall. Occasionally one is signaled, however. This
187 * prevents the provider's completion queue from wrapping and
188 * losing a completion.
189 */ 174 */
190static void 175static void
191rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) 176rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
192{ 177{
193 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; 178 do {
194 int rc; 179 rpcrdma_sendcq_poll(cq);
195 180 } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
196 rc = rpcrdma_sendcq_poll(cq, ep); 181 IB_CQ_REPORT_MISSED_EVENTS) > 0);
197 if (rc) { 182}
198 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
199 __func__, rc);
200 return;
201 }
202 183
203 rc = ib_req_notify_cq(cq, 184static void
204 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); 185rpcrdma_receive_worker(struct work_struct *work)
205 if (rc == 0) 186{
206 return; 187 struct rpcrdma_rep *rep =
207 if (rc < 0) { 188 container_of(work, struct rpcrdma_rep, rr_work);
208 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
209 __func__, rc);
210 return;
211 }
212 189
213 rpcrdma_sendcq_poll(cq, ep); 190 rpcrdma_reply_handler(rep);
214} 191}
215 192
216static void 193static void
217rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) 194rpcrdma_recvcq_process_wc(struct ib_wc *wc)
218{ 195{
219 struct rpcrdma_rep *rep = 196 struct rpcrdma_rep *rep =
220 (struct rpcrdma_rep *)(unsigned long)wc->wr_id; 197 (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -237,91 +214,60 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
237 prefetch(rdmab_to_msg(rep->rr_rdmabuf)); 214 prefetch(rdmab_to_msg(rep->rr_rdmabuf));
238 215
239out_schedule: 216out_schedule:
240 list_add_tail(&rep->rr_list, sched_list); 217 queue_work(rpcrdma_receive_wq, &rep->rr_work);
241 return; 218 return;
219
242out_fail: 220out_fail:
243 if (wc->status != IB_WC_WR_FLUSH_ERR) 221 if (wc->status != IB_WC_WR_FLUSH_ERR)
244 pr_err("RPC: %s: rep %p: %s\n", 222 pr_err("RPC: %s: rep %p: %s\n",
245 __func__, rep, ib_wc_status_msg(wc->status)); 223 __func__, rep, ib_wc_status_msg(wc->status));
246 rep->rr_len = ~0U; 224 rep->rr_len = RPCRDMA_BAD_LEN;
247 goto out_schedule; 225 goto out_schedule;
248} 226}
249 227
250static int 228/* The wc array is on stack: automatic memory is always CPU-local.
251rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) 229 *
230 * struct ib_wc is 64 bytes, making the poll array potentially
231 * large. But this is at the bottom of the call chain. Further
232 * substantial work is done in another thread.
233 */
234static void
235rpcrdma_recvcq_poll(struct ib_cq *cq)
252{ 236{
253 struct list_head sched_list; 237 struct ib_wc *pos, wcs[4];
254 struct ib_wc *wcs; 238 int count, rc;
255 int budget, count, rc;
256 239
257 INIT_LIST_HEAD(&sched_list);
258 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
259 do { 240 do {
260 wcs = ep->rep_recv_wcs; 241 pos = wcs;
261 242
262 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); 243 rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
263 if (rc <= 0) 244 if (rc < 0)
264 goto out_schedule; 245 break;
265 246
266 count = rc; 247 count = rc;
267 while (count-- > 0) 248 while (count-- > 0)
268 rpcrdma_recvcq_process_wc(wcs++, &sched_list); 249 rpcrdma_recvcq_process_wc(pos++);
269 } while (rc == RPCRDMA_POLLSIZE && --budget); 250 } while (rc == ARRAY_SIZE(wcs));
270 rc = 0;
271
272out_schedule:
273 rpcrdma_schedule_tasklet(&sched_list);
274 return rc;
275} 251}
276 252
277/* 253/* Handle provider receive completion upcalls.
278 * Handle receive completions.
279 *
280 * It is reentrant but processes single events in order to maintain
281 * ordering of receives to keep server credits.
282 *
283 * It is the responsibility of the scheduled tasklet to return
284 * recv buffers to the pool. NOTE: this affects synchronization of
285 * connection shutdown. That is, the structures required for
286 * the completion of the reply handler must remain intact until
287 * all memory has been reclaimed.
288 */ 254 */
289static void 255static void
290rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) 256rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
291{ 257{
292 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; 258 do {
293 int rc; 259 rpcrdma_recvcq_poll(cq);
294 260 } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
295 rc = rpcrdma_recvcq_poll(cq, ep); 261 IB_CQ_REPORT_MISSED_EVENTS) > 0);
296 if (rc) {
297 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
298 __func__, rc);
299 return;
300 }
301
302 rc = ib_req_notify_cq(cq,
303 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
304 if (rc == 0)
305 return;
306 if (rc < 0) {
307 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
308 __func__, rc);
309 return;
310 }
311
312 rpcrdma_recvcq_poll(cq, ep);
313} 262}
314 263
315static void 264static void
316rpcrdma_flush_cqs(struct rpcrdma_ep *ep) 265rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
317{ 266{
318 struct ib_wc wc; 267 struct ib_wc wc;
319 LIST_HEAD(sched_list);
320 268
321 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) 269 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
322 rpcrdma_recvcq_process_wc(&wc, &sched_list); 270 rpcrdma_recvcq_process_wc(&wc);
323 if (!list_empty(&sched_list))
324 rpcrdma_schedule_tasklet(&sched_list);
325 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) 271 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
326 rpcrdma_sendcq_process_wc(&wc); 272 rpcrdma_sendcq_process_wc(&wc);
327} 273}
@@ -543,11 +489,8 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
543 } 489 }
544 490
545 if (memreg == RPCRDMA_FRMR) { 491 if (memreg == RPCRDMA_FRMR) {
546 /* Requires both frmr reg and local dma lkey */ 492 if (!(devattr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) ||
547 if (((devattr->device_cap_flags & 493 (devattr->max_fast_reg_page_list_len == 0)) {
548 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
549 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
550 (devattr->max_fast_reg_page_list_len == 0)) {
551 dprintk("RPC: %s: FRMR registration " 494 dprintk("RPC: %s: FRMR registration "
552 "not supported by HCA\n", __func__); 495 "not supported by HCA\n", __func__);
553 memreg = RPCRDMA_MTHCAFMR; 496 memreg = RPCRDMA_MTHCAFMR;
@@ -557,6 +500,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
557 if (!ia->ri_device->alloc_fmr) { 500 if (!ia->ri_device->alloc_fmr) {
558 dprintk("RPC: %s: MTHCAFMR registration " 501 dprintk("RPC: %s: MTHCAFMR registration "
559 "not supported by HCA\n", __func__); 502 "not supported by HCA\n", __func__);
503 rc = -EINVAL;
560 goto out3; 504 goto out3;
561 } 505 }
562 } 506 }
@@ -624,6 +568,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
624 struct ib_device_attr *devattr = &ia->ri_devattr; 568 struct ib_device_attr *devattr = &ia->ri_devattr;
625 struct ib_cq *sendcq, *recvcq; 569 struct ib_cq *sendcq, *recvcq;
626 struct ib_cq_init_attr cq_attr = {}; 570 struct ib_cq_init_attr cq_attr = {};
571 unsigned int max_qp_wr;
627 int rc, err; 572 int rc, err;
628 573
629 if (devattr->max_sge < RPCRDMA_MAX_IOVS) { 574 if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
@@ -632,18 +577,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
632 return -ENOMEM; 577 return -ENOMEM;
633 } 578 }
634 579
580 if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
581 dprintk("RPC: %s: insufficient wqe's available\n",
582 __func__);
583 return -ENOMEM;
584 }
585 max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
586
635 /* check provider's send/recv wr limits */ 587 /* check provider's send/recv wr limits */
636 if (cdata->max_requests > devattr->max_qp_wr) 588 if (cdata->max_requests > max_qp_wr)
637 cdata->max_requests = devattr->max_qp_wr; 589 cdata->max_requests = max_qp_wr;
638 590
639 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 591 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
640 ep->rep_attr.qp_context = ep; 592 ep->rep_attr.qp_context = ep;
641 ep->rep_attr.srq = NULL; 593 ep->rep_attr.srq = NULL;
642 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 594 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
595 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
643 rc = ia->ri_ops->ro_open(ia, ep, cdata); 596 rc = ia->ri_ops->ro_open(ia, ep, cdata);
644 if (rc) 597 if (rc)
645 return rc; 598 return rc;
646 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 599 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
600 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
647 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; 601 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
648 ep->rep_attr.cap.max_recv_sge = 1; 602 ep->rep_attr.cap.max_recv_sge = 1;
649 ep->rep_attr.cap.max_inline_data = 0; 603 ep->rep_attr.cap.max_inline_data = 0;
@@ -671,7 +625,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
671 625
672 cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1; 626 cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
673 sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall, 627 sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
674 rpcrdma_cq_async_error_upcall, ep, &cq_attr); 628 rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
675 if (IS_ERR(sendcq)) { 629 if (IS_ERR(sendcq)) {
676 rc = PTR_ERR(sendcq); 630 rc = PTR_ERR(sendcq);
677 dprintk("RPC: %s: failed to create send CQ: %i\n", 631 dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -688,7 +642,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
688 642
689 cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1; 643 cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
690 recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall, 644 recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
691 rpcrdma_cq_async_error_upcall, ep, &cq_attr); 645 rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
692 if (IS_ERR(recvcq)) { 646 if (IS_ERR(recvcq)) {
693 rc = PTR_ERR(recvcq); 647 rc = PTR_ERR(recvcq);
694 dprintk("RPC: %s: failed to create recv CQ: %i\n", 648 dprintk("RPC: %s: failed to create recv CQ: %i\n",
@@ -887,7 +841,21 @@ retry:
887 } 841 }
888 rc = ep->rep_connected; 842 rc = ep->rep_connected;
889 } else { 843 } else {
844 struct rpcrdma_xprt *r_xprt;
845 unsigned int extras;
846
890 dprintk("RPC: %s: connected\n", __func__); 847 dprintk("RPC: %s: connected\n", __func__);
848
849 r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
850 extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
851
852 if (extras) {
853 rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
854 if (rc)
855 pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
856 __func__, rc);
857 rc = 0;
858 }
891 } 859 }
892 860
893out: 861out:
@@ -924,20 +892,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
924 } 892 }
925} 893}
926 894
927static struct rpcrdma_req * 895struct rpcrdma_req *
928rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 896rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
929{ 897{
898 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
930 struct rpcrdma_req *req; 899 struct rpcrdma_req *req;
931 900
932 req = kzalloc(sizeof(*req), GFP_KERNEL); 901 req = kzalloc(sizeof(*req), GFP_KERNEL);
933 if (req == NULL) 902 if (req == NULL)
934 return ERR_PTR(-ENOMEM); 903 return ERR_PTR(-ENOMEM);
935 904
905 INIT_LIST_HEAD(&req->rl_free);
906 spin_lock(&buffer->rb_reqslock);
907 list_add(&req->rl_all, &buffer->rb_allreqs);
908 spin_unlock(&buffer->rb_reqslock);
936 req->rl_buffer = &r_xprt->rx_buf; 909 req->rl_buffer = &r_xprt->rx_buf;
937 return req; 910 return req;
938} 911}
939 912
940static struct rpcrdma_rep * 913struct rpcrdma_rep *
941rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 914rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
942{ 915{
943 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 916 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
@@ -959,6 +932,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
959 932
960 rep->rr_device = ia->ri_device; 933 rep->rr_device = ia->ri_device;
961 rep->rr_rxprt = r_xprt; 934 rep->rr_rxprt = r_xprt;
935 INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
962 return rep; 936 return rep;
963 937
964out_free: 938out_free:
@@ -972,44 +946,21 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
972{ 946{
973 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 947 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
974 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 948 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
975 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
976 char *p;
977 size_t len;
978 int i, rc; 949 int i, rc;
979 950
980 buf->rb_max_requests = cdata->max_requests; 951 buf->rb_max_requests = r_xprt->rx_data.max_requests;
952 buf->rb_bc_srv_max_requests = 0;
981 spin_lock_init(&buf->rb_lock); 953 spin_lock_init(&buf->rb_lock);
982 954
983 /* Need to allocate:
984 * 1. arrays for send and recv pointers
985 * 2. arrays of struct rpcrdma_req to fill in pointers
986 * 3. array of struct rpcrdma_rep for replies
987 * Send/recv buffers in req/rep need to be registered
988 */
989 len = buf->rb_max_requests *
990 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
991
992 p = kzalloc(len, GFP_KERNEL);
993 if (p == NULL) {
994 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
995 __func__, len);
996 rc = -ENOMEM;
997 goto out;
998 }
999 buf->rb_pool = p; /* for freeing it later */
1000
1001 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1002 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1003 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1004 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1005
1006 rc = ia->ri_ops->ro_init(r_xprt); 955 rc = ia->ri_ops->ro_init(r_xprt);
1007 if (rc) 956 if (rc)
1008 goto out; 957 goto out;
1009 958
959 INIT_LIST_HEAD(&buf->rb_send_bufs);
960 INIT_LIST_HEAD(&buf->rb_allreqs);
961 spin_lock_init(&buf->rb_reqslock);
1010 for (i = 0; i < buf->rb_max_requests; i++) { 962 for (i = 0; i < buf->rb_max_requests; i++) {
1011 struct rpcrdma_req *req; 963 struct rpcrdma_req *req;
1012 struct rpcrdma_rep *rep;
1013 964
1014 req = rpcrdma_create_req(r_xprt); 965 req = rpcrdma_create_req(r_xprt);
1015 if (IS_ERR(req)) { 966 if (IS_ERR(req)) {
@@ -1018,7 +969,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1018 rc = PTR_ERR(req); 969 rc = PTR_ERR(req);
1019 goto out; 970 goto out;
1020 } 971 }
1021 buf->rb_send_bufs[i] = req; 972 req->rl_backchannel = false;
973 list_add(&req->rl_free, &buf->rb_send_bufs);
974 }
975
976 INIT_LIST_HEAD(&buf->rb_recv_bufs);
977 for (i = 0; i < buf->rb_max_requests + 2; i++) {
978 struct rpcrdma_rep *rep;
1022 979
1023 rep = rpcrdma_create_rep(r_xprt); 980 rep = rpcrdma_create_rep(r_xprt);
1024 if (IS_ERR(rep)) { 981 if (IS_ERR(rep)) {
@@ -1027,7 +984,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1027 rc = PTR_ERR(rep); 984 rc = PTR_ERR(rep);
1028 goto out; 985 goto out;
1029 } 986 }
1030 buf->rb_recv_bufs[i] = rep; 987 list_add(&rep->rr_list, &buf->rb_recv_bufs);
1031 } 988 }
1032 989
1033 return 0; 990 return 0;
@@ -1036,22 +993,38 @@ out:
1036 return rc; 993 return rc;
1037} 994}
1038 995
996static struct rpcrdma_req *
997rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
998{
999 struct rpcrdma_req *req;
1000
1001 req = list_first_entry(&buf->rb_send_bufs,
1002 struct rpcrdma_req, rl_free);
1003 list_del(&req->rl_free);
1004 return req;
1005}
1006
1007static struct rpcrdma_rep *
1008rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
1009{
1010 struct rpcrdma_rep *rep;
1011
1012 rep = list_first_entry(&buf->rb_recv_bufs,
1013 struct rpcrdma_rep, rr_list);
1014 list_del(&rep->rr_list);
1015 return rep;
1016}
1017
1039static void 1018static void
1040rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) 1019rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1041{ 1020{
1042 if (!rep)
1043 return;
1044
1045 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); 1021 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1046 kfree(rep); 1022 kfree(rep);
1047} 1023}
1048 1024
1049static void 1025void
1050rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) 1026rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1051{ 1027{
1052 if (!req)
1053 return;
1054
1055 rpcrdma_free_regbuf(ia, req->rl_sendbuf); 1028 rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1056 rpcrdma_free_regbuf(ia, req->rl_rdmabuf); 1029 rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1057 kfree(req); 1030 kfree(req);
@@ -1061,25 +1034,29 @@ void
1061rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1034rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1062{ 1035{
1063 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1036 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1064 int i;
1065 1037
1066 /* clean up in reverse order from create 1038 while (!list_empty(&buf->rb_recv_bufs)) {
1067 * 1. recv mr memory (mr free, then kfree) 1039 struct rpcrdma_rep *rep;
1068 * 2. send mr memory (mr free, then kfree)
1069 * 3. MWs
1070 */
1071 dprintk("RPC: %s: entering\n", __func__);
1072 1040
1073 for (i = 0; i < buf->rb_max_requests; i++) { 1041 rep = rpcrdma_buffer_get_rep_locked(buf);
1074 if (buf->rb_recv_bufs) 1042 rpcrdma_destroy_rep(ia, rep);
1075 rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1076 if (buf->rb_send_bufs)
1077 rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1078 } 1043 }
1079 1044
1080 ia->ri_ops->ro_destroy(buf); 1045 spin_lock(&buf->rb_reqslock);
1046 while (!list_empty(&buf->rb_allreqs)) {
1047 struct rpcrdma_req *req;
1048
1049 req = list_first_entry(&buf->rb_allreqs,
1050 struct rpcrdma_req, rl_all);
1051 list_del(&req->rl_all);
1052
1053 spin_unlock(&buf->rb_reqslock);
1054 rpcrdma_destroy_req(ia, req);
1055 spin_lock(&buf->rb_reqslock);
1056 }
1057 spin_unlock(&buf->rb_reqslock);
1081 1058
1082 kfree(buf->rb_pool); 1059 ia->ri_ops->ro_destroy(buf);
1083} 1060}
1084 1061
1085struct rpcrdma_mw * 1062struct rpcrdma_mw *
@@ -1111,53 +1088,34 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
1111 spin_unlock(&buf->rb_mwlock); 1088 spin_unlock(&buf->rb_mwlock);
1112} 1089}
1113 1090
1114static void
1115rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1116{
1117 buf->rb_send_bufs[--buf->rb_send_index] = req;
1118 req->rl_niovs = 0;
1119 if (req->rl_reply) {
1120 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1121 req->rl_reply = NULL;
1122 }
1123}
1124
1125/* 1091/*
1126 * Get a set of request/reply buffers. 1092 * Get a set of request/reply buffers.
1127 * 1093 *
1128 * Reply buffer (if needed) is attached to send buffer upon return. 1094 * Reply buffer (if available) is attached to send buffer upon return.
1129 * Rule:
1130 * rb_send_index and rb_recv_index MUST always be pointing to the
1131 * *next* available buffer (non-NULL). They are incremented after
1132 * removing buffers, and decremented *before* returning them.
1133 */ 1095 */
1134struct rpcrdma_req * 1096struct rpcrdma_req *
1135rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1097rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1136{ 1098{
1137 struct rpcrdma_req *req; 1099 struct rpcrdma_req *req;
1138 unsigned long flags;
1139
1140 spin_lock_irqsave(&buffers->rb_lock, flags);
1141 1100
1142 if (buffers->rb_send_index == buffers->rb_max_requests) { 1101 spin_lock(&buffers->rb_lock);
1143 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1102 if (list_empty(&buffers->rb_send_bufs))
1144 dprintk("RPC: %s: out of request buffers\n", __func__); 1103 goto out_reqbuf;
1145 return ((struct rpcrdma_req *)NULL); 1104 req = rpcrdma_buffer_get_req_locked(buffers);
1146 } 1105 if (list_empty(&buffers->rb_recv_bufs))
1147 1106 goto out_repbuf;
1148 req = buffers->rb_send_bufs[buffers->rb_send_index]; 1107 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1149 if (buffers->rb_send_index < buffers->rb_recv_index) { 1108 spin_unlock(&buffers->rb_lock);
1150 dprintk("RPC: %s: %d extra receives outstanding (ok)\n", 1109 return req;
1151 __func__,
1152 buffers->rb_recv_index - buffers->rb_send_index);
1153 req->rl_reply = NULL;
1154 } else {
1155 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1156 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1157 }
1158 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1159 1110
1160 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1111out_reqbuf:
1112 spin_unlock(&buffers->rb_lock);
1113 pr_warn("RPC: %s: out of request buffers\n", __func__);
1114 return NULL;
1115out_repbuf:
1116 spin_unlock(&buffers->rb_lock);
1117 pr_warn("RPC: %s: out of reply buffers\n", __func__);
1118 req->rl_reply = NULL;
1161 return req; 1119 return req;
1162} 1120}
1163 1121
@@ -1169,30 +1127,31 @@ void
1169rpcrdma_buffer_put(struct rpcrdma_req *req) 1127rpcrdma_buffer_put(struct rpcrdma_req *req)
1170{ 1128{
1171 struct rpcrdma_buffer *buffers = req->rl_buffer; 1129 struct rpcrdma_buffer *buffers = req->rl_buffer;
1172 unsigned long flags; 1130 struct rpcrdma_rep *rep = req->rl_reply;
1173 1131
1174 spin_lock_irqsave(&buffers->rb_lock, flags); 1132 req->rl_niovs = 0;
1175 rpcrdma_buffer_put_sendbuf(req, buffers); 1133 req->rl_reply = NULL;
1176 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1134
1135 spin_lock(&buffers->rb_lock);
1136 list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
1137 if (rep)
1138 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1139 spin_unlock(&buffers->rb_lock);
1177} 1140}
1178 1141
1179/* 1142/*
1180 * Recover reply buffers from pool. 1143 * Recover reply buffers from pool.
1181 * This happens when recovering from error conditions. 1144 * This happens when recovering from disconnect.
1182 * Post-increment counter/array index.
1183 */ 1145 */
1184void 1146void
1185rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1147rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1186{ 1148{
1187 struct rpcrdma_buffer *buffers = req->rl_buffer; 1149 struct rpcrdma_buffer *buffers = req->rl_buffer;
1188 unsigned long flags;
1189 1150
1190 spin_lock_irqsave(&buffers->rb_lock, flags); 1151 spin_lock(&buffers->rb_lock);
1191 if (buffers->rb_recv_index < buffers->rb_max_requests) { 1152 if (!list_empty(&buffers->rb_recv_bufs))
1192 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; 1153 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1193 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; 1154 spin_unlock(&buffers->rb_lock);
1194 }
1195 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1196} 1155}
1197 1156
1198/* 1157/*
@@ -1203,11 +1162,10 @@ void
1203rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1162rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1204{ 1163{
1205 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1164 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1206 unsigned long flags;
1207 1165
1208 spin_lock_irqsave(&buffers->rb_lock, flags); 1166 spin_lock(&buffers->rb_lock);
1209 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep; 1167 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1210 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1168 spin_unlock(&buffers->rb_lock);
1211} 1169}
1212 1170
1213/* 1171/*
@@ -1364,6 +1322,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1364 return rc; 1322 return rc;
1365} 1323}
1366 1324
1325/**
1326 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
1327 * @r_xprt: transport associated with these backchannel resources
1328 * @min_reqs: minimum number of incoming requests expected
1329 *
1330 * Returns zero if all requested buffers were posted, or a negative errno.
1331 */
1332int
1333rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
1334{
1335 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
1336 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1337 struct rpcrdma_ep *ep = &r_xprt->rx_ep;
1338 struct rpcrdma_rep *rep;
1339 unsigned long flags;
1340 int rc;
1341
1342 while (count--) {
1343 spin_lock_irqsave(&buffers->rb_lock, flags);
1344 if (list_empty(&buffers->rb_recv_bufs))
1345 goto out_reqbuf;
1346 rep = rpcrdma_buffer_get_rep_locked(buffers);
1347 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1348
1349 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1350 if (rc)
1351 goto out_rc;
1352 }
1353
1354 return 0;
1355
1356out_reqbuf:
1357 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1358 pr_warn("%s: no extra receive buffers\n", __func__);
1359 return -ENOMEM;
1360
1361out_rc:
1362 rpcrdma_recv_buffer_put(rep);
1363 return rc;
1364}
1365
1367/* How many chunk list items fit within our inline buffers? 1366/* How many chunk list items fit within our inline buffers?
1368 */ 1367 */
1369unsigned int 1368unsigned int
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c09414e6f91b..f8dd17be9f43 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -77,9 +77,6 @@ struct rpcrdma_ia {
77 * RDMA Endpoint -- one per transport instance 77 * RDMA Endpoint -- one per transport instance
78 */ 78 */
79 79
80#define RPCRDMA_WC_BUDGET (128)
81#define RPCRDMA_POLLSIZE (16)
82
83struct rpcrdma_ep { 80struct rpcrdma_ep {
84 atomic_t rep_cqcount; 81 atomic_t rep_cqcount;
85 int rep_cqinit; 82 int rep_cqinit;
@@ -89,8 +86,6 @@ struct rpcrdma_ep {
89 struct rdma_conn_param rep_remote_cma; 86 struct rdma_conn_param rep_remote_cma;
90 struct sockaddr_storage rep_remote_addr; 87 struct sockaddr_storage rep_remote_addr;
91 struct delayed_work rep_connect_worker; 88 struct delayed_work rep_connect_worker;
92 struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
93 struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
94}; 89};
95 90
96/* 91/*
@@ -106,6 +101,16 @@ struct rpcrdma_ep {
106 */ 101 */
107#define RPCRDMA_IGNORE_COMPLETION (0ULL) 102#define RPCRDMA_IGNORE_COMPLETION (0ULL)
108 103
104/* Pre-allocate extra Work Requests for handling backward receives
105 * and sends. This is a fixed value because the Work Queues are
106 * allocated when the forward channel is set up.
107 */
108#if defined(CONFIG_SUNRPC_BACKCHANNEL)
109#define RPCRDMA_BACKWARD_WRS (8)
110#else
111#define RPCRDMA_BACKWARD_WRS (0)
112#endif
113
109/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV 114/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
110 * 115 *
111 * The below structure appears at the front of a large region of kmalloc'd 116 * The below structure appears at the front of a large region of kmalloc'd
@@ -169,10 +174,13 @@ struct rpcrdma_rep {
169 unsigned int rr_len; 174 unsigned int rr_len;
170 struct ib_device *rr_device; 175 struct ib_device *rr_device;
171 struct rpcrdma_xprt *rr_rxprt; 176 struct rpcrdma_xprt *rr_rxprt;
177 struct work_struct rr_work;
172 struct list_head rr_list; 178 struct list_head rr_list;
173 struct rpcrdma_regbuf *rr_rdmabuf; 179 struct rpcrdma_regbuf *rr_rdmabuf;
174}; 180};
175 181
182#define RPCRDMA_BAD_LEN (~0U)
183
176/* 184/*
177 * struct rpcrdma_mw - external memory region metadata 185 * struct rpcrdma_mw - external memory region metadata
178 * 186 *
@@ -255,6 +263,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
255#define RPCRDMA_MAX_IOVS (2) 263#define RPCRDMA_MAX_IOVS (2)
256 264
257struct rpcrdma_req { 265struct rpcrdma_req {
266 struct list_head rl_free;
258 unsigned int rl_niovs; 267 unsigned int rl_niovs;
259 unsigned int rl_nchunks; 268 unsigned int rl_nchunks;
260 unsigned int rl_connect_cookie; 269 unsigned int rl_connect_cookie;
@@ -264,6 +273,9 @@ struct rpcrdma_req {
264 struct rpcrdma_regbuf *rl_rdmabuf; 273 struct rpcrdma_regbuf *rl_rdmabuf;
265 struct rpcrdma_regbuf *rl_sendbuf; 274 struct rpcrdma_regbuf *rl_sendbuf;
266 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; 275 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
276
277 struct list_head rl_all;
278 bool rl_backchannel;
267}; 279};
268 280
269static inline struct rpcrdma_req * 281static inline struct rpcrdma_req *
@@ -288,12 +300,14 @@ struct rpcrdma_buffer {
288 struct list_head rb_all; 300 struct list_head rb_all;
289 char *rb_pool; 301 char *rb_pool;
290 302
291 spinlock_t rb_lock; /* protect buf arrays */ 303 spinlock_t rb_lock; /* protect buf lists */
304 struct list_head rb_send_bufs;
305 struct list_head rb_recv_bufs;
292 u32 rb_max_requests; 306 u32 rb_max_requests;
293 int rb_send_index; 307
294 int rb_recv_index; 308 u32 rb_bc_srv_max_requests;
295 struct rpcrdma_req **rb_send_bufs; 309 spinlock_t rb_reqslock; /* protect rb_allreqs */
296 struct rpcrdma_rep **rb_recv_bufs; 310 struct list_head rb_allreqs;
297}; 311};
298#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) 312#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
299 313
@@ -339,6 +353,7 @@ struct rpcrdma_stats {
339 unsigned long failed_marshal_count; 353 unsigned long failed_marshal_count;
340 unsigned long bad_reply_count; 354 unsigned long bad_reply_count;
341 unsigned long nomsg_call_count; 355 unsigned long nomsg_call_count;
356 unsigned long bcall_count;
342}; 357};
343 358
344/* 359/*
@@ -414,6 +429,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
414/* 429/*
415 * Buffer calls - xprtrdma/verbs.c 430 * Buffer calls - xprtrdma/verbs.c
416 */ 431 */
432struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
433struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
434void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
417int rpcrdma_buffer_create(struct rpcrdma_xprt *); 435int rpcrdma_buffer_create(struct rpcrdma_xprt *);
418void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); 436void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
419 437
@@ -430,10 +448,14 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
430 struct rpcrdma_regbuf *); 448 struct rpcrdma_regbuf *);
431 449
432unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); 450unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
451int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
433 452
434int frwr_alloc_recovery_wq(void); 453int frwr_alloc_recovery_wq(void);
435void frwr_destroy_recovery_wq(void); 454void frwr_destroy_recovery_wq(void);
436 455
456int rpcrdma_alloc_wq(void);
457void rpcrdma_destroy_wq(void);
458
437/* 459/*
438 * Wrappers for chunk registration, shared by read/write chunk code. 460 * Wrappers for chunk registration, shared by read/write chunk code.
439 */ 461 */
@@ -494,6 +516,18 @@ int rpcrdma_marshal_req(struct rpc_rqst *);
494int xprt_rdma_init(void); 516int xprt_rdma_init(void);
495void xprt_rdma_cleanup(void); 517void xprt_rdma_cleanup(void);
496 518
519/* Backchannel calls - xprtrdma/backchannel.c
520 */
521#if defined(CONFIG_SUNRPC_BACKCHANNEL)
522int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
523int xprt_rdma_bc_up(struct svc_serv *, struct net *);
524int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
525void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
526int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
527void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
528void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
529#endif /* CONFIG_SUNRPC_BACKCHANNEL */
530
497/* Temporary NFS request map cache. Created in svc_rdma.c */ 531/* Temporary NFS request map cache. Created in svc_rdma.c */
498extern struct kmem_cache *svc_rdma_map_cachep; 532extern struct kmem_cache *svc_rdma_map_cachep;
499/* WR context cache. Created in svc_rdma.c */ 533/* WR context cache. Created in svc_rdma.c */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index e71aff251ac1..94824ff02db3 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1365,6 +1365,17 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1365 xs_tcp_read_reply(xprt, desc) : 1365 xs_tcp_read_reply(xprt, desc) :
1366 xs_tcp_read_callback(xprt, desc); 1366 xs_tcp_read_callback(xprt, desc);
1367} 1367}
1368
1369static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
1370{
1371 int ret;
1372
1373 ret = svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
1374 SVC_SOCK_ANONYMOUS);
1375 if (ret < 0)
1376 return ret;
1377 return 0;
1378}
1368#else 1379#else
1369static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, 1380static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1370 struct xdr_skb_reader *desc) 1381 struct xdr_skb_reader *desc)
@@ -2667,6 +2678,12 @@ static struct rpc_xprt_ops xs_tcp_ops = {
2667 .enable_swap = xs_enable_swap, 2678 .enable_swap = xs_enable_swap,
2668 .disable_swap = xs_disable_swap, 2679 .disable_swap = xs_disable_swap,
2669 .inject_disconnect = xs_inject_disconnect, 2680 .inject_disconnect = xs_inject_disconnect,
2681#ifdef CONFIG_SUNRPC_BACKCHANNEL
2682 .bc_setup = xprt_setup_bc,
2683 .bc_up = xs_tcp_bc_up,
2684 .bc_free_rqst = xprt_free_bc_rqst,
2685 .bc_destroy = xprt_destroy_bc,
2686#endif
2670}; 2687};
2671 2688
2672/* 2689/*