aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2016-06-29 13:54:25 -0400
committerAnna Schumaker <Anna.Schumaker@Netapp.com>2016-07-11 15:50:43 -0400
commit5ab8142839c714ed5ac9a9de1846ab71f87a3ed7 (patch)
tree126164156d2a118ca7a5617cbe72ebfa96a430c2
parent9d6b0409788287b64d8401ffba2ce11a5a86a879 (diff)
xprtrdma: Chunk list encoders no longer share one rl_segments array
Currently, all three chunk list encoders each use a portion of the one rl_segments array in rpcrdma_req. This is because the MWs for each chunk list were preserved in rl_segments so that ro_unmap could find and invalidate them after the RPC was complete. However, now that MWs are placed on a per-req linked list as they are registered, there is no longer any information in rpcrdma_mr_seg that is shared between ro_map and ro_unmap_{sync,safe}, and thus nothing in rl_segments needs to be preserved after rpcrdma_marshal_req is complete. Thus the rl_segments array can be used now just for the needs of each rpcrdma_convert_iovs call. Once each chunk list is encoded, the next chunk list encoder is free to re-use all of rl_segments. This means all three chunk lists in one RPC request can now each encode a full size data payload with no increase in the size of rl_segments. This is a key requirement for Kerberos support, since both the Call and Reply for a single RPC transaction are conveyed via Long messages (RDMA Read/Write). Both can be large. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Tested-by: Steve Wise <swise@opengridcomputing.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c61
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h36
2 files changed, 44 insertions, 53 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 6d34c1f7908a..f60d229b78b4 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -196,8 +196,7 @@ rpcrdma_tail_pullup(struct xdr_buf *buf)
196 * MR when they can. 196 * MR when they can.
197 */ 197 */
198static int 198static int
199rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, 199rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)
200 int n, int nsegs)
201{ 200{
202 size_t page_offset; 201 size_t page_offset;
203 u32 remaining; 202 u32 remaining;
@@ -206,7 +205,7 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
206 base = vec->iov_base; 205 base = vec->iov_base;
207 page_offset = offset_in_page(base); 206 page_offset = offset_in_page(base);
208 remaining = vec->iov_len; 207 remaining = vec->iov_len;
209 while (remaining && n < nsegs) { 208 while (remaining && n < RPCRDMA_MAX_SEGS) {
210 seg[n].mr_page = NULL; 209 seg[n].mr_page = NULL;
211 seg[n].mr_offset = base; 210 seg[n].mr_offset = base;
212 seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); 211 seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
@@ -230,23 +229,23 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
230 229
231static int 230static int
232rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, 231rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
233 enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs) 232 enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg)
234{ 233{
235 int len, n = 0, p; 234 int len, n, p, page_base;
236 int page_base;
237 struct page **ppages; 235 struct page **ppages;
238 236
237 n = 0;
239 if (pos == 0) { 238 if (pos == 0) {
240 n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs); 239 n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n);
241 if (n == nsegs) 240 if (n == RPCRDMA_MAX_SEGS)
242 return -EIO; 241 goto out_overflow;
243 } 242 }
244 243
245 len = xdrbuf->page_len; 244 len = xdrbuf->page_len;
246 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); 245 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
247 page_base = xdrbuf->page_base & ~PAGE_MASK; 246 page_base = xdrbuf->page_base & ~PAGE_MASK;
248 p = 0; 247 p = 0;
249 while (len && n < nsegs) { 248 while (len && n < RPCRDMA_MAX_SEGS) {
250 if (!ppages[p]) { 249 if (!ppages[p]) {
251 /* alloc the pagelist for receiving buffer */ 250 /* alloc the pagelist for receiving buffer */
252 ppages[p] = alloc_page(GFP_ATOMIC); 251 ppages[p] = alloc_page(GFP_ATOMIC);
@@ -257,7 +256,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
257 seg[n].mr_offset = (void *)(unsigned long) page_base; 256 seg[n].mr_offset = (void *)(unsigned long) page_base;
258 seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); 257 seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
259 if (seg[n].mr_len > PAGE_SIZE) 258 if (seg[n].mr_len > PAGE_SIZE)
260 return -EIO; 259 goto out_overflow;
261 len -= seg[n].mr_len; 260 len -= seg[n].mr_len;
262 ++n; 261 ++n;
263 ++p; 262 ++p;
@@ -265,8 +264,8 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
265 } 264 }
266 265
267 /* Message overflows the seg array */ 266 /* Message overflows the seg array */
268 if (len && n == nsegs) 267 if (len && n == RPCRDMA_MAX_SEGS)
269 return -EIO; 268 goto out_overflow;
270 269
271 /* When encoding the read list, the tail is always sent inline */ 270 /* When encoding the read list, the tail is always sent inline */
272 if (type == rpcrdma_readch) 271 if (type == rpcrdma_readch)
@@ -277,12 +276,16 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
277 * xdr pad bytes, saving the server an RDMA operation. */ 276 * xdr pad bytes, saving the server an RDMA operation. */
278 if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize) 277 if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
279 return n; 278 return n;
280 n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs); 279 n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n);
281 if (n == nsegs) 280 if (n == RPCRDMA_MAX_SEGS)
282 return -EIO; 281 goto out_overflow;
283 } 282 }
284 283
285 return n; 284 return n;
285
286out_overflow:
287 pr_err("rpcrdma: segment array overflow\n");
288 return -EIO;
286} 289}
287 290
288static inline __be32 * 291static inline __be32 *
@@ -310,7 +313,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
310 struct rpcrdma_req *req, struct rpc_rqst *rqst, 313 struct rpcrdma_req *req, struct rpc_rqst *rqst,
311 __be32 *iptr, enum rpcrdma_chunktype rtype) 314 __be32 *iptr, enum rpcrdma_chunktype rtype)
312{ 315{
313 struct rpcrdma_mr_seg *seg = req->rl_nextseg; 316 struct rpcrdma_mr_seg *seg;
314 struct rpcrdma_mw *mw; 317 struct rpcrdma_mw *mw;
315 unsigned int pos; 318 unsigned int pos;
316 int n, nsegs; 319 int n, nsegs;
@@ -323,8 +326,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
323 pos = rqst->rq_snd_buf.head[0].iov_len; 326 pos = rqst->rq_snd_buf.head[0].iov_len;
324 if (rtype == rpcrdma_areadch) 327 if (rtype == rpcrdma_areadch)
325 pos = 0; 328 pos = 0;
326 nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, 329 seg = req->rl_segments;
327 RPCRDMA_MAX_SEGS - req->rl_nchunks); 330 nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg);
328 if (nsegs < 0) 331 if (nsegs < 0)
329 return ERR_PTR(nsegs); 332 return ERR_PTR(nsegs);
330 333
@@ -349,11 +352,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
349 mw->mw_handle, n < nsegs ? "more" : "last"); 352 mw->mw_handle, n < nsegs ? "more" : "last");
350 353
351 r_xprt->rx_stats.read_chunk_count++; 354 r_xprt->rx_stats.read_chunk_count++;
352 req->rl_nchunks++;
353 seg += n; 355 seg += n;
354 nsegs -= n; 356 nsegs -= n;
355 } while (nsegs); 357 } while (nsegs);
356 req->rl_nextseg = seg;
357 358
358 /* Finish Read list */ 359 /* Finish Read list */
359 *iptr++ = xdr_zero; /* Next item not present */ 360 *iptr++ = xdr_zero; /* Next item not present */
@@ -377,7 +378,7 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
377 struct rpc_rqst *rqst, __be32 *iptr, 378 struct rpc_rqst *rqst, __be32 *iptr,
378 enum rpcrdma_chunktype wtype) 379 enum rpcrdma_chunktype wtype)
379{ 380{
380 struct rpcrdma_mr_seg *seg = req->rl_nextseg; 381 struct rpcrdma_mr_seg *seg;
381 struct rpcrdma_mw *mw; 382 struct rpcrdma_mw *mw;
382 int n, nsegs, nchunks; 383 int n, nsegs, nchunks;
383 __be32 *segcount; 384 __be32 *segcount;
@@ -387,10 +388,10 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
387 return iptr; 388 return iptr;
388 } 389 }
389 390
391 seg = req->rl_segments;
390 nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 392 nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
391 rqst->rq_rcv_buf.head[0].iov_len, 393 rqst->rq_rcv_buf.head[0].iov_len,
392 wtype, seg, 394 wtype, seg);
393 RPCRDMA_MAX_SEGS - req->rl_nchunks);
394 if (nsegs < 0) 395 if (nsegs < 0)
395 return ERR_PTR(nsegs); 396 return ERR_PTR(nsegs);
396 397
@@ -414,12 +415,10 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
414 415
415 r_xprt->rx_stats.write_chunk_count++; 416 r_xprt->rx_stats.write_chunk_count++;
416 r_xprt->rx_stats.total_rdma_request += seg->mr_len; 417 r_xprt->rx_stats.total_rdma_request += seg->mr_len;
417 req->rl_nchunks++;
418 nchunks++; 418 nchunks++;
419 seg += n; 419 seg += n;
420 nsegs -= n; 420 nsegs -= n;
421 } while (nsegs); 421 } while (nsegs);
422 req->rl_nextseg = seg;
423 422
424 /* Update count of segments in this Write chunk */ 423 /* Update count of segments in this Write chunk */
425 *segcount = cpu_to_be32(nchunks); 424 *segcount = cpu_to_be32(nchunks);
@@ -446,7 +445,7 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
446 struct rpcrdma_req *req, struct rpc_rqst *rqst, 445 struct rpcrdma_req *req, struct rpc_rqst *rqst,
447 __be32 *iptr, enum rpcrdma_chunktype wtype) 446 __be32 *iptr, enum rpcrdma_chunktype wtype)
448{ 447{
449 struct rpcrdma_mr_seg *seg = req->rl_nextseg; 448 struct rpcrdma_mr_seg *seg;
450 struct rpcrdma_mw *mw; 449 struct rpcrdma_mw *mw;
451 int n, nsegs, nchunks; 450 int n, nsegs, nchunks;
452 __be32 *segcount; 451 __be32 *segcount;
@@ -456,8 +455,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
456 return iptr; 455 return iptr;
457 } 456 }
458 457
459 nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg, 458 seg = req->rl_segments;
460 RPCRDMA_MAX_SEGS - req->rl_nchunks); 459 nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg);
461 if (nsegs < 0) 460 if (nsegs < 0)
462 return ERR_PTR(nsegs); 461 return ERR_PTR(nsegs);
463 462
@@ -481,12 +480,10 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
481 480
482 r_xprt->rx_stats.reply_chunk_count++; 481 r_xprt->rx_stats.reply_chunk_count++;
483 r_xprt->rx_stats.total_rdma_request += seg->mr_len; 482 r_xprt->rx_stats.total_rdma_request += seg->mr_len;
484 req->rl_nchunks++;
485 nchunks++; 483 nchunks++;
486 seg += n; 484 seg += n;
487 nsegs -= n; 485 nsegs -= n;
488 } while (nsegs); 486 } while (nsegs);
489 req->rl_nextseg = seg;
490 487
491 /* Update count of segments in the Reply chunk */ 488 /* Update count of segments in the Reply chunk */
492 *segcount = cpu_to_be32(nchunks); 489 *segcount = cpu_to_be32(nchunks);
@@ -656,8 +653,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
656 * send a Call message with a Position Zero Read chunk and a 653 * send a Call message with a Position Zero Read chunk and a
657 * regular Read chunk at the same time. 654 * regular Read chunk at the same time.
658 */ 655 */
659 req->rl_nchunks = 0;
660 req->rl_nextseg = req->rl_segments;
661 iptr = headerp->rm_body.rm_chunks; 656 iptr = headerp->rm_body.rm_chunks;
662 iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype); 657 iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
663 if (IS_ERR(iptr)) 658 if (IS_ERR(iptr))
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index f5d05110de9f..670fad57153a 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -171,23 +171,14 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
171 * o recv buffer (posted to provider) 171 * o recv buffer (posted to provider)
172 * o ib_sge (also donated to provider) 172 * o ib_sge (also donated to provider)
173 * o status of reply (length, success or not) 173 * o status of reply (length, success or not)
174 * o bookkeeping state to get run by tasklet (list, etc) 174 * o bookkeeping state to get run by reply handler (list, etc)
175 * 175 *
176 * These are allocated during initialization, per-transport instance; 176 * These are allocated during initialization, per-transport instance.
177 * however, the tasklet execution list itself is global, as it should
178 * always be pretty short.
179 * 177 *
180 * N of these are associated with a transport instance, and stored in 178 * N of these are associated with a transport instance, and stored in
181 * struct rpcrdma_buffer. N is the max number of outstanding requests. 179 * struct rpcrdma_buffer. N is the max number of outstanding requests.
182 */ 180 */
183 181
184#define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE)
185
186/* data segments + head/tail for Call + head/tail for Reply */
187#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 4)
188
189struct rpcrdma_buffer;
190
191struct rpcrdma_rep { 182struct rpcrdma_rep {
192 struct ib_cqe rr_cqe; 183 struct ib_cqe rr_cqe;
193 unsigned int rr_len; 184 unsigned int rr_len;
@@ -267,13 +258,18 @@ struct rpcrdma_mw {
267 * of iovs for send operations. The reason is that the iovs passed to 258 * of iovs for send operations. The reason is that the iovs passed to
268 * ib_post_{send,recv} must not be modified until the work request 259 * ib_post_{send,recv} must not be modified until the work request
269 * completes. 260 * completes.
270 *
271 * NOTES:
272 * o RPCRDMA_MAX_SEGS is the max number of addressible chunk elements we
273 * marshal. The number needed varies depending on the iov lists that
274 * are passed to us and the memory registration mode we are in.
275 */ 261 */
276 262
263/* Maximum number of page-sized "segments" per chunk list to be
264 * registered or invalidated. Must handle a Reply chunk:
265 */
266enum {
267 RPCRDMA_MAX_IOV_SEGS = 3,
268 RPCRDMA_MAX_DATA_SEGS = ((1 * 1024 * 1024) / PAGE_SIZE) + 1,
269 RPCRDMA_MAX_SEGS = RPCRDMA_MAX_DATA_SEGS +
270 RPCRDMA_MAX_IOV_SEGS,
271};
272
277struct rpcrdma_mr_seg { /* chunk descriptors */ 273struct rpcrdma_mr_seg { /* chunk descriptors */
278 u32 mr_len; /* length of chunk or segment */ 274 u32 mr_len; /* length of chunk or segment */
279 struct page *mr_page; /* owning page, if any */ 275 struct page *mr_page; /* owning page, if any */
@@ -282,10 +278,10 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
282 278
283#define RPCRDMA_MAX_IOVS (2) 279#define RPCRDMA_MAX_IOVS (2)
284 280
281struct rpcrdma_buffer;
285struct rpcrdma_req { 282struct rpcrdma_req {
286 struct list_head rl_free; 283 struct list_head rl_free;
287 unsigned int rl_niovs; 284 unsigned int rl_niovs;
288 unsigned int rl_nchunks;
289 unsigned int rl_connect_cookie; 285 unsigned int rl_connect_cookie;
290 struct rpc_task *rl_task; 286 struct rpc_task *rl_task;
291 struct rpcrdma_buffer *rl_buffer; 287 struct rpcrdma_buffer *rl_buffer;
@@ -293,13 +289,13 @@ struct rpcrdma_req {
293 struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS]; 289 struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
294 struct rpcrdma_regbuf *rl_rdmabuf; 290 struct rpcrdma_regbuf *rl_rdmabuf;
295 struct rpcrdma_regbuf *rl_sendbuf; 291 struct rpcrdma_regbuf *rl_sendbuf;
296 struct list_head rl_registered; /* registered segments */
297 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
298 struct rpcrdma_mr_seg *rl_nextseg;
299 292
300 struct ib_cqe rl_cqe; 293 struct ib_cqe rl_cqe;
301 struct list_head rl_all; 294 struct list_head rl_all;
302 bool rl_backchannel; 295 bool rl_backchannel;
296
297 struct list_head rl_registered; /* registered segments */
298 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
303}; 299};
304 300
305static inline struct rpcrdma_req * 301static inline struct rpcrdma_req *