aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-05-26 13:33:33 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2016-05-26 13:33:33 -0400
commitea8ea737c46cffa5d0ee74309f81e55a7e5e9c2a (patch)
treeae159b2c5968fa3c2a5a4ab7176584bc9a17b889 /net
parent0b9210c9c86e46a7a62bbc7b69b84001315072ff (diff)
parentc7d73af2d249f0323f5cdb171a59497ce80011fb (diff)
Merge tag 'nfs-for-4.7-1' of git://git.linux-nfs.org/projects/anna/linux-nfs
Pull NFS client updates from Anna Schumaker: "Highlights include: Features: - Add support for the NFS v4.2 COPY operation - Add support for NFS/RDMA over IPv6 Bugfixes and cleanups: - Avoid race that crashes nfs_init_commit() - Fix oops in callback path - Fix LOCK/OPEN race when unlinking an open file - Choose correct stateids when using delegations in setattr, read and write - Don't send empty SETATTR after OPEN_CREATE - xprtrdma: Prevent server from writing a reply into memory client has released - xprtrdma: Support using Read list and Reply chunk in one RPC call" * tag 'nfs-for-4.7-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (61 commits) pnfs: pnfs_update_layout needs to consider if strict iomode checking is on nfs/flexfiles: Use the layout segment for reading unless it a IOMODE_RW and reading is disabled nfs/flexfiles: Helper function to detect FF_FLAGS_NO_READ_IO nfs: avoid race that crashes nfs_init_commit NFS: checking for NULL instead of IS_ERR() in nfs_commit_file() pnfs: make pnfs_layout_process more robust pnfs: rework LAYOUTGET retry handling pnfs: lift retry logic from send_layoutget to pnfs_update_layout pnfs: fix bad error handling in send_layoutget flexfiles: add kerneldoc header to nfs4_ff_layout_prepare_ds flexfiles: remove pointless setting of NFS_LAYOUT_RETURN_REQUESTED pnfs: only tear down lsegs that precede seqid in LAYOUTRETURN args pnfs: keep track of the return sequence number in pnfs_layout_hdr pnfs: record sequence in pnfs_layout_segment when it's created pnfs: don't merge new ff lsegs with ones that have LAYOUTRETURN bit set pNFS/flexfiles: When initing reads or writes, we might have to retry connecting to DSes pNFS/flexfiles: When checking for available DSes, conditionally check for MDS io pNFS/flexfile: Fix erroneous fall back to read/write through the MDS NFS: Reclaim writes via writepage are opportunistic NFSv4: Use the right stateid for delegations in setattr, read and write ...
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/auth.c9
-rw-r--r--net/sunrpc/auth_generic.c13
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c6
-rw-r--r--net/sunrpc/auth_unix.c6
-rw-r--r--net/sunrpc/clnt.c17
-rw-r--r--net/sunrpc/xdr.c2
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c16
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c134
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c214
-rw-r--r--net/sunrpc/xprtrdma/physical_ops.c39
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c517
-rw-r--r--net/sunrpc/xprtrdma/transport.c16
-rw-r--r--net/sunrpc/xprtrdma/verbs.c78
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h47
-rw-r--r--net/sunrpc/xprtsock.c6
15 files changed, 676 insertions, 444 deletions
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index 02f53674dc39..040ff627c18a 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -543,7 +543,7 @@ rpcauth_cache_enforce_limit(void)
543 */ 543 */
544struct rpc_cred * 544struct rpc_cred *
545rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred, 545rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred,
546 int flags) 546 int flags, gfp_t gfp)
547{ 547{
548 LIST_HEAD(free); 548 LIST_HEAD(free);
549 struct rpc_cred_cache *cache = auth->au_credcache; 549 struct rpc_cred_cache *cache = auth->au_credcache;
@@ -580,7 +580,7 @@ rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred,
580 if (flags & RPCAUTH_LOOKUP_RCU) 580 if (flags & RPCAUTH_LOOKUP_RCU)
581 return ERR_PTR(-ECHILD); 581 return ERR_PTR(-ECHILD);
582 582
583 new = auth->au_ops->crcreate(auth, acred, flags); 583 new = auth->au_ops->crcreate(auth, acred, flags, gfp);
584 if (IS_ERR(new)) { 584 if (IS_ERR(new)) {
585 cred = new; 585 cred = new;
586 goto out; 586 goto out;
@@ -703,8 +703,7 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags)
703 new = rpcauth_bind_new_cred(task, lookupflags); 703 new = rpcauth_bind_new_cred(task, lookupflags);
704 if (IS_ERR(new)) 704 if (IS_ERR(new))
705 return PTR_ERR(new); 705 return PTR_ERR(new);
706 if (req->rq_cred != NULL) 706 put_rpccred(req->rq_cred);
707 put_rpccred(req->rq_cred);
708 req->rq_cred = new; 707 req->rq_cred = new;
709 return 0; 708 return 0;
710} 709}
@@ -712,6 +711,8 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags)
712void 711void
713put_rpccred(struct rpc_cred *cred) 712put_rpccred(struct rpc_cred *cred)
714{ 713{
714 if (cred == NULL)
715 return;
715 /* Fast path for unhashed credentials */ 716 /* Fast path for unhashed credentials */
716 if (test_bit(RPCAUTH_CRED_HASHED, &cred->cr_flags) == 0) { 717 if (test_bit(RPCAUTH_CRED_HASHED, &cred->cr_flags) == 0) {
717 if (atomic_dec_and_test(&cred->cr_count)) 718 if (atomic_dec_and_test(&cred->cr_count))
diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c
index 41248b1820c7..54dd3fdead54 100644
--- a/net/sunrpc/auth_generic.c
+++ b/net/sunrpc/auth_generic.c
@@ -38,6 +38,13 @@ struct rpc_cred *rpc_lookup_cred(void)
38} 38}
39EXPORT_SYMBOL_GPL(rpc_lookup_cred); 39EXPORT_SYMBOL_GPL(rpc_lookup_cred);
40 40
41struct rpc_cred *
42rpc_lookup_generic_cred(struct auth_cred *acred, int flags, gfp_t gfp)
43{
44 return rpcauth_lookup_credcache(&generic_auth, acred, flags, gfp);
45}
46EXPORT_SYMBOL_GPL(rpc_lookup_generic_cred);
47
41struct rpc_cred *rpc_lookup_cred_nonblock(void) 48struct rpc_cred *rpc_lookup_cred_nonblock(void)
42{ 49{
43 return rpcauth_lookupcred(&generic_auth, RPCAUTH_LOOKUP_RCU); 50 return rpcauth_lookupcred(&generic_auth, RPCAUTH_LOOKUP_RCU);
@@ -77,15 +84,15 @@ static struct rpc_cred *generic_bind_cred(struct rpc_task *task,
77static struct rpc_cred * 84static struct rpc_cred *
78generic_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) 85generic_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
79{ 86{
80 return rpcauth_lookup_credcache(&generic_auth, acred, flags); 87 return rpcauth_lookup_credcache(&generic_auth, acred, flags, GFP_KERNEL);
81} 88}
82 89
83static struct rpc_cred * 90static struct rpc_cred *
84generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) 91generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp)
85{ 92{
86 struct generic_cred *gcred; 93 struct generic_cred *gcred;
87 94
88 gcred = kmalloc(sizeof(*gcred), GFP_KERNEL); 95 gcred = kmalloc(sizeof(*gcred), gfp);
89 if (gcred == NULL) 96 if (gcred == NULL)
90 return ERR_PTR(-ENOMEM); 97 return ERR_PTR(-ENOMEM);
91 98
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 15612ffa8d57..e64ae93d5b4f 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1299,11 +1299,11 @@ gss_destroy_cred(struct rpc_cred *cred)
1299static struct rpc_cred * 1299static struct rpc_cred *
1300gss_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) 1300gss_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
1301{ 1301{
1302 return rpcauth_lookup_credcache(auth, acred, flags); 1302 return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS);
1303} 1303}
1304 1304
1305static struct rpc_cred * 1305static struct rpc_cred *
1306gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) 1306gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp)
1307{ 1307{
1308 struct gss_auth *gss_auth = container_of(auth, struct gss_auth, rpc_auth); 1308 struct gss_auth *gss_auth = container_of(auth, struct gss_auth, rpc_auth);
1309 struct gss_cred *cred = NULL; 1309 struct gss_cred *cred = NULL;
@@ -1313,7 +1313,7 @@ gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
1313 __func__, from_kuid(&init_user_ns, acred->uid), 1313 __func__, from_kuid(&init_user_ns, acred->uid),
1314 auth->au_flavor); 1314 auth->au_flavor);
1315 1315
1316 if (!(cred = kzalloc(sizeof(*cred), GFP_NOFS))) 1316 if (!(cred = kzalloc(sizeof(*cred), gfp)))
1317 goto out_err; 1317 goto out_err;
1318 1318
1319 rpcauth_init_cred(&cred->gc_base, acred, auth, &gss_credops); 1319 rpcauth_init_cred(&cred->gc_base, acred, auth, &gss_credops);
diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c
index 0d3dd364c22f..9f65452b7cbc 100644
--- a/net/sunrpc/auth_unix.c
+++ b/net/sunrpc/auth_unix.c
@@ -52,11 +52,11 @@ unx_destroy(struct rpc_auth *auth)
52static struct rpc_cred * 52static struct rpc_cred *
53unx_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) 53unx_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
54{ 54{
55 return rpcauth_lookup_credcache(auth, acred, flags); 55 return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS);
56} 56}
57 57
58static struct rpc_cred * 58static struct rpc_cred *
59unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) 59unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp)
60{ 60{
61 struct unx_cred *cred; 61 struct unx_cred *cred;
62 unsigned int groups = 0; 62 unsigned int groups = 0;
@@ -66,7 +66,7 @@ unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
66 from_kuid(&init_user_ns, acred->uid), 66 from_kuid(&init_user_ns, acred->uid),
67 from_kgid(&init_user_ns, acred->gid)); 67 from_kgid(&init_user_ns, acred->gid));
68 68
69 if (!(cred = kmalloc(sizeof(*cred), GFP_NOFS))) 69 if (!(cred = kmalloc(sizeof(*cred), gfp)))
70 return ERR_PTR(-ENOMEM); 70 return ERR_PTR(-ENOMEM);
71 71
72 rpcauth_init_cred(&cred->uc_base, acred, auth, &unix_credops); 72 rpcauth_init_cred(&cred->uc_base, acred, auth, &unix_credops);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 7e0c9bf22df8..06b4df9faaa1 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1414,6 +1414,23 @@ size_t rpc_max_payload(struct rpc_clnt *clnt)
1414EXPORT_SYMBOL_GPL(rpc_max_payload); 1414EXPORT_SYMBOL_GPL(rpc_max_payload);
1415 1415
1416/** 1416/**
1417 * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1418 * @clnt: RPC client to query
1419 */
1420size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1421{
1422 struct rpc_xprt *xprt;
1423 size_t ret;
1424
1425 rcu_read_lock();
1426 xprt = rcu_dereference(clnt->cl_xprt);
1427 ret = xprt->ops->bc_maxpayload(xprt);
1428 rcu_read_unlock();
1429 return ret;
1430}
1431EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1432
1433/**
1417 * rpc_get_timeout - Get timeout for transport in units of HZ 1434 * rpc_get_timeout - Get timeout for transport in units of HZ
1418 * @clnt: RPC client to query 1435 * @clnt: RPC client to query
1419 */ 1436 */
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index 6bdb3865212d..c4f3cc0c0775 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -797,6 +797,8 @@ void xdr_init_decode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)
797 xdr_set_iov(xdr, buf->head, buf->len); 797 xdr_set_iov(xdr, buf->head, buf->len);
798 else if (buf->page_len != 0) 798 else if (buf->page_len != 0)
799 xdr_set_page_base(xdr, 0, buf->len); 799 xdr_set_page_base(xdr, 0, buf->len);
800 else
801 xdr_set_iov(xdr, buf->head, buf->len);
800 if (p != NULL && p > xdr->p && xdr->end >= p) { 802 if (p != NULL && p > xdr->p && xdr->end >= p) {
801 xdr->nwords -= p - xdr->p; 803 xdr->nwords -= p - xdr->p;
802 xdr->p = p; 804 xdr->p = p;
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 2dcd7640eeb5..87762d976b63 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -192,6 +192,22 @@ int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
192} 192}
193 193
194/** 194/**
195 * xprt_rdma_bc_maxpayload - Return maximum backchannel message size
196 * @xprt: transport
197 *
198 * Returns maximum size, in bytes, of a backchannel message
199 */
200size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
201{
202 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
203 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
204 size_t maxmsg;
205
206 maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
207 return maxmsg - RPCRDMA_HDRLEN_MIN;
208}
209
210/**
195 * rpcrdma_bc_marshal_reply - Send backwards direction reply 211 * rpcrdma_bc_marshal_reply - Send backwards direction reply
196 * @rqst: buffer containing RPC reply data 212 * @rqst: buffer containing RPC reply data
197 * 213 *
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index b289e106540b..6326ebe8b595 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -35,10 +35,71 @@
35/* Maximum scatter/gather per FMR */ 35/* Maximum scatter/gather per FMR */
36#define RPCRDMA_MAX_FMR_SGES (64) 36#define RPCRDMA_MAX_FMR_SGES (64)
37 37
38static struct workqueue_struct *fmr_recovery_wq;
39
40#define FMR_RECOVERY_WQ_FLAGS (WQ_UNBOUND)
41
42int
43fmr_alloc_recovery_wq(void)
44{
45 fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0);
46 return !fmr_recovery_wq ? -ENOMEM : 0;
47}
48
49void
50fmr_destroy_recovery_wq(void)
51{
52 struct workqueue_struct *wq;
53
54 if (!fmr_recovery_wq)
55 return;
56
57 wq = fmr_recovery_wq;
58 fmr_recovery_wq = NULL;
59 destroy_workqueue(wq);
60}
61
62static int
63__fmr_unmap(struct rpcrdma_mw *mw)
64{
65 LIST_HEAD(l);
66
67 list_add(&mw->fmr.fmr->list, &l);
68 return ib_unmap_fmr(&l);
69}
70
71/* Deferred reset of a single FMR. Generate a fresh rkey by
72 * replacing the MR. There's no recovery if this fails.
73 */
74static void
75__fmr_recovery_worker(struct work_struct *work)
76{
77 struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw,
78 mw_work);
79 struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
80
81 __fmr_unmap(mw);
82 rpcrdma_put_mw(r_xprt, mw);
83 return;
84}
85
86/* A broken MR was discovered in a context that can't sleep.
87 * Defer recovery to the recovery worker.
88 */
89static void
90__fmr_queue_recovery(struct rpcrdma_mw *mw)
91{
92 INIT_WORK(&mw->mw_work, __fmr_recovery_worker);
93 queue_work(fmr_recovery_wq, &mw->mw_work);
94}
95
38static int 96static int
39fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, 97fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
40 struct rpcrdma_create_data_internal *cdata) 98 struct rpcrdma_create_data_internal *cdata)
41{ 99{
100 rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
101 RPCRDMA_MAX_DATA_SEGS /
102 RPCRDMA_MAX_FMR_SGES));
42 return 0; 103 return 0;
43} 104}
44 105
@@ -48,7 +109,7 @@ static size_t
48fmr_op_maxpages(struct rpcrdma_xprt *r_xprt) 109fmr_op_maxpages(struct rpcrdma_xprt *r_xprt)
49{ 110{
50 return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, 111 return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
51 rpcrdma_max_segments(r_xprt) * RPCRDMA_MAX_FMR_SGES); 112 RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES);
52} 113}
53 114
54static int 115static int
@@ -89,6 +150,7 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
89 if (IS_ERR(r->fmr.fmr)) 150 if (IS_ERR(r->fmr.fmr))
90 goto out_fmr_err; 151 goto out_fmr_err;
91 152
153 r->mw_xprt = r_xprt;
92 list_add(&r->mw_list, &buf->rb_mws); 154 list_add(&r->mw_list, &buf->rb_mws);
93 list_add(&r->mw_all, &buf->rb_all); 155 list_add(&r->mw_all, &buf->rb_all);
94 } 156 }
@@ -104,15 +166,6 @@ out:
104 return rc; 166 return rc;
105} 167}
106 168
107static int
108__fmr_unmap(struct rpcrdma_mw *r)
109{
110 LIST_HEAD(l);
111
112 list_add(&r->fmr.fmr->list, &l);
113 return ib_unmap_fmr(&l);
114}
115
116/* Use the ib_map_phys_fmr() verb to register a memory region 169/* Use the ib_map_phys_fmr() verb to register a memory region
117 * for remote access via RDMA READ or RDMA WRITE. 170 * for remote access via RDMA READ or RDMA WRITE.
118 */ 171 */
@@ -183,15 +236,10 @@ static void
183__fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) 236__fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
184{ 237{
185 struct ib_device *device = r_xprt->rx_ia.ri_device; 238 struct ib_device *device = r_xprt->rx_ia.ri_device;
186 struct rpcrdma_mw *mw = seg->rl_mw;
187 int nsegs = seg->mr_nsegs; 239 int nsegs = seg->mr_nsegs;
188 240
189 seg->rl_mw = NULL;
190
191 while (nsegs--) 241 while (nsegs--)
192 rpcrdma_unmap_one(device, seg++); 242 rpcrdma_unmap_one(device, seg++);
193
194 rpcrdma_put_mw(r_xprt, mw);
195} 243}
196 244
197/* Invalidate all memory regions that were registered for "req". 245/* Invalidate all memory regions that were registered for "req".
@@ -234,42 +282,50 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
234 seg = &req->rl_segments[i]; 282 seg = &req->rl_segments[i];
235 283
236 __fmr_dma_unmap(r_xprt, seg); 284 __fmr_dma_unmap(r_xprt, seg);
285 rpcrdma_put_mw(r_xprt, seg->rl_mw);
237 286
238 i += seg->mr_nsegs; 287 i += seg->mr_nsegs;
239 seg->mr_nsegs = 0; 288 seg->mr_nsegs = 0;
289 seg->rl_mw = NULL;
240 } 290 }
241 291
242 req->rl_nchunks = 0; 292 req->rl_nchunks = 0;
243} 293}
244 294
245/* Use the ib_unmap_fmr() verb to prevent further remote 295/* Use a slow, safe mechanism to invalidate all memory regions
246 * access via RDMA READ or RDMA WRITE. 296 * that were registered for "req".
297 *
298 * In the asynchronous case, DMA unmapping occurs first here
299 * because the rpcrdma_mr_seg is released immediately after this
300 * call. It's contents won't be available in __fmr_dma_unmap later.
301 * FIXME.
247 */ 302 */
248static int 303static void
249fmr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) 304fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
305 bool sync)
250{ 306{
251 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 307 struct rpcrdma_mr_seg *seg;
252 struct rpcrdma_mr_seg *seg1 = seg; 308 struct rpcrdma_mw *mw;
253 struct rpcrdma_mw *mw = seg1->rl_mw; 309 unsigned int i;
254 int rc, nsegs = seg->mr_nsegs;
255 310
256 dprintk("RPC: %s: FMR %p\n", __func__, mw); 311 for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
312 seg = &req->rl_segments[i];
313 mw = seg->rl_mw;
257 314
258 seg1->rl_mw = NULL; 315 if (sync) {
259 while (seg1->mr_nsegs--) 316 /* ORDER */
260 rpcrdma_unmap_one(ia->ri_device, seg++); 317 __fmr_unmap(mw);
261 rc = __fmr_unmap(mw); 318 __fmr_dma_unmap(r_xprt, seg);
262 if (rc) 319 rpcrdma_put_mw(r_xprt, mw);
263 goto out_err; 320 } else {
264 rpcrdma_put_mw(r_xprt, mw); 321 __fmr_dma_unmap(r_xprt, seg);
265 return nsegs; 322 __fmr_queue_recovery(mw);
323 }
266 324
267out_err: 325 i += seg->mr_nsegs;
268 /* The FMR is abandoned, but remains in rb_all. fmr_op_destroy 326 seg->mr_nsegs = 0;
269 * will attempt to release it when the transport is destroyed. 327 seg->rl_mw = NULL;
270 */ 328 }
271 dprintk("RPC: %s: ib_unmap_fmr status %i\n", __func__, rc);
272 return nsegs;
273} 329}
274 330
275static void 331static void
@@ -295,7 +351,7 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
295const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { 351const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
296 .ro_map = fmr_op_map, 352 .ro_map = fmr_op_map,
297 .ro_unmap_sync = fmr_op_unmap_sync, 353 .ro_unmap_sync = fmr_op_unmap_sync,
298 .ro_unmap = fmr_op_unmap, 354 .ro_unmap_safe = fmr_op_unmap_safe,
299 .ro_open = fmr_op_open, 355 .ro_open = fmr_op_open,
300 .ro_maxpages = fmr_op_maxpages, 356 .ro_maxpages = fmr_op_maxpages,
301 .ro_init = fmr_op_init, 357 .ro_init = fmr_op_init,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 94c3fa910b85..c0947544babe 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -98,6 +98,47 @@ frwr_destroy_recovery_wq(void)
98 destroy_workqueue(wq); 98 destroy_workqueue(wq);
99} 99}
100 100
101static int
102__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
103{
104 struct rpcrdma_frmr *f = &r->frmr;
105 int rc;
106
107 rc = ib_dereg_mr(f->fr_mr);
108 if (rc) {
109 pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n",
110 rc, r);
111 return rc;
112 }
113
114 f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG,
115 ia->ri_max_frmr_depth);
116 if (IS_ERR(f->fr_mr)) {
117 pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
118 PTR_ERR(f->fr_mr), r);
119 return PTR_ERR(f->fr_mr);
120 }
121
122 dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
123 f->fr_state = FRMR_IS_INVALID;
124 return 0;
125}
126
127static void
128__frwr_reset_and_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
129{
130 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
131 struct rpcrdma_frmr *f = &mw->frmr;
132 int rc;
133
134 rc = __frwr_reset_mr(ia, mw);
135 ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, f->fr_dir);
136 if (rc)
137 return;
138
139 rpcrdma_put_mw(r_xprt, mw);
140}
141
101/* Deferred reset of a single FRMR. Generate a fresh rkey by 142/* Deferred reset of a single FRMR. Generate a fresh rkey by
102 * replacing the MR. 143 * replacing the MR.
103 * 144 *
@@ -109,26 +150,10 @@ static void
109__frwr_recovery_worker(struct work_struct *work) 150__frwr_recovery_worker(struct work_struct *work)
110{ 151{
111 struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw, 152 struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
112 frmr.fr_work); 153 mw_work);
113 struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt;
114 unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
115 struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
116
117 if (ib_dereg_mr(r->frmr.fr_mr))
118 goto out_fail;
119 154
120 r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); 155 __frwr_reset_and_unmap(r->mw_xprt, r);
121 if (IS_ERR(r->frmr.fr_mr))
122 goto out_fail;
123
124 dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
125 r->frmr.fr_state = FRMR_IS_INVALID;
126 rpcrdma_put_mw(r_xprt, r);
127 return; 156 return;
128
129out_fail:
130 pr_warn("RPC: %s: FRMR %p unrecovered\n",
131 __func__, r);
132} 157}
133 158
134/* A broken MR was discovered in a context that can't sleep. 159/* A broken MR was discovered in a context that can't sleep.
@@ -137,8 +162,8 @@ out_fail:
137static void 162static void
138__frwr_queue_recovery(struct rpcrdma_mw *r) 163__frwr_queue_recovery(struct rpcrdma_mw *r)
139{ 164{
140 INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker); 165 INIT_WORK(&r->mw_work, __frwr_recovery_worker);
141 queue_work(frwr_recovery_wq, &r->frmr.fr_work); 166 queue_work(frwr_recovery_wq, &r->mw_work);
142} 167}
143 168
144static int 169static int
@@ -152,11 +177,11 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
152 if (IS_ERR(f->fr_mr)) 177 if (IS_ERR(f->fr_mr))
153 goto out_mr_err; 178 goto out_mr_err;
154 179
155 f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL); 180 f->fr_sg = kcalloc(depth, sizeof(*f->fr_sg), GFP_KERNEL);
156 if (!f->sg) 181 if (!f->fr_sg)
157 goto out_list_err; 182 goto out_list_err;
158 183
159 sg_init_table(f->sg, depth); 184 sg_init_table(f->fr_sg, depth);
160 185
161 init_completion(&f->fr_linv_done); 186 init_completion(&f->fr_linv_done);
162 187
@@ -185,7 +210,7 @@ __frwr_release(struct rpcrdma_mw *r)
185 if (rc) 210 if (rc)
186 dprintk("RPC: %s: ib_dereg_mr status %i\n", 211 dprintk("RPC: %s: ib_dereg_mr status %i\n",
187 __func__, rc); 212 __func__, rc);
188 kfree(r->frmr.sg); 213 kfree(r->frmr.fr_sg);
189} 214}
190 215
191static int 216static int
@@ -231,6 +256,9 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
231 depth; 256 depth;
232 } 257 }
233 258
259 rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
260 RPCRDMA_MAX_DATA_SEGS /
261 ia->ri_max_frmr_depth));
234 return 0; 262 return 0;
235} 263}
236 264
@@ -243,7 +271,7 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
243 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 271 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
244 272
245 return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, 273 return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
246 rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth); 274 RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frmr_depth);
247} 275}
248 276
249static void 277static void
@@ -350,9 +378,9 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
350 return rc; 378 return rc;
351 } 379 }
352 380
381 r->mw_xprt = r_xprt;
353 list_add(&r->mw_list, &buf->rb_mws); 382 list_add(&r->mw_list, &buf->rb_mws);
354 list_add(&r->mw_all, &buf->rb_all); 383 list_add(&r->mw_all, &buf->rb_all);
355 r->frmr.fr_xprt = r_xprt;
356 } 384 }
357 385
358 return 0; 386 return 0;
@@ -396,12 +424,12 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
396 424
397 for (i = 0; i < nsegs;) { 425 for (i = 0; i < nsegs;) {
398 if (seg->mr_page) 426 if (seg->mr_page)
399 sg_set_page(&frmr->sg[i], 427 sg_set_page(&frmr->fr_sg[i],
400 seg->mr_page, 428 seg->mr_page,
401 seg->mr_len, 429 seg->mr_len,
402 offset_in_page(seg->mr_offset)); 430 offset_in_page(seg->mr_offset));
403 else 431 else
404 sg_set_buf(&frmr->sg[i], seg->mr_offset, 432 sg_set_buf(&frmr->fr_sg[i], seg->mr_offset,
405 seg->mr_len); 433 seg->mr_len);
406 434
407 ++seg; 435 ++seg;
@@ -412,25 +440,26 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
412 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) 440 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
413 break; 441 break;
414 } 442 }
415 frmr->sg_nents = i; 443 frmr->fr_nents = i;
444 frmr->fr_dir = direction;
416 445
417 dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction); 446 dma_nents = ib_dma_map_sg(device, frmr->fr_sg, frmr->fr_nents, direction);
418 if (!dma_nents) { 447 if (!dma_nents) {
419 pr_err("RPC: %s: failed to dma map sg %p sg_nents %u\n", 448 pr_err("RPC: %s: failed to dma map sg %p sg_nents %u\n",
420 __func__, frmr->sg, frmr->sg_nents); 449 __func__, frmr->fr_sg, frmr->fr_nents);
421 return -ENOMEM; 450 return -ENOMEM;
422 } 451 }
423 452
424 n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE); 453 n = ib_map_mr_sg(mr, frmr->fr_sg, frmr->fr_nents, NULL, PAGE_SIZE);
425 if (unlikely(n != frmr->sg_nents)) { 454 if (unlikely(n != frmr->fr_nents)) {
426 pr_err("RPC: %s: failed to map mr %p (%u/%u)\n", 455 pr_err("RPC: %s: failed to map mr %p (%u/%u)\n",
427 __func__, frmr->fr_mr, n, frmr->sg_nents); 456 __func__, frmr->fr_mr, n, frmr->fr_nents);
428 rc = n < 0 ? n : -EINVAL; 457 rc = n < 0 ? n : -EINVAL;
429 goto out_senderr; 458 goto out_senderr;
430 } 459 }
431 460
432 dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n", 461 dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n",
433 __func__, mw, frmr->sg_nents, mr->length); 462 __func__, mw, frmr->fr_nents, mr->length);
434 463
435 key = (u8)(mr->rkey & 0x000000FF); 464 key = (u8)(mr->rkey & 0x000000FF);
436 ib_update_fast_reg_key(mr, ++key); 465 ib_update_fast_reg_key(mr, ++key);
@@ -452,18 +481,16 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
452 if (rc) 481 if (rc)
453 goto out_senderr; 482 goto out_senderr;
454 483
455 seg1->mr_dir = direction;
456 seg1->rl_mw = mw; 484 seg1->rl_mw = mw;
457 seg1->mr_rkey = mr->rkey; 485 seg1->mr_rkey = mr->rkey;
458 seg1->mr_base = mr->iova; 486 seg1->mr_base = mr->iova;
459 seg1->mr_nsegs = frmr->sg_nents; 487 seg1->mr_nsegs = frmr->fr_nents;
460 seg1->mr_len = mr->length; 488 seg1->mr_len = mr->length;
461 489
462 return frmr->sg_nents; 490 return frmr->fr_nents;
463 491
464out_senderr: 492out_senderr:
465 dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); 493 dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc);
466 ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction);
467 __frwr_queue_recovery(mw); 494 __frwr_queue_recovery(mw);
468 return rc; 495 return rc;
469} 496}
@@ -487,24 +514,6 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
487 return invalidate_wr; 514 return invalidate_wr;
488} 515}
489 516
490static void
491__frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
492 int rc)
493{
494 struct ib_device *device = r_xprt->rx_ia.ri_device;
495 struct rpcrdma_mw *mw = seg->rl_mw;
496 struct rpcrdma_frmr *f = &mw->frmr;
497
498 seg->rl_mw = NULL;
499
500 ib_dma_unmap_sg(device, f->sg, f->sg_nents, seg->mr_dir);
501
502 if (!rc)
503 rpcrdma_put_mw(r_xprt, mw);
504 else
505 __frwr_queue_recovery(mw);
506}
507
508/* Invalidate all memory regions that were registered for "req". 517/* Invalidate all memory regions that were registered for "req".
509 * 518 *
510 * Sleeps until it is safe for the host CPU to access the 519 * Sleeps until it is safe for the host CPU to access the
@@ -518,6 +527,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
518 struct rpcrdma_mr_seg *seg; 527 struct rpcrdma_mr_seg *seg;
519 unsigned int i, nchunks; 528 unsigned int i, nchunks;
520 struct rpcrdma_frmr *f; 529 struct rpcrdma_frmr *f;
530 struct rpcrdma_mw *mw;
521 int rc; 531 int rc;
522 532
523 dprintk("RPC: %s: req %p\n", __func__, req); 533 dprintk("RPC: %s: req %p\n", __func__, req);
@@ -558,11 +568,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
558 * unless ri_id->qp is a valid pointer. 568 * unless ri_id->qp is a valid pointer.
559 */ 569 */
560 rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr); 570 rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
561 if (rc) { 571 if (rc)
562 pr_warn("%s: ib_post_send failed %i\n", __func__, rc); 572 goto reset_mrs;
563 rdma_disconnect(ia->ri_id);
564 goto unmap;
565 }
566 573
567 wait_for_completion(&f->fr_linv_done); 574 wait_for_completion(&f->fr_linv_done);
568 575
@@ -572,56 +579,65 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
572unmap: 579unmap:
573 for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { 580 for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
574 seg = &req->rl_segments[i]; 581 seg = &req->rl_segments[i];
582 mw = seg->rl_mw;
583 seg->rl_mw = NULL;
575 584
576 __frwr_dma_unmap(r_xprt, seg, rc); 585 ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents,
586 f->fr_dir);
587 rpcrdma_put_mw(r_xprt, mw);
577 588
578 i += seg->mr_nsegs; 589 i += seg->mr_nsegs;
579 seg->mr_nsegs = 0; 590 seg->mr_nsegs = 0;
580 } 591 }
581 592
582 req->rl_nchunks = 0; 593 req->rl_nchunks = 0;
583} 594 return;
584 595
585/* Post a LOCAL_INV Work Request to prevent further remote access 596reset_mrs:
586 * via RDMA READ or RDMA WRITE. 597 pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
587 */
588static int
589frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
590{
591 struct rpcrdma_mr_seg *seg1 = seg;
592 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
593 struct rpcrdma_mw *mw = seg1->rl_mw;
594 struct rpcrdma_frmr *frmr = &mw->frmr;
595 struct ib_send_wr *invalidate_wr, *bad_wr;
596 int rc, nsegs = seg->mr_nsegs;
597 598
598 dprintk("RPC: %s: FRMR %p\n", __func__, mw); 599 /* Find and reset the MRs in the LOCAL_INV WRs that did not
600 * get posted. This is synchronous, and slow.
601 */
602 for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
603 seg = &req->rl_segments[i];
604 mw = seg->rl_mw;
605 f = &mw->frmr;
599 606
600 seg1->rl_mw = NULL; 607 if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) {
601 frmr->fr_state = FRMR_IS_INVALID; 608 __frwr_reset_mr(ia, mw);
602 invalidate_wr = &mw->frmr.fr_invwr; 609 bad_wr = bad_wr->next;
610 }
603 611
604 memset(invalidate_wr, 0, sizeof(*invalidate_wr)); 612 i += seg->mr_nsegs;
605 frmr->fr_cqe.done = frwr_wc_localinv; 613 }
606 invalidate_wr->wr_cqe = &frmr->fr_cqe; 614 goto unmap;
607 invalidate_wr->opcode = IB_WR_LOCAL_INV; 615}
608 invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey;
609 DECR_CQCOUNT(&r_xprt->rx_ep);
610 616
611 ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir); 617/* Use a slow, safe mechanism to invalidate all memory regions
612 read_lock(&ia->ri_qplock); 618 * that were registered for "req".
613 rc = ib_post_send(ia->ri_id->qp, invalidate_wr, &bad_wr); 619 */
614 read_unlock(&ia->ri_qplock); 620static void
615 if (rc) 621frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
616 goto out_err; 622 bool sync)
623{
624 struct rpcrdma_mr_seg *seg;
625 struct rpcrdma_mw *mw;
626 unsigned int i;
617 627
618 rpcrdma_put_mw(r_xprt, mw); 628 for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
619 return nsegs; 629 seg = &req->rl_segments[i];
630 mw = seg->rl_mw;
620 631
621out_err: 632 if (sync)
622 dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); 633 __frwr_reset_and_unmap(r_xprt, mw);
623 __frwr_queue_recovery(mw); 634 else
624 return nsegs; 635 __frwr_queue_recovery(mw);
636
637 i += seg->mr_nsegs;
638 seg->mr_nsegs = 0;
639 seg->rl_mw = NULL;
640 }
625} 641}
626 642
627static void 643static void
@@ -643,7 +659,7 @@ frwr_op_destroy(struct rpcrdma_buffer *buf)
643const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { 659const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
644 .ro_map = frwr_op_map, 660 .ro_map = frwr_op_map,
645 .ro_unmap_sync = frwr_op_unmap_sync, 661 .ro_unmap_sync = frwr_op_unmap_sync,
646 .ro_unmap = frwr_op_unmap, 662 .ro_unmap_safe = frwr_op_unmap_safe,
647 .ro_open = frwr_op_open, 663 .ro_open = frwr_op_open,
648 .ro_maxpages = frwr_op_maxpages, 664 .ro_maxpages = frwr_op_maxpages,
649 .ro_init = frwr_op_init, 665 .ro_init = frwr_op_init,
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
index 481b9b6f4a15..3750596cc432 100644
--- a/net/sunrpc/xprtrdma/physical_ops.c
+++ b/net/sunrpc/xprtrdma/physical_ops.c
@@ -36,8 +36,11 @@ physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
36 __func__, PTR_ERR(mr)); 36 __func__, PTR_ERR(mr));
37 return -ENOMEM; 37 return -ENOMEM;
38 } 38 }
39
40 ia->ri_dma_mr = mr; 39 ia->ri_dma_mr = mr;
40
41 rpcrdma_set_max_header_sizes(ia, cdata, min_t(unsigned int,
42 RPCRDMA_MAX_DATA_SEGS,
43 RPCRDMA_MAX_HDR_SEGS));
41 return 0; 44 return 0;
42} 45}
43 46
@@ -47,7 +50,7 @@ static size_t
47physical_op_maxpages(struct rpcrdma_xprt *r_xprt) 50physical_op_maxpages(struct rpcrdma_xprt *r_xprt)
48{ 51{
49 return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, 52 return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
50 rpcrdma_max_segments(r_xprt)); 53 RPCRDMA_MAX_HDR_SEGS);
51} 54}
52 55
53static int 56static int
@@ -71,17 +74,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
71 return 1; 74 return 1;
72} 75}
73 76
74/* Unmap a memory region, but leave it registered.
75 */
76static int
77physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
78{
79 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
80
81 rpcrdma_unmap_one(ia->ri_device, seg);
82 return 1;
83}
84
85/* DMA unmap all memory regions that were mapped for "req". 77/* DMA unmap all memory regions that were mapped for "req".
86 */ 78 */
87static void 79static void
@@ -94,6 +86,25 @@ physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
94 rpcrdma_unmap_one(device, &req->rl_segments[i++]); 86 rpcrdma_unmap_one(device, &req->rl_segments[i++]);
95} 87}
96 88
89/* Use a slow, safe mechanism to invalidate all memory regions
90 * that were registered for "req".
91 *
92 * For physical memory registration, there is no good way to
93 * fence a single MR that has been advertised to the server. The
94 * client has already handed the server an R_key that cannot be
95 * invalidated and is shared by all MRs on this connection.
96 * Tearing down the PD might be the only safe choice, but it's
97 * not clear that a freshly acquired DMA R_key would be different
98 * than the one used by the PD that was just destroyed.
99 * FIXME.
100 */
101static void
102physical_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
103 bool sync)
104{
105 physical_op_unmap_sync(r_xprt, req);
106}
107
97static void 108static void
98physical_op_destroy(struct rpcrdma_buffer *buf) 109physical_op_destroy(struct rpcrdma_buffer *buf)
99{ 110{
@@ -102,7 +113,7 @@ physical_op_destroy(struct rpcrdma_buffer *buf)
102const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = { 113const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = {
103 .ro_map = physical_op_map, 114 .ro_map = physical_op_map,
104 .ro_unmap_sync = physical_op_unmap_sync, 115 .ro_unmap_sync = physical_op_unmap_sync,
105 .ro_unmap = physical_op_unmap, 116 .ro_unmap_safe = physical_op_unmap_safe,
106 .ro_open = physical_op_open, 117 .ro_open = physical_op_open,
107 .ro_maxpages = physical_op_maxpages, 118 .ro_maxpages = physical_op_maxpages,
108 .ro_init = physical_op_init, 119 .ro_init = physical_op_init,
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 888823bb6dae..35a81096e83d 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -61,26 +61,84 @@ enum rpcrdma_chunktype {
61 rpcrdma_replych 61 rpcrdma_replych
62}; 62};
63 63
64#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
65static const char transfertypes[][12] = { 64static const char transfertypes[][12] = {
66 "pure inline", /* no chunks */ 65 "inline", /* no chunks */
67 " read chunk", /* some argument via rdma read */ 66 "read list", /* some argument via rdma read */
68 "*read chunk", /* entire request via rdma read */ 67 "*read list", /* entire request via rdma read */
69 "write chunk", /* some result via rdma write */ 68 "write list", /* some result via rdma write */
70 "reply chunk" /* entire reply via rdma write */ 69 "reply chunk" /* entire reply via rdma write */
71}; 70};
72#endif 71
72/* Returns size of largest RPC-over-RDMA header in a Call message
73 *
74 * The largest Call header contains a full-size Read list and a
75 * minimal Reply chunk.
76 */
77static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
78{
79 unsigned int size;
80
81 /* Fixed header fields and list discriminators */
82 size = RPCRDMA_HDRLEN_MIN;
83
84 /* Maximum Read list size */
85 maxsegs += 2; /* segment for head and tail buffers */
86 size = maxsegs * sizeof(struct rpcrdma_read_chunk);
87
88 /* Minimal Read chunk size */
89 size += sizeof(__be32); /* segment count */
90 size += sizeof(struct rpcrdma_segment);
91 size += sizeof(__be32); /* list discriminator */
92
93 dprintk("RPC: %s: max call header size = %u\n",
94 __func__, size);
95 return size;
96}
97
98/* Returns size of largest RPC-over-RDMA header in a Reply message
99 *
100 * There is only one Write list or one Reply chunk per Reply
101 * message. The larger list is the Write list.
102 */
103static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
104{
105 unsigned int size;
106
107 /* Fixed header fields and list discriminators */
108 size = RPCRDMA_HDRLEN_MIN;
109
110 /* Maximum Write list size */
111 maxsegs += 2; /* segment for head and tail buffers */
112 size = sizeof(__be32); /* segment count */
113 size += maxsegs * sizeof(struct rpcrdma_segment);
114 size += sizeof(__be32); /* list discriminator */
115
116 dprintk("RPC: %s: max reply header size = %u\n",
117 __func__, size);
118 return size;
119}
120
121void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia,
122 struct rpcrdma_create_data_internal *cdata,
123 unsigned int maxsegs)
124{
125 ia->ri_max_inline_write = cdata->inline_wsize -
126 rpcrdma_max_call_header_size(maxsegs);
127 ia->ri_max_inline_read = cdata->inline_rsize -
128 rpcrdma_max_reply_header_size(maxsegs);
129}
73 130
74/* The client can send a request inline as long as the RPCRDMA header 131/* The client can send a request inline as long as the RPCRDMA header
75 * plus the RPC call fit under the transport's inline limit. If the 132 * plus the RPC call fit under the transport's inline limit. If the
76 * combined call message size exceeds that limit, the client must use 133 * combined call message size exceeds that limit, the client must use
77 * the read chunk list for this operation. 134 * the read chunk list for this operation.
78 */ 135 */
79static bool rpcrdma_args_inline(struct rpc_rqst *rqst) 136static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
137 struct rpc_rqst *rqst)
80{ 138{
81 unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len; 139 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
82 140
83 return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); 141 return rqst->rq_snd_buf.len <= ia->ri_max_inline_write;
84} 142}
85 143
86/* The client can't know how large the actual reply will be. Thus it 144/* The client can't know how large the actual reply will be. Thus it
@@ -89,11 +147,12 @@ static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
89 * limit, the client must provide a write list or a reply chunk for 147 * limit, the client must provide a write list or a reply chunk for
90 * this request. 148 * this request.
91 */ 149 */
92static bool rpcrdma_results_inline(struct rpc_rqst *rqst) 150static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
151 struct rpc_rqst *rqst)
93{ 152{
94 unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen; 153 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
95 154
96 return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst); 155 return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
97} 156}
98 157
99static int 158static int
@@ -226,23 +285,16 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
226 return n; 285 return n;
227} 286}
228 287
229/* 288static inline __be32 *
230 * Create read/write chunk lists, and reply chunks, for RDMA 289xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg)
231 * 290{
232 * Assume check against THRESHOLD has been done, and chunks are required. 291 *iptr++ = cpu_to_be32(seg->mr_rkey);
233 * Assume only encoding one list entry for read|write chunks. The NFSv3 292 *iptr++ = cpu_to_be32(seg->mr_len);
234 * protocol is simple enough to allow this as it only has a single "bulk 293 return xdr_encode_hyper(iptr, seg->mr_base);
235 * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The 294}
236 * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.) 295
237 * 296/* XDR-encode the Read list. Supports encoding a list of read
238 * When used for a single reply chunk (which is a special write 297 * segments that belong to a single read chunk.
239 * chunk used for the entire reply, rather than just the data), it
240 * is used primarily for READDIR and READLINK which would otherwise
241 * be severely size-limited by a small rdma inline read max. The server
242 * response will come back as an RDMA Write, followed by a message
243 * of type RDMA_NOMSG carrying the xid and length. As a result, reply
244 * chunks do not provide data alignment, however they do not require
245 * "fixup" (moving the response to the upper layer buffer) either.
246 * 298 *
247 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 299 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
248 * 300 *
@@ -250,131 +302,190 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
250 * N elements, position P (same P for all chunks of same arg!): 302 * N elements, position P (same P for all chunks of same arg!):
251 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 303 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
252 * 304 *
305 * Returns a pointer to the XDR word in the RDMA header following
306 * the end of the Read list, or an error pointer.
307 */
308static __be32 *
309rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
310 struct rpcrdma_req *req, struct rpc_rqst *rqst,
311 __be32 *iptr, enum rpcrdma_chunktype rtype)
312{
313 struct rpcrdma_mr_seg *seg = req->rl_nextseg;
314 unsigned int pos;
315 int n, nsegs;
316
317 if (rtype == rpcrdma_noch) {
318 *iptr++ = xdr_zero; /* item not present */
319 return iptr;
320 }
321
322 pos = rqst->rq_snd_buf.head[0].iov_len;
323 if (rtype == rpcrdma_areadch)
324 pos = 0;
325 nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg,
326 RPCRDMA_MAX_SEGS - req->rl_nchunks);
327 if (nsegs < 0)
328 return ERR_PTR(nsegs);
329
330 do {
331 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false);
332 if (n <= 0)
333 return ERR_PTR(n);
334
335 *iptr++ = xdr_one; /* item present */
336
337 /* All read segments in this chunk
338 * have the same "position".
339 */
340 *iptr++ = cpu_to_be32(pos);
341 iptr = xdr_encode_rdma_segment(iptr, seg);
342
343 dprintk("RPC: %5u %s: read segment pos %u "
344 "%d@0x%016llx:0x%08x (%s)\n",
345 rqst->rq_task->tk_pid, __func__, pos,
346 seg->mr_len, (unsigned long long)seg->mr_base,
347 seg->mr_rkey, n < nsegs ? "more" : "last");
348
349 r_xprt->rx_stats.read_chunk_count++;
350 req->rl_nchunks++;
351 seg += n;
352 nsegs -= n;
353 } while (nsegs);
354 req->rl_nextseg = seg;
355
356 /* Finish Read list */
357 *iptr++ = xdr_zero; /* Next item not present */
358 return iptr;
359}
360
361/* XDR-encode the Write list. Supports encoding a list containing
362 * one array of plain segments that belong to a single write chunk.
363 *
364 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
365 *
253 * Write chunklist (a list of (one) counted array): 366 * Write chunklist (a list of (one) counted array):
254 * N elements: 367 * N elements:
255 * 1 - N - HLOO - HLOO - ... - HLOO - 0 368 * 1 - N - HLOO - HLOO - ... - HLOO - 0
256 * 369 *
370 * Returns a pointer to the XDR word in the RDMA header following
371 * the end of the Write list, or an error pointer.
372 */
373static __be32 *
374rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
375 struct rpc_rqst *rqst, __be32 *iptr,
376 enum rpcrdma_chunktype wtype)
377{
378 struct rpcrdma_mr_seg *seg = req->rl_nextseg;
379 int n, nsegs, nchunks;
380 __be32 *segcount;
381
382 if (wtype != rpcrdma_writech) {
383 *iptr++ = xdr_zero; /* no Write list present */
384 return iptr;
385 }
386
387 nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
388 rqst->rq_rcv_buf.head[0].iov_len,
389 wtype, seg,
390 RPCRDMA_MAX_SEGS - req->rl_nchunks);
391 if (nsegs < 0)
392 return ERR_PTR(nsegs);
393
394 *iptr++ = xdr_one; /* Write list present */
395 segcount = iptr++; /* save location of segment count */
396
397 nchunks = 0;
398 do {
399 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
400 if (n <= 0)
401 return ERR_PTR(n);
402
403 iptr = xdr_encode_rdma_segment(iptr, seg);
404
405 dprintk("RPC: %5u %s: write segment "
406 "%d@0x016%llx:0x%08x (%s)\n",
407 rqst->rq_task->tk_pid, __func__,
408 seg->mr_len, (unsigned long long)seg->mr_base,
409 seg->mr_rkey, n < nsegs ? "more" : "last");
410
411 r_xprt->rx_stats.write_chunk_count++;
412 r_xprt->rx_stats.total_rdma_request += seg->mr_len;
413 req->rl_nchunks++;
414 nchunks++;
415 seg += n;
416 nsegs -= n;
417 } while (nsegs);
418 req->rl_nextseg = seg;
419
420 /* Update count of segments in this Write chunk */
421 *segcount = cpu_to_be32(nchunks);
422
423 /* Finish Write list */
424 *iptr++ = xdr_zero; /* Next item not present */
425 return iptr;
426}
427
428/* XDR-encode the Reply chunk. Supports encoding an array of plain
429 * segments that belong to a single write (reply) chunk.
430 *
431 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
432 *
257 * Reply chunk (a counted array): 433 * Reply chunk (a counted array):
258 * N elements: 434 * N elements:
259 * 1 - N - HLOO - HLOO - ... - HLOO 435 * 1 - N - HLOO - HLOO - ... - HLOO
260 * 436 *
261 * Returns positive RPC/RDMA header size, or negative errno. 437 * Returns a pointer to the XDR word in the RDMA header following
438 * the end of the Reply chunk, or an error pointer.
262 */ 439 */
263 440static __be32 *
264static ssize_t 441rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
265rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, 442 struct rpcrdma_req *req, struct rpc_rqst *rqst,
266 struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type) 443 __be32 *iptr, enum rpcrdma_chunktype wtype)
267{ 444{
268 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 445 struct rpcrdma_mr_seg *seg = req->rl_nextseg;
269 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 446 int n, nsegs, nchunks;
270 int n, nsegs, nchunks = 0; 447 __be32 *segcount;
271 unsigned int pos;
272 struct rpcrdma_mr_seg *seg = req->rl_segments;
273 struct rpcrdma_read_chunk *cur_rchunk = NULL;
274 struct rpcrdma_write_array *warray = NULL;
275 struct rpcrdma_write_chunk *cur_wchunk = NULL;
276 __be32 *iptr = headerp->rm_body.rm_chunks;
277 int (*map)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *, int, bool);
278
279 if (type == rpcrdma_readch || type == rpcrdma_areadch) {
280 /* a read chunk - server will RDMA Read our memory */
281 cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
282 } else {
283 /* a write or reply chunk - server will RDMA Write our memory */
284 *iptr++ = xdr_zero; /* encode a NULL read chunk list */
285 if (type == rpcrdma_replych)
286 *iptr++ = xdr_zero; /* a NULL write chunk list */
287 warray = (struct rpcrdma_write_array *) iptr;
288 cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
289 }
290 448
291 if (type == rpcrdma_replych || type == rpcrdma_areadch) 449 if (wtype != rpcrdma_replych) {
292 pos = 0; 450 *iptr++ = xdr_zero; /* no Reply chunk present */
293 else 451 return iptr;
294 pos = target->head[0].iov_len; 452 }
295 453
296 nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); 454 nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
455 RPCRDMA_MAX_SEGS - req->rl_nchunks);
297 if (nsegs < 0) 456 if (nsegs < 0)
298 return nsegs; 457 return ERR_PTR(nsegs);
299 458
300 map = r_xprt->rx_ia.ri_ops->ro_map; 459 *iptr++ = xdr_one; /* Reply chunk present */
460 segcount = iptr++; /* save location of segment count */
461
462 nchunks = 0;
301 do { 463 do {
302 n = map(r_xprt, seg, nsegs, cur_wchunk != NULL); 464 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
303 if (n <= 0) 465 if (n <= 0)
304 goto out; 466 return ERR_PTR(n);
305 if (cur_rchunk) { /* read */ 467
306 cur_rchunk->rc_discrim = xdr_one; 468 iptr = xdr_encode_rdma_segment(iptr, seg);
307 /* all read chunks have the same "position" */ 469
308 cur_rchunk->rc_position = cpu_to_be32(pos); 470 dprintk("RPC: %5u %s: reply segment "
309 cur_rchunk->rc_target.rs_handle = 471 "%d@0x%016llx:0x%08x (%s)\n",
310 cpu_to_be32(seg->mr_rkey); 472 rqst->rq_task->tk_pid, __func__,
311 cur_rchunk->rc_target.rs_length = 473 seg->mr_len, (unsigned long long)seg->mr_base,
312 cpu_to_be32(seg->mr_len); 474 seg->mr_rkey, n < nsegs ? "more" : "last");
313 xdr_encode_hyper( 475
314 (__be32 *)&cur_rchunk->rc_target.rs_offset, 476 r_xprt->rx_stats.reply_chunk_count++;
315 seg->mr_base); 477 r_xprt->rx_stats.total_rdma_request += seg->mr_len;
316 dprintk("RPC: %s: read chunk " 478 req->rl_nchunks++;
317 "elem %d@0x%llx:0x%x pos %u (%s)\n", __func__,
318 seg->mr_len, (unsigned long long)seg->mr_base,
319 seg->mr_rkey, pos, n < nsegs ? "more" : "last");
320 cur_rchunk++;
321 r_xprt->rx_stats.read_chunk_count++;
322 } else { /* write/reply */
323 cur_wchunk->wc_target.rs_handle =
324 cpu_to_be32(seg->mr_rkey);
325 cur_wchunk->wc_target.rs_length =
326 cpu_to_be32(seg->mr_len);
327 xdr_encode_hyper(
328 (__be32 *)&cur_wchunk->wc_target.rs_offset,
329 seg->mr_base);
330 dprintk("RPC: %s: %s chunk "
331 "elem %d@0x%llx:0x%x (%s)\n", __func__,
332 (type == rpcrdma_replych) ? "reply" : "write",
333 seg->mr_len, (unsigned long long)seg->mr_base,
334 seg->mr_rkey, n < nsegs ? "more" : "last");
335 cur_wchunk++;
336 if (type == rpcrdma_replych)
337 r_xprt->rx_stats.reply_chunk_count++;
338 else
339 r_xprt->rx_stats.write_chunk_count++;
340 r_xprt->rx_stats.total_rdma_request += seg->mr_len;
341 }
342 nchunks++; 479 nchunks++;
343 seg += n; 480 seg += n;
344 nsegs -= n; 481 nsegs -= n;
345 } while (nsegs); 482 } while (nsegs);
483 req->rl_nextseg = seg;
346 484
347 /* success. all failures return above */ 485 /* Update count of segments in the Reply chunk */
348 req->rl_nchunks = nchunks; 486 *segcount = cpu_to_be32(nchunks);
349
350 /*
351 * finish off header. If write, marshal discrim and nchunks.
352 */
353 if (cur_rchunk) {
354 iptr = (__be32 *) cur_rchunk;
355 *iptr++ = xdr_zero; /* finish the read chunk list */
356 *iptr++ = xdr_zero; /* encode a NULL write chunk list */
357 *iptr++ = xdr_zero; /* encode a NULL reply chunk */
358 } else {
359 warray->wc_discrim = xdr_one;
360 warray->wc_nchunks = cpu_to_be32(nchunks);
361 iptr = (__be32 *) cur_wchunk;
362 if (type == rpcrdma_writech) {
363 *iptr++ = xdr_zero; /* finish the write chunk list */
364 *iptr++ = xdr_zero; /* encode a NULL reply chunk */
365 }
366 }
367
368 /*
369 * Return header size.
370 */
371 return (unsigned char *)iptr - (unsigned char *)headerp;
372 487
373out: 488 return iptr;
374 for (pos = 0; nchunks--;)
375 pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
376 &req->rl_segments[pos]);
377 return n;
378} 489}
379 490
380/* 491/*
@@ -440,13 +551,10 @@ static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
440 * Marshal a request: the primary job of this routine is to choose 551 * Marshal a request: the primary job of this routine is to choose
441 * the transfer modes. See comments below. 552 * the transfer modes. See comments below.
442 * 553 *
443 * Uses multiple RDMA IOVs for a request: 554 * Prepares up to two IOVs per Call message:
444 * [0] -- RPC RDMA header, which uses memory from the *start* of the 555 *
445 * preregistered buffer that already holds the RPC data in 556 * [0] -- RPC RDMA header
446 * its middle. 557 * [1] -- the RPC header/data
447 * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
448 * [2] -- optional padding.
449 * [3] -- if padded, header only in [1] and data here.
450 * 558 *
451 * Returns zero on success, otherwise a negative errno. 559 * Returns zero on success, otherwise a negative errno.
452 */ 560 */
@@ -457,24 +565,17 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
457 struct rpc_xprt *xprt = rqst->rq_xprt; 565 struct rpc_xprt *xprt = rqst->rq_xprt;
458 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 566 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
459 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 567 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
460 char *base;
461 size_t rpclen;
462 ssize_t hdrlen;
463 enum rpcrdma_chunktype rtype, wtype; 568 enum rpcrdma_chunktype rtype, wtype;
464 struct rpcrdma_msg *headerp; 569 struct rpcrdma_msg *headerp;
570 ssize_t hdrlen;
571 size_t rpclen;
572 __be32 *iptr;
465 573
466#if defined(CONFIG_SUNRPC_BACKCHANNEL) 574#if defined(CONFIG_SUNRPC_BACKCHANNEL)
467 if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) 575 if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
468 return rpcrdma_bc_marshal_reply(rqst); 576 return rpcrdma_bc_marshal_reply(rqst);
469#endif 577#endif
470 578
471 /*
472 * rpclen gets amount of data in first buffer, which is the
473 * pre-registered buffer.
474 */
475 base = rqst->rq_svec[0].iov_base;
476 rpclen = rqst->rq_svec[0].iov_len;
477
478 headerp = rdmab_to_msg(req->rl_rdmabuf); 579 headerp = rdmab_to_msg(req->rl_rdmabuf);
479 /* don't byte-swap XID, it's already done in request */ 580 /* don't byte-swap XID, it's already done in request */
480 headerp->rm_xid = rqst->rq_xid; 581 headerp->rm_xid = rqst->rq_xid;
@@ -485,15 +586,16 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
485 /* 586 /*
486 * Chunks needed for results? 587 * Chunks needed for results?
487 * 588 *
488 * o Read ops return data as write chunk(s), header as inline.
489 * o If the expected result is under the inline threshold, all ops 589 * o If the expected result is under the inline threshold, all ops
490 * return as inline. 590 * return as inline.
591 * o Large read ops return data as write chunk(s), header as
592 * inline.
491 * o Large non-read ops return as a single reply chunk. 593 * o Large non-read ops return as a single reply chunk.
492 */ 594 */
493 if (rqst->rq_rcv_buf.flags & XDRBUF_READ) 595 if (rpcrdma_results_inline(r_xprt, rqst))
494 wtype = rpcrdma_writech;
495 else if (rpcrdma_results_inline(rqst))
496 wtype = rpcrdma_noch; 596 wtype = rpcrdma_noch;
597 else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
598 wtype = rpcrdma_writech;
497 else 599 else
498 wtype = rpcrdma_replych; 600 wtype = rpcrdma_replych;
499 601
@@ -511,10 +613,14 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
511 * that both has a data payload, and whose non-data arguments 613 * that both has a data payload, and whose non-data arguments
512 * by themselves are larger than the inline threshold. 614 * by themselves are larger than the inline threshold.
513 */ 615 */
514 if (rpcrdma_args_inline(rqst)) { 616 if (rpcrdma_args_inline(r_xprt, rqst)) {
515 rtype = rpcrdma_noch; 617 rtype = rpcrdma_noch;
618 rpcrdma_inline_pullup(rqst);
619 rpclen = rqst->rq_svec[0].iov_len;
516 } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) { 620 } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
517 rtype = rpcrdma_readch; 621 rtype = rpcrdma_readch;
622 rpclen = rqst->rq_svec[0].iov_len;
623 rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
518 } else { 624 } else {
519 r_xprt->rx_stats.nomsg_call_count++; 625 r_xprt->rx_stats.nomsg_call_count++;
520 headerp->rm_type = htonl(RDMA_NOMSG); 626 headerp->rm_type = htonl(RDMA_NOMSG);
@@ -522,57 +628,50 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
522 rpclen = 0; 628 rpclen = 0;
523 } 629 }
524 630
525 /* The following simplification is not true forever */ 631 /* This implementation supports the following combinations
526 if (rtype != rpcrdma_noch && wtype == rpcrdma_replych) 632 * of chunk lists in one RPC-over-RDMA Call message:
527 wtype = rpcrdma_noch; 633 *
528 if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { 634 * - Read list
529 dprintk("RPC: %s: cannot marshal multiple chunk lists\n", 635 * - Write list
530 __func__); 636 * - Reply chunk
531 return -EIO; 637 * - Read list + Reply chunk
532 } 638 *
533 639 * It might not yet support the following combinations:
534 hdrlen = RPCRDMA_HDRLEN_MIN; 640 *
535 641 * - Read list + Write list
536 /* 642 *
537 * Pull up any extra send data into the preregistered buffer. 643 * It does not support the following combinations:
538 * When padding is in use and applies to the transfer, insert 644 *
539 * it and change the message type. 645 * - Write list + Reply chunk
646 * - Read list + Write list + Reply chunk
647 *
648 * This implementation supports only a single chunk in each
649 * Read or Write list. Thus for example the client cannot
650 * send a Call message with a Position Zero Read chunk and a
651 * regular Read chunk at the same time.
540 */ 652 */
541 if (rtype == rpcrdma_noch) { 653 req->rl_nchunks = 0;
542 654 req->rl_nextseg = req->rl_segments;
543 rpcrdma_inline_pullup(rqst); 655 iptr = headerp->rm_body.rm_chunks;
544 656 iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
545 headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; 657 if (IS_ERR(iptr))
546 headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; 658 goto out_unmap;
547 headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero; 659 iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
548 /* new length after pullup */ 660 if (IS_ERR(iptr))
549 rpclen = rqst->rq_svec[0].iov_len; 661 goto out_unmap;
550 } else if (rtype == rpcrdma_readch) 662 iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
551 rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); 663 if (IS_ERR(iptr))
552 if (rtype != rpcrdma_noch) { 664 goto out_unmap;
553 hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, 665 hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
554 headerp, rtype); 666
555 wtype = rtype; /* simplify dprintk */ 667 if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
556 668 goto out_overflow;
557 } else if (wtype != rpcrdma_noch) { 669
558 hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf, 670 dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
559 headerp, wtype); 671 rqst->rq_task->tk_pid, __func__,
560 } 672 transfertypes[rtype], transfertypes[wtype],
561 if (hdrlen < 0) 673 hdrlen, rpclen);
562 return hdrlen;
563 674
564 dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd"
565 " headerp 0x%p base 0x%p lkey 0x%x\n",
566 __func__, transfertypes[wtype], hdrlen, rpclen,
567 headerp, base, rdmab_lkey(req->rl_rdmabuf));
568
569 /*
570 * initialize send_iov's - normally only two: rdma chunk header and
571 * single preregistered RPC header buffer, but if padding is present,
572 * then use a preregistered (and zeroed) pad buffer between the RPC
573 * header and any write data. In all non-rdma cases, any following
574 * data has been copied into the RPC header buffer.
575 */
576 req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); 675 req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
577 req->rl_send_iov[0].length = hdrlen; 676 req->rl_send_iov[0].length = hdrlen;
578 req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); 677 req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
@@ -587,6 +686,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
587 686
588 req->rl_niovs = 2; 687 req->rl_niovs = 2;
589 return 0; 688 return 0;
689
690out_overflow:
691 pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
692 hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
693 /* Terminate this RPC. Chunks registered above will be
694 * released by xprt_release -> xprt_rmda_free .
695 */
696 return -EIO;
697
698out_unmap:
699 r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
700 return PTR_ERR(iptr);
590} 701}
591 702
592/* 703/*
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index b1b009f10ea3..99d2e5b72726 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -73,6 +73,8 @@ static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR;
73 73
74static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE; 74static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
75static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE; 75static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
76static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
77static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
76static unsigned int zero; 78static unsigned int zero;
77static unsigned int max_padding = PAGE_SIZE; 79static unsigned int max_padding = PAGE_SIZE;
78static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS; 80static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
@@ -96,6 +98,8 @@ static struct ctl_table xr_tunables_table[] = {
96 .maxlen = sizeof(unsigned int), 98 .maxlen = sizeof(unsigned int),
97 .mode = 0644, 99 .mode = 0644,
98 .proc_handler = proc_dointvec, 100 .proc_handler = proc_dointvec,
101 .extra1 = &min_inline_size,
102 .extra2 = &max_inline_size,
99 }, 103 },
100 { 104 {
101 .procname = "rdma_max_inline_write", 105 .procname = "rdma_max_inline_write",
@@ -103,6 +107,8 @@ static struct ctl_table xr_tunables_table[] = {
103 .maxlen = sizeof(unsigned int), 107 .maxlen = sizeof(unsigned int),
104 .mode = 0644, 108 .mode = 0644,
105 .proc_handler = proc_dointvec, 109 .proc_handler = proc_dointvec,
110 .extra1 = &min_inline_size,
111 .extra2 = &max_inline_size,
106 }, 112 },
107 { 113 {
108 .procname = "rdma_inline_write_padding", 114 .procname = "rdma_inline_write_padding",
@@ -508,6 +514,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
508out: 514out:
509 dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); 515 dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
510 req->rl_connect_cookie = 0; /* our reserved value */ 516 req->rl_connect_cookie = 0; /* our reserved value */
517 req->rl_task = task;
511 return req->rl_sendbuf->rg_base; 518 return req->rl_sendbuf->rg_base;
512 519
513out_rdmabuf: 520out_rdmabuf:
@@ -564,7 +571,6 @@ xprt_rdma_free(void *buffer)
564 struct rpcrdma_req *req; 571 struct rpcrdma_req *req;
565 struct rpcrdma_xprt *r_xprt; 572 struct rpcrdma_xprt *r_xprt;
566 struct rpcrdma_regbuf *rb; 573 struct rpcrdma_regbuf *rb;
567 int i;
568 574
569 if (buffer == NULL) 575 if (buffer == NULL)
570 return; 576 return;
@@ -578,11 +584,8 @@ xprt_rdma_free(void *buffer)
578 584
579 dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); 585 dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
580 586
581 for (i = 0; req->rl_nchunks;) { 587 r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req,
582 --req->rl_nchunks; 588 !RPC_IS_ASYNC(req->rl_task));
583 i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
584 &req->rl_segments[i]);
585 }
586 589
587 rpcrdma_buffer_put(req); 590 rpcrdma_buffer_put(req);
588} 591}
@@ -707,6 +710,7 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
707#if defined(CONFIG_SUNRPC_BACKCHANNEL) 710#if defined(CONFIG_SUNRPC_BACKCHANNEL)
708 .bc_setup = xprt_rdma_bc_setup, 711 .bc_setup = xprt_rdma_bc_setup,
709 .bc_up = xprt_rdma_bc_up, 712 .bc_up = xprt_rdma_bc_up,
713 .bc_maxpayload = xprt_rdma_bc_maxpayload,
710 .bc_free_rqst = xprt_rdma_bc_free_rqst, 714 .bc_free_rqst = xprt_rdma_bc_free_rqst,
711 .bc_destroy = xprt_rdma_bc_destroy, 715 .bc_destroy = xprt_rdma_bc_destroy,
712#endif 716#endif
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index f5ed9f982cd7..b044d98a1370 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -203,15 +203,6 @@ out_fail:
203 goto out_schedule; 203 goto out_schedule;
204} 204}
205 205
206static void
207rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
208{
209 struct ib_wc wc;
210
211 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
212 rpcrdma_receive_wc(NULL, &wc);
213}
214
215static int 206static int
216rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 207rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
217{ 208{
@@ -374,23 +365,6 @@ out:
374} 365}
375 366
376/* 367/*
377 * Drain any cq, prior to teardown.
378 */
379static void
380rpcrdma_clean_cq(struct ib_cq *cq)
381{
382 struct ib_wc wc;
383 int count = 0;
384
385 while (1 == ib_poll_cq(cq, 1, &wc))
386 ++count;
387
388 if (count)
389 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
390 __func__, count, wc.opcode);
391}
392
393/*
394 * Exported functions. 368 * Exported functions.
395 */ 369 */
396 370
@@ -459,7 +433,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
459 dprintk("RPC: %s: memory registration strategy is '%s'\n", 433 dprintk("RPC: %s: memory registration strategy is '%s'\n",
460 __func__, ia->ri_ops->ro_displayname); 434 __func__, ia->ri_ops->ro_displayname);
461 435
462 rwlock_init(&ia->ri_qplock);
463 return 0; 436 return 0;
464 437
465out3: 438out3:
@@ -515,7 +488,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
515 __func__); 488 __func__);
516 return -ENOMEM; 489 return -ENOMEM;
517 } 490 }
518 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS; 491 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
519 492
520 /* check provider's send/recv wr limits */ 493 /* check provider's send/recv wr limits */
521 if (cdata->max_requests > max_qp_wr) 494 if (cdata->max_requests > max_qp_wr)
@@ -526,11 +499,13 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
526 ep->rep_attr.srq = NULL; 499 ep->rep_attr.srq = NULL;
527 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 500 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
528 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 501 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
502 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */
529 rc = ia->ri_ops->ro_open(ia, ep, cdata); 503 rc = ia->ri_ops->ro_open(ia, ep, cdata);
530 if (rc) 504 if (rc)
531 return rc; 505 return rc;
532 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 506 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
533 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 507 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
508 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */
534 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; 509 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
535 ep->rep_attr.cap.max_recv_sge = 1; 510 ep->rep_attr.cap.max_recv_sge = 1;
536 ep->rep_attr.cap.max_inline_data = 0; 511 ep->rep_attr.cap.max_inline_data = 0;
@@ -578,6 +553,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
578 ep->rep_attr.recv_cq = recvcq; 553 ep->rep_attr.recv_cq = recvcq;
579 554
580 /* Initialize cma parameters */ 555 /* Initialize cma parameters */
556 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
581 557
582 /* RPC/RDMA does not use private data */ 558 /* RPC/RDMA does not use private data */
583 ep->rep_remote_cma.private_data = NULL; 559 ep->rep_remote_cma.private_data = NULL;
@@ -591,7 +567,16 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
591 ep->rep_remote_cma.responder_resources = 567 ep->rep_remote_cma.responder_resources =
592 ia->ri_device->attrs.max_qp_rd_atom; 568 ia->ri_device->attrs.max_qp_rd_atom;
593 569
594 ep->rep_remote_cma.retry_count = 7; 570 /* Limit transport retries so client can detect server
571 * GID changes quickly. RPC layer handles re-establishing
572 * transport connection and retransmission.
573 */
574 ep->rep_remote_cma.retry_count = 6;
575
576 /* RPC-over-RDMA handles its own flow control. In addition,
577 * make all RNR NAKs visible so we know that RPC-over-RDMA
578 * flow control is working correctly (no NAKs should be seen).
579 */
595 ep->rep_remote_cma.flow_control = 0; 580 ep->rep_remote_cma.flow_control = 0;
596 ep->rep_remote_cma.rnr_retry_count = 0; 581 ep->rep_remote_cma.rnr_retry_count = 0;
597 582
@@ -622,13 +607,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
622 607
623 cancel_delayed_work_sync(&ep->rep_connect_worker); 608 cancel_delayed_work_sync(&ep->rep_connect_worker);
624 609
625 if (ia->ri_id->qp)
626 rpcrdma_ep_disconnect(ep, ia);
627
628 rpcrdma_clean_cq(ep->rep_attr.recv_cq);
629 rpcrdma_clean_cq(ep->rep_attr.send_cq);
630
631 if (ia->ri_id->qp) { 610 if (ia->ri_id->qp) {
611 rpcrdma_ep_disconnect(ep, ia);
632 rdma_destroy_qp(ia->ri_id); 612 rdma_destroy_qp(ia->ri_id);
633 ia->ri_id->qp = NULL; 613 ia->ri_id->qp = NULL;
634 } 614 }
@@ -659,7 +639,6 @@ retry:
659 dprintk("RPC: %s: reconnecting...\n", __func__); 639 dprintk("RPC: %s: reconnecting...\n", __func__);
660 640
661 rpcrdma_ep_disconnect(ep, ia); 641 rpcrdma_ep_disconnect(ep, ia);
662 rpcrdma_flush_cqs(ep);
663 642
664 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); 643 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
665 id = rpcrdma_create_id(xprt, ia, 644 id = rpcrdma_create_id(xprt, ia,
@@ -692,10 +671,8 @@ retry:
692 goto out; 671 goto out;
693 } 672 }
694 673
695 write_lock(&ia->ri_qplock);
696 old = ia->ri_id; 674 old = ia->ri_id;
697 ia->ri_id = id; 675 ia->ri_id = id;
698 write_unlock(&ia->ri_qplock);
699 676
700 rdma_destroy_qp(old); 677 rdma_destroy_qp(old);
701 rpcrdma_destroy_id(old); 678 rpcrdma_destroy_id(old);
@@ -785,7 +762,6 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
785{ 762{
786 int rc; 763 int rc;
787 764
788 rpcrdma_flush_cqs(ep);
789 rc = rdma_disconnect(ia->ri_id); 765 rc = rdma_disconnect(ia->ri_id);
790 if (!rc) { 766 if (!rc) {
791 /* returns without wait if not connected */ 767 /* returns without wait if not connected */
@@ -797,6 +773,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
797 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 773 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
798 ep->rep_connected = rc; 774 ep->rep_connected = rc;
799 } 775 }
776
777 ib_drain_qp(ia->ri_id->qp);
800} 778}
801 779
802struct rpcrdma_req * 780struct rpcrdma_req *
@@ -1271,25 +1249,3 @@ out_rc:
1271 rpcrdma_recv_buffer_put(rep); 1249 rpcrdma_recv_buffer_put(rep);
1272 return rc; 1250 return rc;
1273} 1251}
1274
1275/* How many chunk list items fit within our inline buffers?
1276 */
1277unsigned int
1278rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1279{
1280 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1281 int bytes, segments;
1282
1283 bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1284 bytes -= RPCRDMA_HDRLEN_MIN;
1285 if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1286 pr_warn("RPC: %s: inline threshold too small\n",
1287 __func__);
1288 return 0;
1289 }
1290
1291 segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1292 dprintk("RPC: %s: max chunk list size = %d segments\n",
1293 __func__, segments);
1294 return segments;
1295}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 2ebc743cb96f..95cdc66225ee 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -65,7 +65,6 @@
65 */ 65 */
66struct rpcrdma_ia { 66struct rpcrdma_ia {
67 const struct rpcrdma_memreg_ops *ri_ops; 67 const struct rpcrdma_memreg_ops *ri_ops;
68 rwlock_t ri_qplock;
69 struct ib_device *ri_device; 68 struct ib_device *ri_device;
70 struct rdma_cm_id *ri_id; 69 struct rdma_cm_id *ri_id;
71 struct ib_pd *ri_pd; 70 struct ib_pd *ri_pd;
@@ -73,6 +72,8 @@ struct rpcrdma_ia {
73 struct completion ri_done; 72 struct completion ri_done;
74 int ri_async_rc; 73 int ri_async_rc;
75 unsigned int ri_max_frmr_depth; 74 unsigned int ri_max_frmr_depth;
75 unsigned int ri_max_inline_write;
76 unsigned int ri_max_inline_read;
76 struct ib_qp_attr ri_qp_attr; 77 struct ib_qp_attr ri_qp_attr;
77 struct ib_qp_init_attr ri_qp_init_attr; 78 struct ib_qp_init_attr ri_qp_init_attr;
78}; 79};
@@ -144,6 +145,26 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
144 145
145#define RPCRDMA_DEF_GFP (GFP_NOIO | __GFP_NOWARN) 146#define RPCRDMA_DEF_GFP (GFP_NOIO | __GFP_NOWARN)
146 147
148/* To ensure a transport can always make forward progress,
149 * the number of RDMA segments allowed in header chunk lists
150 * is capped at 8. This prevents less-capable devices and
151 * memory registrations from overrunning the Send buffer
152 * while building chunk lists.
153 *
154 * Elements of the Read list take up more room than the
155 * Write list or Reply chunk. 8 read segments means the Read
156 * list (or Write list or Reply chunk) cannot consume more
157 * than
158 *
159 * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes.
160 *
161 * And the fixed part of the header is another 24 bytes.
162 *
163 * The smallest inline threshold is 1024 bytes, ensuring that
164 * at least 750 bytes are available for RPC messages.
165 */
166#define RPCRDMA_MAX_HDR_SEGS (8)
167
147/* 168/*
148 * struct rpcrdma_rep -- this structure encapsulates state required to recv 169 * struct rpcrdma_rep -- this structure encapsulates state required to recv
149 * and complete a reply, asychronously. It needs several pieces of 170 * and complete a reply, asychronously. It needs several pieces of
@@ -162,7 +183,9 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
162 */ 183 */
163 184
164#define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE) 185#define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE)
165#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */ 186
187/* data segments + head/tail for Call + head/tail for Reply */
188#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 4)
166 189
167struct rpcrdma_buffer; 190struct rpcrdma_buffer;
168 191
@@ -198,14 +221,13 @@ enum rpcrdma_frmr_state {
198}; 221};
199 222
200struct rpcrdma_frmr { 223struct rpcrdma_frmr {
201 struct scatterlist *sg; 224 struct scatterlist *fr_sg;
202 int sg_nents; 225 int fr_nents;
226 enum dma_data_direction fr_dir;
203 struct ib_mr *fr_mr; 227 struct ib_mr *fr_mr;
204 struct ib_cqe fr_cqe; 228 struct ib_cqe fr_cqe;
205 enum rpcrdma_frmr_state fr_state; 229 enum rpcrdma_frmr_state fr_state;
206 struct completion fr_linv_done; 230 struct completion fr_linv_done;
207 struct work_struct fr_work;
208 struct rpcrdma_xprt *fr_xprt;
209 union { 231 union {
210 struct ib_reg_wr fr_regwr; 232 struct ib_reg_wr fr_regwr;
211 struct ib_send_wr fr_invwr; 233 struct ib_send_wr fr_invwr;
@@ -222,6 +244,8 @@ struct rpcrdma_mw {
222 struct rpcrdma_fmr fmr; 244 struct rpcrdma_fmr fmr;
223 struct rpcrdma_frmr frmr; 245 struct rpcrdma_frmr frmr;
224 }; 246 };
247 struct work_struct mw_work;
248 struct rpcrdma_xprt *mw_xprt;
225 struct list_head mw_list; 249 struct list_head mw_list;
226 struct list_head mw_all; 250 struct list_head mw_all;
227}; 251};
@@ -270,12 +294,14 @@ struct rpcrdma_req {
270 unsigned int rl_niovs; 294 unsigned int rl_niovs;
271 unsigned int rl_nchunks; 295 unsigned int rl_nchunks;
272 unsigned int rl_connect_cookie; 296 unsigned int rl_connect_cookie;
297 struct rpc_task *rl_task;
273 struct rpcrdma_buffer *rl_buffer; 298 struct rpcrdma_buffer *rl_buffer;
274 struct rpcrdma_rep *rl_reply;/* holder for reply buffer */ 299 struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
275 struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS]; 300 struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
276 struct rpcrdma_regbuf *rl_rdmabuf; 301 struct rpcrdma_regbuf *rl_rdmabuf;
277 struct rpcrdma_regbuf *rl_sendbuf; 302 struct rpcrdma_regbuf *rl_sendbuf;
278 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; 303 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
304 struct rpcrdma_mr_seg *rl_nextseg;
279 305
280 struct ib_cqe rl_cqe; 306 struct ib_cqe rl_cqe;
281 struct list_head rl_all; 307 struct list_head rl_all;
@@ -372,8 +398,8 @@ struct rpcrdma_memreg_ops {
372 struct rpcrdma_mr_seg *, int, bool); 398 struct rpcrdma_mr_seg *, int, bool);
373 void (*ro_unmap_sync)(struct rpcrdma_xprt *, 399 void (*ro_unmap_sync)(struct rpcrdma_xprt *,
374 struct rpcrdma_req *); 400 struct rpcrdma_req *);
375 int (*ro_unmap)(struct rpcrdma_xprt *, 401 void (*ro_unmap_safe)(struct rpcrdma_xprt *,
376 struct rpcrdma_mr_seg *); 402 struct rpcrdma_req *, bool);
377 int (*ro_open)(struct rpcrdma_ia *, 403 int (*ro_open)(struct rpcrdma_ia *,
378 struct rpcrdma_ep *, 404 struct rpcrdma_ep *,
379 struct rpcrdma_create_data_internal *); 405 struct rpcrdma_create_data_internal *);
@@ -456,7 +482,6 @@ struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
456void rpcrdma_free_regbuf(struct rpcrdma_ia *, 482void rpcrdma_free_regbuf(struct rpcrdma_ia *,
457 struct rpcrdma_regbuf *); 483 struct rpcrdma_regbuf *);
458 484
459unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
460int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); 485int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
461 486
462int frwr_alloc_recovery_wq(void); 487int frwr_alloc_recovery_wq(void);
@@ -519,6 +544,9 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
519 * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c 544 * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
520 */ 545 */
521int rpcrdma_marshal_req(struct rpc_rqst *); 546int rpcrdma_marshal_req(struct rpc_rqst *);
547void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *,
548 struct rpcrdma_create_data_internal *,
549 unsigned int);
522 550
523/* RPC/RDMA module init - xprtrdma/transport.c 551/* RPC/RDMA module init - xprtrdma/transport.c
524 */ 552 */
@@ -534,6 +562,7 @@ void xprt_rdma_cleanup(void);
534#if defined(CONFIG_SUNRPC_BACKCHANNEL) 562#if defined(CONFIG_SUNRPC_BACKCHANNEL)
535int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); 563int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
536int xprt_rdma_bc_up(struct svc_serv *, struct net *); 564int xprt_rdma_bc_up(struct svc_serv *, struct net *);
565size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *);
537int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); 566int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
538void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); 567void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
539int rpcrdma_bc_marshal_reply(struct rpc_rqst *); 568int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index b90c5397b5e1..2d3e0c42361e 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1364,6 +1364,11 @@ static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
1364 return ret; 1364 return ret;
1365 return 0; 1365 return 0;
1366} 1366}
1367
1368static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
1369{
1370 return PAGE_SIZE;
1371}
1367#else 1372#else
1368static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, 1373static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1369 struct xdr_skb_reader *desc) 1374 struct xdr_skb_reader *desc)
@@ -2661,6 +2666,7 @@ static struct rpc_xprt_ops xs_tcp_ops = {
2661#ifdef CONFIG_SUNRPC_BACKCHANNEL 2666#ifdef CONFIG_SUNRPC_BACKCHANNEL
2662 .bc_setup = xprt_setup_bc, 2667 .bc_setup = xprt_setup_bc,
2663 .bc_up = xs_tcp_bc_up, 2668 .bc_up = xs_tcp_bc_up,
2669 .bc_maxpayload = xs_tcp_bc_maxpayload,
2664 .bc_free_rqst = xprt_free_bc_rqst, 2670 .bc_free_rqst = xprt_free_bc_rqst,
2665 .bc_destroy = xprt_destroy_bc, 2671 .bc_destroy = xprt_destroy_bc,
2666#endif 2672#endif