aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-03-24 13:41:00 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2016-03-24 13:41:00 -0400
commit5b1e167d8de86d698114a0a8de61e9d1365d3e8a (patch)
treeeb54b9afde920a97fc49f202d0810d0ad39cb500 /net
parent8b97be054572fc769619184dcc174e280a5c851c (diff)
parenta6ab1e8126d205238defbb55d23661a3a5c6a0d8 (diff)
Merge tag 'nfsd-4.6' of git://linux-nfs.org/~bfields/linux
Pull nfsd updates from Bruce Fields: "Various bugfixes, a RDMA update from Chuck Lever, and support for a new pnfs layout type from Christoph Hellwig. The new layout type is a variant of the block layout which uses SCSI features to offer improved fencing and device identification. (Also: note this pull request also includes the client side of SCSI layout, with Trond's permission.)" * tag 'nfsd-4.6' of git://linux-nfs.org/~bfields/linux: sunrpc/cache: drop reference when sunrpc_cache_pipe_upcall() detects a race nfsd: recover: fix memory leak nfsd: fix deadlock secinfo+readdir compound nfsd4: resfh unused in nfsd4_secinfo svcrdma: Use new CQ API for RPC-over-RDMA server send CQs svcrdma: Use new CQ API for RPC-over-RDMA server receive CQs svcrdma: Remove close_out exit path svcrdma: Hook up the logic to return ERR_CHUNK svcrdma: Use correct XID in error replies svcrdma: Make RDMA_ERROR messages work rpcrdma: Add RPCRDMA_HDRLEN_ERR svcrdma: svc_rdma_post_recv() should close connection on error svcrdma: Close connection when a send error occurs nfsd: Lower NFSv4.1 callback message size limit svcrdma: Do not send Write chunk XDR pad with inline content svcrdma: Do not write xdr_buf::tail in a Write chunk svcrdma: Find client-provided write and reply chunks once per reply nfsd: Update NFS server comments related to RDMA support nfsd: Fix a memory leak when meeting unsupported state_protect_how4 nfsd4: fix bad bounds checking
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/auth_null.c4
-rw-r--r--net/sunrpc/auth_unix.c6
-rw-r--r--net/sunrpc/cache.c6
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c17
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_marshal.c64
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c60
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c196
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c445
8 files changed, 359 insertions, 439 deletions
diff --git a/net/sunrpc/auth_null.c b/net/sunrpc/auth_null.c
index c2a2b584a056..8d9eb4d5ddd8 100644
--- a/net/sunrpc/auth_null.c
+++ b/net/sunrpc/auth_null.c
@@ -113,8 +113,8 @@ const struct rpc_authops authnull_ops = {
113 113
114static 114static
115struct rpc_auth null_auth = { 115struct rpc_auth null_auth = {
116 .au_cslack = 4, 116 .au_cslack = NUL_CALLSLACK,
117 .au_rslack = 2, 117 .au_rslack = NUL_REPLYSLACK,
118 .au_ops = &authnull_ops, 118 .au_ops = &authnull_ops,
119 .au_flavor = RPC_AUTH_NULL, 119 .au_flavor = RPC_AUTH_NULL,
120 .au_count = ATOMIC_INIT(0), 120 .au_count = ATOMIC_INIT(0),
diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c
index 548240dd15fc..0d3dd364c22f 100644
--- a/net/sunrpc/auth_unix.c
+++ b/net/sunrpc/auth_unix.c
@@ -23,8 +23,6 @@ struct unx_cred {
23}; 23};
24#define uc_uid uc_base.cr_uid 24#define uc_uid uc_base.cr_uid
25 25
26#define UNX_WRITESLACK (21 + XDR_QUADLEN(UNX_MAXNODENAME))
27
28#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 26#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
29# define RPCDBG_FACILITY RPCDBG_AUTH 27# define RPCDBG_FACILITY RPCDBG_AUTH
30#endif 28#endif
@@ -228,8 +226,8 @@ const struct rpc_authops authunix_ops = {
228 226
229static 227static
230struct rpc_auth unix_auth = { 228struct rpc_auth unix_auth = {
231 .au_cslack = UNX_WRITESLACK, 229 .au_cslack = UNX_CALLSLACK,
232 .au_rslack = 2, /* assume AUTH_NULL verf */ 230 .au_rslack = NUL_REPLYSLACK,
233 .au_ops = &authunix_ops, 231 .au_ops = &authunix_ops,
234 .au_flavor = RPC_AUTH_UNIX, 232 .au_flavor = RPC_AUTH_UNIX,
235 .au_count = ATOMIC_INIT(0), 233 .au_count = ATOMIC_INIT(0),
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index 273bc3a35425..008c25d1b9f9 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -1182,14 +1182,14 @@ int sunrpc_cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
1182 } 1182 }
1183 1183
1184 crq->q.reader = 0; 1184 crq->q.reader = 0;
1185 crq->item = cache_get(h);
1186 crq->buf = buf; 1185 crq->buf = buf;
1187 crq->len = 0; 1186 crq->len = 0;
1188 crq->readers = 0; 1187 crq->readers = 0;
1189 spin_lock(&queue_lock); 1188 spin_lock(&queue_lock);
1190 if (test_bit(CACHE_PENDING, &h->flags)) 1189 if (test_bit(CACHE_PENDING, &h->flags)) {
1190 crq->item = cache_get(h);
1191 list_add_tail(&crq->q.list, &detail->queue); 1191 list_add_tail(&crq->q.list, &detail->queue);
1192 else 1192 } else
1193 /* Lost a race, no longer PENDING, so don't enqueue */ 1193 /* Lost a race, no longer PENDING, so don't enqueue */
1194 ret = -EAGAIN; 1194 ret = -EAGAIN;
1195 spin_unlock(&queue_lock); 1195 spin_unlock(&queue_lock);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 65a7c232a345..a2a7519b0f23 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -107,26 +107,18 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
107 int ret; 107 int ret;
108 108
109 vec = svc_rdma_get_req_map(rdma); 109 vec = svc_rdma_get_req_map(rdma);
110 ret = svc_rdma_map_xdr(rdma, sndbuf, vec); 110 ret = svc_rdma_map_xdr(rdma, sndbuf, vec, false);
111 if (ret) 111 if (ret)
112 goto out_err; 112 goto out_err;
113 113
114 /* Post a recv buffer to handle the reply for this request. */ 114 ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
115 ret = svc_rdma_post_recv(rdma, GFP_NOIO); 115 if (ret)
116 if (ret) {
117 pr_err("svcrdma: Failed to post bc receive buffer, err=%d.\n",
118 ret);
119 pr_err("svcrdma: closing transport %p.\n", rdma);
120 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
121 ret = -ENOTCONN;
122 goto out_err; 116 goto out_err;
123 }
124 117
125 ctxt = svc_rdma_get_context(rdma); 118 ctxt = svc_rdma_get_context(rdma);
126 ctxt->pages[0] = virt_to_page(rqst->rq_buffer); 119 ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
127 ctxt->count = 1; 120 ctxt->count = 1;
128 121
129 ctxt->wr_op = IB_WR_SEND;
130 ctxt->direction = DMA_TO_DEVICE; 122 ctxt->direction = DMA_TO_DEVICE;
131 ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey; 123 ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
132 ctxt->sge[0].length = sndbuf->len; 124 ctxt->sge[0].length = sndbuf->len;
@@ -140,7 +132,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
140 atomic_inc(&rdma->sc_dma_used); 132 atomic_inc(&rdma->sc_dma_used);
141 133
142 memset(&send_wr, 0, sizeof(send_wr)); 134 memset(&send_wr, 0, sizeof(send_wr));
143 send_wr.wr_id = (unsigned long)ctxt; 135 ctxt->cqe.done = svc_rdma_wc_send;
136 send_wr.wr_cqe = &ctxt->cqe;
144 send_wr.sg_list = ctxt->sge; 137 send_wr.sg_list = ctxt->sge;
145 send_wr.num_sge = 1; 138 send_wr.num_sge = 1;
146 send_wr.opcode = IB_WR_SEND; 139 send_wr.opcode = IB_WR_SEND;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
index e2fca7617242..765bca47c74d 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -145,29 +145,44 @@ static __be32 *decode_reply_array(__be32 *va, __be32 *vaend)
145 return (__be32 *)&ary->wc_array[nchunks]; 145 return (__be32 *)&ary->wc_array[nchunks];
146} 146}
147 147
148int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req, 148int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
149 struct svc_rqst *rqstp)
150{ 149{
151 struct rpcrdma_msg *rmsgp = NULL;
152 __be32 *va, *vaend; 150 __be32 *va, *vaend;
151 unsigned int len;
153 u32 hdr_len; 152 u32 hdr_len;
154 153
155 rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
156
157 /* Verify that there's enough bytes for header + something */ 154 /* Verify that there's enough bytes for header + something */
158 if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) { 155 if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_ERR) {
159 dprintk("svcrdma: header too short = %d\n", 156 dprintk("svcrdma: header too short = %d\n",
160 rqstp->rq_arg.len); 157 rqstp->rq_arg.len);
161 return -EINVAL; 158 return -EINVAL;
162 } 159 }
163 160
164 if (rmsgp->rm_vers != rpcrdma_version) 161 if (rmsgp->rm_vers != rpcrdma_version) {
165 return -ENOSYS; 162 dprintk("%s: bad version %u\n", __func__,
166 163 be32_to_cpu(rmsgp->rm_vers));
167 /* Pull in the extra for the padded case and bump our pointer */ 164 return -EPROTONOSUPPORT;
168 if (rmsgp->rm_type == rdma_msgp) { 165 }
169 int hdrlen;
170 166
167 switch (be32_to_cpu(rmsgp->rm_type)) {
168 case RDMA_MSG:
169 case RDMA_NOMSG:
170 break;
171
172 case RDMA_DONE:
173 /* Just drop it */
174 dprintk("svcrdma: dropping RDMA_DONE message\n");
175 return 0;
176
177 case RDMA_ERROR:
178 /* Possible if this is a backchannel reply.
179 * XXX: We should cancel this XID, though.
180 */
181 dprintk("svcrdma: dropping RDMA_ERROR message\n");
182 return 0;
183
184 case RDMA_MSGP:
185 /* Pull in the extra for the padded case, bump our pointer */
171 rmsgp->rm_body.rm_padded.rm_align = 186 rmsgp->rm_body.rm_padded.rm_align =
172 be32_to_cpu(rmsgp->rm_body.rm_padded.rm_align); 187 be32_to_cpu(rmsgp->rm_body.rm_padded.rm_align);
173 rmsgp->rm_body.rm_padded.rm_thresh = 188 rmsgp->rm_body.rm_padded.rm_thresh =
@@ -175,11 +190,15 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
175 190
176 va = &rmsgp->rm_body.rm_padded.rm_pempty[4]; 191 va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
177 rqstp->rq_arg.head[0].iov_base = va; 192 rqstp->rq_arg.head[0].iov_base = va;
178 hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp); 193 len = (u32)((unsigned long)va - (unsigned long)rmsgp);
179 rqstp->rq_arg.head[0].iov_len -= hdrlen; 194 rqstp->rq_arg.head[0].iov_len -= len;
180 if (hdrlen > rqstp->rq_arg.len) 195 if (len > rqstp->rq_arg.len)
181 return -EINVAL; 196 return -EINVAL;
182 return hdrlen; 197 return len;
198 default:
199 dprintk("svcrdma: bad rdma procedure (%u)\n",
200 be32_to_cpu(rmsgp->rm_type));
201 return -EINVAL;
183 } 202 }
184 203
185 /* The chunk list may contain either a read chunk list or a write 204 /* The chunk list may contain either a read chunk list or a write
@@ -188,20 +207,25 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
188 va = &rmsgp->rm_body.rm_chunks[0]; 207 va = &rmsgp->rm_body.rm_chunks[0];
189 vaend = (__be32 *)((unsigned long)rmsgp + rqstp->rq_arg.len); 208 vaend = (__be32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
190 va = decode_read_list(va, vaend); 209 va = decode_read_list(va, vaend);
191 if (!va) 210 if (!va) {
211 dprintk("svcrdma: failed to decode read list\n");
192 return -EINVAL; 212 return -EINVAL;
213 }
193 va = decode_write_list(va, vaend); 214 va = decode_write_list(va, vaend);
194 if (!va) 215 if (!va) {
216 dprintk("svcrdma: failed to decode write list\n");
195 return -EINVAL; 217 return -EINVAL;
218 }
196 va = decode_reply_array(va, vaend); 219 va = decode_reply_array(va, vaend);
197 if (!va) 220 if (!va) {
221 dprintk("svcrdma: failed to decode reply chunk\n");
198 return -EINVAL; 222 return -EINVAL;
223 }
199 224
200 rqstp->rq_arg.head[0].iov_base = va; 225 rqstp->rq_arg.head[0].iov_base = va;
201 hdr_len = (unsigned long)va - (unsigned long)rmsgp; 226 hdr_len = (unsigned long)va - (unsigned long)rmsgp;
202 rqstp->rq_arg.head[0].iov_len -= hdr_len; 227 rqstp->rq_arg.head[0].iov_len -= hdr_len;
203 228
204 *rdma_req = rmsgp;
205 return hdr_len; 229 return hdr_len;
206} 230}
207 231
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index c8b8a8b4181e..3b24a646eb46 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -180,9 +180,9 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
180 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 180 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
181 181
182 memset(&read_wr, 0, sizeof(read_wr)); 182 memset(&read_wr, 0, sizeof(read_wr));
183 read_wr.wr.wr_id = (unsigned long)ctxt; 183 ctxt->cqe.done = svc_rdma_wc_read;
184 read_wr.wr.wr_cqe = &ctxt->cqe;
184 read_wr.wr.opcode = IB_WR_RDMA_READ; 185 read_wr.wr.opcode = IB_WR_RDMA_READ;
185 ctxt->wr_op = read_wr.wr.opcode;
186 read_wr.wr.send_flags = IB_SEND_SIGNALED; 186 read_wr.wr.send_flags = IB_SEND_SIGNALED;
187 read_wr.rkey = rs_handle; 187 read_wr.rkey = rs_handle;
188 read_wr.remote_addr = rs_offset; 188 read_wr.remote_addr = rs_offset;
@@ -299,8 +299,9 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
299 ctxt->read_hdr = head; 299 ctxt->read_hdr = head;
300 300
301 /* Prepare REG WR */ 301 /* Prepare REG WR */
302 ctxt->reg_cqe.done = svc_rdma_wc_reg;
303 reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
302 reg_wr.wr.opcode = IB_WR_REG_MR; 304 reg_wr.wr.opcode = IB_WR_REG_MR;
303 reg_wr.wr.wr_id = 0;
304 reg_wr.wr.send_flags = IB_SEND_SIGNALED; 305 reg_wr.wr.send_flags = IB_SEND_SIGNALED;
305 reg_wr.wr.num_sge = 0; 306 reg_wr.wr.num_sge = 0;
306 reg_wr.mr = frmr->mr; 307 reg_wr.mr = frmr->mr;
@@ -310,6 +311,8 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
310 311
311 /* Prepare RDMA_READ */ 312 /* Prepare RDMA_READ */
312 memset(&read_wr, 0, sizeof(read_wr)); 313 memset(&read_wr, 0, sizeof(read_wr));
314 ctxt->cqe.done = svc_rdma_wc_read;
315 read_wr.wr.wr_cqe = &ctxt->cqe;
313 read_wr.wr.send_flags = IB_SEND_SIGNALED; 316 read_wr.wr.send_flags = IB_SEND_SIGNALED;
314 read_wr.rkey = rs_handle; 317 read_wr.rkey = rs_handle;
315 read_wr.remote_addr = rs_offset; 318 read_wr.remote_addr = rs_offset;
@@ -317,19 +320,18 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
317 read_wr.wr.num_sge = 1; 320 read_wr.wr.num_sge = 1;
318 if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) { 321 if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
319 read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV; 322 read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
320 read_wr.wr.wr_id = (unsigned long)ctxt;
321 read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey; 323 read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
322 } else { 324 } else {
323 read_wr.wr.opcode = IB_WR_RDMA_READ; 325 read_wr.wr.opcode = IB_WR_RDMA_READ;
324 read_wr.wr.next = &inv_wr; 326 read_wr.wr.next = &inv_wr;
325 /* Prepare invalidate */ 327 /* Prepare invalidate */
326 memset(&inv_wr, 0, sizeof(inv_wr)); 328 memset(&inv_wr, 0, sizeof(inv_wr));
327 inv_wr.wr_id = (unsigned long)ctxt; 329 ctxt->inv_cqe.done = svc_rdma_wc_inv;
330 inv_wr.wr_cqe = &ctxt->inv_cqe;
328 inv_wr.opcode = IB_WR_LOCAL_INV; 331 inv_wr.opcode = IB_WR_LOCAL_INV;
329 inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE; 332 inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
330 inv_wr.ex.invalidate_rkey = frmr->mr->lkey; 333 inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
331 } 334 }
332 ctxt->wr_op = read_wr.wr.opcode;
333 335
334 /* Post the chain */ 336 /* Post the chain */
335 ret = svc_rdma_send(xprt, &reg_wr.wr); 337 ret = svc_rdma_send(xprt, &reg_wr.wr);
@@ -612,7 +614,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
612 struct svc_rdma_op_ctxt *ctxt = NULL; 614 struct svc_rdma_op_ctxt *ctxt = NULL;
613 struct rpcrdma_msg *rmsgp; 615 struct rpcrdma_msg *rmsgp;
614 int ret = 0; 616 int ret = 0;
615 int len;
616 617
617 dprintk("svcrdma: rqstp=%p\n", rqstp); 618 dprintk("svcrdma: rqstp=%p\n", rqstp);
618 619
@@ -642,8 +643,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
642 * transport list 643 * transport list
643 */ 644 */
644 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 645 if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
645 goto close_out; 646 goto defer;
646
647 goto out; 647 goto out;
648 } 648 }
649 dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n", 649 dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
@@ -654,15 +654,13 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
654 rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len); 654 rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
655 655
656 /* Decode the RDMA header. */ 656 /* Decode the RDMA header. */
657 len = svc_rdma_xdr_decode_req(&rmsgp, rqstp); 657 rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
658 rqstp->rq_xprt_hlen = len; 658 ret = svc_rdma_xdr_decode_req(rmsgp, rqstp);
659 659 if (ret < 0)
660 /* If the request is invalid, reply with an error */ 660 goto out_err;
661 if (len < 0) { 661 if (ret == 0)
662 if (len == -ENOSYS) 662 goto out_drop;
663 svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS); 663 rqstp->rq_xprt_hlen = ret;
664 goto close_out;
665 }
666 664
667 if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) { 665 if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
668 ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp, 666 ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp,
@@ -698,26 +696,16 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
698 svc_xprt_copy_addrs(rqstp, xprt); 696 svc_xprt_copy_addrs(rqstp, xprt);
699 return ret; 697 return ret;
700 698
701 close_out: 699out_err:
702 if (ctxt) 700 svc_rdma_send_error(rdma_xprt, rmsgp, ret);
703 svc_rdma_put_context(ctxt, 1); 701 svc_rdma_put_context(ctxt, 0);
704 dprintk("svcrdma: transport %p is closing\n", xprt); 702 return 0;
705 /* 703
706 * Set the close bit and enqueue it. svc_recv will see the
707 * close bit and call svc_xprt_delete
708 */
709 set_bit(XPT_CLOSE, &xprt->xpt_flags);
710defer: 704defer:
711 return 0; 705 return 0;
712 706
707out_drop:
708 svc_rdma_put_context(ctxt, 1);
713repost: 709repost:
714 ret = svc_rdma_post_recv(rdma_xprt, GFP_KERNEL); 710 return svc_rdma_repost_recv(rdma_xprt, GFP_KERNEL);
715 if (ret) {
716 pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
717 ret);
718 pr_err("svcrdma: closing transport %p.\n", rdma_xprt);
719 set_bit(XPT_CLOSE, &rdma_xprt->sc_xprt.xpt_flags);
720 ret = -ENOTCONN;
721 }
722 return ret;
723} 711}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index df57f3ce6cd2..4f1b1c4f45f9 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -50,9 +50,15 @@
50 50
51#define RPCDBG_FACILITY RPCDBG_SVCXPRT 51#define RPCDBG_FACILITY RPCDBG_SVCXPRT
52 52
53static u32 xdr_padsize(u32 len)
54{
55 return (len & 3) ? (4 - (len & 3)) : 0;
56}
57
53int svc_rdma_map_xdr(struct svcxprt_rdma *xprt, 58int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
54 struct xdr_buf *xdr, 59 struct xdr_buf *xdr,
55 struct svc_rdma_req_map *vec) 60 struct svc_rdma_req_map *vec,
61 bool write_chunk_present)
56{ 62{
57 int sge_no; 63 int sge_no;
58 u32 sge_bytes; 64 u32 sge_bytes;
@@ -92,9 +98,20 @@ int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
92 98
93 /* Tail SGE */ 99 /* Tail SGE */
94 if (xdr->tail[0].iov_len) { 100 if (xdr->tail[0].iov_len) {
95 vec->sge[sge_no].iov_base = xdr->tail[0].iov_base; 101 unsigned char *base = xdr->tail[0].iov_base;
96 vec->sge[sge_no].iov_len = xdr->tail[0].iov_len; 102 size_t len = xdr->tail[0].iov_len;
97 sge_no++; 103 u32 xdr_pad = xdr_padsize(xdr->page_len);
104
105 if (write_chunk_present && xdr_pad) {
106 base += xdr_pad;
107 len -= xdr_pad;
108 }
109
110 if (len) {
111 vec->sge[sge_no].iov_base = base;
112 vec->sge[sge_no].iov_len = len;
113 sge_no++;
114 }
98 } 115 }
99 116
100 dprintk("svcrdma: %s: sge_no %d page_no %d " 117 dprintk("svcrdma: %s: sge_no %d page_no %d "
@@ -166,10 +183,10 @@ svc_rdma_get_write_array(struct rpcrdma_msg *rmsgp)
166 * reply array is present 183 * reply array is present
167 */ 184 */
168static struct rpcrdma_write_array * 185static struct rpcrdma_write_array *
169svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp) 186svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp,
187 struct rpcrdma_write_array *wr_ary)
170{ 188{
171 struct rpcrdma_read_chunk *rch; 189 struct rpcrdma_read_chunk *rch;
172 struct rpcrdma_write_array *wr_ary;
173 struct rpcrdma_write_array *rp_ary; 190 struct rpcrdma_write_array *rp_ary;
174 191
175 /* XXX: Need to fix when reply chunk may occur with read list 192 /* XXX: Need to fix when reply chunk may occur with read list
@@ -191,7 +208,6 @@ svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp)
191 goto found_it; 208 goto found_it;
192 } 209 }
193 210
194 wr_ary = svc_rdma_get_write_array(rmsgp);
195 if (wr_ary) { 211 if (wr_ary) {
196 int chunk = be32_to_cpu(wr_ary->wc_nchunks); 212 int chunk = be32_to_cpu(wr_ary->wc_nchunks);
197 213
@@ -281,8 +297,8 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
281 297
282 /* Prepare WRITE WR */ 298 /* Prepare WRITE WR */
283 memset(&write_wr, 0, sizeof write_wr); 299 memset(&write_wr, 0, sizeof write_wr);
284 ctxt->wr_op = IB_WR_RDMA_WRITE; 300 ctxt->cqe.done = svc_rdma_wc_write;
285 write_wr.wr.wr_id = (unsigned long)ctxt; 301 write_wr.wr.wr_cqe = &ctxt->cqe;
286 write_wr.wr.sg_list = &sge[0]; 302 write_wr.wr.sg_list = &sge[0];
287 write_wr.wr.num_sge = sge_no; 303 write_wr.wr.num_sge = sge_no;
288 write_wr.wr.opcode = IB_WR_RDMA_WRITE; 304 write_wr.wr.opcode = IB_WR_RDMA_WRITE;
@@ -298,41 +314,37 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
298 err: 314 err:
299 svc_rdma_unmap_dma(ctxt); 315 svc_rdma_unmap_dma(ctxt);
300 svc_rdma_put_context(ctxt, 0); 316 svc_rdma_put_context(ctxt, 0);
301 /* Fatal error, close transport */
302 return -EIO; 317 return -EIO;
303} 318}
304 319
320noinline
305static int send_write_chunks(struct svcxprt_rdma *xprt, 321static int send_write_chunks(struct svcxprt_rdma *xprt,
306 struct rpcrdma_msg *rdma_argp, 322 struct rpcrdma_write_array *wr_ary,
307 struct rpcrdma_msg *rdma_resp, 323 struct rpcrdma_msg *rdma_resp,
308 struct svc_rqst *rqstp, 324 struct svc_rqst *rqstp,
309 struct svc_rdma_req_map *vec) 325 struct svc_rdma_req_map *vec)
310{ 326{
311 u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len; 327 u32 xfer_len = rqstp->rq_res.page_len;
312 int write_len; 328 int write_len;
313 u32 xdr_off; 329 u32 xdr_off;
314 int chunk_off; 330 int chunk_off;
315 int chunk_no; 331 int chunk_no;
316 int nchunks; 332 int nchunks;
317 struct rpcrdma_write_array *arg_ary;
318 struct rpcrdma_write_array *res_ary; 333 struct rpcrdma_write_array *res_ary;
319 int ret; 334 int ret;
320 335
321 arg_ary = svc_rdma_get_write_array(rdma_argp);
322 if (!arg_ary)
323 return 0;
324 res_ary = (struct rpcrdma_write_array *) 336 res_ary = (struct rpcrdma_write_array *)
325 &rdma_resp->rm_body.rm_chunks[1]; 337 &rdma_resp->rm_body.rm_chunks[1];
326 338
327 /* Write chunks start at the pagelist */ 339 /* Write chunks start at the pagelist */
328 nchunks = be32_to_cpu(arg_ary->wc_nchunks); 340 nchunks = be32_to_cpu(wr_ary->wc_nchunks);
329 for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0; 341 for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
330 xfer_len && chunk_no < nchunks; 342 xfer_len && chunk_no < nchunks;
331 chunk_no++) { 343 chunk_no++) {
332 struct rpcrdma_segment *arg_ch; 344 struct rpcrdma_segment *arg_ch;
333 u64 rs_offset; 345 u64 rs_offset;
334 346
335 arg_ch = &arg_ary->wc_array[chunk_no].wc_target; 347 arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
336 write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length)); 348 write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
337 349
338 /* Prepare the response chunk given the length actually 350 /* Prepare the response chunk given the length actually
@@ -350,11 +362,8 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
350 xdr_off, 362 xdr_off,
351 write_len, 363 write_len,
352 vec); 364 vec);
353 if (ret <= 0) { 365 if (ret <= 0)
354 dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n", 366 goto out_err;
355 ret);
356 return -EIO;
357 }
358 chunk_off += ret; 367 chunk_off += ret;
359 xdr_off += ret; 368 xdr_off += ret;
360 xfer_len -= ret; 369 xfer_len -= ret;
@@ -364,11 +373,16 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
364 /* Update the req with the number of chunks actually used */ 373 /* Update the req with the number of chunks actually used */
365 svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no); 374 svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
366 375
367 return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len; 376 return rqstp->rq_res.page_len;
377
378out_err:
379 pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
380 return -EIO;
368} 381}
369 382
383noinline
370static int send_reply_chunks(struct svcxprt_rdma *xprt, 384static int send_reply_chunks(struct svcxprt_rdma *xprt,
371 struct rpcrdma_msg *rdma_argp, 385 struct rpcrdma_write_array *rp_ary,
372 struct rpcrdma_msg *rdma_resp, 386 struct rpcrdma_msg *rdma_resp,
373 struct svc_rqst *rqstp, 387 struct svc_rqst *rqstp,
374 struct svc_rdma_req_map *vec) 388 struct svc_rdma_req_map *vec)
@@ -380,25 +394,21 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
380 int chunk_off; 394 int chunk_off;
381 int nchunks; 395 int nchunks;
382 struct rpcrdma_segment *ch; 396 struct rpcrdma_segment *ch;
383 struct rpcrdma_write_array *arg_ary;
384 struct rpcrdma_write_array *res_ary; 397 struct rpcrdma_write_array *res_ary;
385 int ret; 398 int ret;
386 399
387 arg_ary = svc_rdma_get_reply_array(rdma_argp);
388 if (!arg_ary)
389 return 0;
390 /* XXX: need to fix when reply lists occur with read-list and or 400 /* XXX: need to fix when reply lists occur with read-list and or
391 * write-list */ 401 * write-list */
392 res_ary = (struct rpcrdma_write_array *) 402 res_ary = (struct rpcrdma_write_array *)
393 &rdma_resp->rm_body.rm_chunks[2]; 403 &rdma_resp->rm_body.rm_chunks[2];
394 404
395 /* xdr offset starts at RPC message */ 405 /* xdr offset starts at RPC message */
396 nchunks = be32_to_cpu(arg_ary->wc_nchunks); 406 nchunks = be32_to_cpu(rp_ary->wc_nchunks);
397 for (xdr_off = 0, chunk_no = 0; 407 for (xdr_off = 0, chunk_no = 0;
398 xfer_len && chunk_no < nchunks; 408 xfer_len && chunk_no < nchunks;
399 chunk_no++) { 409 chunk_no++) {
400 u64 rs_offset; 410 u64 rs_offset;
401 ch = &arg_ary->wc_array[chunk_no].wc_target; 411 ch = &rp_ary->wc_array[chunk_no].wc_target;
402 write_len = min(xfer_len, be32_to_cpu(ch->rs_length)); 412 write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
403 413
404 /* Prepare the reply chunk given the length actually 414 /* Prepare the reply chunk given the length actually
@@ -415,11 +425,8 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
415 xdr_off, 425 xdr_off,
416 write_len, 426 write_len,
417 vec); 427 vec);
418 if (ret <= 0) { 428 if (ret <= 0)
419 dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n", 429 goto out_err;
420 ret);
421 return -EIO;
422 }
423 chunk_off += ret; 430 chunk_off += ret;
424 xdr_off += ret; 431 xdr_off += ret;
425 xfer_len -= ret; 432 xfer_len -= ret;
@@ -430,6 +437,10 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
430 svc_rdma_xdr_encode_reply_array(res_ary, chunk_no); 437 svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
431 438
432 return rqstp->rq_res.len; 439 return rqstp->rq_res.len;
440
441out_err:
442 pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
443 return -EIO;
433} 444}
434 445
435/* This function prepares the portion of the RPCRDMA message to be 446/* This function prepares the portion of the RPCRDMA message to be
@@ -464,13 +475,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
464 int pages; 475 int pages;
465 int ret; 476 int ret;
466 477
467 /* Post a recv buffer to handle another request. */ 478 ret = svc_rdma_repost_recv(rdma, GFP_KERNEL);
468 ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
469 if (ret) { 479 if (ret) {
470 printk(KERN_INFO
471 "svcrdma: could not post a receive buffer, err=%d."
472 "Closing transport %p.\n", ret, rdma);
473 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
474 svc_rdma_put_context(ctxt, 0); 480 svc_rdma_put_context(ctxt, 0);
475 return -ENOTCONN; 481 return -ENOTCONN;
476 } 482 }
@@ -543,8 +549,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
543 goto err; 549 goto err;
544 } 550 }
545 memset(&send_wr, 0, sizeof send_wr); 551 memset(&send_wr, 0, sizeof send_wr);
546 ctxt->wr_op = IB_WR_SEND; 552 ctxt->cqe.done = svc_rdma_wc_send;
547 send_wr.wr_id = (unsigned long)ctxt; 553 send_wr.wr_cqe = &ctxt->cqe;
548 send_wr.sg_list = ctxt->sge; 554 send_wr.sg_list = ctxt->sge;
549 send_wr.num_sge = sge_no; 555 send_wr.num_sge = sge_no;
550 send_wr.opcode = IB_WR_SEND; 556 send_wr.opcode = IB_WR_SEND;
@@ -559,6 +565,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
559 err: 565 err:
560 svc_rdma_unmap_dma(ctxt); 566 svc_rdma_unmap_dma(ctxt);
561 svc_rdma_put_context(ctxt, 1); 567 svc_rdma_put_context(ctxt, 1);
568 pr_err("svcrdma: failed to send reply, rc=%d\n", ret);
562 return -EIO; 569 return -EIO;
563} 570}
564 571
@@ -573,7 +580,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
573 container_of(xprt, struct svcxprt_rdma, sc_xprt); 580 container_of(xprt, struct svcxprt_rdma, sc_xprt);
574 struct rpcrdma_msg *rdma_argp; 581 struct rpcrdma_msg *rdma_argp;
575 struct rpcrdma_msg *rdma_resp; 582 struct rpcrdma_msg *rdma_resp;
576 struct rpcrdma_write_array *reply_ary; 583 struct rpcrdma_write_array *wr_ary, *rp_ary;
577 enum rpcrdma_proc reply_type; 584 enum rpcrdma_proc reply_type;
578 int ret; 585 int ret;
579 int inline_bytes; 586 int inline_bytes;
@@ -587,12 +594,14 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
587 * places this at the start of page 0. 594 * places this at the start of page 0.
588 */ 595 */
589 rdma_argp = page_address(rqstp->rq_pages[0]); 596 rdma_argp = page_address(rqstp->rq_pages[0]);
597 wr_ary = svc_rdma_get_write_array(rdma_argp);
598 rp_ary = svc_rdma_get_reply_array(rdma_argp, wr_ary);
590 599
591 /* Build an req vec for the XDR */ 600 /* Build an req vec for the XDR */
592 ctxt = svc_rdma_get_context(rdma); 601 ctxt = svc_rdma_get_context(rdma);
593 ctxt->direction = DMA_TO_DEVICE; 602 ctxt->direction = DMA_TO_DEVICE;
594 vec = svc_rdma_get_req_map(rdma); 603 vec = svc_rdma_get_req_map(rdma);
595 ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec); 604 ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
596 if (ret) 605 if (ret)
597 goto err0; 606 goto err0;
598 inline_bytes = rqstp->rq_res.len; 607 inline_bytes = rqstp->rq_res.len;
@@ -603,8 +612,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
603 if (!res_page) 612 if (!res_page)
604 goto err0; 613 goto err0;
605 rdma_resp = page_address(res_page); 614 rdma_resp = page_address(res_page);
606 reply_ary = svc_rdma_get_reply_array(rdma_argp); 615 if (rp_ary)
607 if (reply_ary)
608 reply_type = RDMA_NOMSG; 616 reply_type = RDMA_NOMSG;
609 else 617 else
610 reply_type = RDMA_MSG; 618 reply_type = RDMA_MSG;
@@ -612,27 +620,26 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
612 rdma_resp, reply_type); 620 rdma_resp, reply_type);
613 621
614 /* Send any write-chunk data and build resp write-list */ 622 /* Send any write-chunk data and build resp write-list */
615 ret = send_write_chunks(rdma, rdma_argp, rdma_resp, 623 if (wr_ary) {
616 rqstp, vec); 624 ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec);
617 if (ret < 0) { 625 if (ret < 0)
618 printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n", 626 goto err1;
619 ret); 627 inline_bytes -= ret + xdr_padsize(ret);
620 goto err1;
621 } 628 }
622 inline_bytes -= ret;
623 629
624 /* Send any reply-list data and update resp reply-list */ 630 /* Send any reply-list data and update resp reply-list */
625 ret = send_reply_chunks(rdma, rdma_argp, rdma_resp, 631 if (rp_ary) {
626 rqstp, vec); 632 ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
627 if (ret < 0) { 633 if (ret < 0)
628 printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n", 634 goto err1;
629 ret); 635 inline_bytes -= ret;
630 goto err1;
631 } 636 }
632 inline_bytes -= ret;
633 637
634 ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec, 638 ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec,
635 inline_bytes); 639 inline_bytes);
640 if (ret < 0)
641 goto err1;
642
636 svc_rdma_put_req_map(rdma, vec); 643 svc_rdma_put_req_map(rdma, vec);
637 dprintk("svcrdma: send_reply returns %d\n", ret); 644 dprintk("svcrdma: send_reply returns %d\n", ret);
638 return ret; 645 return ret;
@@ -642,5 +649,68 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
642 err0: 649 err0:
643 svc_rdma_put_req_map(rdma, vec); 650 svc_rdma_put_req_map(rdma, vec);
644 svc_rdma_put_context(ctxt, 0); 651 svc_rdma_put_context(ctxt, 0);
645 return ret; 652 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
653 return -ENOTCONN;
654}
655
656void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
657 int status)
658{
659 struct ib_send_wr err_wr;
660 struct page *p;
661 struct svc_rdma_op_ctxt *ctxt;
662 enum rpcrdma_errcode err;
663 __be32 *va;
664 int length;
665 int ret;
666
667 ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
668 if (ret)
669 return;
670
671 p = alloc_page(GFP_KERNEL);
672 if (!p)
673 return;
674 va = page_address(p);
675
676 /* XDR encode an error reply */
677 err = ERR_CHUNK;
678 if (status == -EPROTONOSUPPORT)
679 err = ERR_VERS;
680 length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
681
682 ctxt = svc_rdma_get_context(xprt);
683 ctxt->direction = DMA_TO_DEVICE;
684 ctxt->count = 1;
685 ctxt->pages[0] = p;
686
687 /* Prepare SGE for local address */
688 ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
689 ctxt->sge[0].length = length;
690 ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
691 p, 0, length, DMA_TO_DEVICE);
692 if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
693 dprintk("svcrdma: Error mapping buffer for protocol error\n");
694 svc_rdma_put_context(ctxt, 1);
695 return;
696 }
697 atomic_inc(&xprt->sc_dma_used);
698
699 /* Prepare SEND WR */
700 memset(&err_wr, 0, sizeof(err_wr));
701 ctxt->cqe.done = svc_rdma_wc_send;
702 err_wr.wr_cqe = &ctxt->cqe;
703 err_wr.sg_list = ctxt->sge;
704 err_wr.num_sge = 1;
705 err_wr.opcode = IB_WR_SEND;
706 err_wr.send_flags = IB_SEND_SIGNALED;
707
708 /* Post It */
709 ret = svc_rdma_send(xprt, &err_wr);
710 if (ret) {
711 dprintk("svcrdma: Error %d posting send for protocol error\n",
712 ret);
713 svc_rdma_unmap_dma(ctxt);
714 svc_rdma_put_context(ctxt, 1);
715 }
646} 716}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 5763825d09bf..90668969d559 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -63,17 +63,10 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
63 int flags); 63 int flags);
64static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 64static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
65static void svc_rdma_release_rqst(struct svc_rqst *); 65static void svc_rdma_release_rqst(struct svc_rqst *);
66static void dto_tasklet_func(unsigned long data);
67static void svc_rdma_detach(struct svc_xprt *xprt); 66static void svc_rdma_detach(struct svc_xprt *xprt);
68static void svc_rdma_free(struct svc_xprt *xprt); 67static void svc_rdma_free(struct svc_xprt *xprt);
69static int svc_rdma_has_wspace(struct svc_xprt *xprt); 68static int svc_rdma_has_wspace(struct svc_xprt *xprt);
70static int svc_rdma_secure_port(struct svc_rqst *); 69static int svc_rdma_secure_port(struct svc_rqst *);
71static void rq_cq_reap(struct svcxprt_rdma *xprt);
72static void sq_cq_reap(struct svcxprt_rdma *xprt);
73
74static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
75static DEFINE_SPINLOCK(dto_lock);
76static LIST_HEAD(dto_xprt_q);
77 70
78static struct svc_xprt_ops svc_rdma_ops = { 71static struct svc_xprt_ops svc_rdma_ops = {
79 .xpo_create = svc_rdma_create, 72 .xpo_create = svc_rdma_create,
@@ -352,15 +345,6 @@ static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
352 } 345 }
353} 346}
354 347
355/* ib_cq event handler */
356static void cq_event_handler(struct ib_event *event, void *context)
357{
358 struct svc_xprt *xprt = context;
359 dprintk("svcrdma: received CQ event %s (%d), context=%p\n",
360 ib_event_msg(event->event), event->event, context);
361 set_bit(XPT_CLOSE, &xprt->xpt_flags);
362}
363
364/* QP event handler */ 348/* QP event handler */
365static void qp_event_handler(struct ib_event *event, void *context) 349static void qp_event_handler(struct ib_event *event, void *context)
366{ 350{
@@ -392,251 +376,171 @@ static void qp_event_handler(struct ib_event *event, void *context)
392 } 376 }
393} 377}
394 378
395/* 379/**
396 * Data Transfer Operation Tasklet 380 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
381 * @cq: completion queue
382 * @wc: completed WR
397 * 383 *
398 * Walks a list of transports with I/O pending, removing entries as
399 * they are added to the server's I/O pending list. Two bits indicate
400 * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
401 * spinlock that serializes access to the transport list with the RQ
402 * and SQ interrupt handlers.
403 */ 384 */
404static void dto_tasklet_func(unsigned long data) 385static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
405{ 386{
406 struct svcxprt_rdma *xprt; 387 struct svcxprt_rdma *xprt = cq->cq_context;
407 unsigned long flags; 388 struct ib_cqe *cqe = wc->wr_cqe;
389 struct svc_rdma_op_ctxt *ctxt;
408 390
409 spin_lock_irqsave(&dto_lock, flags); 391 /* WARNING: Only wc->wr_cqe and wc->status are reliable */
410 while (!list_empty(&dto_xprt_q)) { 392 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
411 xprt = list_entry(dto_xprt_q.next, 393 ctxt->wc_status = wc->status;
412 struct svcxprt_rdma, sc_dto_q); 394 svc_rdma_unmap_dma(ctxt);
413 list_del_init(&xprt->sc_dto_q);
414 spin_unlock_irqrestore(&dto_lock, flags);
415 395
416 rq_cq_reap(xprt); 396 if (wc->status != IB_WC_SUCCESS)
417 sq_cq_reap(xprt); 397 goto flushed;
418 398
419 svc_xprt_put(&xprt->sc_xprt); 399 /* All wc fields are now known to be valid */
420 spin_lock_irqsave(&dto_lock, flags); 400 ctxt->byte_len = wc->byte_len;
421 } 401 spin_lock(&xprt->sc_rq_dto_lock);
422 spin_unlock_irqrestore(&dto_lock, flags); 402 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
403 spin_unlock(&xprt->sc_rq_dto_lock);
404
405 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
406 if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
407 goto out;
408 svc_xprt_enqueue(&xprt->sc_xprt);
409 goto out;
410
411flushed:
412 if (wc->status != IB_WC_WR_FLUSH_ERR)
413 pr_warn("svcrdma: receive: %s (%u/0x%x)\n",
414 ib_wc_status_msg(wc->status),
415 wc->status, wc->vendor_err);
416 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
417 svc_rdma_put_context(ctxt, 1);
418
419out:
420 svc_xprt_put(&xprt->sc_xprt);
423} 421}
424 422
425/* 423static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
426 * Receive Queue Completion Handler 424 struct ib_wc *wc,
427 * 425 const char *opname)
428 * Since an RQ completion handler is called on interrupt context, we
429 * need to defer the handling of the I/O to a tasklet
430 */
431static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
432{ 426{
433 struct svcxprt_rdma *xprt = cq_context; 427 if (wc->status != IB_WC_SUCCESS)
434 unsigned long flags; 428 goto err;
435
436 /* Guard against unconditional flush call for destroyed QP */
437 if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
438 return;
439 429
440 /* 430out:
441 * Set the bit regardless of whether or not it's on the list 431 atomic_dec(&xprt->sc_sq_count);
442 * because it may be on the list already due to an SQ 432 wake_up(&xprt->sc_send_wait);
443 * completion. 433 return;
444 */ 434
445 set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags); 435err:
436 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
437 if (wc->status != IB_WC_WR_FLUSH_ERR)
438 pr_err("svcrdma: %s: %s (%u/0x%x)\n",
439 opname, ib_wc_status_msg(wc->status),
440 wc->status, wc->vendor_err);
441 goto out;
442}
446 443
447 /* 444static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
448 * If this transport is not already on the DTO transport queue, 445 const char *opname)
449 * add it 446{
450 */ 447 struct svcxprt_rdma *xprt = cq->cq_context;
451 spin_lock_irqsave(&dto_lock, flags);
452 if (list_empty(&xprt->sc_dto_q)) {
453 svc_xprt_get(&xprt->sc_xprt);
454 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
455 }
456 spin_unlock_irqrestore(&dto_lock, flags);
457 448
458 /* Tasklet does all the work to avoid irqsave locks. */ 449 svc_rdma_send_wc_common(xprt, wc, opname);
459 tasklet_schedule(&dto_tasklet); 450 svc_xprt_put(&xprt->sc_xprt);
460} 451}
461 452
462/* 453/**
463 * rq_cq_reap - Process the RQ CQ. 454 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
464 * 455 * @cq: completion queue
465 * Take all completing WC off the CQE and enqueue the associated DTO 456 * @wc: completed WR
466 * context on the dto_q for the transport.
467 * 457 *
468 * Note that caller must hold a transport reference.
469 */ 458 */
470static void rq_cq_reap(struct svcxprt_rdma *xprt) 459void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
471{ 460{
472 int ret; 461 struct ib_cqe *cqe = wc->wr_cqe;
473 struct ib_wc wc; 462 struct svc_rdma_op_ctxt *ctxt;
474 struct svc_rdma_op_ctxt *ctxt = NULL;
475 463
476 if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) 464 svc_rdma_send_wc_common_put(cq, wc, "send");
477 return;
478 465
479 ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); 466 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
480 atomic_inc(&rdma_stat_rq_poll); 467 svc_rdma_unmap_dma(ctxt);
468 svc_rdma_put_context(ctxt, 1);
469}
481 470
482 while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) { 471/**
483 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 472 * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC
484 ctxt->wc_status = wc.status; 473 * @cq: completion queue
485 ctxt->byte_len = wc.byte_len; 474 * @wc: completed WR
486 svc_rdma_unmap_dma(ctxt); 475 *
487 if (wc.status != IB_WC_SUCCESS) { 476 */
488 /* Close the transport */ 477void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc)
489 dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt); 478{
490 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 479 struct ib_cqe *cqe = wc->wr_cqe;
491 svc_rdma_put_context(ctxt, 1); 480 struct svc_rdma_op_ctxt *ctxt;
492 svc_xprt_put(&xprt->sc_xprt);
493 continue;
494 }
495 spin_lock_bh(&xprt->sc_rq_dto_lock);
496 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
497 spin_unlock_bh(&xprt->sc_rq_dto_lock);
498 svc_xprt_put(&xprt->sc_xprt);
499 }
500 481
501 if (ctxt) 482 svc_rdma_send_wc_common_put(cq, wc, "write");
502 atomic_inc(&rdma_stat_rq_prod);
503 483
504 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 484 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
505 /* 485 svc_rdma_unmap_dma(ctxt);
506 * If data arrived before established event, 486 svc_rdma_put_context(ctxt, 0);
507 * don't enqueue. This defers RPC I/O until the
508 * RDMA connection is complete.
509 */
510 if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
511 svc_xprt_enqueue(&xprt->sc_xprt);
512} 487}
513 488
514/* 489/**
515 * Process a completion context 490 * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
491 * @cq: completion queue
492 * @wc: completed WR
493 *
516 */ 494 */
517static void process_context(struct svcxprt_rdma *xprt, 495void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc)
518 struct svc_rdma_op_ctxt *ctxt)
519{ 496{
520 struct svc_rdma_op_ctxt *read_hdr; 497 svc_rdma_send_wc_common_put(cq, wc, "fastreg");
521 int free_pages = 0; 498}
522
523 svc_rdma_unmap_dma(ctxt);
524 499
525 switch (ctxt->wr_op) { 500/**
526 case IB_WR_SEND: 501 * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC
527 free_pages = 1; 502 * @cq: completion queue
528 break; 503 * @wc: completed WR
504 *
505 */
506void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc)
507{
508 struct svcxprt_rdma *xprt = cq->cq_context;
509 struct ib_cqe *cqe = wc->wr_cqe;
510 struct svc_rdma_op_ctxt *ctxt;
529 511
530 case IB_WR_RDMA_WRITE: 512 svc_rdma_send_wc_common(xprt, wc, "read");
531 break;
532 513
533 case IB_WR_RDMA_READ: 514 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
534 case IB_WR_RDMA_READ_WITH_INV: 515 svc_rdma_unmap_dma(ctxt);
535 svc_rdma_put_frmr(xprt, ctxt->frmr); 516 svc_rdma_put_frmr(xprt, ctxt->frmr);
536 517
537 if (!test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) 518 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
538 break; 519 struct svc_rdma_op_ctxt *read_hdr;
539 520
540 read_hdr = ctxt->read_hdr; 521 read_hdr = ctxt->read_hdr;
541 svc_rdma_put_context(ctxt, 0); 522 spin_lock(&xprt->sc_rq_dto_lock);
542
543 spin_lock_bh(&xprt->sc_rq_dto_lock);
544 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
545 list_add_tail(&read_hdr->dto_q, 523 list_add_tail(&read_hdr->dto_q,
546 &xprt->sc_read_complete_q); 524 &xprt->sc_read_complete_q);
547 spin_unlock_bh(&xprt->sc_rq_dto_lock); 525 spin_unlock(&xprt->sc_rq_dto_lock);
548 svc_xprt_enqueue(&xprt->sc_xprt);
549 return;
550 526
551 default: 527 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
552 dprintk("svcrdma: unexpected completion opcode=%d\n", 528 svc_xprt_enqueue(&xprt->sc_xprt);
553 ctxt->wr_op);
554 break;
555 } 529 }
556 530
557 svc_rdma_put_context(ctxt, free_pages); 531 svc_rdma_put_context(ctxt, 0);
532 svc_xprt_put(&xprt->sc_xprt);
558} 533}
559 534
560/* 535/**
561 * Send Queue Completion Handler - potentially called on interrupt context. 536 * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC
537 * @cq: completion queue
538 * @wc: completed WR
562 * 539 *
563 * Note that caller must hold a transport reference.
564 */ 540 */
565static void sq_cq_reap(struct svcxprt_rdma *xprt) 541void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc)
566{
567 struct svc_rdma_op_ctxt *ctxt = NULL;
568 struct ib_wc wc_a[6];
569 struct ib_wc *wc;
570 struct ib_cq *cq = xprt->sc_sq_cq;
571 int ret;
572
573 memset(wc_a, 0, sizeof(wc_a));
574
575 if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
576 return;
577
578 ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
579 atomic_inc(&rdma_stat_sq_poll);
580 while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
581 int i;
582
583 for (i = 0; i < ret; i++) {
584 wc = &wc_a[i];
585 if (wc->status != IB_WC_SUCCESS) {
586 dprintk("svcrdma: sq wc err status %s (%d)\n",
587 ib_wc_status_msg(wc->status),
588 wc->status);
589
590 /* Close the transport */
591 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
592 }
593
594 /* Decrement used SQ WR count */
595 atomic_dec(&xprt->sc_sq_count);
596 wake_up(&xprt->sc_send_wait);
597
598 ctxt = (struct svc_rdma_op_ctxt *)
599 (unsigned long)wc->wr_id;
600 if (ctxt)
601 process_context(xprt, ctxt);
602
603 svc_xprt_put(&xprt->sc_xprt);
604 }
605 }
606
607 if (ctxt)
608 atomic_inc(&rdma_stat_sq_prod);
609}
610
611static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
612{ 542{
613 struct svcxprt_rdma *xprt = cq_context; 543 svc_rdma_send_wc_common_put(cq, wc, "localInv");
614 unsigned long flags;
615
616 /* Guard against unconditional flush call for destroyed QP */
617 if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
618 return;
619
620 /*
621 * Set the bit regardless of whether or not it's on the list
622 * because it may be on the list already due to an RQ
623 * completion.
624 */
625 set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
626
627 /*
628 * If this transport is not already on the DTO transport queue,
629 * add it
630 */
631 spin_lock_irqsave(&dto_lock, flags);
632 if (list_empty(&xprt->sc_dto_q)) {
633 svc_xprt_get(&xprt->sc_xprt);
634 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
635 }
636 spin_unlock_irqrestore(&dto_lock, flags);
637
638 /* Tasklet does all the work to avoid irqsave locks. */
639 tasklet_schedule(&dto_tasklet);
640} 544}
641 545
642static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 546static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
@@ -681,6 +585,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
681 ctxt = svc_rdma_get_context(xprt); 585 ctxt = svc_rdma_get_context(xprt);
682 buflen = 0; 586 buflen = 0;
683 ctxt->direction = DMA_FROM_DEVICE; 587 ctxt->direction = DMA_FROM_DEVICE;
588 ctxt->cqe.done = svc_rdma_wc_receive;
684 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { 589 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
685 if (sge_no >= xprt->sc_max_sge) { 590 if (sge_no >= xprt->sc_max_sge) {
686 pr_err("svcrdma: Too many sges (%d)\n", sge_no); 591 pr_err("svcrdma: Too many sges (%d)\n", sge_no);
@@ -705,7 +610,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
705 recv_wr.next = NULL; 610 recv_wr.next = NULL;
706 recv_wr.sg_list = &ctxt->sge[0]; 611 recv_wr.sg_list = &ctxt->sge[0];
707 recv_wr.num_sge = ctxt->count; 612 recv_wr.num_sge = ctxt->count;
708 recv_wr.wr_id = (u64)(unsigned long)ctxt; 613 recv_wr.wr_cqe = &ctxt->cqe;
709 614
710 svc_xprt_get(&xprt->sc_xprt); 615 svc_xprt_get(&xprt->sc_xprt);
711 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 616 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
@@ -722,6 +627,21 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
722 return -ENOMEM; 627 return -ENOMEM;
723} 628}
724 629
630int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags)
631{
632 int ret = 0;
633
634 ret = svc_rdma_post_recv(xprt, flags);
635 if (ret) {
636 pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
637 ret);
638 pr_err("svcrdma: closing transport %p.\n", xprt);
639 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
640 ret = -ENOTCONN;
641 }
642 return ret;
643}
644
725/* 645/*
726 * This function handles the CONNECT_REQUEST event on a listening 646 * This function handles the CONNECT_REQUEST event on a listening
727 * endpoint. It is passed the cma_id for the _new_ connection. The context in 647 * endpoint. It is passed the cma_id for the _new_ connection. The context in
@@ -1011,7 +931,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
1011 struct svcxprt_rdma *listen_rdma; 931 struct svcxprt_rdma *listen_rdma;
1012 struct svcxprt_rdma *newxprt = NULL; 932 struct svcxprt_rdma *newxprt = NULL;
1013 struct rdma_conn_param conn_param; 933 struct rdma_conn_param conn_param;
1014 struct ib_cq_init_attr cq_attr = {};
1015 struct ib_qp_init_attr qp_attr; 934 struct ib_qp_init_attr qp_attr;
1016 struct ib_device *dev; 935 struct ib_device *dev;
1017 unsigned int i; 936 unsigned int i;
@@ -1069,22 +988,14 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
1069 dprintk("svcrdma: error creating PD for connect request\n"); 988 dprintk("svcrdma: error creating PD for connect request\n");
1070 goto errout; 989 goto errout;
1071 } 990 }
1072 cq_attr.cqe = newxprt->sc_sq_depth; 991 newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
1073 newxprt->sc_sq_cq = ib_create_cq(dev, 992 0, IB_POLL_SOFTIRQ);
1074 sq_comp_handler,
1075 cq_event_handler,
1076 newxprt,
1077 &cq_attr);
1078 if (IS_ERR(newxprt->sc_sq_cq)) { 993 if (IS_ERR(newxprt->sc_sq_cq)) {
1079 dprintk("svcrdma: error creating SQ CQ for connect request\n"); 994 dprintk("svcrdma: error creating SQ CQ for connect request\n");
1080 goto errout; 995 goto errout;
1081 } 996 }
1082 cq_attr.cqe = newxprt->sc_rq_depth; 997 newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth,
1083 newxprt->sc_rq_cq = ib_create_cq(dev, 998 0, IB_POLL_SOFTIRQ);
1084 rq_comp_handler,
1085 cq_event_handler,
1086 newxprt,
1087 &cq_attr);
1088 if (IS_ERR(newxprt->sc_rq_cq)) { 999 if (IS_ERR(newxprt->sc_rq_cq)) {
1089 dprintk("svcrdma: error creating RQ CQ for connect request\n"); 1000 dprintk("svcrdma: error creating RQ CQ for connect request\n");
1090 goto errout; 1001 goto errout;
@@ -1173,13 +1084,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
1173 /* Swap out the handler */ 1084 /* Swap out the handler */
1174 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 1085 newxprt->sc_cm_id->event_handler = rdma_cma_handler;
1175 1086
1176 /*
1177 * Arm the CQs for the SQ and RQ before accepting so we can't
1178 * miss the first message
1179 */
1180 ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
1181 ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
1182
1183 /* Accept Connection */ 1087 /* Accept Connection */
1184 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 1088 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
1185 memset(&conn_param, 0, sizeof conn_param); 1089 memset(&conn_param, 0, sizeof conn_param);
@@ -1319,10 +1223,10 @@ static void __svc_rdma_free(struct work_struct *work)
1319 ib_destroy_qp(rdma->sc_qp); 1223 ib_destroy_qp(rdma->sc_qp);
1320 1224
1321 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 1225 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
1322 ib_destroy_cq(rdma->sc_sq_cq); 1226 ib_free_cq(rdma->sc_sq_cq);
1323 1227
1324 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 1228 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
1325 ib_destroy_cq(rdma->sc_rq_cq); 1229 ib_free_cq(rdma->sc_rq_cq);
1326 1230
1327 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 1231 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
1328 ib_dealloc_pd(rdma->sc_pd); 1232 ib_dealloc_pd(rdma->sc_pd);
@@ -1383,9 +1287,6 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1383 spin_unlock_bh(&xprt->sc_lock); 1287 spin_unlock_bh(&xprt->sc_lock);
1384 atomic_inc(&rdma_stat_sq_starve); 1288 atomic_inc(&rdma_stat_sq_starve);
1385 1289
1386 /* See if we can opportunistically reap SQ WR to make room */
1387 sq_cq_reap(xprt);
1388
1389 /* Wait until SQ WR available if SQ still full */ 1290 /* Wait until SQ WR available if SQ still full */
1390 wait_event(xprt->sc_send_wait, 1291 wait_event(xprt->sc_send_wait,
1391 atomic_read(&xprt->sc_sq_count) < 1292 atomic_read(&xprt->sc_sq_count) <
@@ -1418,57 +1319,3 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1418 } 1319 }
1419 return ret; 1320 return ret;
1420} 1321}
1421
1422void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
1423 enum rpcrdma_errcode err)
1424{
1425 struct ib_send_wr err_wr;
1426 struct page *p;
1427 struct svc_rdma_op_ctxt *ctxt;
1428 __be32 *va;
1429 int length;
1430 int ret;
1431
1432 p = alloc_page(GFP_KERNEL);
1433 if (!p)
1434 return;
1435 va = page_address(p);
1436
1437 /* XDR encode error */
1438 length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
1439
1440 ctxt = svc_rdma_get_context(xprt);
1441 ctxt->direction = DMA_FROM_DEVICE;
1442 ctxt->count = 1;
1443 ctxt->pages[0] = p;
1444
1445 /* Prepare SGE for local address */
1446 ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
1447 p, 0, length, DMA_FROM_DEVICE);
1448 if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
1449 put_page(p);
1450 svc_rdma_put_context(ctxt, 1);
1451 return;
1452 }
1453 atomic_inc(&xprt->sc_dma_used);
1454 ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
1455 ctxt->sge[0].length = length;
1456
1457 /* Prepare SEND WR */
1458 memset(&err_wr, 0, sizeof err_wr);
1459 ctxt->wr_op = IB_WR_SEND;
1460 err_wr.wr_id = (unsigned long)ctxt;
1461 err_wr.sg_list = ctxt->sge;
1462 err_wr.num_sge = 1;
1463 err_wr.opcode = IB_WR_SEND;
1464 err_wr.send_flags = IB_SEND_SIGNALED;
1465
1466 /* Post It */
1467 ret = svc_rdma_send(xprt, &err_wr);
1468 if (ret) {
1469 dprintk("svcrdma: Error %d posting send for protocol error\n",
1470 ret);
1471 svc_rdma_unmap_dma(ctxt);
1472 svc_rdma_put_context(ctxt, 1);
1473 }
1474}