aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTrond Myklebust <trond.myklebust@primarydata.com>2017-09-05 15:16:04 -0400
committerTrond Myklebust <trond.myklebust@primarydata.com>2017-09-05 15:16:04 -0400
commitf9773b22a27a4234f436c9570afd62d905e00a13 (patch)
tree1dd82dd838ffc7277f281b57d0e940dc970cd19f
parent7af7a5963c40d8ed853d1004701c73a221d94644 (diff)
parent67af6f652f9ccad772c48f7c959ad5aa23bdfb40 (diff)
Merge tag 'nfs-rdma-for-4.14-1' of git://git.linux-nfs.org/projects/anna/linux-nfs into linux-next
NFS-over-RDMA client updates for Linux 4.14 Bugfixes and cleanups: - Constify rpc_xprt_ops - Harden RPC call encoding and decoding - Clean up rpc call decoding to use xdr_streams - Remove unused variables from various structures - Refactor code to remove imul instructions - Rearrange rx_stats structure for better cacheline sharing
-rw-r--r--include/linux/sunrpc/xdr.h13
-rw-r--r--include/linux/sunrpc/xprt.h2
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c71
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c10
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c12
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c844
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c2
-rw-r--r--net/sunrpc/xprtrdma/transport.c7
-rw-r--r--net/sunrpc/xprtrdma/verbs.c21
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h33
-rw-r--r--net/sunrpc/xprtsock.c8
11 files changed, 597 insertions, 426 deletions
diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
index 261b48a2701d..86b59e3525a5 100644
--- a/include/linux/sunrpc/xdr.h
+++ b/include/linux/sunrpc/xdr.h
@@ -239,6 +239,19 @@ extern unsigned int xdr_read_pages(struct xdr_stream *xdr, unsigned int len);
239extern void xdr_enter_page(struct xdr_stream *xdr, unsigned int len); 239extern void xdr_enter_page(struct xdr_stream *xdr, unsigned int len);
240extern int xdr_process_buf(struct xdr_buf *buf, unsigned int offset, unsigned int len, int (*actor)(struct scatterlist *, void *), void *data); 240extern int xdr_process_buf(struct xdr_buf *buf, unsigned int offset, unsigned int len, int (*actor)(struct scatterlist *, void *), void *data);
241 241
242/**
243 * xdr_stream_remaining - Return the number of bytes remaining in the stream
244 * @xdr: pointer to struct xdr_stream
245 *
246 * Return value:
247 * Number of bytes remaining in @xdr before xdr->end
248 */
249static inline size_t
250xdr_stream_remaining(const struct xdr_stream *xdr)
251{
252 return xdr->nwords << 2;
253}
254
242ssize_t xdr_stream_decode_string_dup(struct xdr_stream *xdr, char **str, 255ssize_t xdr_stream_decode_string_dup(struct xdr_stream *xdr, char **str,
243 size_t maxlen, gfp_t gfp_flags); 256 size_t maxlen, gfp_t gfp_flags);
244/** 257/**
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index a97e6de5f9f2..5a7bff41f6b7 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -174,7 +174,7 @@ enum xprt_transports {
174 174
175struct rpc_xprt { 175struct rpc_xprt {
176 struct kref kref; /* Reference count */ 176 struct kref kref; /* Reference count */
177 struct rpc_xprt_ops * ops; /* transport methods */ 177 const struct rpc_xprt_ops *ops; /* transport methods */
178 178
179 const struct rpc_timeout *timeout; /* timeout parms */ 179 const struct rpc_timeout *timeout; /* timeout parms */
180 struct sockaddr_storage addr; /* server address */ 180 struct sockaddr_storage addr; /* server address */
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 03f6b5840764..d31d0ac5ada9 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -49,6 +49,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
49 if (IS_ERR(rb)) 49 if (IS_ERR(rb))
50 goto out_fail; 50 goto out_fail;
51 req->rl_rdmabuf = rb; 51 req->rl_rdmabuf = rb;
52 xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
52 53
53 size = r_xprt->rx_data.inline_rsize; 54 size = r_xprt->rx_data.inline_rsize;
54 rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL); 55 rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
@@ -202,20 +203,24 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
202 */ 203 */
203int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) 204int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
204{ 205{
205 struct rpc_xprt *xprt = rqst->rq_xprt; 206 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
206 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
207 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 207 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
208 struct rpcrdma_msg *headerp; 208 __be32 *p;
209 209
210 headerp = rdmab_to_msg(req->rl_rdmabuf); 210 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
211 headerp->rm_xid = rqst->rq_xid; 211 xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf,
212 headerp->rm_vers = rpcrdma_version; 212 req->rl_rdmabuf->rg_base);
213 headerp->rm_credit = 213
214 cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests); 214 p = xdr_reserve_space(&req->rl_stream, 28);
215 headerp->rm_type = rdma_msg; 215 if (unlikely(!p))
216 headerp->rm_body.rm_chunks[0] = xdr_zero; 216 return -EIO;
217 headerp->rm_body.rm_chunks[1] = xdr_zero; 217 *p++ = rqst->rq_xid;
218 headerp->rm_body.rm_chunks[2] = xdr_zero; 218 *p++ = rpcrdma_version;
219 *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
220 *p++ = rdma_msg;
221 *p++ = xdr_zero;
222 *p++ = xdr_zero;
223 *p = xdr_zero;
219 224
220 if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN, 225 if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN,
221 &rqst->rq_snd_buf, rpcrdma_noch)) 226 &rqst->rq_snd_buf, rpcrdma_noch))
@@ -271,9 +276,6 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
271 * @xprt: transport receiving the call 276 * @xprt: transport receiving the call
272 * @rep: receive buffer containing the call 277 * @rep: receive buffer containing the call
273 * 278 *
274 * Called in the RPC reply handler, which runs in a tasklet.
275 * Be quick about it.
276 *
277 * Operational assumptions: 279 * Operational assumptions:
278 * o Backchannel credits are ignored, just as the NFS server 280 * o Backchannel credits are ignored, just as the NFS server
279 * forechannel currently does 281 * forechannel currently does
@@ -284,7 +286,6 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
284 struct rpcrdma_rep *rep) 286 struct rpcrdma_rep *rep)
285{ 287{
286 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 288 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
287 struct rpcrdma_msg *headerp;
288 struct svc_serv *bc_serv; 289 struct svc_serv *bc_serv;
289 struct rpcrdma_req *req; 290 struct rpcrdma_req *req;
290 struct rpc_rqst *rqst; 291 struct rpc_rqst *rqst;
@@ -292,24 +293,15 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
292 size_t size; 293 size_t size;
293 __be32 *p; 294 __be32 *p;
294 295
295 headerp = rdmab_to_msg(rep->rr_rdmabuf); 296 p = xdr_inline_decode(&rep->rr_stream, 0);
297 size = xdr_stream_remaining(&rep->rr_stream);
298
296#ifdef RPCRDMA_BACKCHANNEL_DEBUG 299#ifdef RPCRDMA_BACKCHANNEL_DEBUG
297 pr_info("RPC: %s: callback XID %08x, length=%u\n", 300 pr_info("RPC: %s: callback XID %08x, length=%u\n",
298 __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len); 301 __func__, be32_to_cpup(p), size);
299 pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp); 302 pr_info("RPC: %s: %*ph\n", __func__, size, p);
300#endif 303#endif
301 304
302 /* Sanity check:
303 * Need at least enough bytes for RPC/RDMA header, as code
304 * here references the header fields by array offset. Also,
305 * backward calls are always inline, so ensure there
306 * are some bytes beyond the RPC/RDMA header.
307 */
308 if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
309 goto out_short;
310 p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
311 size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
312
313 /* Grab a free bc rqst */ 305 /* Grab a free bc rqst */
314 spin_lock(&xprt->bc_pa_lock); 306 spin_lock(&xprt->bc_pa_lock);
315 if (list_empty(&xprt->bc_pa_list)) { 307 if (list_empty(&xprt->bc_pa_list)) {
@@ -325,7 +317,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
325 /* Prepare rqst */ 317 /* Prepare rqst */
326 rqst->rq_reply_bytes_recvd = 0; 318 rqst->rq_reply_bytes_recvd = 0;
327 rqst->rq_bytes_sent = 0; 319 rqst->rq_bytes_sent = 0;
328 rqst->rq_xid = headerp->rm_xid; 320 rqst->rq_xid = *p;
329 321
330 rqst->rq_private_buf.len = size; 322 rqst->rq_private_buf.len = size;
331 set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); 323 set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
@@ -337,9 +329,9 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
337 buf->len = size; 329 buf->len = size;
338 330
339 /* The receive buffer has to be hooked to the rpcrdma_req 331 /* The receive buffer has to be hooked to the rpcrdma_req
340 * so that it can be reposted after the server is done 332 * so that it is not released while the req is pointing
341 * parsing it but just before sending the backward 333 * to its buffer, and so that it can be reposted after
342 * direction reply. 334 * the Upper Layer is done decoding it.
343 */ 335 */
344 req = rpcr_to_rdmar(rqst); 336 req = rpcr_to_rdmar(rqst);
345 dprintk("RPC: %s: attaching rep %p to req %p\n", 337 dprintk("RPC: %s: attaching rep %p to req %p\n",
@@ -367,13 +359,4 @@ out_overflow:
367 * when the connection is re-established. 359 * when the connection is re-established.
368 */ 360 */
369 return; 361 return;
370
371out_short:
372 pr_warn("RPC/RDMA short backward direction call\n");
373
374 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
375 xprt_disconnect_done(xprt);
376 else
377 pr_warn("RPC: %s: reposting rep %p\n",
378 __func__, rep);
379} 362}
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index d3f84bb1d443..6c7151341194 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -177,7 +177,7 @@ fmr_op_maxpages(struct rpcrdma_xprt *r_xprt)
177/* Use the ib_map_phys_fmr() verb to register a memory region 177/* Use the ib_map_phys_fmr() verb to register a memory region
178 * for remote access via RDMA READ or RDMA WRITE. 178 * for remote access via RDMA READ or RDMA WRITE.
179 */ 179 */
180static int 180static struct rpcrdma_mr_seg *
181fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, 181fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
182 int nsegs, bool writing, struct rpcrdma_mw **out) 182 int nsegs, bool writing, struct rpcrdma_mw **out)
183{ 183{
@@ -188,7 +188,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
188 188
189 mw = rpcrdma_get_mw(r_xprt); 189 mw = rpcrdma_get_mw(r_xprt);
190 if (!mw) 190 if (!mw)
191 return -ENOBUFS; 191 return ERR_PTR(-ENOBUFS);
192 192
193 pageoff = offset_in_page(seg1->mr_offset); 193 pageoff = offset_in_page(seg1->mr_offset);
194 seg1->mr_offset -= pageoff; /* start of page */ 194 seg1->mr_offset -= pageoff; /* start of page */
@@ -232,13 +232,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
232 mw->mw_offset = dma_pages[0] + pageoff; 232 mw->mw_offset = dma_pages[0] + pageoff;
233 233
234 *out = mw; 234 *out = mw;
235 return mw->mw_nents; 235 return seg;
236 236
237out_dmamap_err: 237out_dmamap_err:
238 pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n", 238 pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
239 mw->mw_sg, i); 239 mw->mw_sg, i);
240 rpcrdma_put_mw(r_xprt, mw); 240 rpcrdma_put_mw(r_xprt, mw);
241 return -EIO; 241 return ERR_PTR(-EIO);
242 242
243out_maperr: 243out_maperr:
244 pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n", 244 pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
@@ -247,7 +247,7 @@ out_maperr:
247 ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, 247 ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
248 mw->mw_sg, mw->mw_nents, mw->mw_dir); 248 mw->mw_sg, mw->mw_nents, mw->mw_dir);
249 rpcrdma_put_mw(r_xprt, mw); 249 rpcrdma_put_mw(r_xprt, mw);
250 return -EIO; 250 return ERR_PTR(-EIO);
251} 251}
252 252
253/* Invalidate all memory regions that were registered for "req". 253/* Invalidate all memory regions that were registered for "req".
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 6aea36a38bfd..5a936a6a31a3 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -344,7 +344,7 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
344/* Post a REG_MR Work Request to register a memory region 344/* Post a REG_MR Work Request to register a memory region
345 * for remote access via RDMA READ or RDMA WRITE. 345 * for remote access via RDMA READ or RDMA WRITE.
346 */ 346 */
347static int 347static struct rpcrdma_mr_seg *
348frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, 348frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
349 int nsegs, bool writing, struct rpcrdma_mw **out) 349 int nsegs, bool writing, struct rpcrdma_mw **out)
350{ 350{
@@ -364,7 +364,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
364 rpcrdma_defer_mr_recovery(mw); 364 rpcrdma_defer_mr_recovery(mw);
365 mw = rpcrdma_get_mw(r_xprt); 365 mw = rpcrdma_get_mw(r_xprt);
366 if (!mw) 366 if (!mw)
367 return -ENOBUFS; 367 return ERR_PTR(-ENOBUFS);
368 } while (mw->frmr.fr_state != FRMR_IS_INVALID); 368 } while (mw->frmr.fr_state != FRMR_IS_INVALID);
369 frmr = &mw->frmr; 369 frmr = &mw->frmr;
370 frmr->fr_state = FRMR_IS_VALID; 370 frmr->fr_state = FRMR_IS_VALID;
@@ -429,25 +429,25 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
429 mw->mw_offset = mr->iova; 429 mw->mw_offset = mr->iova;
430 430
431 *out = mw; 431 *out = mw;
432 return mw->mw_nents; 432 return seg;
433 433
434out_dmamap_err: 434out_dmamap_err:
435 pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n", 435 pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
436 mw->mw_sg, i); 436 mw->mw_sg, i);
437 frmr->fr_state = FRMR_IS_INVALID; 437 frmr->fr_state = FRMR_IS_INVALID;
438 rpcrdma_put_mw(r_xprt, mw); 438 rpcrdma_put_mw(r_xprt, mw);
439 return -EIO; 439 return ERR_PTR(-EIO);
440 440
441out_mapmr_err: 441out_mapmr_err:
442 pr_err("rpcrdma: failed to map mr %p (%d/%d)\n", 442 pr_err("rpcrdma: failed to map mr %p (%d/%d)\n",
443 frmr->fr_mr, n, mw->mw_nents); 443 frmr->fr_mr, n, mw->mw_nents);
444 rpcrdma_defer_mr_recovery(mw); 444 rpcrdma_defer_mr_recovery(mw);
445 return -EIO; 445 return ERR_PTR(-EIO);
446 446
447out_senderr: 447out_senderr:
448 pr_err("rpcrdma: FRMR registration ib_post_send returned %i\n", rc); 448 pr_err("rpcrdma: FRMR registration ib_post_send returned %i\n", rc);
449 rpcrdma_defer_mr_recovery(mw); 449 rpcrdma_defer_mr_recovery(mw);
450 return -ENOTCONN; 450 return ERR_PTR(-ENOTCONN);
451} 451}
452 452
453/* Invalidate all memory regions that were registered for "req". 453/* Invalidate all memory regions that were registered for "req".
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index dfa748a0c8de..7fec4039cd15 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -169,40 +169,41 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
169 return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read; 169 return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
170} 170}
171 171
172/* Split "vec" on page boundaries into segments. FMR registers pages, 172/* Split @vec on page boundaries into SGEs. FMR registers pages, not
173 * not a byte range. Other modes coalesce these segments into a single 173 * a byte range. Other modes coalesce these SGEs into a single MR
174 * MR when they can. 174 * when they can.
175 *
176 * Returns pointer to next available SGE, and bumps the total number
177 * of SGEs consumed.
175 */ 178 */
176static int 179static struct rpcrdma_mr_seg *
177rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n) 180rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
181 unsigned int *n)
178{ 182{
179 size_t page_offset; 183 u32 remaining, page_offset;
180 u32 remaining;
181 char *base; 184 char *base;
182 185
183 base = vec->iov_base; 186 base = vec->iov_base;
184 page_offset = offset_in_page(base); 187 page_offset = offset_in_page(base);
185 remaining = vec->iov_len; 188 remaining = vec->iov_len;
186 while (remaining && n < RPCRDMA_MAX_SEGS) { 189 while (remaining) {
187 seg[n].mr_page = NULL; 190 seg->mr_page = NULL;
188 seg[n].mr_offset = base; 191 seg->mr_offset = base;
189 seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); 192 seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
190 remaining -= seg[n].mr_len; 193 remaining -= seg->mr_len;
191 base += seg[n].mr_len; 194 base += seg->mr_len;
192 ++n; 195 ++seg;
196 ++(*n);
193 page_offset = 0; 197 page_offset = 0;
194 } 198 }
195 return n; 199 return seg;
196} 200}
197 201
198/* 202/* Convert @xdrbuf into SGEs no larger than a page each. As they
199 * Chunk assembly from upper layer xdr_buf. 203 * are registered, these SGEs are then coalesced into RDMA segments
204 * when the selected memreg mode supports it.
200 * 205 *
201 * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk 206 * Returns positive number of SGEs consumed, or a negative errno.
202 * elements. Segments are then coalesced when registered, if possible
203 * within the selected memreg mode.
204 *
205 * Returns positive number of segments converted, or a negative errno.
206 */ 207 */
207 208
208static int 209static int
@@ -210,47 +211,41 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
210 unsigned int pos, enum rpcrdma_chunktype type, 211 unsigned int pos, enum rpcrdma_chunktype type,
211 struct rpcrdma_mr_seg *seg) 212 struct rpcrdma_mr_seg *seg)
212{ 213{
213 int len, n, p, page_base; 214 unsigned long page_base;
215 unsigned int len, n;
214 struct page **ppages; 216 struct page **ppages;
215 217
216 n = 0; 218 n = 0;
217 if (pos == 0) { 219 if (pos == 0)
218 n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n); 220 seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
219 if (n == RPCRDMA_MAX_SEGS)
220 goto out_overflow;
221 }
222 221
223 len = xdrbuf->page_len; 222 len = xdrbuf->page_len;
224 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); 223 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
225 page_base = offset_in_page(xdrbuf->page_base); 224 page_base = offset_in_page(xdrbuf->page_base);
226 p = 0; 225 while (len) {
227 while (len && n < RPCRDMA_MAX_SEGS) { 226 if (unlikely(!*ppages)) {
228 if (!ppages[p]) { 227 /* XXX: Certain upper layer operations do
229 /* alloc the pagelist for receiving buffer */ 228 * not provide receive buffer pages.
230 ppages[p] = alloc_page(GFP_ATOMIC); 229 */
231 if (!ppages[p]) 230 *ppages = alloc_page(GFP_ATOMIC);
231 if (!*ppages)
232 return -EAGAIN; 232 return -EAGAIN;
233 } 233 }
234 seg[n].mr_page = ppages[p]; 234 seg->mr_page = *ppages;
235 seg[n].mr_offset = (void *)(unsigned long) page_base; 235 seg->mr_offset = (char *)page_base;
236 seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); 236 seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
237 if (seg[n].mr_len > PAGE_SIZE) 237 len -= seg->mr_len;
238 goto out_overflow; 238 ++ppages;
239 len -= seg[n].mr_len; 239 ++seg;
240 ++n; 240 ++n;
241 ++p; 241 page_base = 0;
242 page_base = 0; /* page offset only applies to first page */
243 } 242 }
244 243
245 /* Message overflows the seg array */
246 if (len && n == RPCRDMA_MAX_SEGS)
247 goto out_overflow;
248
249 /* When encoding a Read chunk, the tail iovec contains an 244 /* When encoding a Read chunk, the tail iovec contains an
250 * XDR pad and may be omitted. 245 * XDR pad and may be omitted.
251 */ 246 */
252 if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup) 247 if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
253 return n; 248 goto out;
254 249
255 /* When encoding a Write chunk, some servers need to see an 250 /* When encoding a Write chunk, some servers need to see an
256 * extra segment for non-XDR-aligned Write chunks. The upper 251 * extra segment for non-XDR-aligned Write chunks. The upper
@@ -258,30 +253,81 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
258 * for this purpose. 253 * for this purpose.
259 */ 254 */
260 if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup) 255 if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
261 return n; 256 goto out;
262 257
263 if (xdrbuf->tail[0].iov_len) { 258 if (xdrbuf->tail[0].iov_len)
264 n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n); 259 seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
265 if (n == RPCRDMA_MAX_SEGS)
266 goto out_overflow;
267 }
268 260
261out:
262 if (unlikely(n > RPCRDMA_MAX_SEGS))
263 return -EIO;
269 return n; 264 return n;
265}
266
267static inline int
268encode_item_present(struct xdr_stream *xdr)
269{
270 __be32 *p;
271
272 p = xdr_reserve_space(xdr, sizeof(*p));
273 if (unlikely(!p))
274 return -EMSGSIZE;
275
276 *p = xdr_one;
277 return 0;
278}
279
280static inline int
281encode_item_not_present(struct xdr_stream *xdr)
282{
283 __be32 *p;
284
285 p = xdr_reserve_space(xdr, sizeof(*p));
286 if (unlikely(!p))
287 return -EMSGSIZE;
270 288
271out_overflow: 289 *p = xdr_zero;
272 pr_err("rpcrdma: segment array overflow\n"); 290 return 0;
273 return -EIO;
274} 291}
275 292
276static inline __be32 * 293static void
277xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw) 294xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw)
278{ 295{
279 *iptr++ = cpu_to_be32(mw->mw_handle); 296 *iptr++ = cpu_to_be32(mw->mw_handle);
280 *iptr++ = cpu_to_be32(mw->mw_length); 297 *iptr++ = cpu_to_be32(mw->mw_length);
281 return xdr_encode_hyper(iptr, mw->mw_offset); 298 xdr_encode_hyper(iptr, mw->mw_offset);
299}
300
301static int
302encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw)
303{
304 __be32 *p;
305
306 p = xdr_reserve_space(xdr, 4 * sizeof(*p));
307 if (unlikely(!p))
308 return -EMSGSIZE;
309
310 xdr_encode_rdma_segment(p, mw);
311 return 0;
312}
313
314static int
315encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw,
316 u32 position)
317{
318 __be32 *p;
319
320 p = xdr_reserve_space(xdr, 6 * sizeof(*p));
321 if (unlikely(!p))
322 return -EMSGSIZE;
323
324 *p++ = xdr_one; /* Item present */
325 *p++ = cpu_to_be32(position);
326 xdr_encode_rdma_segment(p, mw);
327 return 0;
282} 328}
283 329
284/* XDR-encode the Read list. Supports encoding a list of read 330/* Register and XDR encode the Read list. Supports encoding a list of read
285 * segments that belong to a single read chunk. 331 * segments that belong to a single read chunk.
286 * 332 *
287 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 333 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
@@ -290,23 +336,20 @@ xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw)
290 * N elements, position P (same P for all chunks of same arg!): 336 * N elements, position P (same P for all chunks of same arg!):
291 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 337 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
292 * 338 *
293 * Returns a pointer to the XDR word in the RDMA header following 339 * Returns zero on success, or a negative errno if a failure occurred.
294 * the end of the Read list, or an error pointer. 340 * @xdr is advanced to the next position in the stream.
341 *
342 * Only a single @pos value is currently supported.
295 */ 343 */
296static __be32 * 344static noinline int
297rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, 345rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
298 struct rpcrdma_req *req, struct rpc_rqst *rqst, 346 struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
299 __be32 *iptr, enum rpcrdma_chunktype rtype)
300{ 347{
348 struct xdr_stream *xdr = &req->rl_stream;
301 struct rpcrdma_mr_seg *seg; 349 struct rpcrdma_mr_seg *seg;
302 struct rpcrdma_mw *mw; 350 struct rpcrdma_mw *mw;
303 unsigned int pos; 351 unsigned int pos;
304 int n, nsegs; 352 int nsegs;
305
306 if (rtype == rpcrdma_noch) {
307 *iptr++ = xdr_zero; /* item not present */
308 return iptr;
309 }
310 353
311 pos = rqst->rq_snd_buf.head[0].iov_len; 354 pos = rqst->rq_snd_buf.head[0].iov_len;
312 if (rtype == rpcrdma_areadch) 355 if (rtype == rpcrdma_areadch)
@@ -315,40 +358,33 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
315 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos, 358 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
316 rtype, seg); 359 rtype, seg);
317 if (nsegs < 0) 360 if (nsegs < 0)
318 return ERR_PTR(nsegs); 361 return nsegs;
319 362
320 do { 363 do {
321 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, 364 seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
322 false, &mw); 365 false, &mw);
323 if (n < 0) 366 if (IS_ERR(seg))
324 return ERR_PTR(n); 367 return PTR_ERR(seg);
325 rpcrdma_push_mw(mw, &req->rl_registered); 368 rpcrdma_push_mw(mw, &req->rl_registered);
326 369
327 *iptr++ = xdr_one; /* item present */ 370 if (encode_read_segment(xdr, mw, pos) < 0)
328 371 return -EMSGSIZE;
329 /* All read segments in this chunk
330 * have the same "position".
331 */
332 *iptr++ = cpu_to_be32(pos);
333 iptr = xdr_encode_rdma_segment(iptr, mw);
334 372
335 dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n", 373 dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n",
336 rqst->rq_task->tk_pid, __func__, pos, 374 rqst->rq_task->tk_pid, __func__, pos,
337 mw->mw_length, (unsigned long long)mw->mw_offset, 375 mw->mw_length, (unsigned long long)mw->mw_offset,
338 mw->mw_handle, n < nsegs ? "more" : "last"); 376 mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last");
339 377
340 r_xprt->rx_stats.read_chunk_count++; 378 r_xprt->rx_stats.read_chunk_count++;
341 seg += n; 379 nsegs -= mw->mw_nents;
342 nsegs -= n;
343 } while (nsegs); 380 } while (nsegs);
344 381
345 /* Finish Read list */ 382 return 0;
346 *iptr++ = xdr_zero; /* Next item not present */
347 return iptr;
348} 383}
349 384
350/* XDR-encode the Write list. Supports encoding a list containing 385/* Register and XDR encode the Write list. Supports encoding a list
351 * one array of plain segments that belong to a single write chunk. 386 * containing one array of plain segments that belong to a single
387 * write chunk.
352 * 388 *
353 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 389 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
354 * 390 *
@@ -356,66 +392,65 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
356 * N elements: 392 * N elements:
357 * 1 - N - HLOO - HLOO - ... - HLOO - 0 393 * 1 - N - HLOO - HLOO - ... - HLOO - 0
358 * 394 *
359 * Returns a pointer to the XDR word in the RDMA header following 395 * Returns zero on success, or a negative errno if a failure occurred.
360 * the end of the Write list, or an error pointer. 396 * @xdr is advanced to the next position in the stream.
397 *
398 * Only a single Write chunk is currently supported.
361 */ 399 */
362static __be32 * 400static noinline int
363rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, 401rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
364 struct rpc_rqst *rqst, __be32 *iptr, 402 struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
365 enum rpcrdma_chunktype wtype)
366{ 403{
404 struct xdr_stream *xdr = &req->rl_stream;
367 struct rpcrdma_mr_seg *seg; 405 struct rpcrdma_mr_seg *seg;
368 struct rpcrdma_mw *mw; 406 struct rpcrdma_mw *mw;
369 int n, nsegs, nchunks; 407 int nsegs, nchunks;
370 __be32 *segcount; 408 __be32 *segcount;
371 409
372 if (wtype != rpcrdma_writech) {
373 *iptr++ = xdr_zero; /* no Write list present */
374 return iptr;
375 }
376
377 seg = req->rl_segments; 410 seg = req->rl_segments;
378 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 411 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
379 rqst->rq_rcv_buf.head[0].iov_len, 412 rqst->rq_rcv_buf.head[0].iov_len,
380 wtype, seg); 413 wtype, seg);
381 if (nsegs < 0) 414 if (nsegs < 0)
382 return ERR_PTR(nsegs); 415 return nsegs;
383 416
384 *iptr++ = xdr_one; /* Write list present */ 417 if (encode_item_present(xdr) < 0)
385 segcount = iptr++; /* save location of segment count */ 418 return -EMSGSIZE;
419 segcount = xdr_reserve_space(xdr, sizeof(*segcount));
420 if (unlikely(!segcount))
421 return -EMSGSIZE;
422 /* Actual value encoded below */
386 423
387 nchunks = 0; 424 nchunks = 0;
388 do { 425 do {
389 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, 426 seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
390 true, &mw); 427 true, &mw);
391 if (n < 0) 428 if (IS_ERR(seg))
392 return ERR_PTR(n); 429 return PTR_ERR(seg);
393 rpcrdma_push_mw(mw, &req->rl_registered); 430 rpcrdma_push_mw(mw, &req->rl_registered);
394 431
395 iptr = xdr_encode_rdma_segment(iptr, mw); 432 if (encode_rdma_segment(xdr, mw) < 0)
433 return -EMSGSIZE;
396 434
397 dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n", 435 dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n",
398 rqst->rq_task->tk_pid, __func__, 436 rqst->rq_task->tk_pid, __func__,
399 mw->mw_length, (unsigned long long)mw->mw_offset, 437 mw->mw_length, (unsigned long long)mw->mw_offset,
400 mw->mw_handle, n < nsegs ? "more" : "last"); 438 mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last");
401 439
402 r_xprt->rx_stats.write_chunk_count++; 440 r_xprt->rx_stats.write_chunk_count++;
403 r_xprt->rx_stats.total_rdma_request += seg->mr_len; 441 r_xprt->rx_stats.total_rdma_request += seg->mr_len;
404 nchunks++; 442 nchunks++;
405 seg += n; 443 nsegs -= mw->mw_nents;
406 nsegs -= n;
407 } while (nsegs); 444 } while (nsegs);
408 445
409 /* Update count of segments in this Write chunk */ 446 /* Update count of segments in this Write chunk */
410 *segcount = cpu_to_be32(nchunks); 447 *segcount = cpu_to_be32(nchunks);
411 448
412 /* Finish Write list */ 449 return 0;
413 *iptr++ = xdr_zero; /* Next item not present */
414 return iptr;
415} 450}
416 451
417/* XDR-encode the Reply chunk. Supports encoding an array of plain 452/* Register and XDR encode the Reply chunk. Supports encoding an array
418 * segments that belong to a single write (reply) chunk. 453 * of plain segments that belong to a single write (reply) chunk.
419 * 454 *
420 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 455 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
421 * 456 *
@@ -423,58 +458,57 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
423 * N elements: 458 * N elements:
424 * 1 - N - HLOO - HLOO - ... - HLOO 459 * 1 - N - HLOO - HLOO - ... - HLOO
425 * 460 *
426 * Returns a pointer to the XDR word in the RDMA header following 461 * Returns zero on success, or a negative errno if a failure occurred.
427 * the end of the Reply chunk, or an error pointer. 462 * @xdr is advanced to the next position in the stream.
428 */ 463 */
429static __be32 * 464static noinline int
430rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, 465rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
431 struct rpcrdma_req *req, struct rpc_rqst *rqst, 466 struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
432 __be32 *iptr, enum rpcrdma_chunktype wtype)
433{ 467{
468 struct xdr_stream *xdr = &req->rl_stream;
434 struct rpcrdma_mr_seg *seg; 469 struct rpcrdma_mr_seg *seg;
435 struct rpcrdma_mw *mw; 470 struct rpcrdma_mw *mw;
436 int n, nsegs, nchunks; 471 int nsegs, nchunks;
437 __be32 *segcount; 472 __be32 *segcount;
438 473
439 if (wtype != rpcrdma_replych) {
440 *iptr++ = xdr_zero; /* no Reply chunk present */
441 return iptr;
442 }
443
444 seg = req->rl_segments; 474 seg = req->rl_segments;
445 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); 475 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
446 if (nsegs < 0) 476 if (nsegs < 0)
447 return ERR_PTR(nsegs); 477 return nsegs;
448 478
449 *iptr++ = xdr_one; /* Reply chunk present */ 479 if (encode_item_present(xdr) < 0)
450 segcount = iptr++; /* save location of segment count */ 480 return -EMSGSIZE;
481 segcount = xdr_reserve_space(xdr, sizeof(*segcount));
482 if (unlikely(!segcount))
483 return -EMSGSIZE;
484 /* Actual value encoded below */
451 485
452 nchunks = 0; 486 nchunks = 0;
453 do { 487 do {
454 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, 488 seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
455 true, &mw); 489 true, &mw);
456 if (n < 0) 490 if (IS_ERR(seg))
457 return ERR_PTR(n); 491 return PTR_ERR(seg);
458 rpcrdma_push_mw(mw, &req->rl_registered); 492 rpcrdma_push_mw(mw, &req->rl_registered);
459 493
460 iptr = xdr_encode_rdma_segment(iptr, mw); 494 if (encode_rdma_segment(xdr, mw) < 0)
495 return -EMSGSIZE;
461 496
462 dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n", 497 dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n",
463 rqst->rq_task->tk_pid, __func__, 498 rqst->rq_task->tk_pid, __func__,
464 mw->mw_length, (unsigned long long)mw->mw_offset, 499 mw->mw_length, (unsigned long long)mw->mw_offset,
465 mw->mw_handle, n < nsegs ? "more" : "last"); 500 mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last");
466 501
467 r_xprt->rx_stats.reply_chunk_count++; 502 r_xprt->rx_stats.reply_chunk_count++;
468 r_xprt->rx_stats.total_rdma_request += seg->mr_len; 503 r_xprt->rx_stats.total_rdma_request += seg->mr_len;
469 nchunks++; 504 nchunks++;
470 seg += n; 505 nsegs -= mw->mw_nents;
471 nsegs -= n;
472 } while (nsegs); 506 } while (nsegs);
473 507
474 /* Update count of segments in the Reply chunk */ 508 /* Update count of segments in the Reply chunk */
475 *segcount = cpu_to_be32(nchunks); 509 *segcount = cpu_to_be32(nchunks);
476 510
477 return iptr; 511 return 0;
478} 512}
479 513
480/* Prepare the RPC-over-RDMA header SGE. 514/* Prepare the RPC-over-RDMA header SGE.
@@ -651,37 +685,52 @@ rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
651 req->rl_mapped_sges = 0; 685 req->rl_mapped_sges = 0;
652} 686}
653 687
654/* 688/**
655 * Marshal a request: the primary job of this routine is to choose 689 * rpcrdma_marshal_req - Marshal and send one RPC request
656 * the transfer modes. See comments below. 690 * @r_xprt: controlling transport
691 * @rqst: RPC request to be marshaled
692 *
693 * For the RPC in "rqst", this function:
694 * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
695 * - Registers Read, Write, and Reply chunks
696 * - Constructs the transport header
697 * - Posts a Send WR to send the transport header and request
657 * 698 *
658 * Returns zero on success, otherwise a negative errno. 699 * Returns:
700 * %0 if the RPC was sent successfully,
701 * %-ENOTCONN if the connection was lost,
702 * %-EAGAIN if not enough pages are available for on-demand reply buffer,
703 * %-ENOBUFS if no MRs are available to register chunks,
704 * %-EMSGSIZE if the transport header is too small,
705 * %-EIO if a permanent problem occurred while marshaling.
659 */ 706 */
660
661int 707int
662rpcrdma_marshal_req(struct rpc_rqst *rqst) 708rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
663{ 709{
664 struct rpc_xprt *xprt = rqst->rq_xprt;
665 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
666 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 710 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
711 struct xdr_stream *xdr = &req->rl_stream;
667 enum rpcrdma_chunktype rtype, wtype; 712 enum rpcrdma_chunktype rtype, wtype;
668 struct rpcrdma_msg *headerp;
669 bool ddp_allowed; 713 bool ddp_allowed;
670 ssize_t hdrlen; 714 __be32 *p;
671 size_t rpclen; 715 int ret;
672 __be32 *iptr;
673 716
674#if defined(CONFIG_SUNRPC_BACKCHANNEL) 717#if defined(CONFIG_SUNRPC_BACKCHANNEL)
675 if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) 718 if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
676 return rpcrdma_bc_marshal_reply(rqst); 719 return rpcrdma_bc_marshal_reply(rqst);
677#endif 720#endif
678 721
679 headerp = rdmab_to_msg(req->rl_rdmabuf); 722 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
680 /* don't byte-swap XID, it's already done in request */ 723 xdr_init_encode(xdr, &req->rl_hdrbuf,
681 headerp->rm_xid = rqst->rq_xid; 724 req->rl_rdmabuf->rg_base);
682 headerp->rm_vers = rpcrdma_version; 725
683 headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); 726 /* Fixed header fields */
684 headerp->rm_type = rdma_msg; 727 ret = -EMSGSIZE;
728 p = xdr_reserve_space(xdr, 4 * sizeof(*p));
729 if (!p)
730 goto out_err;
731 *p++ = rqst->rq_xid;
732 *p++ = rpcrdma_version;
733 *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
685 734
686 /* When the ULP employs a GSS flavor that guarantees integrity 735 /* When the ULP employs a GSS flavor that guarantees integrity
687 * or privacy, direct data placement of individual data items 736 * or privacy, direct data placement of individual data items
@@ -721,17 +770,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
721 * by themselves are larger than the inline threshold. 770 * by themselves are larger than the inline threshold.
722 */ 771 */
723 if (rpcrdma_args_inline(r_xprt, rqst)) { 772 if (rpcrdma_args_inline(r_xprt, rqst)) {
773 *p++ = rdma_msg;
724 rtype = rpcrdma_noch; 774 rtype = rpcrdma_noch;
725 rpclen = rqst->rq_snd_buf.len;
726 } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { 775 } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
776 *p++ = rdma_msg;
727 rtype = rpcrdma_readch; 777 rtype = rpcrdma_readch;
728 rpclen = rqst->rq_snd_buf.head[0].iov_len +
729 rqst->rq_snd_buf.tail[0].iov_len;
730 } else { 778 } else {
731 r_xprt->rx_stats.nomsg_call_count++; 779 r_xprt->rx_stats.nomsg_call_count++;
732 headerp->rm_type = htonl(RDMA_NOMSG); 780 *p++ = rdma_nomsg;
733 rtype = rpcrdma_areadch; 781 rtype = rpcrdma_areadch;
734 rpclen = 0;
735 } 782 }
736 783
737 req->rl_xid = rqst->rq_xid; 784 req->rl_xid = rqst->rq_xid;
@@ -759,79 +806,50 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
759 * send a Call message with a Position Zero Read chunk and a 806 * send a Call message with a Position Zero Read chunk and a
760 * regular Read chunk at the same time. 807 * regular Read chunk at the same time.
761 */ 808 */
762 iptr = headerp->rm_body.rm_chunks; 809 if (rtype != rpcrdma_noch) {
763 iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype); 810 ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
764 if (IS_ERR(iptr)) 811 if (ret)
812 goto out_err;
813 }
814 ret = encode_item_not_present(xdr);
815 if (ret)
765 goto out_err; 816 goto out_err;
766 iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype); 817
767 if (IS_ERR(iptr)) 818 if (wtype == rpcrdma_writech) {
819 ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
820 if (ret)
821 goto out_err;
822 }
823 ret = encode_item_not_present(xdr);
824 if (ret)
768 goto out_err; 825 goto out_err;
769 iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype); 826
770 if (IS_ERR(iptr)) 827 if (wtype != rpcrdma_replych)
828 ret = encode_item_not_present(xdr);
829 else
830 ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
831 if (ret)
771 goto out_err; 832 goto out_err;
772 hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
773 833
774 dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n", 834 dprintk("RPC: %5u %s: %s/%s: hdrlen %u rpclen\n",
775 rqst->rq_task->tk_pid, __func__, 835 rqst->rq_task->tk_pid, __func__,
776 transfertypes[rtype], transfertypes[wtype], 836 transfertypes[rtype], transfertypes[wtype],
777 hdrlen, rpclen); 837 xdr_stream_pos(xdr));
778 838
779 if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, hdrlen, 839 if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req,
840 xdr_stream_pos(xdr),
780 &rqst->rq_snd_buf, rtype)) { 841 &rqst->rq_snd_buf, rtype)) {
781 iptr = ERR_PTR(-EIO); 842 ret = -EIO;
782 goto out_err; 843 goto out_err;
783 } 844 }
784 return 0; 845 return 0;
785 846
786out_err: 847out_err:
787 if (PTR_ERR(iptr) != -ENOBUFS) { 848 if (ret != -ENOBUFS) {
788 pr_err("rpcrdma: rpcrdma_marshal_req failed, status %ld\n", 849 pr_err("rpcrdma: header marshaling failed (%d)\n", ret);
789 PTR_ERR(iptr));
790 r_xprt->rx_stats.failed_marshal_count++; 850 r_xprt->rx_stats.failed_marshal_count++;
791 } 851 }
792 return PTR_ERR(iptr); 852 return ret;
793}
794
795/*
796 * Chase down a received write or reply chunklist to get length
797 * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
798 */
799static int
800rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp)
801{
802 unsigned int i, total_len;
803 struct rpcrdma_write_chunk *cur_wchunk;
804 char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);
805
806 i = be32_to_cpu(**iptrp);
807 cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
808 total_len = 0;
809 while (i--) {
810 struct rpcrdma_segment *seg = &cur_wchunk->wc_target;
811 ifdebug(FACILITY) {
812 u64 off;
813 xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
814 dprintk("RPC: %s: chunk %d@0x%016llx:0x%08x\n",
815 __func__,
816 be32_to_cpu(seg->rs_length),
817 (unsigned long long)off,
818 be32_to_cpu(seg->rs_handle));
819 }
820 total_len += be32_to_cpu(seg->rs_length);
821 ++cur_wchunk;
822 }
823 /* check and adjust for properly terminated write chunk */
824 if (wrchunk) {
825 __be32 *w = (__be32 *) cur_wchunk;
826 if (*w++ != xdr_zero)
827 return -1;
828 cur_wchunk = (struct rpcrdma_write_chunk *) w;
829 }
830 if ((char *)cur_wchunk > base + rep->rr_len)
831 return -1;
832
833 *iptrp = (__be32 *) cur_wchunk;
834 return total_len;
835} 853}
836 854
837/** 855/**
@@ -949,37 +967,254 @@ rpcrdma_mark_remote_invalidation(struct list_head *mws,
949 } 967 }
950} 968}
951 969
952#if defined(CONFIG_SUNRPC_BACKCHANNEL)
953/* By convention, backchannel calls arrive via rdma_msg type 970/* By convention, backchannel calls arrive via rdma_msg type
954 * messages, and never populate the chunk lists. This makes 971 * messages, and never populate the chunk lists. This makes
955 * the RPC/RDMA header small and fixed in size, so it is 972 * the RPC/RDMA header small and fixed in size, so it is
956 * straightforward to check the RPC header's direction field. 973 * straightforward to check the RPC header's direction field.
957 */ 974 */
958static bool 975static bool
959rpcrdma_is_bcall(struct rpcrdma_msg *headerp) 976rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
977 __be32 xid, __be32 proc)
978#if defined(CONFIG_SUNRPC_BACKCHANNEL)
960{ 979{
961 __be32 *p = (__be32 *)headerp; 980 struct xdr_stream *xdr = &rep->rr_stream;
981 __be32 *p;
962 982
963 if (headerp->rm_type != rdma_msg) 983 if (proc != rdma_msg)
964 return false; 984 return false;
965 if (headerp->rm_body.rm_chunks[0] != xdr_zero) 985
986 /* Peek at stream contents without advancing. */
987 p = xdr_inline_decode(xdr, 0);
988
989 /* Chunk lists */
990 if (*p++ != xdr_zero)
966 return false; 991 return false;
967 if (headerp->rm_body.rm_chunks[1] != xdr_zero) 992 if (*p++ != xdr_zero)
968 return false; 993 return false;
969 if (headerp->rm_body.rm_chunks[2] != xdr_zero) 994 if (*p++ != xdr_zero)
970 return false; 995 return false;
971 996
972 /* sanity */ 997 /* RPC header */
973 if (p[7] != headerp->rm_xid) 998 if (*p++ != xid)
974 return false; 999 return false;
975 /* call direction */ 1000 if (*p != cpu_to_be32(RPC_CALL))
976 if (p[8] != cpu_to_be32(RPC_CALL))
977 return false; 1001 return false;
978 1002
1003 /* Now that we are sure this is a backchannel call,
1004 * advance to the RPC header.
1005 */
1006 p = xdr_inline_decode(xdr, 3 * sizeof(*p));
1007 if (unlikely(!p))
1008 goto out_short;
1009
1010 rpcrdma_bc_receive_call(r_xprt, rep);
1011 return true;
1012
1013out_short:
1014 pr_warn("RPC/RDMA short backward direction call\n");
1015 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
1016 xprt_disconnect_done(&r_xprt->rx_xprt);
979 return true; 1017 return true;
980} 1018}
1019#else /* CONFIG_SUNRPC_BACKCHANNEL */
1020{
1021 return false;
1022}
981#endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1023#endif /* CONFIG_SUNRPC_BACKCHANNEL */
982 1024
1025static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
1026{
1027 __be32 *p;
1028
1029 p = xdr_inline_decode(xdr, 4 * sizeof(*p));
1030 if (unlikely(!p))
1031 return -EIO;
1032
1033 ifdebug(FACILITY) {
1034 u64 offset;
1035 u32 handle;
1036
1037 handle = be32_to_cpup(p++);
1038 *length = be32_to_cpup(p++);
1039 xdr_decode_hyper(p, &offset);
1040 dprintk("RPC: %s: segment %u@0x%016llx:0x%08x\n",
1041 __func__, *length, (unsigned long long)offset,
1042 handle);
1043 } else {
1044 *length = be32_to_cpup(p + 1);
1045 }
1046
1047 return 0;
1048}
1049
1050static int decode_write_chunk(struct xdr_stream *xdr, u32 *length)
1051{
1052 u32 segcount, seglength;
1053 __be32 *p;
1054
1055 p = xdr_inline_decode(xdr, sizeof(*p));
1056 if (unlikely(!p))
1057 return -EIO;
1058
1059 *length = 0;
1060 segcount = be32_to_cpup(p);
1061 while (segcount--) {
1062 if (decode_rdma_segment(xdr, &seglength))
1063 return -EIO;
1064 *length += seglength;
1065 }
1066
1067 dprintk("RPC: %s: segcount=%u, %u bytes\n",
1068 __func__, be32_to_cpup(p), *length);
1069 return 0;
1070}
1071
1072/* In RPC-over-RDMA Version One replies, a Read list is never
1073 * expected. This decoder is a stub that returns an error if
1074 * a Read list is present.
1075 */
1076static int decode_read_list(struct xdr_stream *xdr)
1077{
1078 __be32 *p;
1079
1080 p = xdr_inline_decode(xdr, sizeof(*p));
1081 if (unlikely(!p))
1082 return -EIO;
1083 if (unlikely(*p != xdr_zero))
1084 return -EIO;
1085 return 0;
1086}
1087
1088/* Supports only one Write chunk in the Write list
1089 */
1090static int decode_write_list(struct xdr_stream *xdr, u32 *length)
1091{
1092 u32 chunklen;
1093 bool first;
1094 __be32 *p;
1095
1096 *length = 0;
1097 first = true;
1098 do {
1099 p = xdr_inline_decode(xdr, sizeof(*p));
1100 if (unlikely(!p))
1101 return -EIO;
1102 if (*p == xdr_zero)
1103 break;
1104 if (!first)
1105 return -EIO;
1106
1107 if (decode_write_chunk(xdr, &chunklen))
1108 return -EIO;
1109 *length += chunklen;
1110 first = false;
1111 } while (true);
1112 return 0;
1113}
1114
1115static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
1116{
1117 __be32 *p;
1118
1119 p = xdr_inline_decode(xdr, sizeof(*p));
1120 if (unlikely(!p))
1121 return -EIO;
1122
1123 *length = 0;
1124 if (*p != xdr_zero)
1125 if (decode_write_chunk(xdr, length))
1126 return -EIO;
1127 return 0;
1128}
1129
1130static int
1131rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1132 struct rpc_rqst *rqst)
1133{
1134 struct xdr_stream *xdr = &rep->rr_stream;
1135 u32 writelist, replychunk, rpclen;
1136 char *base;
1137
1138 /* Decode the chunk lists */
1139 if (decode_read_list(xdr))
1140 return -EIO;
1141 if (decode_write_list(xdr, &writelist))
1142 return -EIO;
1143 if (decode_reply_chunk(xdr, &replychunk))
1144 return -EIO;
1145
1146 /* RDMA_MSG sanity checks */
1147 if (unlikely(replychunk))
1148 return -EIO;
1149
1150 /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
1151 base = (char *)xdr_inline_decode(xdr, 0);
1152 rpclen = xdr_stream_remaining(xdr);
1153 r_xprt->rx_stats.fixup_copy_count +=
1154 rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
1155
1156 r_xprt->rx_stats.total_rdma_reply += writelist;
1157 return rpclen + xdr_align_size(writelist);
1158}
1159
1160static noinline int
1161rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1162{
1163 struct xdr_stream *xdr = &rep->rr_stream;
1164 u32 writelist, replychunk;
1165
1166 /* Decode the chunk lists */
1167 if (decode_read_list(xdr))
1168 return -EIO;
1169 if (decode_write_list(xdr, &writelist))
1170 return -EIO;
1171 if (decode_reply_chunk(xdr, &replychunk))
1172 return -EIO;
1173
1174 /* RDMA_NOMSG sanity checks */
1175 if (unlikely(writelist))
1176 return -EIO;
1177 if (unlikely(!replychunk))
1178 return -EIO;
1179
1180 /* Reply chunk buffer already is the reply vector */
1181 r_xprt->rx_stats.total_rdma_reply += replychunk;
1182 return replychunk;
1183}
1184
1185static noinline int
1186rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1187 struct rpc_rqst *rqst)
1188{
1189 struct xdr_stream *xdr = &rep->rr_stream;
1190 __be32 *p;
1191
1192 p = xdr_inline_decode(xdr, sizeof(*p));
1193 if (unlikely(!p))
1194 return -EIO;
1195
1196 switch (*p) {
1197 case err_vers:
1198 p = xdr_inline_decode(xdr, 2 * sizeof(*p));
1199 if (!p)
1200 break;
1201 dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
1202 rqst->rq_task->tk_pid, __func__,
1203 be32_to_cpup(p), be32_to_cpu(*(p + 1)));
1204 break;
1205 case err_chunk:
1206 dprintk("RPC: %5u: %s: server reports header decoding error\n",
1207 rqst->rq_task->tk_pid, __func__);
1208 break;
1209 default:
1210 dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
1211 rqst->rq_task->tk_pid, __func__, be32_to_cpup(p));
1212 }
1213
1214 r_xprt->rx_stats.bad_reply_count++;
1215 return -EREMOTEIO;
1216}
1217
983/* Process received RPC/RDMA messages. 1218/* Process received RPC/RDMA messages.
984 * 1219 *
985 * Errors must result in the RPC task either being awakened, or 1220 * Errors must result in the RPC task either being awakened, or
@@ -993,33 +1228,39 @@ rpcrdma_reply_handler(struct work_struct *work)
993 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1228 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
994 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1229 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
995 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1230 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
996 struct rpcrdma_msg *headerp; 1231 struct xdr_stream *xdr = &rep->rr_stream;
997 struct rpcrdma_req *req; 1232 struct rpcrdma_req *req;
998 struct rpc_rqst *rqst; 1233 struct rpc_rqst *rqst;
999 __be32 *iptr; 1234 __be32 *p, xid, vers, proc;
1000 int rdmalen, status, rmerr;
1001 unsigned long cwnd; 1235 unsigned long cwnd;
1002 struct list_head mws; 1236 struct list_head mws;
1237 int status;
1003 1238
1004 dprintk("RPC: %s: incoming rep %p\n", __func__, rep); 1239 dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
1005 1240
1006 if (rep->rr_len == RPCRDMA_BAD_LEN) 1241 if (rep->rr_hdrbuf.head[0].iov_len == 0)
1007 goto out_badstatus; 1242 goto out_badstatus;
1008 if (rep->rr_len < RPCRDMA_HDRLEN_ERR) 1243
1244 xdr_init_decode(xdr, &rep->rr_hdrbuf,
1245 rep->rr_hdrbuf.head[0].iov_base);
1246
1247 /* Fixed transport header fields */
1248 p = xdr_inline_decode(xdr, 4 * sizeof(*p));
1249 if (unlikely(!p))
1009 goto out_shortreply; 1250 goto out_shortreply;
1251 xid = *p++;
1252 vers = *p++;
1253 p++; /* credits */
1254 proc = *p++;
1010 1255
1011 headerp = rdmab_to_msg(rep->rr_rdmabuf); 1256 if (rpcrdma_is_bcall(r_xprt, rep, xid, proc))
1012#if defined(CONFIG_SUNRPC_BACKCHANNEL) 1257 return;
1013 if (rpcrdma_is_bcall(headerp))
1014 goto out_bcall;
1015#endif
1016 1258
1017 /* Match incoming rpcrdma_rep to an rpcrdma_req to 1259 /* Match incoming rpcrdma_rep to an rpcrdma_req to
1018 * get context for handling any incoming chunks. 1260 * get context for handling any incoming chunks.
1019 */ 1261 */
1020 spin_lock(&buf->rb_lock); 1262 spin_lock(&buf->rb_lock);
1021 req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf, 1263 req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf, xid);
1022 headerp->rm_xid);
1023 if (!req) 1264 if (!req)
1024 goto out_nomatch; 1265 goto out_nomatch;
1025 if (req->rl_reply) 1266 if (req->rl_reply)
@@ -1035,7 +1276,7 @@ rpcrdma_reply_handler(struct work_struct *work)
1035 spin_unlock(&buf->rb_lock); 1276 spin_unlock(&buf->rb_lock);
1036 1277
1037 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", 1278 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
1038 __func__, rep, req, be32_to_cpu(headerp->rm_xid)); 1279 __func__, rep, req, be32_to_cpu(xid));
1039 1280
1040 /* Invalidate and unmap the data payloads before waking the 1281 /* Invalidate and unmap the data payloads before waking the
1041 * waiting application. This guarantees the memory regions 1282 * waiting application. This guarantees the memory regions
@@ -1052,82 +1293,28 @@ rpcrdma_reply_handler(struct work_struct *work)
1052 * the rep, rqst, and rq_task pointers remain stable. 1293 * the rep, rqst, and rq_task pointers remain stable.
1053 */ 1294 */
1054 spin_lock(&xprt->recv_lock); 1295 spin_lock(&xprt->recv_lock);
1055 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); 1296 rqst = xprt_lookup_rqst(xprt, xid);
1056 if (!rqst) 1297 if (!rqst)
1057 goto out_norqst; 1298 goto out_norqst;
1058 xprt->reestablish_timeout = 0; 1299 xprt->reestablish_timeout = 0;
1059 if (headerp->rm_vers != rpcrdma_version) 1300 if (vers != rpcrdma_version)
1060 goto out_badversion; 1301 goto out_badversion;
1061 1302
1062 /* check for expected message types */ 1303 switch (proc) {
1063 /* The order of some of these tests is important. */
1064 switch (headerp->rm_type) {
1065 case rdma_msg: 1304 case rdma_msg:
1066 /* never expect read chunks */ 1305 status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1067 /* never expect reply chunks (two ways to check) */
1068 if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
1069 (headerp->rm_body.rm_chunks[1] == xdr_zero &&
1070 headerp->rm_body.rm_chunks[2] != xdr_zero))
1071 goto badheader;
1072 if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
1073 /* count any expected write chunks in read reply */
1074 /* start at write chunk array count */
1075 iptr = &headerp->rm_body.rm_chunks[2];
1076 rdmalen = rpcrdma_count_chunks(rep, 1, &iptr);
1077 /* check for validity, and no reply chunk after */
1078 if (rdmalen < 0 || *iptr++ != xdr_zero)
1079 goto badheader;
1080 rep->rr_len -=
1081 ((unsigned char *)iptr - (unsigned char *)headerp);
1082 status = rep->rr_len + rdmalen;
1083 r_xprt->rx_stats.total_rdma_reply += rdmalen;
1084 /* special case - last chunk may omit padding */
1085 if (rdmalen &= 3) {
1086 rdmalen = 4 - rdmalen;
1087 status += rdmalen;
1088 }
1089 } else {
1090 /* else ordinary inline */
1091 rdmalen = 0;
1092 iptr = (__be32 *)((unsigned char *)headerp +
1093 RPCRDMA_HDRLEN_MIN);
1094 rep->rr_len -= RPCRDMA_HDRLEN_MIN;
1095 status = rep->rr_len;
1096 }
1097
1098 r_xprt->rx_stats.fixup_copy_count +=
1099 rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len,
1100 rdmalen);
1101 break; 1306 break;
1102
1103 case rdma_nomsg: 1307 case rdma_nomsg:
1104 /* never expect read or write chunks, always reply chunks */ 1308 status = rpcrdma_decode_nomsg(r_xprt, rep);
1105 if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
1106 headerp->rm_body.rm_chunks[1] != xdr_zero ||
1107 headerp->rm_body.rm_chunks[2] != xdr_one)
1108 goto badheader;
1109 iptr = (__be32 *)((unsigned char *)headerp +
1110 RPCRDMA_HDRLEN_MIN);
1111 rdmalen = rpcrdma_count_chunks(rep, 0, &iptr);
1112 if (rdmalen < 0)
1113 goto badheader;
1114 r_xprt->rx_stats.total_rdma_reply += rdmalen;
1115 /* Reply chunk buffer already is the reply vector - no fixup. */
1116 status = rdmalen;
1117 break; 1309 break;
1118
1119 case rdma_error: 1310 case rdma_error:
1120 goto out_rdmaerr; 1311 status = rpcrdma_decode_error(r_xprt, rep, rqst);
1121 1312 break;
1122badheader:
1123 default: 1313 default:
1124 dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1125 rqst->rq_task->tk_pid, __func__,
1126 be32_to_cpu(headerp->rm_type));
1127 status = -EIO; 1314 status = -EIO;
1128 r_xprt->rx_stats.bad_reply_count++;
1129 break;
1130 } 1315 }
1316 if (status < 0)
1317 goto out_badheader;
1131 1318
1132out: 1319out:
1133 cwnd = xprt->cwnd; 1320 cwnd = xprt->cwnd;
@@ -1149,42 +1336,22 @@ out_badstatus:
1149 } 1336 }
1150 return; 1337 return;
1151 1338
1152#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1153out_bcall:
1154 rpcrdma_bc_receive_call(r_xprt, rep);
1155 return;
1156#endif
1157
1158/* If the incoming reply terminated a pending RPC, the next 1339/* If the incoming reply terminated a pending RPC, the next
1159 * RPC call will post a replacement receive buffer as it is 1340 * RPC call will post a replacement receive buffer as it is
1160 * being marshaled. 1341 * being marshaled.
1161 */ 1342 */
1162out_badversion: 1343out_badversion:
1163 dprintk("RPC: %s: invalid version %d\n", 1344 dprintk("RPC: %s: invalid version %d\n",
1164 __func__, be32_to_cpu(headerp->rm_vers)); 1345 __func__, be32_to_cpu(vers));
1165 status = -EIO; 1346 status = -EIO;
1166 r_xprt->rx_stats.bad_reply_count++; 1347 r_xprt->rx_stats.bad_reply_count++;
1167 goto out; 1348 goto out;
1168 1349
1169out_rdmaerr: 1350out_badheader:
1170 rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err); 1351 dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1171 switch (rmerr) { 1352 rqst->rq_task->tk_pid, __func__, be32_to_cpu(proc));
1172 case ERR_VERS:
1173 pr_err("%s: server reports header version error (%u-%u)\n",
1174 __func__,
1175 be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
1176 be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
1177 break;
1178 case ERR_CHUNK:
1179 pr_err("%s: server reports header decoding error\n",
1180 __func__);
1181 break;
1182 default:
1183 pr_err("%s: server reports unknown error %d\n",
1184 __func__, rmerr);
1185 }
1186 status = -EREMOTEIO;
1187 r_xprt->rx_stats.bad_reply_count++; 1353 r_xprt->rx_stats.bad_reply_count++;
1354 status = -EIO;
1188 goto out; 1355 goto out;
1189 1356
1190/* The req was still available, but by the time the recv_lock 1357/* The req was still available, but by the time the recv_lock
@@ -1204,16 +1371,15 @@ out_shortreply:
1204 1371
1205out_nomatch: 1372out_nomatch:
1206 spin_unlock(&buf->rb_lock); 1373 spin_unlock(&buf->rb_lock);
1207 dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n", 1374 dprintk("RPC: %s: no match for incoming xid 0x%08x\n",
1208 __func__, be32_to_cpu(headerp->rm_xid), 1375 __func__, be32_to_cpu(xid));
1209 rep->rr_len);
1210 goto repost; 1376 goto repost;
1211 1377
1212out_duplicate: 1378out_duplicate:
1213 spin_unlock(&buf->rb_lock); 1379 spin_unlock(&buf->rb_lock);
1214 dprintk("RPC: %s: " 1380 dprintk("RPC: %s: "
1215 "duplicate reply %p to RPC request %p: xid 0x%08x\n", 1381 "duplicate reply %p to RPC request %p: xid 0x%08x\n",
1216 __func__, rep, req, be32_to_cpu(headerp->rm_xid)); 1382 __func__, rep, req, be32_to_cpu(xid));
1217 1383
1218/* If no pending RPC transaction was matched, post a replacement 1384/* If no pending RPC transaction was matched, post a replacement
1219 * receive buffer before returning. 1385 * receive buffer before returning.
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 0d574cda242d..ec37ad83b068 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -269,7 +269,7 @@ xprt_rdma_bc_put(struct rpc_xprt *xprt)
269 module_put(THIS_MODULE); 269 module_put(THIS_MODULE);
270} 270}
271 271
272static struct rpc_xprt_ops xprt_rdma_bc_procs = { 272static const struct rpc_xprt_ops xprt_rdma_bc_procs = {
273 .reserve_xprt = xprt_reserve_xprt_cong, 273 .reserve_xprt = xprt_reserve_xprt_cong,
274 .release_xprt = xprt_release_xprt_cong, 274 .release_xprt = xprt_release_xprt_cong,
275 .alloc_slot = xprt_alloc_slot, 275 .alloc_slot = xprt_alloc_slot,
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index d1c458e5ec4d..b680591f6763 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -149,7 +149,7 @@ static struct ctl_table sunrpc_table[] = {
149 149
150#endif 150#endif
151 151
152static struct rpc_xprt_ops xprt_rdma_procs; /*forward reference */ 152static const struct rpc_xprt_ops xprt_rdma_procs;
153 153
154static void 154static void
155xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap) 155xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
@@ -559,6 +559,7 @@ rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
559 559
560 r_xprt->rx_stats.hardway_register_count += size; 560 r_xprt->rx_stats.hardway_register_count += size;
561 req->rl_rdmabuf = rb; 561 req->rl_rdmabuf = rb;
562 xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
562 return true; 563 return true;
563} 564}
564 565
@@ -730,7 +731,7 @@ xprt_rdma_send_request(struct rpc_task *task)
730 if (unlikely(!list_empty(&req->rl_registered))) 731 if (unlikely(!list_empty(&req->rl_registered)))
731 r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); 732 r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
732 733
733 rc = rpcrdma_marshal_req(rqst); 734 rc = rpcrdma_marshal_req(r_xprt, rqst);
734 if (rc < 0) 735 if (rc < 0)
735 goto failed_marshal; 736 goto failed_marshal;
736 737
@@ -811,7 +812,7 @@ xprt_rdma_disable_swap(struct rpc_xprt *xprt)
811 * Plumbing for rpc transport switch and kernel module 812 * Plumbing for rpc transport switch and kernel module
812 */ 813 */
813 814
814static struct rpc_xprt_ops xprt_rdma_procs = { 815static const struct rpc_xprt_ops xprt_rdma_procs = {
815 .reserve_xprt = xprt_reserve_xprt_cong, 816 .reserve_xprt = xprt_reserve_xprt_cong,
816 .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */ 817 .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */
817 .alloc_slot = xprt_alloc_slot, 818 .alloc_slot = xprt_alloc_slot,
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index e4171f2abe37..c78fb27c20ed 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -139,14 +139,11 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
139static void 139static void
140rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) 140rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
141{ 141{
142 struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
143 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; 142 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
143 __be32 *p = rep->rr_rdmabuf->rg_base;
144 u32 credits; 144 u32 credits;
145 145
146 if (rep->rr_len < RPCRDMA_HDRLEN_ERR) 146 credits = be32_to_cpup(p + 2);
147 return;
148
149 credits = be32_to_cpu(rmsgp->rm_credit);
150 if (credits == 0) 147 if (credits == 0)
151 credits = 1; /* don't deadlock */ 148 credits = 1; /* don't deadlock */
152 else if (credits > buffer->rb_max_requests) 149 else if (credits > buffer->rb_max_requests)
@@ -173,21 +170,19 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
173 goto out_fail; 170 goto out_fail;
174 171
175 /* status == SUCCESS means all fields in wc are trustworthy */ 172 /* status == SUCCESS means all fields in wc are trustworthy */
176 if (wc->opcode != IB_WC_RECV)
177 return;
178
179 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 173 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n",
180 __func__, rep, wc->byte_len); 174 __func__, rep, wc->byte_len);
181 175
182 rep->rr_len = wc->byte_len; 176 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
183 rep->rr_wc_flags = wc->wc_flags; 177 rep->rr_wc_flags = wc->wc_flags;
184 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 178 rep->rr_inv_rkey = wc->ex.invalidate_rkey;
185 179
186 ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf), 180 ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
187 rdmab_addr(rep->rr_rdmabuf), 181 rdmab_addr(rep->rr_rdmabuf),
188 rep->rr_len, DMA_FROM_DEVICE); 182 wc->byte_len, DMA_FROM_DEVICE);
189 183
190 rpcrdma_update_granted_credits(rep); 184 if (wc->byte_len >= RPCRDMA_HDRLEN_ERR)
185 rpcrdma_update_granted_credits(rep);
191 186
192out_schedule: 187out_schedule:
193 queue_work(rpcrdma_receive_wq, &rep->rr_work); 188 queue_work(rpcrdma_receive_wq, &rep->rr_work);
@@ -198,7 +193,7 @@ out_fail:
198 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 193 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
199 ib_wc_status_msg(wc->status), 194 ib_wc_status_msg(wc->status),
200 wc->status, wc->vendor_err); 195 wc->status, wc->vendor_err);
201 rep->rr_len = RPCRDMA_BAD_LEN; 196 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
202 goto out_schedule; 197 goto out_schedule;
203} 198}
204 199
@@ -974,6 +969,8 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
974 rc = PTR_ERR(rep->rr_rdmabuf); 969 rc = PTR_ERR(rep->rr_rdmabuf);
975 goto out_free; 970 goto out_free;
976 } 971 }
972 xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
973 rdmab_length(rep->rr_rdmabuf));
977 974
978 rep->rr_cqe.done = rpcrdma_wc_receive; 975 rep->rr_cqe.done = rpcrdma_wc_receive;
979 rep->rr_rxprt = r_xprt; 976 rep->rr_rxprt = r_xprt;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index b282d3f8cdd8..45dab2475c99 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -218,18 +218,17 @@ enum {
218 218
219struct rpcrdma_rep { 219struct rpcrdma_rep {
220 struct ib_cqe rr_cqe; 220 struct ib_cqe rr_cqe;
221 unsigned int rr_len;
222 int rr_wc_flags; 221 int rr_wc_flags;
223 u32 rr_inv_rkey; 222 u32 rr_inv_rkey;
223 struct rpcrdma_regbuf *rr_rdmabuf;
224 struct rpcrdma_xprt *rr_rxprt; 224 struct rpcrdma_xprt *rr_rxprt;
225 struct work_struct rr_work; 225 struct work_struct rr_work;
226 struct xdr_buf rr_hdrbuf;
227 struct xdr_stream rr_stream;
226 struct list_head rr_list; 228 struct list_head rr_list;
227 struct ib_recv_wr rr_recv_wr; 229 struct ib_recv_wr rr_recv_wr;
228 struct rpcrdma_regbuf *rr_rdmabuf;
229}; 230};
230 231
231#define RPCRDMA_BAD_LEN (~0U)
232
233/* 232/*
234 * struct rpcrdma_mw - external memory region metadata 233 * struct rpcrdma_mw - external memory region metadata
235 * 234 *
@@ -346,6 +345,8 @@ struct rpcrdma_req {
346 unsigned int rl_connect_cookie; 345 unsigned int rl_connect_cookie;
347 struct rpcrdma_buffer *rl_buffer; 346 struct rpcrdma_buffer *rl_buffer;
348 struct rpcrdma_rep *rl_reply; 347 struct rpcrdma_rep *rl_reply;
348 struct xdr_stream rl_stream;
349 struct xdr_buf rl_hdrbuf;
349 struct ib_send_wr rl_send_wr; 350 struct ib_send_wr rl_send_wr;
350 struct ib_sge rl_send_sge[RPCRDMA_MAX_SEND_SGES]; 351 struct ib_sge rl_send_sge[RPCRDMA_MAX_SEND_SGES];
351 struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */ 352 struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
@@ -440,24 +441,27 @@ struct rpcrdma_create_data_internal {
440 * Statistics for RPCRDMA 441 * Statistics for RPCRDMA
441 */ 442 */
442struct rpcrdma_stats { 443struct rpcrdma_stats {
444 /* accessed when sending a call */
443 unsigned long read_chunk_count; 445 unsigned long read_chunk_count;
444 unsigned long write_chunk_count; 446 unsigned long write_chunk_count;
445 unsigned long reply_chunk_count; 447 unsigned long reply_chunk_count;
446
447 unsigned long long total_rdma_request; 448 unsigned long long total_rdma_request;
448 unsigned long long total_rdma_reply;
449 449
450 /* rarely accessed error counters */
450 unsigned long long pullup_copy_count; 451 unsigned long long pullup_copy_count;
451 unsigned long long fixup_copy_count;
452 unsigned long hardway_register_count; 452 unsigned long hardway_register_count;
453 unsigned long failed_marshal_count; 453 unsigned long failed_marshal_count;
454 unsigned long bad_reply_count; 454 unsigned long bad_reply_count;
455 unsigned long nomsg_call_count;
456 unsigned long bcall_count;
457 unsigned long mrs_recovered; 455 unsigned long mrs_recovered;
458 unsigned long mrs_orphaned; 456 unsigned long mrs_orphaned;
459 unsigned long mrs_allocated; 457 unsigned long mrs_allocated;
458
459 /* accessed when receiving a reply */
460 unsigned long long total_rdma_reply;
461 unsigned long long fixup_copy_count;
460 unsigned long local_inv_needed; 462 unsigned long local_inv_needed;
463 unsigned long nomsg_call_count;
464 unsigned long bcall_count;
461}; 465};
462 466
463/* 467/*
@@ -465,7 +469,8 @@ struct rpcrdma_stats {
465 */ 469 */
466struct rpcrdma_xprt; 470struct rpcrdma_xprt;
467struct rpcrdma_memreg_ops { 471struct rpcrdma_memreg_ops {
468 int (*ro_map)(struct rpcrdma_xprt *, 472 struct rpcrdma_mr_seg *
473 (*ro_map)(struct rpcrdma_xprt *,
469 struct rpcrdma_mr_seg *, int, bool, 474 struct rpcrdma_mr_seg *, int, bool,
470 struct rpcrdma_mw **); 475 struct rpcrdma_mw **);
471 void (*ro_unmap_sync)(struct rpcrdma_xprt *, 476 void (*ro_unmap_sync)(struct rpcrdma_xprt *,
@@ -638,10 +643,16 @@ enum rpcrdma_chunktype {
638bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *, 643bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *,
639 u32, struct xdr_buf *, enum rpcrdma_chunktype); 644 u32, struct xdr_buf *, enum rpcrdma_chunktype);
640void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *); 645void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
641int rpcrdma_marshal_req(struct rpc_rqst *); 646int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
642void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); 647void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
643void rpcrdma_reply_handler(struct work_struct *work); 648void rpcrdma_reply_handler(struct work_struct *work);
644 649
650static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
651{
652 xdr->head[0].iov_len = len;
653 xdr->len = len;
654}
655
645/* RPC/RDMA module init - xprtrdma/transport.c 656/* RPC/RDMA module init - xprtrdma/transport.c
646 */ 657 */
647extern unsigned int xprt_rdma_max_inline_read; 658extern unsigned int xprt_rdma_max_inline_read;
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 2b918137aaa0..9b5de31aa429 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2728,7 +2728,7 @@ static void bc_destroy(struct rpc_xprt *xprt)
2728 module_put(THIS_MODULE); 2728 module_put(THIS_MODULE);
2729} 2729}
2730 2730
2731static struct rpc_xprt_ops xs_local_ops = { 2731static const struct rpc_xprt_ops xs_local_ops = {
2732 .reserve_xprt = xprt_reserve_xprt, 2732 .reserve_xprt = xprt_reserve_xprt,
2733 .release_xprt = xs_tcp_release_xprt, 2733 .release_xprt = xs_tcp_release_xprt,
2734 .alloc_slot = xprt_alloc_slot, 2734 .alloc_slot = xprt_alloc_slot,
@@ -2746,7 +2746,7 @@ static struct rpc_xprt_ops xs_local_ops = {
2746 .disable_swap = xs_disable_swap, 2746 .disable_swap = xs_disable_swap,
2747}; 2747};
2748 2748
2749static struct rpc_xprt_ops xs_udp_ops = { 2749static const struct rpc_xprt_ops xs_udp_ops = {
2750 .set_buffer_size = xs_udp_set_buffer_size, 2750 .set_buffer_size = xs_udp_set_buffer_size,
2751 .reserve_xprt = xprt_reserve_xprt_cong, 2751 .reserve_xprt = xprt_reserve_xprt_cong,
2752 .release_xprt = xprt_release_xprt_cong, 2752 .release_xprt = xprt_release_xprt_cong,
@@ -2768,7 +2768,7 @@ static struct rpc_xprt_ops xs_udp_ops = {
2768 .inject_disconnect = xs_inject_disconnect, 2768 .inject_disconnect = xs_inject_disconnect,
2769}; 2769};
2770 2770
2771static struct rpc_xprt_ops xs_tcp_ops = { 2771static const struct rpc_xprt_ops xs_tcp_ops = {
2772 .reserve_xprt = xprt_reserve_xprt, 2772 .reserve_xprt = xprt_reserve_xprt,
2773 .release_xprt = xs_tcp_release_xprt, 2773 .release_xprt = xs_tcp_release_xprt,
2774 .alloc_slot = xprt_lock_and_alloc_slot, 2774 .alloc_slot = xprt_lock_and_alloc_slot,
@@ -2799,7 +2799,7 @@ static struct rpc_xprt_ops xs_tcp_ops = {
2799 * The rpc_xprt_ops for the server backchannel 2799 * The rpc_xprt_ops for the server backchannel
2800 */ 2800 */
2801 2801
2802static struct rpc_xprt_ops bc_tcp_ops = { 2802static const struct rpc_xprt_ops bc_tcp_ops = {
2803 .reserve_xprt = xprt_reserve_xprt, 2803 .reserve_xprt = xprt_reserve_xprt,
2804 .release_xprt = xprt_release_xprt, 2804 .release_xprt = xprt_release_xprt,
2805 .alloc_slot = xprt_alloc_slot, 2805 .alloc_slot = xprt_alloc_slot,