aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/lockd/svc.c6
-rw-r--r--fs/lockd/svclock.c18
-rw-r--r--fs/nfs/callback.c26
-rw-r--r--fs/nfsd/nfs3xdr.c23
-rw-r--r--fs/nfsd/nfs4proc.c3
-rw-r--r--fs/nfsd/nfs4state.c25
-rw-r--r--fs/nfsd/nfs4xdr.c19
-rw-r--r--fs/nfsd/nfsxdr.c13
-rw-r--r--fs/nfsd/vfs.c24
-rw-r--r--include/linux/sunrpc/rpc_rdma.h3
-rw-r--r--include/linux/sunrpc/svc.h4
-rw-r--r--include/linux/sunrpc/svc_rdma.h75
-rw-r--r--include/uapi/linux/nfsd/cld.h14
-rw-r--r--net/sunrpc/Kconfig1
-rw-r--r--net/sunrpc/svc.c134
-rw-r--r--net/sunrpc/xprtrdma/Makefile2
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c8
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c71
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_marshal.c89
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c79
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_rw.c512
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c978
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c110
23 files changed, 1334 insertions, 903 deletions
diff --git a/fs/lockd/svc.c b/fs/lockd/svc.c
index e7c8b9c76e48..5d481e8a1b5d 100644
--- a/fs/lockd/svc.c
+++ b/fs/lockd/svc.c
@@ -132,6 +132,8 @@ lockd(void *vrqstp)
132{ 132{
133 int err = 0; 133 int err = 0;
134 struct svc_rqst *rqstp = vrqstp; 134 struct svc_rqst *rqstp = vrqstp;
135 struct net *net = &init_net;
136 struct lockd_net *ln = net_generic(net, lockd_net_id);
135 137
136 /* try_to_freeze() is called from svc_recv() */ 138 /* try_to_freeze() is called from svc_recv() */
137 set_freezable(); 139 set_freezable();
@@ -176,6 +178,8 @@ lockd(void *vrqstp)
176 if (nlmsvc_ops) 178 if (nlmsvc_ops)
177 nlmsvc_invalidate_all(); 179 nlmsvc_invalidate_all();
178 nlm_shutdown_hosts(); 180 nlm_shutdown_hosts();
181 cancel_delayed_work_sync(&ln->grace_period_end);
182 locks_end_grace(&ln->lockd_manager);
179 return 0; 183 return 0;
180} 184}
181 185
@@ -270,8 +274,6 @@ static void lockd_down_net(struct svc_serv *serv, struct net *net)
270 if (ln->nlmsvc_users) { 274 if (ln->nlmsvc_users) {
271 if (--ln->nlmsvc_users == 0) { 275 if (--ln->nlmsvc_users == 0) {
272 nlm_shutdown_hosts_net(net); 276 nlm_shutdown_hosts_net(net);
273 cancel_delayed_work_sync(&ln->grace_period_end);
274 locks_end_grace(&ln->lockd_manager);
275 svc_shutdown_net(serv, net); 277 svc_shutdown_net(serv, net);
276 dprintk("lockd_down_net: per-net data destroyed; net=%p\n", net); 278 dprintk("lockd_down_net: per-net data destroyed; net=%p\n", net);
277 } 279 }
diff --git a/fs/lockd/svclock.c b/fs/lockd/svclock.c
index 5581e020644b..3507c80d1d4b 100644
--- a/fs/lockd/svclock.c
+++ b/fs/lockd/svclock.c
@@ -870,15 +870,15 @@ nlmsvc_grant_reply(struct nlm_cookie *cookie, __be32 status)
870 if (!(block = nlmsvc_find_block(cookie))) 870 if (!(block = nlmsvc_find_block(cookie)))
871 return; 871 return;
872 872
873 if (block) { 873 if (status == nlm_lck_denied_grace_period) {
874 if (status == nlm_lck_denied_grace_period) { 874 /* Try again in a couple of seconds */
875 /* Try again in a couple of seconds */ 875 nlmsvc_insert_block(block, 10 * HZ);
876 nlmsvc_insert_block(block, 10 * HZ); 876 } else {
877 } else { 877 /*
878 /* Lock is now held by client, or has been rejected. 878 * Lock is now held by client, or has been rejected.
879 * In both cases, the block should be removed. */ 879 * In both cases, the block should be removed.
880 nlmsvc_unlink_block(block); 880 */
881 } 881 nlmsvc_unlink_block(block);
882 } 882 }
883 nlmsvc_release_block(block); 883 nlmsvc_release_block(block);
884} 884}
diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c
index 773774531aff..73a1f928226c 100644
--- a/fs/nfs/callback.c
+++ b/fs/nfs/callback.c
@@ -76,7 +76,10 @@ nfs4_callback_svc(void *vrqstp)
76 76
77 set_freezable(); 77 set_freezable();
78 78
79 while (!kthread_should_stop()) { 79 while (!kthread_freezable_should_stop(NULL)) {
80
81 if (signal_pending(current))
82 flush_signals(current);
80 /* 83 /*
81 * Listen for a request on the socket 84 * Listen for a request on the socket
82 */ 85 */
@@ -85,6 +88,8 @@ nfs4_callback_svc(void *vrqstp)
85 continue; 88 continue;
86 svc_process(rqstp); 89 svc_process(rqstp);
87 } 90 }
91 svc_exit_thread(rqstp);
92 module_put_and_exit(0);
88 return 0; 93 return 0;
89} 94}
90 95
@@ -103,9 +108,10 @@ nfs41_callback_svc(void *vrqstp)
103 108
104 set_freezable(); 109 set_freezable();
105 110
106 while (!kthread_should_stop()) { 111 while (!kthread_freezable_should_stop(NULL)) {
107 if (try_to_freeze()) 112
108 continue; 113 if (signal_pending(current))
114 flush_signals(current);
109 115
110 prepare_to_wait(&serv->sv_cb_waitq, &wq, TASK_INTERRUPTIBLE); 116 prepare_to_wait(&serv->sv_cb_waitq, &wq, TASK_INTERRUPTIBLE);
111 spin_lock_bh(&serv->sv_cb_lock); 117 spin_lock_bh(&serv->sv_cb_lock);
@@ -121,11 +127,13 @@ nfs41_callback_svc(void *vrqstp)
121 error); 127 error);
122 } else { 128 } else {
123 spin_unlock_bh(&serv->sv_cb_lock); 129 spin_unlock_bh(&serv->sv_cb_lock);
124 schedule(); 130 if (!kthread_should_stop())
131 schedule();
125 finish_wait(&serv->sv_cb_waitq, &wq); 132 finish_wait(&serv->sv_cb_waitq, &wq);
126 } 133 }
127 flush_signals(current);
128 } 134 }
135 svc_exit_thread(rqstp);
136 module_put_and_exit(0);
129 return 0; 137 return 0;
130} 138}
131 139
@@ -221,14 +229,14 @@ err_bind:
221static struct svc_serv_ops nfs40_cb_sv_ops = { 229static struct svc_serv_ops nfs40_cb_sv_ops = {
222 .svo_function = nfs4_callback_svc, 230 .svo_function = nfs4_callback_svc,
223 .svo_enqueue_xprt = svc_xprt_do_enqueue, 231 .svo_enqueue_xprt = svc_xprt_do_enqueue,
224 .svo_setup = svc_set_num_threads, 232 .svo_setup = svc_set_num_threads_sync,
225 .svo_module = THIS_MODULE, 233 .svo_module = THIS_MODULE,
226}; 234};
227#if defined(CONFIG_NFS_V4_1) 235#if defined(CONFIG_NFS_V4_1)
228static struct svc_serv_ops nfs41_cb_sv_ops = { 236static struct svc_serv_ops nfs41_cb_sv_ops = {
229 .svo_function = nfs41_callback_svc, 237 .svo_function = nfs41_callback_svc,
230 .svo_enqueue_xprt = svc_xprt_do_enqueue, 238 .svo_enqueue_xprt = svc_xprt_do_enqueue,
231 .svo_setup = svc_set_num_threads, 239 .svo_setup = svc_set_num_threads_sync,
232 .svo_module = THIS_MODULE, 240 .svo_module = THIS_MODULE,
233}; 241};
234 242
@@ -280,7 +288,7 @@ static struct svc_serv *nfs_callback_create_svc(int minorversion)
280 printk(KERN_WARNING "nfs_callback_create_svc: no kthread, %d users??\n", 288 printk(KERN_WARNING "nfs_callback_create_svc: no kthread, %d users??\n",
281 cb_info->users); 289 cb_info->users);
282 290
283 serv = svc_create(&nfs4_callback_program, NFS4_CALLBACK_BUFSIZE, sv_ops); 291 serv = svc_create_pooled(&nfs4_callback_program, NFS4_CALLBACK_BUFSIZE, sv_ops);
284 if (!serv) { 292 if (!serv) {
285 printk(KERN_ERR "nfs_callback_create_svc: create service failed\n"); 293 printk(KERN_ERR "nfs_callback_create_svc: create service failed\n");
286 return ERR_PTR(-ENOMEM); 294 return ERR_PTR(-ENOMEM);
diff --git a/fs/nfsd/nfs3xdr.c b/fs/nfsd/nfs3xdr.c
index 452334694a5d..12feac6ee2fd 100644
--- a/fs/nfsd/nfs3xdr.c
+++ b/fs/nfsd/nfs3xdr.c
@@ -334,8 +334,11 @@ nfs3svc_decode_readargs(struct svc_rqst *rqstp, __be32 *p,
334 if (!p) 334 if (!p)
335 return 0; 335 return 0;
336 p = xdr_decode_hyper(p, &args->offset); 336 p = xdr_decode_hyper(p, &args->offset);
337
338 args->count = ntohl(*p++); 337 args->count = ntohl(*p++);
338
339 if (!xdr_argsize_check(rqstp, p))
340 return 0;
341
339 len = min(args->count, max_blocksize); 342 len = min(args->count, max_blocksize);
340 343
341 /* set up the kvec */ 344 /* set up the kvec */
@@ -349,7 +352,7 @@ nfs3svc_decode_readargs(struct svc_rqst *rqstp, __be32 *p,
349 v++; 352 v++;
350 } 353 }
351 args->vlen = v; 354 args->vlen = v;
352 return xdr_argsize_check(rqstp, p); 355 return 1;
353} 356}
354 357
355int 358int
@@ -541,9 +544,11 @@ nfs3svc_decode_readlinkargs(struct svc_rqst *rqstp, __be32 *p,
541 p = decode_fh(p, &args->fh); 544 p = decode_fh(p, &args->fh);
542 if (!p) 545 if (!p)
543 return 0; 546 return 0;
547 if (!xdr_argsize_check(rqstp, p))
548 return 0;
544 args->buffer = page_address(*(rqstp->rq_next_page++)); 549 args->buffer = page_address(*(rqstp->rq_next_page++));
545 550
546 return xdr_argsize_check(rqstp, p); 551 return 1;
547} 552}
548 553
549int 554int
@@ -569,10 +574,14 @@ nfs3svc_decode_readdirargs(struct svc_rqst *rqstp, __be32 *p,
569 args->verf = p; p += 2; 574 args->verf = p; p += 2;
570 args->dircount = ~0; 575 args->dircount = ~0;
571 args->count = ntohl(*p++); 576 args->count = ntohl(*p++);
577
578 if (!xdr_argsize_check(rqstp, p))
579 return 0;
580
572 args->count = min_t(u32, args->count, PAGE_SIZE); 581 args->count = min_t(u32, args->count, PAGE_SIZE);
573 args->buffer = page_address(*(rqstp->rq_next_page++)); 582 args->buffer = page_address(*(rqstp->rq_next_page++));
574 583
575 return xdr_argsize_check(rqstp, p); 584 return 1;
576} 585}
577 586
578int 587int
@@ -590,6 +599,9 @@ nfs3svc_decode_readdirplusargs(struct svc_rqst *rqstp, __be32 *p,
590 args->dircount = ntohl(*p++); 599 args->dircount = ntohl(*p++);
591 args->count = ntohl(*p++); 600 args->count = ntohl(*p++);
592 601
602 if (!xdr_argsize_check(rqstp, p))
603 return 0;
604
593 len = args->count = min(args->count, max_blocksize); 605 len = args->count = min(args->count, max_blocksize);
594 while (len > 0) { 606 while (len > 0) {
595 struct page *p = *(rqstp->rq_next_page++); 607 struct page *p = *(rqstp->rq_next_page++);
@@ -597,8 +609,7 @@ nfs3svc_decode_readdirplusargs(struct svc_rqst *rqstp, __be32 *p,
597 args->buffer = page_address(p); 609 args->buffer = page_address(p);
598 len -= PAGE_SIZE; 610 len -= PAGE_SIZE;
599 } 611 }
600 612 return 1;
601 return xdr_argsize_check(rqstp, p);
602} 613}
603 614
604int 615int
diff --git a/fs/nfsd/nfs4proc.c b/fs/nfsd/nfs4proc.c
index d86031b6ad79..c453a1998e00 100644
--- a/fs/nfsd/nfs4proc.c
+++ b/fs/nfsd/nfs4proc.c
@@ -1259,7 +1259,8 @@ nfsd4_layout_verify(struct svc_export *exp, unsigned int layout_type)
1259 return NULL; 1259 return NULL;
1260 } 1260 }
1261 1261
1262 if (!(exp->ex_layout_types & (1 << layout_type))) { 1262 if (layout_type >= LAYOUT_TYPE_MAX ||
1263 !(exp->ex_layout_types & (1 << layout_type))) {
1263 dprintk("%s: layout type %d not supported\n", 1264 dprintk("%s: layout type %d not supported\n",
1264 __func__, layout_type); 1265 __func__, layout_type);
1265 return NULL; 1266 return NULL;
diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
index e9ef50addddb..22002fb75a18 100644
--- a/fs/nfsd/nfs4state.c
+++ b/fs/nfsd/nfs4state.c
@@ -1912,28 +1912,15 @@ static void copy_clid(struct nfs4_client *target, struct nfs4_client *source)
1912 target->cl_clientid.cl_id = source->cl_clientid.cl_id; 1912 target->cl_clientid.cl_id = source->cl_clientid.cl_id;
1913} 1913}
1914 1914
1915int strdup_if_nonnull(char **target, char *source)
1916{
1917 if (source) {
1918 *target = kstrdup(source, GFP_KERNEL);
1919 if (!*target)
1920 return -ENOMEM;
1921 } else
1922 *target = NULL;
1923 return 0;
1924}
1925
1926static int copy_cred(struct svc_cred *target, struct svc_cred *source) 1915static int copy_cred(struct svc_cred *target, struct svc_cred *source)
1927{ 1916{
1928 int ret; 1917 target->cr_principal = kstrdup(source->cr_principal, GFP_KERNEL);
1918 target->cr_raw_principal = kstrdup(source->cr_raw_principal,
1919 GFP_KERNEL);
1920 if ((source->cr_principal && ! target->cr_principal) ||
1921 (source->cr_raw_principal && ! target->cr_raw_principal))
1922 return -ENOMEM;
1929 1923
1930 ret = strdup_if_nonnull(&target->cr_principal, source->cr_principal);
1931 if (ret)
1932 return ret;
1933 ret = strdup_if_nonnull(&target->cr_raw_principal,
1934 source->cr_raw_principal);
1935 if (ret)
1936 return ret;
1937 target->cr_flavor = source->cr_flavor; 1924 target->cr_flavor = source->cr_flavor;
1938 target->cr_uid = source->cr_uid; 1925 target->cr_uid = source->cr_uid;
1939 target->cr_gid = source->cr_gid; 1926 target->cr_gid = source->cr_gid;
diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
index 33017d652b1d..26780d53a6f9 100644
--- a/fs/nfsd/nfs4xdr.c
+++ b/fs/nfsd/nfs4xdr.c
@@ -2831,9 +2831,14 @@ out_acl:
2831 } 2831 }
2832#endif /* CONFIG_NFSD_PNFS */ 2832#endif /* CONFIG_NFSD_PNFS */
2833 if (bmval2 & FATTR4_WORD2_SUPPATTR_EXCLCREAT) { 2833 if (bmval2 & FATTR4_WORD2_SUPPATTR_EXCLCREAT) {
2834 status = nfsd4_encode_bitmap(xdr, NFSD_SUPPATTR_EXCLCREAT_WORD0, 2834 u32 supp[3];
2835 NFSD_SUPPATTR_EXCLCREAT_WORD1, 2835
2836 NFSD_SUPPATTR_EXCLCREAT_WORD2); 2836 memcpy(supp, nfsd_suppattrs[minorversion], sizeof(supp));
2837 supp[0] &= NFSD_SUPPATTR_EXCLCREAT_WORD0;
2838 supp[1] &= NFSD_SUPPATTR_EXCLCREAT_WORD1;
2839 supp[2] &= NFSD_SUPPATTR_EXCLCREAT_WORD2;
2840
2841 status = nfsd4_encode_bitmap(xdr, supp[0], supp[1], supp[2]);
2837 if (status) 2842 if (status)
2838 goto out; 2843 goto out;
2839 } 2844 }
@@ -4119,8 +4124,7 @@ nfsd4_encode_getdeviceinfo(struct nfsd4_compoundres *resp, __be32 nfserr,
4119 struct nfsd4_getdeviceinfo *gdev) 4124 struct nfsd4_getdeviceinfo *gdev)
4120{ 4125{
4121 struct xdr_stream *xdr = &resp->xdr; 4126 struct xdr_stream *xdr = &resp->xdr;
4122 const struct nfsd4_layout_ops *ops = 4127 const struct nfsd4_layout_ops *ops;
4123 nfsd4_layout_ops[gdev->gd_layout_type];
4124 u32 starting_len = xdr->buf->len, needed_len; 4128 u32 starting_len = xdr->buf->len, needed_len;
4125 __be32 *p; 4129 __be32 *p;
4126 4130
@@ -4137,6 +4141,7 @@ nfsd4_encode_getdeviceinfo(struct nfsd4_compoundres *resp, __be32 nfserr,
4137 4141
4138 /* If maxcount is 0 then just update notifications */ 4142 /* If maxcount is 0 then just update notifications */
4139 if (gdev->gd_maxcount != 0) { 4143 if (gdev->gd_maxcount != 0) {
4144 ops = nfsd4_layout_ops[gdev->gd_layout_type];
4140 nfserr = ops->encode_getdeviceinfo(xdr, gdev); 4145 nfserr = ops->encode_getdeviceinfo(xdr, gdev);
4141 if (nfserr) { 4146 if (nfserr) {
4142 /* 4147 /*
@@ -4189,8 +4194,7 @@ nfsd4_encode_layoutget(struct nfsd4_compoundres *resp, __be32 nfserr,
4189 struct nfsd4_layoutget *lgp) 4194 struct nfsd4_layoutget *lgp)
4190{ 4195{
4191 struct xdr_stream *xdr = &resp->xdr; 4196 struct xdr_stream *xdr = &resp->xdr;
4192 const struct nfsd4_layout_ops *ops = 4197 const struct nfsd4_layout_ops *ops;
4193 nfsd4_layout_ops[lgp->lg_layout_type];
4194 __be32 *p; 4198 __be32 *p;
4195 4199
4196 dprintk("%s: err %d\n", __func__, nfserr); 4200 dprintk("%s: err %d\n", __func__, nfserr);
@@ -4213,6 +4217,7 @@ nfsd4_encode_layoutget(struct nfsd4_compoundres *resp, __be32 nfserr,
4213 *p++ = cpu_to_be32(lgp->lg_seg.iomode); 4217 *p++ = cpu_to_be32(lgp->lg_seg.iomode);
4214 *p++ = cpu_to_be32(lgp->lg_layout_type); 4218 *p++ = cpu_to_be32(lgp->lg_layout_type);
4215 4219
4220 ops = nfsd4_layout_ops[lgp->lg_layout_type];
4216 nfserr = ops->encode_layoutget(xdr, lgp); 4221 nfserr = ops->encode_layoutget(xdr, lgp);
4217out: 4222out:
4218 kfree(lgp->lg_content); 4223 kfree(lgp->lg_content);
diff --git a/fs/nfsd/nfsxdr.c b/fs/nfsd/nfsxdr.c
index de07ff625777..6a4947a3f4fa 100644
--- a/fs/nfsd/nfsxdr.c
+++ b/fs/nfsd/nfsxdr.c
@@ -257,6 +257,9 @@ nfssvc_decode_readargs(struct svc_rqst *rqstp, __be32 *p,
257 len = args->count = ntohl(*p++); 257 len = args->count = ntohl(*p++);
258 p++; /* totalcount - unused */ 258 p++; /* totalcount - unused */
259 259
260 if (!xdr_argsize_check(rqstp, p))
261 return 0;
262
260 len = min_t(unsigned int, len, NFSSVC_MAXBLKSIZE_V2); 263 len = min_t(unsigned int, len, NFSSVC_MAXBLKSIZE_V2);
261 264
262 /* set up somewhere to store response. 265 /* set up somewhere to store response.
@@ -272,7 +275,7 @@ nfssvc_decode_readargs(struct svc_rqst *rqstp, __be32 *p,
272 v++; 275 v++;
273 } 276 }
274 args->vlen = v; 277 args->vlen = v;
275 return xdr_argsize_check(rqstp, p); 278 return 1;
276} 279}
277 280
278int 281int
@@ -362,9 +365,11 @@ nfssvc_decode_readlinkargs(struct svc_rqst *rqstp, __be32 *p, struct nfsd_readli
362 p = decode_fh(p, &args->fh); 365 p = decode_fh(p, &args->fh);
363 if (!p) 366 if (!p)
364 return 0; 367 return 0;
368 if (!xdr_argsize_check(rqstp, p))
369 return 0;
365 args->buffer = page_address(*(rqstp->rq_next_page++)); 370 args->buffer = page_address(*(rqstp->rq_next_page++));
366 371
367 return xdr_argsize_check(rqstp, p); 372 return 1;
368} 373}
369 374
370int 375int
@@ -402,9 +407,11 @@ nfssvc_decode_readdirargs(struct svc_rqst *rqstp, __be32 *p,
402 args->cookie = ntohl(*p++); 407 args->cookie = ntohl(*p++);
403 args->count = ntohl(*p++); 408 args->count = ntohl(*p++);
404 args->count = min_t(u32, args->count, PAGE_SIZE); 409 args->count = min_t(u32, args->count, PAGE_SIZE);
410 if (!xdr_argsize_check(rqstp, p))
411 return 0;
405 args->buffer = page_address(*(rqstp->rq_next_page++)); 412 args->buffer = page_address(*(rqstp->rq_next_page++));
406 413
407 return xdr_argsize_check(rqstp, p); 414 return 1;
408} 415}
409 416
410/* 417/*
diff --git a/fs/nfsd/vfs.c b/fs/nfsd/vfs.c
index 9aaf6ca77569..2be32955d7f2 100644
--- a/fs/nfsd/vfs.c
+++ b/fs/nfsd/vfs.c
@@ -94,6 +94,12 @@ nfsd_cross_mnt(struct svc_rqst *rqstp, struct dentry **dpp,
94 err = follow_down(&path); 94 err = follow_down(&path);
95 if (err < 0) 95 if (err < 0)
96 goto out; 96 goto out;
97 if (path.mnt == exp->ex_path.mnt && path.dentry == dentry &&
98 nfsd_mountpoint(dentry, exp) == 2) {
99 /* This is only a mountpoint in some other namespace */
100 path_put(&path);
101 goto out;
102 }
97 103
98 exp2 = rqst_exp_get_by_name(rqstp, &path); 104 exp2 = rqst_exp_get_by_name(rqstp, &path);
99 if (IS_ERR(exp2)) { 105 if (IS_ERR(exp2)) {
@@ -167,16 +173,26 @@ static int nfsd_lookup_parent(struct svc_rqst *rqstp, struct dentry *dparent, st
167/* 173/*
168 * For nfsd purposes, we treat V4ROOT exports as though there was an 174 * For nfsd purposes, we treat V4ROOT exports as though there was an
169 * export at *every* directory. 175 * export at *every* directory.
176 * We return:
177 * '1' if this dentry *must* be an export point,
178 * '2' if it might be, if there is really a mount here, and
179 * '0' if there is no chance of an export point here.
170 */ 180 */
171int nfsd_mountpoint(struct dentry *dentry, struct svc_export *exp) 181int nfsd_mountpoint(struct dentry *dentry, struct svc_export *exp)
172{ 182{
173 if (d_mountpoint(dentry)) 183 if (!d_inode(dentry))
184 return 0;
185 if (exp->ex_flags & NFSEXP_V4ROOT)
174 return 1; 186 return 1;
175 if (nfsd4_is_junction(dentry)) 187 if (nfsd4_is_junction(dentry))
176 return 1; 188 return 1;
177 if (!(exp->ex_flags & NFSEXP_V4ROOT)) 189 if (d_mountpoint(dentry))
178 return 0; 190 /*
179 return d_inode(dentry) != NULL; 191 * Might only be a mountpoint in a different namespace,
192 * but we need to check.
193 */
194 return 2;
195 return 0;
180} 196}
181 197
182__be32 198__be32
diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index 245fc59b7324..b7e85b341a54 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -143,6 +143,9 @@ enum rpcrdma_proc {
143#define rdma_done cpu_to_be32(RDMA_DONE) 143#define rdma_done cpu_to_be32(RDMA_DONE)
144#define rdma_error cpu_to_be32(RDMA_ERROR) 144#define rdma_error cpu_to_be32(RDMA_ERROR)
145 145
146#define err_vers cpu_to_be32(ERR_VERS)
147#define err_chunk cpu_to_be32(ERR_CHUNK)
148
146/* 149/*
147 * Private extension to RPC-over-RDMA Version One. 150 * Private extension to RPC-over-RDMA Version One.
148 * Message passed during RDMA-CM connection set-up. 151 * Message passed during RDMA-CM connection set-up.
diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index e770abeed32d..94631026f79c 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -336,8 +336,7 @@ xdr_argsize_check(struct svc_rqst *rqstp, __be32 *p)
336{ 336{
337 char *cp = (char *)p; 337 char *cp = (char *)p;
338 struct kvec *vec = &rqstp->rq_arg.head[0]; 338 struct kvec *vec = &rqstp->rq_arg.head[0];
339 return cp >= (char*)vec->iov_base 339 return cp == (char *)vec->iov_base + vec->iov_len;
340 && cp <= (char*)vec->iov_base + vec->iov_len;
341} 340}
342 341
343static inline int 342static inline int
@@ -474,6 +473,7 @@ void svc_pool_map_put(void);
474struct svc_serv * svc_create_pooled(struct svc_program *, unsigned int, 473struct svc_serv * svc_create_pooled(struct svc_program *, unsigned int,
475 struct svc_serv_ops *); 474 struct svc_serv_ops *);
476int svc_set_num_threads(struct svc_serv *, struct svc_pool *, int); 475int svc_set_num_threads(struct svc_serv *, struct svc_pool *, int);
476int svc_set_num_threads_sync(struct svc_serv *, struct svc_pool *, int);
477int svc_pool_stats_open(struct svc_serv *serv, struct file *file); 477int svc_pool_stats_open(struct svc_serv *serv, struct file *file);
478void svc_destroy(struct svc_serv *); 478void svc_destroy(struct svc_serv *);
479void svc_shutdown_net(struct svc_serv *, struct net *); 479void svc_shutdown_net(struct svc_serv *, struct net *);
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index b105f73e3ca2..f3787d800ba4 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -48,6 +48,12 @@
48#include <rdma/rdma_cm.h> 48#include <rdma/rdma_cm.h>
49#define SVCRDMA_DEBUG 49#define SVCRDMA_DEBUG
50 50
51/* Default and maximum inline threshold sizes */
52enum {
53 RPCRDMA_DEF_INLINE_THRESH = 4096,
54 RPCRDMA_MAX_INLINE_THRESH = 65536
55};
56
51/* RPC/RDMA parameters and stats */ 57/* RPC/RDMA parameters and stats */
52extern unsigned int svcrdma_ord; 58extern unsigned int svcrdma_ord;
53extern unsigned int svcrdma_max_requests; 59extern unsigned int svcrdma_max_requests;
@@ -85,27 +91,11 @@ struct svc_rdma_op_ctxt {
85 enum dma_data_direction direction; 91 enum dma_data_direction direction;
86 int count; 92 int count;
87 unsigned int mapped_sges; 93 unsigned int mapped_sges;
88 struct ib_sge sge[RPCSVC_MAXPAGES]; 94 struct ib_send_wr send_wr;
95 struct ib_sge sge[1 + RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE];
89 struct page *pages[RPCSVC_MAXPAGES]; 96 struct page *pages[RPCSVC_MAXPAGES];
90}; 97};
91 98
92/*
93 * NFS_ requests are mapped on the client side by the chunk lists in
94 * the RPCRDMA header. During the fetching of the RPC from the client
95 * and the writing of the reply to the client, the memory in the
96 * client and the memory in the server must be mapped as contiguous
97 * vaddr/len for access by the hardware. These data strucures keep
98 * these mappings.
99 *
100 * For an RDMA_WRITE, the 'sge' maps the RPC REPLY. For RDMA_READ, the
101 * 'sge' in the svc_rdma_req_map maps the server side RPC reply and the
102 * 'ch' field maps the read-list of the RPCRDMA header to the 'sge'
103 * mapping of the reply.
104 */
105struct svc_rdma_chunk_sge {
106 int start; /* sge no for this chunk */
107 int count; /* sge count for this chunk */
108};
109struct svc_rdma_fastreg_mr { 99struct svc_rdma_fastreg_mr {
110 struct ib_mr *mr; 100 struct ib_mr *mr;
111 struct scatterlist *sg; 101 struct scatterlist *sg;
@@ -114,15 +104,7 @@ struct svc_rdma_fastreg_mr {
114 enum dma_data_direction direction; 104 enum dma_data_direction direction;
115 struct list_head frmr_list; 105 struct list_head frmr_list;
116}; 106};
117struct svc_rdma_req_map { 107
118 struct list_head free;
119 unsigned long count;
120 union {
121 struct kvec sge[RPCSVC_MAXPAGES];
122 struct svc_rdma_chunk_sge ch[RPCSVC_MAXPAGES];
123 unsigned long lkey[RPCSVC_MAXPAGES];
124 };
125};
126#define RDMACTXT_F_LAST_CTXT 2 108#define RDMACTXT_F_LAST_CTXT 2
127 109
128#define SVCRDMA_DEVCAP_FAST_REG 1 /* fast mr registration */ 110#define SVCRDMA_DEVCAP_FAST_REG 1 /* fast mr registration */
@@ -144,14 +126,15 @@ struct svcxprt_rdma {
144 u32 sc_max_requests; /* Max requests */ 126 u32 sc_max_requests; /* Max requests */
145 u32 sc_max_bc_requests;/* Backward credits */ 127 u32 sc_max_bc_requests;/* Backward credits */
146 int sc_max_req_size; /* Size of each RQ WR buf */ 128 int sc_max_req_size; /* Size of each RQ WR buf */
129 u8 sc_port_num;
147 130
148 struct ib_pd *sc_pd; 131 struct ib_pd *sc_pd;
149 132
150 spinlock_t sc_ctxt_lock; 133 spinlock_t sc_ctxt_lock;
151 struct list_head sc_ctxts; 134 struct list_head sc_ctxts;
152 int sc_ctxt_used; 135 int sc_ctxt_used;
153 spinlock_t sc_map_lock; 136 spinlock_t sc_rw_ctxt_lock;
154 struct list_head sc_maps; 137 struct list_head sc_rw_ctxts;
155 138
156 struct list_head sc_rq_dto_q; 139 struct list_head sc_rq_dto_q;
157 spinlock_t sc_rq_dto_lock; 140 spinlock_t sc_rq_dto_lock;
@@ -181,9 +164,7 @@ struct svcxprt_rdma {
181/* The default ORD value is based on two outstanding full-size writes with a 164/* The default ORD value is based on two outstanding full-size writes with a
182 * page size of 4k, or 32k * 2 ops / 4k = 16 outstanding RDMA_READ. */ 165 * page size of 4k, or 32k * 2 ops / 4k = 16 outstanding RDMA_READ. */
183#define RPCRDMA_ORD (64/4) 166#define RPCRDMA_ORD (64/4)
184#define RPCRDMA_SQ_DEPTH_MULT 8
185#define RPCRDMA_MAX_REQUESTS 32 167#define RPCRDMA_MAX_REQUESTS 32
186#define RPCRDMA_MAX_REQ_SIZE 4096
187 168
188/* Typical ULP usage of BC requests is NFSv4.1 backchannel. Our 169/* Typical ULP usage of BC requests is NFSv4.1 backchannel. Our
189 * current NFSv4.1 implementation supports one backchannel slot. 170 * current NFSv4.1 implementation supports one backchannel slot.
@@ -201,19 +182,11 @@ static inline void svc_rdma_count_mappings(struct svcxprt_rdma *rdma,
201 182
202/* svc_rdma_backchannel.c */ 183/* svc_rdma_backchannel.c */
203extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, 184extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
204 struct rpcrdma_msg *rmsgp, 185 __be32 *rdma_resp,
205 struct xdr_buf *rcvbuf); 186 struct xdr_buf *rcvbuf);
206 187
207/* svc_rdma_marshal.c */ 188/* svc_rdma_marshal.c */
208extern int svc_rdma_xdr_decode_req(struct xdr_buf *); 189extern int svc_rdma_xdr_decode_req(struct xdr_buf *);
209extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
210 struct rpcrdma_msg *,
211 enum rpcrdma_errcode, __be32 *);
212extern void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *, int);
213extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int);
214extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int,
215 __be32, __be64, u32);
216extern unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp);
217 190
218/* svc_rdma_recvfrom.c */ 191/* svc_rdma_recvfrom.c */
219extern int svc_rdma_recvfrom(struct svc_rqst *); 192extern int svc_rdma_recvfrom(struct svc_rqst *);
@@ -224,16 +197,25 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,
224 struct svc_rdma_op_ctxt *, int *, u32 *, 197 struct svc_rdma_op_ctxt *, int *, u32 *,
225 u32, u32, u64, bool); 198 u32, u32, u64, bool);
226 199
200/* svc_rdma_rw.c */
201extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
202extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
203 __be32 *wr_ch, struct xdr_buf *xdr);
204extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
205 __be32 *rp_ch, bool writelist,
206 struct xdr_buf *xdr);
207
227/* svc_rdma_sendto.c */ 208/* svc_rdma_sendto.c */
228extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *, 209extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
229 struct svc_rdma_req_map *, bool); 210 struct svc_rdma_op_ctxt *ctxt,
211 __be32 *rdma_resp, unsigned int len);
212extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
213 struct svc_rdma_op_ctxt *ctxt,
214 int num_sge, u32 inv_rkey);
230extern int svc_rdma_sendto(struct svc_rqst *); 215extern int svc_rdma_sendto(struct svc_rqst *);
231extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
232 int);
233 216
234/* svc_rdma_transport.c */ 217/* svc_rdma_transport.c */
235extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *); 218extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
236extern void svc_rdma_wc_write(struct ib_cq *, struct ib_wc *);
237extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *); 219extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
238extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *); 220extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
239extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *); 221extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
@@ -244,9 +226,6 @@ extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
244extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *); 226extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
245extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int); 227extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
246extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt); 228extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
247extern struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *);
248extern void svc_rdma_put_req_map(struct svcxprt_rdma *,
249 struct svc_rdma_req_map *);
250extern struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *); 229extern struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *);
251extern void svc_rdma_put_frmr(struct svcxprt_rdma *, 230extern void svc_rdma_put_frmr(struct svcxprt_rdma *,
252 struct svc_rdma_fastreg_mr *); 231 struct svc_rdma_fastreg_mr *);
diff --git a/include/uapi/linux/nfsd/cld.h b/include/uapi/linux/nfsd/cld.h
index f14a9ab06f1f..ec260274be0c 100644
--- a/include/uapi/linux/nfsd/cld.h
+++ b/include/uapi/linux/nfsd/cld.h
@@ -22,6 +22,8 @@
22#ifndef _NFSD_CLD_H 22#ifndef _NFSD_CLD_H
23#define _NFSD_CLD_H 23#define _NFSD_CLD_H
24 24
25#include <linux/types.h>
26
25/* latest upcall version available */ 27/* latest upcall version available */
26#define CLD_UPCALL_VERSION 1 28#define CLD_UPCALL_VERSION 1
27 29
@@ -37,18 +39,18 @@ enum cld_command {
37 39
38/* representation of long-form NFSv4 client ID */ 40/* representation of long-form NFSv4 client ID */
39struct cld_name { 41struct cld_name {
40 uint16_t cn_len; /* length of cm_id */ 42 __u16 cn_len; /* length of cm_id */
41 unsigned char cn_id[NFS4_OPAQUE_LIMIT]; /* client-provided */ 43 unsigned char cn_id[NFS4_OPAQUE_LIMIT]; /* client-provided */
42} __attribute__((packed)); 44} __attribute__((packed));
43 45
44/* message struct for communication with userspace */ 46/* message struct for communication with userspace */
45struct cld_msg { 47struct cld_msg {
46 uint8_t cm_vers; /* upcall version */ 48 __u8 cm_vers; /* upcall version */
47 uint8_t cm_cmd; /* upcall command */ 49 __u8 cm_cmd; /* upcall command */
48 int16_t cm_status; /* return code */ 50 __s16 cm_status; /* return code */
49 uint32_t cm_xid; /* transaction id */ 51 __u32 cm_xid; /* transaction id */
50 union { 52 union {
51 int64_t cm_gracetime; /* grace period start time */ 53 __s64 cm_gracetime; /* grace period start time */
52 struct cld_name cm_name; 54 struct cld_name cm_name;
53 } __attribute__((packed)) cm_u; 55 } __attribute__((packed)) cm_u;
54} __attribute__((packed)); 56} __attribute__((packed));
diff --git a/net/sunrpc/Kconfig b/net/sunrpc/Kconfig
index 04ce2c0b660e..ac09ca803296 100644
--- a/net/sunrpc/Kconfig
+++ b/net/sunrpc/Kconfig
@@ -52,6 +52,7 @@ config SUNRPC_XPRT_RDMA
52 tristate "RPC-over-RDMA transport" 52 tristate "RPC-over-RDMA transport"
53 depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS 53 depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS
54 default SUNRPC && INFINIBAND 54 default SUNRPC && INFINIBAND
55 select SG_POOL
55 help 56 help
56 This option allows the NFS client and server to use RDMA 57 This option allows the NFS client and server to use RDMA
57 transports (InfiniBand, iWARP, or RoCE). 58 transports (InfiniBand, iWARP, or RoCE).
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index a08aeb56b8e4..bc0f5a0ecbdc 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -702,59 +702,32 @@ found_pool:
702 return task; 702 return task;
703} 703}
704 704
705/* 705/* create new threads */
706 * Create or destroy enough new threads to make the number 706static int
707 * of threads the given number. If `pool' is non-NULL, applies 707svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
708 * only to threads in that pool, otherwise round-robins between
709 * all pools. Caller must ensure that mutual exclusion between this and
710 * server startup or shutdown.
711 *
712 * Destroying threads relies on the service threads filling in
713 * rqstp->rq_task, which only the nfs ones do. Assumes the serv
714 * has been created using svc_create_pooled().
715 *
716 * Based on code that used to be in nfsd_svc() but tweaked
717 * to be pool-aware.
718 */
719int
720svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
721{ 708{
722 struct svc_rqst *rqstp; 709 struct svc_rqst *rqstp;
723 struct task_struct *task; 710 struct task_struct *task;
724 struct svc_pool *chosen_pool; 711 struct svc_pool *chosen_pool;
725 int error = 0;
726 unsigned int state = serv->sv_nrthreads-1; 712 unsigned int state = serv->sv_nrthreads-1;
727 int node; 713 int node;
728 714
729 if (pool == NULL) { 715 do {
730 /* The -1 assumes caller has done a svc_get() */
731 nrservs -= (serv->sv_nrthreads-1);
732 } else {
733 spin_lock_bh(&pool->sp_lock);
734 nrservs -= pool->sp_nrthreads;
735 spin_unlock_bh(&pool->sp_lock);
736 }
737
738 /* create new threads */
739 while (nrservs > 0) {
740 nrservs--; 716 nrservs--;
741 chosen_pool = choose_pool(serv, pool, &state); 717 chosen_pool = choose_pool(serv, pool, &state);
742 718
743 node = svc_pool_map_get_node(chosen_pool->sp_id); 719 node = svc_pool_map_get_node(chosen_pool->sp_id);
744 rqstp = svc_prepare_thread(serv, chosen_pool, node); 720 rqstp = svc_prepare_thread(serv, chosen_pool, node);
745 if (IS_ERR(rqstp)) { 721 if (IS_ERR(rqstp))
746 error = PTR_ERR(rqstp); 722 return PTR_ERR(rqstp);
747 break;
748 }
749 723
750 __module_get(serv->sv_ops->svo_module); 724 __module_get(serv->sv_ops->svo_module);
751 task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp, 725 task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp,
752 node, "%s", serv->sv_name); 726 node, "%s", serv->sv_name);
753 if (IS_ERR(task)) { 727 if (IS_ERR(task)) {
754 error = PTR_ERR(task);
755 module_put(serv->sv_ops->svo_module); 728 module_put(serv->sv_ops->svo_module);
756 svc_exit_thread(rqstp); 729 svc_exit_thread(rqstp);
757 break; 730 return PTR_ERR(task);
758 } 731 }
759 732
760 rqstp->rq_task = task; 733 rqstp->rq_task = task;
@@ -763,18 +736,103 @@ svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
763 736
764 svc_sock_update_bufs(serv); 737 svc_sock_update_bufs(serv);
765 wake_up_process(task); 738 wake_up_process(task);
766 } 739 } while (nrservs > 0);
740
741 return 0;
742}
743
744
745/* destroy old threads */
746static int
747svc_signal_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
748{
749 struct task_struct *task;
750 unsigned int state = serv->sv_nrthreads-1;
751
767 /* destroy old threads */ 752 /* destroy old threads */
768 while (nrservs < 0 && 753 do {
769 (task = choose_victim(serv, pool, &state)) != NULL) { 754 task = choose_victim(serv, pool, &state);
755 if (task == NULL)
756 break;
770 send_sig(SIGINT, task, 1); 757 send_sig(SIGINT, task, 1);
771 nrservs++; 758 nrservs++;
759 } while (nrservs < 0);
760
761 return 0;
762}
763
764/*
765 * Create or destroy enough new threads to make the number
766 * of threads the given number. If `pool' is non-NULL, applies
767 * only to threads in that pool, otherwise round-robins between
768 * all pools. Caller must ensure that mutual exclusion between this and
769 * server startup or shutdown.
770 *
771 * Destroying threads relies on the service threads filling in
772 * rqstp->rq_task, which only the nfs ones do. Assumes the serv
773 * has been created using svc_create_pooled().
774 *
775 * Based on code that used to be in nfsd_svc() but tweaked
776 * to be pool-aware.
777 */
778int
779svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
780{
781 if (pool == NULL) {
782 /* The -1 assumes caller has done a svc_get() */
783 nrservs -= (serv->sv_nrthreads-1);
784 } else {
785 spin_lock_bh(&pool->sp_lock);
786 nrservs -= pool->sp_nrthreads;
787 spin_unlock_bh(&pool->sp_lock);
772 } 788 }
773 789
774 return error; 790 if (nrservs > 0)
791 return svc_start_kthreads(serv, pool, nrservs);
792 if (nrservs < 0)
793 return svc_signal_kthreads(serv, pool, nrservs);
794 return 0;
775} 795}
776EXPORT_SYMBOL_GPL(svc_set_num_threads); 796EXPORT_SYMBOL_GPL(svc_set_num_threads);
777 797
798/* destroy old threads */
799static int
800svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
801{
802 struct task_struct *task;
803 unsigned int state = serv->sv_nrthreads-1;
804
805 /* destroy old threads */
806 do {
807 task = choose_victim(serv, pool, &state);
808 if (task == NULL)
809 break;
810 kthread_stop(task);
811 nrservs++;
812 } while (nrservs < 0);
813 return 0;
814}
815
816int
817svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
818{
819 if (pool == NULL) {
820 /* The -1 assumes caller has done a svc_get() */
821 nrservs -= (serv->sv_nrthreads-1);
822 } else {
823 spin_lock_bh(&pool->sp_lock);
824 nrservs -= pool->sp_nrthreads;
825 spin_unlock_bh(&pool->sp_lock);
826 }
827
828 if (nrservs > 0)
829 return svc_start_kthreads(serv, pool, nrservs);
830 if (nrservs < 0)
831 return svc_stop_kthreads(serv, pool, nrservs);
832 return 0;
833}
834EXPORT_SYMBOL_GPL(svc_set_num_threads_sync);
835
778/* 836/*
779 * Called from a server thread as it's exiting. Caller must hold the "service 837 * Called from a server thread as it's exiting. Caller must hold the "service
780 * mutex" for the service. 838 * mutex" for the service.
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index ef19fa42c50f..c1ae8142ab73 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -4,5 +4,5 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
4 fmr_ops.o frwr_ops.o \ 4 fmr_ops.o frwr_ops.o \
5 svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \ 5 svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
6 svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ 6 svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
7 module.o 7 svc_rdma_rw.o module.o
8rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o 8rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index c846ca9f1eba..a4a8f6989ee7 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -58,9 +58,9 @@ unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS;
58unsigned int svcrdma_max_bc_requests = RPCRDMA_MAX_BC_REQUESTS; 58unsigned int svcrdma_max_bc_requests = RPCRDMA_MAX_BC_REQUESTS;
59static unsigned int min_max_requests = 4; 59static unsigned int min_max_requests = 4;
60static unsigned int max_max_requests = 16384; 60static unsigned int max_max_requests = 16384;
61unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE; 61unsigned int svcrdma_max_req_size = RPCRDMA_DEF_INLINE_THRESH;
62static unsigned int min_max_inline = 4096; 62static unsigned int min_max_inline = RPCRDMA_DEF_INLINE_THRESH;
63static unsigned int max_max_inline = 65536; 63static unsigned int max_max_inline = RPCRDMA_MAX_INLINE_THRESH;
64 64
65atomic_t rdma_stat_recv; 65atomic_t rdma_stat_recv;
66atomic_t rdma_stat_read; 66atomic_t rdma_stat_read;
@@ -247,8 +247,6 @@ int svc_rdma_init(void)
247 dprintk("SVCRDMA Module Init, register RPC RDMA transport\n"); 247 dprintk("SVCRDMA Module Init, register RPC RDMA transport\n");
248 dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord); 248 dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord);
249 dprintk("\tmax_requests : %u\n", svcrdma_max_requests); 249 dprintk("\tmax_requests : %u\n", svcrdma_max_requests);
250 dprintk("\tsq_depth : %u\n",
251 svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT);
252 dprintk("\tmax_bc_requests : %u\n", svcrdma_max_bc_requests); 250 dprintk("\tmax_bc_requests : %u\n", svcrdma_max_bc_requests);
253 dprintk("\tmax_inline : %d\n", svcrdma_max_req_size); 251 dprintk("\tmax_inline : %d\n", svcrdma_max_req_size);
254 252
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index ff1df40f0d26..c676ed0efb5a 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -12,7 +12,17 @@
12 12
13#undef SVCRDMA_BACKCHANNEL_DEBUG 13#undef SVCRDMA_BACKCHANNEL_DEBUG
14 14
15int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp, 15/**
16 * svc_rdma_handle_bc_reply - Process incoming backchannel reply
17 * @xprt: controlling backchannel transport
18 * @rdma_resp: pointer to incoming transport header
19 * @rcvbuf: XDR buffer into which to decode the reply
20 *
21 * Returns:
22 * %0 if @rcvbuf is filled in, xprt_complete_rqst called,
23 * %-EAGAIN if server should call ->recvfrom again.
24 */
25int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
16 struct xdr_buf *rcvbuf) 26 struct xdr_buf *rcvbuf)
17{ 27{
18 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 28 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
@@ -27,13 +37,13 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
27 37
28 p = (__be32 *)src->iov_base; 38 p = (__be32 *)src->iov_base;
29 len = src->iov_len; 39 len = src->iov_len;
30 xid = rmsgp->rm_xid; 40 xid = *rdma_resp;
31 41
32#ifdef SVCRDMA_BACKCHANNEL_DEBUG 42#ifdef SVCRDMA_BACKCHANNEL_DEBUG
33 pr_info("%s: xid=%08x, length=%zu\n", 43 pr_info("%s: xid=%08x, length=%zu\n",
34 __func__, be32_to_cpu(xid), len); 44 __func__, be32_to_cpu(xid), len);
35 pr_info("%s: RPC/RDMA: %*ph\n", 45 pr_info("%s: RPC/RDMA: %*ph\n",
36 __func__, (int)RPCRDMA_HDRLEN_MIN, rmsgp); 46 __func__, (int)RPCRDMA_HDRLEN_MIN, rdma_resp);
37 pr_info("%s: RPC: %*ph\n", 47 pr_info("%s: RPC: %*ph\n",
38 __func__, (int)len, p); 48 __func__, (int)len, p);
39#endif 49#endif
@@ -53,7 +63,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
53 goto out_unlock; 63 goto out_unlock;
54 memcpy(dst->iov_base, p, len); 64 memcpy(dst->iov_base, p, len);
55 65
56 credits = be32_to_cpu(rmsgp->rm_credit); 66 credits = be32_to_cpup(rdma_resp + 2);
57 if (credits == 0) 67 if (credits == 0)
58 credits = 1; /* don't deadlock */ 68 credits = 1; /* don't deadlock */
59 else if (credits > r_xprt->rx_buf.rb_bc_max_requests) 69 else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
@@ -90,9 +100,9 @@ out_notfound:
90 * Caller holds the connection's mutex and has already marshaled 100 * Caller holds the connection's mutex and has already marshaled
91 * the RPC/RDMA request. 101 * the RPC/RDMA request.
92 * 102 *
93 * This is similar to svc_rdma_reply, but takes an rpc_rqst 103 * This is similar to svc_rdma_send_reply_msg, but takes a struct
94 * instead, does not support chunks, and avoids blocking memory 104 * rpc_rqst instead, does not support chunks, and avoids blocking
95 * allocation. 105 * memory allocation.
96 * 106 *
97 * XXX: There is still an opportunity to block in svc_rdma_send() 107 * XXX: There is still an opportunity to block in svc_rdma_send()
98 * if there are no SQ entries to post the Send. This may occur if 108 * if there are no SQ entries to post the Send. This may occur if
@@ -101,59 +111,36 @@ out_notfound:
101static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, 111static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
102 struct rpc_rqst *rqst) 112 struct rpc_rqst *rqst)
103{ 113{
104 struct xdr_buf *sndbuf = &rqst->rq_snd_buf;
105 struct svc_rdma_op_ctxt *ctxt; 114 struct svc_rdma_op_ctxt *ctxt;
106 struct svc_rdma_req_map *vec;
107 struct ib_send_wr send_wr;
108 int ret; 115 int ret;
109 116
110 vec = svc_rdma_get_req_map(rdma); 117 ctxt = svc_rdma_get_context(rdma);
111 ret = svc_rdma_map_xdr(rdma, sndbuf, vec, false); 118
112 if (ret) 119 /* rpcrdma_bc_send_request builds the transport header and
120 * the backchannel RPC message in the same buffer. Thus only
121 * one SGE is needed to send both.
122 */
123 ret = svc_rdma_map_reply_hdr(rdma, ctxt, rqst->rq_buffer,
124 rqst->rq_snd_buf.len);
125 if (ret < 0)
113 goto out_err; 126 goto out_err;
114 127
115 ret = svc_rdma_repost_recv(rdma, GFP_NOIO); 128 ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
116 if (ret) 129 if (ret)
117 goto out_err; 130 goto out_err;
118 131
119 ctxt = svc_rdma_get_context(rdma); 132 ret = svc_rdma_post_send_wr(rdma, ctxt, 1, 0);
120 ctxt->pages[0] = virt_to_page(rqst->rq_buffer); 133 if (ret)
121 ctxt->count = 1;
122
123 ctxt->direction = DMA_TO_DEVICE;
124 ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
125 ctxt->sge[0].length = sndbuf->len;
126 ctxt->sge[0].addr =
127 ib_dma_map_page(rdma->sc_cm_id->device, ctxt->pages[0], 0,
128 sndbuf->len, DMA_TO_DEVICE);
129 if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) {
130 ret = -EIO;
131 goto out_unmap;
132 }
133 svc_rdma_count_mappings(rdma, ctxt);
134
135 memset(&send_wr, 0, sizeof(send_wr));
136 ctxt->cqe.done = svc_rdma_wc_send;
137 send_wr.wr_cqe = &ctxt->cqe;
138 send_wr.sg_list = ctxt->sge;
139 send_wr.num_sge = 1;
140 send_wr.opcode = IB_WR_SEND;
141 send_wr.send_flags = IB_SEND_SIGNALED;
142
143 ret = svc_rdma_send(rdma, &send_wr);
144 if (ret) {
145 ret = -EIO;
146 goto out_unmap; 134 goto out_unmap;
147 }
148 135
149out_err: 136out_err:
150 svc_rdma_put_req_map(rdma, vec);
151 dprintk("svcrdma: %s returns %d\n", __func__, ret); 137 dprintk("svcrdma: %s returns %d\n", __func__, ret);
152 return ret; 138 return ret;
153 139
154out_unmap: 140out_unmap:
155 svc_rdma_unmap_dma(ctxt); 141 svc_rdma_unmap_dma(ctxt);
156 svc_rdma_put_context(ctxt, 1); 142 svc_rdma_put_context(ctxt, 1);
143 ret = -EIO;
157 goto out_err; 144 goto out_err;
158} 145}
159 146
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
index 1c4aabf0f657..bdcf7d85a3dc 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -166,92 +166,3 @@ out_inval:
166 dprintk("svcrdma: failed to parse transport header\n"); 166 dprintk("svcrdma: failed to parse transport header\n");
167 return -EINVAL; 167 return -EINVAL;
168} 168}
169
170int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt,
171 struct rpcrdma_msg *rmsgp,
172 enum rpcrdma_errcode err, __be32 *va)
173{
174 __be32 *startp = va;
175
176 *va++ = rmsgp->rm_xid;
177 *va++ = rmsgp->rm_vers;
178 *va++ = xprt->sc_fc_credits;
179 *va++ = rdma_error;
180 *va++ = cpu_to_be32(err);
181 if (err == ERR_VERS) {
182 *va++ = rpcrdma_version;
183 *va++ = rpcrdma_version;
184 }
185
186 return (int)((unsigned long)va - (unsigned long)startp);
187}
188
189/**
190 * svc_rdma_xdr_get_reply_hdr_length - Get length of Reply transport header
191 * @rdma_resp: buffer containing Reply transport header
192 *
193 * Returns length of transport header, in bytes.
194 */
195unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp)
196{
197 unsigned int nsegs;
198 __be32 *p;
199
200 p = rdma_resp;
201
202 /* RPC-over-RDMA V1 replies never have a Read list. */
203 p += rpcrdma_fixed_maxsz + 1;
204
205 /* Skip Write list. */
206 while (*p++ != xdr_zero) {
207 nsegs = be32_to_cpup(p++);
208 p += nsegs * rpcrdma_segment_maxsz;
209 }
210
211 /* Skip Reply chunk. */
212 if (*p++ != xdr_zero) {
213 nsegs = be32_to_cpup(p++);
214 p += nsegs * rpcrdma_segment_maxsz;
215 }
216
217 return (unsigned long)p - (unsigned long)rdma_resp;
218}
219
220void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
221{
222 struct rpcrdma_write_array *ary;
223
224 /* no read-list */
225 rmsgp->rm_body.rm_chunks[0] = xdr_zero;
226
227 /* write-array discrim */
228 ary = (struct rpcrdma_write_array *)
229 &rmsgp->rm_body.rm_chunks[1];
230 ary->wc_discrim = xdr_one;
231 ary->wc_nchunks = cpu_to_be32(chunks);
232
233 /* write-list terminator */
234 ary->wc_array[chunks].wc_target.rs_handle = xdr_zero;
235
236 /* reply-array discriminator */
237 ary->wc_array[chunks].wc_target.rs_length = xdr_zero;
238}
239
240void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,
241 int chunks)
242{
243 ary->wc_discrim = xdr_one;
244 ary->wc_nchunks = cpu_to_be32(chunks);
245}
246
247void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,
248 int chunk_no,
249 __be32 rs_handle,
250 __be64 rs_offset,
251 u32 write_len)
252{
253 struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target;
254 seg->rs_handle = rs_handle;
255 seg->rs_offset = rs_offset;
256 seg->rs_length = cpu_to_be32(write_len);
257}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index f7b2daf72a86..27a99bf5b1a6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -558,33 +558,85 @@ static void rdma_read_complete(struct svc_rqst *rqstp,
558 rqstp->rq_arg.buflen = head->arg.buflen; 558 rqstp->rq_arg.buflen = head->arg.buflen;
559} 559}
560 560
561static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
562 __be32 *rdma_argp, int status)
563{
564 struct svc_rdma_op_ctxt *ctxt;
565 __be32 *p, *err_msgp;
566 unsigned int length;
567 struct page *page;
568 int ret;
569
570 ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
571 if (ret)
572 return;
573
574 page = alloc_page(GFP_KERNEL);
575 if (!page)
576 return;
577 err_msgp = page_address(page);
578
579 p = err_msgp;
580 *p++ = *rdma_argp;
581 *p++ = *(rdma_argp + 1);
582 *p++ = xprt->sc_fc_credits;
583 *p++ = rdma_error;
584 if (status == -EPROTONOSUPPORT) {
585 *p++ = err_vers;
586 *p++ = rpcrdma_version;
587 *p++ = rpcrdma_version;
588 } else {
589 *p++ = err_chunk;
590 }
591 length = (unsigned long)p - (unsigned long)err_msgp;
592
593 /* Map transport header; no RPC message payload */
594 ctxt = svc_rdma_get_context(xprt);
595 ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length);
596 if (ret) {
597 dprintk("svcrdma: Error %d mapping send for protocol error\n",
598 ret);
599 return;
600 }
601
602 ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0);
603 if (ret) {
604 dprintk("svcrdma: Error %d posting send for protocol error\n",
605 ret);
606 svc_rdma_unmap_dma(ctxt);
607 svc_rdma_put_context(ctxt, 1);
608 }
609}
610
561/* By convention, backchannel calls arrive via rdma_msg type 611/* By convention, backchannel calls arrive via rdma_msg type
562 * messages, and never populate the chunk lists. This makes 612 * messages, and never populate the chunk lists. This makes
563 * the RPC/RDMA header small and fixed in size, so it is 613 * the RPC/RDMA header small and fixed in size, so it is
564 * straightforward to check the RPC header's direction field. 614 * straightforward to check the RPC header's direction field.
565 */ 615 */
566static bool 616static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt,
567svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, struct rpcrdma_msg *rmsgp) 617 __be32 *rdma_resp)
568{ 618{
569 __be32 *p = (__be32 *)rmsgp; 619 __be32 *p;
570 620
571 if (!xprt->xpt_bc_xprt) 621 if (!xprt->xpt_bc_xprt)
572 return false; 622 return false;
573 623
574 if (rmsgp->rm_type != rdma_msg) 624 p = rdma_resp + 3;
625 if (*p++ != rdma_msg)
575 return false; 626 return false;
576 if (rmsgp->rm_body.rm_chunks[0] != xdr_zero) 627
628 if (*p++ != xdr_zero)
577 return false; 629 return false;
578 if (rmsgp->rm_body.rm_chunks[1] != xdr_zero) 630 if (*p++ != xdr_zero)
579 return false; 631 return false;
580 if (rmsgp->rm_body.rm_chunks[2] != xdr_zero) 632 if (*p++ != xdr_zero)
581 return false; 633 return false;
582 634
583 /* sanity */ 635 /* XID sanity */
584 if (p[7] != rmsgp->rm_xid) 636 if (*p++ != *rdma_resp)
585 return false; 637 return false;
586 /* call direction */ 638 /* call direction */
587 if (p[8] == cpu_to_be32(RPC_CALL)) 639 if (*p == cpu_to_be32(RPC_CALL))
588 return false; 640 return false;
589 641
590 return true; 642 return true;
@@ -650,8 +702,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
650 goto out_drop; 702 goto out_drop;
651 rqstp->rq_xprt_hlen = ret; 703 rqstp->rq_xprt_hlen = ret;
652 704
653 if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) { 705 if (svc_rdma_is_backchannel_reply(xprt, &rmsgp->rm_xid)) {
654 ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp, 706 ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt,
707 &rmsgp->rm_xid,
655 &rqstp->rq_arg); 708 &rqstp->rq_arg);
656 svc_rdma_put_context(ctxt, 0); 709 svc_rdma_put_context(ctxt, 0);
657 if (ret) 710 if (ret)
@@ -686,7 +739,7 @@ complete:
686 return ret; 739 return ret;
687 740
688out_err: 741out_err:
689 svc_rdma_send_error(rdma_xprt, rmsgp, ret); 742 svc_rdma_send_error(rdma_xprt, &rmsgp->rm_xid, ret);
690 svc_rdma_put_context(ctxt, 0); 743 svc_rdma_put_context(ctxt, 0);
691 return 0; 744 return 0;
692 745
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
new file mode 100644
index 000000000000..0cf620277693
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -0,0 +1,512 @@
1/*
2 * Copyright (c) 2016 Oracle. All rights reserved.
3 *
4 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
5 */
6
7#include <linux/sunrpc/rpc_rdma.h>
8#include <linux/sunrpc/svc_rdma.h>
9#include <linux/sunrpc/debug.h>
10
11#include <rdma/rw.h>
12
13#define RPCDBG_FACILITY RPCDBG_SVCXPRT
14
15/* Each R/W context contains state for one chain of RDMA Read or
16 * Write Work Requests.
17 *
18 * Each WR chain handles a single contiguous server-side buffer,
19 * because scatterlist entries after the first have to start on
20 * page alignment. xdr_buf iovecs cannot guarantee alignment.
21 *
22 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
23 * from a client may contain a unique R_key, so each WR chain moves
24 * up to one segment at a time.
25 *
26 * The scatterlist makes this data structure over 4KB in size. To
27 * make it less likely to fail, and to handle the allocation for
28 * smaller I/O requests without disabling bottom-halves, these
29 * contexts are created on demand, but cached and reused until the
30 * controlling svcxprt_rdma is destroyed.
31 */
32struct svc_rdma_rw_ctxt {
33 struct list_head rw_list;
34 struct rdma_rw_ctx rw_ctx;
35 int rw_nents;
36 struct sg_table rw_sg_table;
37 struct scatterlist rw_first_sgl[0];
38};
39
40static inline struct svc_rdma_rw_ctxt *
41svc_rdma_next_ctxt(struct list_head *list)
42{
43 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
44 rw_list);
45}
46
47static struct svc_rdma_rw_ctxt *
48svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
49{
50 struct svc_rdma_rw_ctxt *ctxt;
51
52 spin_lock(&rdma->sc_rw_ctxt_lock);
53
54 ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts);
55 if (ctxt) {
56 list_del(&ctxt->rw_list);
57 spin_unlock(&rdma->sc_rw_ctxt_lock);
58 } else {
59 spin_unlock(&rdma->sc_rw_ctxt_lock);
60 ctxt = kmalloc(sizeof(*ctxt) +
61 SG_CHUNK_SIZE * sizeof(struct scatterlist),
62 GFP_KERNEL);
63 if (!ctxt)
64 goto out;
65 INIT_LIST_HEAD(&ctxt->rw_list);
66 }
67
68 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
69 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
70 ctxt->rw_sg_table.sgl)) {
71 kfree(ctxt);
72 ctxt = NULL;
73 }
74out:
75 return ctxt;
76}
77
78static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
79 struct svc_rdma_rw_ctxt *ctxt)
80{
81 sg_free_table_chained(&ctxt->rw_sg_table, true);
82
83 spin_lock(&rdma->sc_rw_ctxt_lock);
84 list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
85 spin_unlock(&rdma->sc_rw_ctxt_lock);
86}
87
88/**
89 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
90 * @rdma: transport about to be destroyed
91 *
92 */
93void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
94{
95 struct svc_rdma_rw_ctxt *ctxt;
96
97 while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) {
98 list_del(&ctxt->rw_list);
99 kfree(ctxt);
100 }
101}
102
103/* A chunk context tracks all I/O for moving one Read or Write
104 * chunk. This is a a set of rdma_rw's that handle data movement
105 * for all segments of one chunk.
106 *
107 * These are small, acquired with a single allocator call, and
108 * no more than one is needed per chunk. They are allocated on
109 * demand, and not cached.
110 */
111struct svc_rdma_chunk_ctxt {
112 struct ib_cqe cc_cqe;
113 struct svcxprt_rdma *cc_rdma;
114 struct list_head cc_rwctxts;
115 int cc_sqecount;
116 enum dma_data_direction cc_dir;
117};
118
119static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
120 struct svc_rdma_chunk_ctxt *cc,
121 enum dma_data_direction dir)
122{
123 cc->cc_rdma = rdma;
124 svc_xprt_get(&rdma->sc_xprt);
125
126 INIT_LIST_HEAD(&cc->cc_rwctxts);
127 cc->cc_sqecount = 0;
128 cc->cc_dir = dir;
129}
130
131static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc)
132{
133 struct svcxprt_rdma *rdma = cc->cc_rdma;
134 struct svc_rdma_rw_ctxt *ctxt;
135
136 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
137 list_del(&ctxt->rw_list);
138
139 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
140 rdma->sc_port_num, ctxt->rw_sg_table.sgl,
141 ctxt->rw_nents, cc->cc_dir);
142 svc_rdma_put_rw_ctxt(rdma, ctxt);
143 }
144 svc_xprt_put(&rdma->sc_xprt);
145}
146
147/* State for sending a Write or Reply chunk.
148 * - Tracks progress of writing one chunk over all its segments
149 * - Stores arguments for the SGL constructor functions
150 */
151struct svc_rdma_write_info {
152 /* write state of this chunk */
153 unsigned int wi_seg_off;
154 unsigned int wi_seg_no;
155 unsigned int wi_nsegs;
156 __be32 *wi_segs;
157
158 /* SGL constructor arguments */
159 struct xdr_buf *wi_xdr;
160 unsigned char *wi_base;
161 unsigned int wi_next_off;
162
163 struct svc_rdma_chunk_ctxt wi_cc;
164};
165
166static struct svc_rdma_write_info *
167svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
168{
169 struct svc_rdma_write_info *info;
170
171 info = kmalloc(sizeof(*info), GFP_KERNEL);
172 if (!info)
173 return info;
174
175 info->wi_seg_off = 0;
176 info->wi_seg_no = 0;
177 info->wi_nsegs = be32_to_cpup(++chunk);
178 info->wi_segs = ++chunk;
179 svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE);
180 return info;
181}
182
183static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
184{
185 svc_rdma_cc_release(&info->wi_cc);
186 kfree(info);
187}
188
189/**
190 * svc_rdma_write_done - Write chunk completion
191 * @cq: controlling Completion Queue
192 * @wc: Work Completion
193 *
194 * Pages under I/O are freed by a subsequent Send completion.
195 */
196static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
197{
198 struct ib_cqe *cqe = wc->wr_cqe;
199 struct svc_rdma_chunk_ctxt *cc =
200 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
201 struct svcxprt_rdma *rdma = cc->cc_rdma;
202 struct svc_rdma_write_info *info =
203 container_of(cc, struct svc_rdma_write_info, wi_cc);
204
205 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
206 wake_up(&rdma->sc_send_wait);
207
208 if (unlikely(wc->status != IB_WC_SUCCESS)) {
209 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
210 if (wc->status != IB_WC_WR_FLUSH_ERR)
211 pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
212 ib_wc_status_msg(wc->status),
213 wc->status, wc->vendor_err);
214 }
215
216 svc_rdma_write_info_free(info);
217}
218
219/* This function sleeps when the transport's Send Queue is congested.
220 *
221 * Assumptions:
222 * - If ib_post_send() succeeds, only one completion is expected,
223 * even if one or more WRs are flushed. This is true when posting
224 * an rdma_rw_ctx or when posting a single signaled WR.
225 */
226static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
227{
228 struct svcxprt_rdma *rdma = cc->cc_rdma;
229 struct svc_xprt *xprt = &rdma->sc_xprt;
230 struct ib_send_wr *first_wr, *bad_wr;
231 struct list_head *tmp;
232 struct ib_cqe *cqe;
233 int ret;
234
235 first_wr = NULL;
236 cqe = &cc->cc_cqe;
237 list_for_each(tmp, &cc->cc_rwctxts) {
238 struct svc_rdma_rw_ctxt *ctxt;
239
240 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
241 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
242 rdma->sc_port_num, cqe, first_wr);
243 cqe = NULL;
244 }
245
246 do {
247 if (atomic_sub_return(cc->cc_sqecount,
248 &rdma->sc_sq_avail) > 0) {
249 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
250 if (ret)
251 break;
252 return 0;
253 }
254
255 atomic_inc(&rdma_stat_sq_starve);
256 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
257 wait_event(rdma->sc_send_wait,
258 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
259 } while (1);
260
261 pr_err("svcrdma: ib_post_send failed (%d)\n", ret);
262 set_bit(XPT_CLOSE, &xprt->xpt_flags);
263
264 /* If even one was posted, there will be a completion. */
265 if (bad_wr != first_wr)
266 return 0;
267
268 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
269 wake_up(&rdma->sc_send_wait);
270 return -ENOTCONN;
271}
272
273/* Build and DMA-map an SGL that covers one kvec in an xdr_buf
274 */
275static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
276 unsigned int len,
277 struct svc_rdma_rw_ctxt *ctxt)
278{
279 struct scatterlist *sg = ctxt->rw_sg_table.sgl;
280
281 sg_set_buf(&sg[0], info->wi_base, len);
282 info->wi_base += len;
283
284 ctxt->rw_nents = 1;
285}
286
287/* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
288 */
289static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
290 unsigned int remaining,
291 struct svc_rdma_rw_ctxt *ctxt)
292{
293 unsigned int sge_no, sge_bytes, page_off, page_no;
294 struct xdr_buf *xdr = info->wi_xdr;
295 struct scatterlist *sg;
296 struct page **page;
297
298 page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK;
299 page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT;
300 page = xdr->pages + page_no;
301 info->wi_next_off += remaining;
302 sg = ctxt->rw_sg_table.sgl;
303 sge_no = 0;
304 do {
305 sge_bytes = min_t(unsigned int, remaining,
306 PAGE_SIZE - page_off);
307 sg_set_page(sg, *page, sge_bytes, page_off);
308
309 remaining -= sge_bytes;
310 sg = sg_next(sg);
311 page_off = 0;
312 sge_no++;
313 page++;
314 } while (remaining);
315
316 ctxt->rw_nents = sge_no;
317}
318
319/* Construct RDMA Write WRs to send a portion of an xdr_buf containing
320 * an RPC Reply.
321 */
322static int
323svc_rdma_build_writes(struct svc_rdma_write_info *info,
324 void (*constructor)(struct svc_rdma_write_info *info,
325 unsigned int len,
326 struct svc_rdma_rw_ctxt *ctxt),
327 unsigned int remaining)
328{
329 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
330 struct svcxprt_rdma *rdma = cc->cc_rdma;
331 struct svc_rdma_rw_ctxt *ctxt;
332 __be32 *seg;
333 int ret;
334
335 cc->cc_cqe.done = svc_rdma_write_done;
336 seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
337 do {
338 unsigned int write_len;
339 u32 seg_length, seg_handle;
340 u64 seg_offset;
341
342 if (info->wi_seg_no >= info->wi_nsegs)
343 goto out_overflow;
344
345 seg_handle = be32_to_cpup(seg);
346 seg_length = be32_to_cpup(seg + 1);
347 xdr_decode_hyper(seg + 2, &seg_offset);
348 seg_offset += info->wi_seg_off;
349
350 write_len = min(remaining, seg_length - info->wi_seg_off);
351 ctxt = svc_rdma_get_rw_ctxt(rdma,
352 (write_len >> PAGE_SHIFT) + 2);
353 if (!ctxt)
354 goto out_noctx;
355
356 constructor(info, write_len, ctxt);
357 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp,
358 rdma->sc_port_num, ctxt->rw_sg_table.sgl,
359 ctxt->rw_nents, 0, seg_offset,
360 seg_handle, DMA_TO_DEVICE);
361 if (ret < 0)
362 goto out_initerr;
363
364 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
365 cc->cc_sqecount += ret;
366 if (write_len == seg_length - info->wi_seg_off) {
367 seg += 4;
368 info->wi_seg_no++;
369 info->wi_seg_off = 0;
370 } else {
371 info->wi_seg_off += write_len;
372 }
373 remaining -= write_len;
374 } while (remaining);
375
376 return 0;
377
378out_overflow:
379 dprintk("svcrdma: inadequate space in Write chunk (%u)\n",
380 info->wi_nsegs);
381 return -E2BIG;
382
383out_noctx:
384 dprintk("svcrdma: no R/W ctxs available\n");
385 return -ENOMEM;
386
387out_initerr:
388 svc_rdma_put_rw_ctxt(rdma, ctxt);
389 pr_err("svcrdma: failed to map pagelist (%d)\n", ret);
390 return -EIO;
391}
392
393/* Send one of an xdr_buf's kvecs by itself. To send a Reply
394 * chunk, the whole RPC Reply is written back to the client.
395 * This function writes either the head or tail of the xdr_buf
396 * containing the Reply.
397 */
398static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
399 struct kvec *vec)
400{
401 info->wi_base = vec->iov_base;
402 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
403 vec->iov_len);
404}
405
406/* Send an xdr_buf's page list by itself. A Write chunk is
407 * just the page list. a Reply chunk is the head, page list,
408 * and tail. This function is shared between the two types
409 * of chunk.
410 */
411static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
412 struct xdr_buf *xdr)
413{
414 info->wi_xdr = xdr;
415 info->wi_next_off = 0;
416 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
417 xdr->page_len);
418}
419
420/**
421 * svc_rdma_send_write_chunk - Write all segments in a Write chunk
422 * @rdma: controlling RDMA transport
423 * @wr_ch: Write chunk provided by client
424 * @xdr: xdr_buf containing the data payload
425 *
426 * Returns a non-negative number of bytes the chunk consumed, or
427 * %-E2BIG if the payload was larger than the Write chunk,
428 * %-ENOMEM if rdma_rw context pool was exhausted,
429 * %-ENOTCONN if posting failed (connection is lost),
430 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
431 */
432int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
433 struct xdr_buf *xdr)
434{
435 struct svc_rdma_write_info *info;
436 int ret;
437
438 if (!xdr->page_len)
439 return 0;
440
441 info = svc_rdma_write_info_alloc(rdma, wr_ch);
442 if (!info)
443 return -ENOMEM;
444
445 ret = svc_rdma_send_xdr_pagelist(info, xdr);
446 if (ret < 0)
447 goto out_err;
448
449 ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
450 if (ret < 0)
451 goto out_err;
452 return xdr->page_len;
453
454out_err:
455 svc_rdma_write_info_free(info);
456 return ret;
457}
458
459/**
460 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
461 * @rdma: controlling RDMA transport
462 * @rp_ch: Reply chunk provided by client
463 * @writelist: true if client provided a Write list
464 * @xdr: xdr_buf containing an RPC Reply
465 *
466 * Returns a non-negative number of bytes the chunk consumed, or
467 * %-E2BIG if the payload was larger than the Reply chunk,
468 * %-ENOMEM if rdma_rw context pool was exhausted,
469 * %-ENOTCONN if posting failed (connection is lost),
470 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
471 */
472int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
473 bool writelist, struct xdr_buf *xdr)
474{
475 struct svc_rdma_write_info *info;
476 int consumed, ret;
477
478 info = svc_rdma_write_info_alloc(rdma, rp_ch);
479 if (!info)
480 return -ENOMEM;
481
482 ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]);
483 if (ret < 0)
484 goto out_err;
485 consumed = xdr->head[0].iov_len;
486
487 /* Send the page list in the Reply chunk only if the
488 * client did not provide Write chunks.
489 */
490 if (!writelist && xdr->page_len) {
491 ret = svc_rdma_send_xdr_pagelist(info, xdr);
492 if (ret < 0)
493 goto out_err;
494 consumed += xdr->page_len;
495 }
496
497 if (xdr->tail[0].iov_len) {
498 ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]);
499 if (ret < 0)
500 goto out_err;
501 consumed += xdr->tail[0].iov_len;
502 }
503
504 ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
505 if (ret < 0)
506 goto out_err;
507 return consumed;
508
509out_err:
510 svc_rdma_write_info_free(info);
511 return ret;
512}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 515221b16d09..1736337f3a55 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -1,4 +1,5 @@
1/* 1/*
2 * Copyright (c) 2016 Oracle. All rights reserved.
2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 3 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
3 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 4 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
4 * 5 *
@@ -40,6 +41,63 @@
40 * Author: Tom Tucker <tom@opengridcomputing.com> 41 * Author: Tom Tucker <tom@opengridcomputing.com>
41 */ 42 */
42 43
44/* Operation
45 *
46 * The main entry point is svc_rdma_sendto. This is called by the
47 * RPC server when an RPC Reply is ready to be transmitted to a client.
48 *
49 * The passed-in svc_rqst contains a struct xdr_buf which holds an
50 * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
51 * transport header, post all Write WRs needed for this Reply, then post
52 * a Send WR conveying the transport header and the RPC message itself to
53 * the client.
54 *
55 * svc_rdma_sendto must fully transmit the Reply before returning, as
56 * the svc_rqst will be recycled as soon as sendto returns. Remaining
57 * resources referred to by the svc_rqst are also recycled at that time.
58 * Therefore any resources that must remain longer must be detached
59 * from the svc_rqst and released later.
60 *
61 * Page Management
62 *
63 * The I/O that performs Reply transmission is asynchronous, and may
64 * complete well after sendto returns. Thus pages under I/O must be
65 * removed from the svc_rqst before sendto returns.
66 *
67 * The logic here depends on Send Queue and completion ordering. Since
68 * the Send WR is always posted last, it will always complete last. Thus
69 * when it completes, it is guaranteed that all previous Write WRs have
70 * also completed.
71 *
72 * Write WRs are constructed and posted. Each Write segment gets its own
73 * svc_rdma_rw_ctxt, allowing the Write completion handler to find and
74 * DMA-unmap the pages under I/O for that Write segment. The Write
75 * completion handler does not release any pages.
76 *
77 * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt.
78 * The ownership of all of the Reply's pages are transferred into that
79 * ctxt, the Send WR is posted, and sendto returns.
80 *
81 * The svc_rdma_op_ctxt is presented when the Send WR completes. The
82 * Send completion handler finally releases the Reply's pages.
83 *
84 * This mechanism also assumes that completions on the transport's Send
85 * Completion Queue do not run in parallel. Otherwise a Write completion
86 * and Send completion running at the same time could release pages that
87 * are still DMA-mapped.
88 *
89 * Error Handling
90 *
91 * - If the Send WR is posted successfully, it will either complete
92 * successfully, or get flushed. Either way, the Send completion
93 * handler releases the Reply's pages.
94 * - If the Send WR cannot be not posted, the forward path releases
95 * the Reply's pages.
96 *
97 * This handles the case, without the use of page reference counting,
98 * where two different Write segments send portions of the same page.
99 */
100
43#include <linux/sunrpc/debug.h> 101#include <linux/sunrpc/debug.h>
44#include <linux/sunrpc/rpc_rdma.h> 102#include <linux/sunrpc/rpc_rdma.h>
45#include <linux/spinlock.h> 103#include <linux/spinlock.h>
@@ -55,113 +113,141 @@ static u32 xdr_padsize(u32 len)
55 return (len & 3) ? (4 - (len & 3)) : 0; 113 return (len & 3) ? (4 - (len & 3)) : 0;
56} 114}
57 115
58int svc_rdma_map_xdr(struct svcxprt_rdma *xprt, 116/* Returns length of transport header, in bytes.
59 struct xdr_buf *xdr, 117 */
60 struct svc_rdma_req_map *vec, 118static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
61 bool write_chunk_present)
62{ 119{
63 int sge_no; 120 unsigned int nsegs;
64 u32 sge_bytes; 121 __be32 *p;
65 u32 page_bytes;
66 u32 page_off;
67 int page_no;
68
69 if (xdr->len !=
70 (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
71 pr_err("svcrdma: %s: XDR buffer length error\n", __func__);
72 return -EIO;
73 }
74 122
75 /* Skip the first sge, this is for the RPCRDMA header */ 123 p = rdma_resp;
76 sge_no = 1; 124
125 /* RPC-over-RDMA V1 replies never have a Read list. */
126 p += rpcrdma_fixed_maxsz + 1;
77 127
78 /* Head SGE */ 128 /* Skip Write list. */
79 vec->sge[sge_no].iov_base = xdr->head[0].iov_base; 129 while (*p++ != xdr_zero) {
80 vec->sge[sge_no].iov_len = xdr->head[0].iov_len; 130 nsegs = be32_to_cpup(p++);
81 sge_no++; 131 p += nsegs * rpcrdma_segment_maxsz;
82
83 /* pages SGE */
84 page_no = 0;
85 page_bytes = xdr->page_len;
86 page_off = xdr->page_base;
87 while (page_bytes) {
88 vec->sge[sge_no].iov_base =
89 page_address(xdr->pages[page_no]) + page_off;
90 sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
91 page_bytes -= sge_bytes;
92 vec->sge[sge_no].iov_len = sge_bytes;
93
94 sge_no++;
95 page_no++;
96 page_off = 0; /* reset for next time through loop */
97 } 132 }
98 133
99 /* Tail SGE */ 134 /* Skip Reply chunk. */
100 if (xdr->tail[0].iov_len) { 135 if (*p++ != xdr_zero) {
101 unsigned char *base = xdr->tail[0].iov_base; 136 nsegs = be32_to_cpup(p++);
102 size_t len = xdr->tail[0].iov_len; 137 p += nsegs * rpcrdma_segment_maxsz;
103 u32 xdr_pad = xdr_padsize(xdr->page_len); 138 }
104 139
105 if (write_chunk_present && xdr_pad) { 140 return (unsigned long)p - (unsigned long)rdma_resp;
106 base += xdr_pad; 141}
107 len -= xdr_pad;
108 }
109 142
110 if (len) { 143/* One Write chunk is copied from Call transport header to Reply
111 vec->sge[sge_no].iov_base = base; 144 * transport header. Each segment's length field is updated to
112 vec->sge[sge_no].iov_len = len; 145 * reflect number of bytes consumed in the segment.
113 sge_no++; 146 *
147 * Returns number of segments in this chunk.
148 */
149static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
150 unsigned int remaining)
151{
152 unsigned int i, nsegs;
153 u32 seg_len;
154
155 /* Write list discriminator */
156 *dst++ = *src++;
157
158 /* number of segments in this chunk */
159 nsegs = be32_to_cpup(src);
160 *dst++ = *src++;
161
162 for (i = nsegs; i; i--) {
163 /* segment's RDMA handle */
164 *dst++ = *src++;
165
166 /* bytes returned in this segment */
167 seg_len = be32_to_cpu(*src);
168 if (remaining >= seg_len) {
169 /* entire segment was consumed */
170 *dst = *src;
171 remaining -= seg_len;
172 } else {
173 /* segment only partly filled */
174 *dst = cpu_to_be32(remaining);
175 remaining = 0;
114 } 176 }
115 } 177 dst++; src++;
116 178
117 dprintk("svcrdma: %s: sge_no %d page_no %d " 179 /* segment's RDMA offset */
118 "page_base %u page_len %u head_len %zu tail_len %zu\n", 180 *dst++ = *src++;
119 __func__, sge_no, page_no, xdr->page_base, xdr->page_len, 181 *dst++ = *src++;
120 xdr->head[0].iov_len, xdr->tail[0].iov_len); 182 }
121 183
122 vec->count = sge_no; 184 return nsegs;
123 return 0;
124} 185}
125 186
126static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt, 187/* The client provided a Write list in the Call message. Fill in
127 struct xdr_buf *xdr, 188 * the segments in the first Write chunk in the Reply's transport
128 u32 xdr_off, size_t len, int dir) 189 * header with the number of bytes consumed in each segment.
190 * Remaining chunks are returned unused.
191 *
192 * Assumptions:
193 * - Client has provided only one Write chunk
194 */
195static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
196 unsigned int consumed)
129{ 197{
130 struct page *page; 198 unsigned int nsegs;
131 dma_addr_t dma_addr; 199 __be32 *p, *q;
132 if (xdr_off < xdr->head[0].iov_len) { 200
133 /* This offset is in the head */ 201 /* RPC-over-RDMA V1 replies never have a Read list. */
134 xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK; 202 p = rdma_resp + rpcrdma_fixed_maxsz + 1;
135 page = virt_to_page(xdr->head[0].iov_base); 203
136 } else { 204 q = wr_ch;
137 xdr_off -= xdr->head[0].iov_len; 205 while (*q != xdr_zero) {
138 if (xdr_off < xdr->page_len) { 206 nsegs = xdr_encode_write_chunk(p, q, consumed);
139 /* This offset is in the page list */ 207 q += 2 + nsegs * rpcrdma_segment_maxsz;
140 xdr_off += xdr->page_base; 208 p += 2 + nsegs * rpcrdma_segment_maxsz;
141 page = xdr->pages[xdr_off >> PAGE_SHIFT]; 209 consumed = 0;
142 xdr_off &= ~PAGE_MASK;
143 } else {
144 /* This offset is in the tail */
145 xdr_off -= xdr->page_len;
146 xdr_off += (unsigned long)
147 xdr->tail[0].iov_base & ~PAGE_MASK;
148 page = virt_to_page(xdr->tail[0].iov_base);
149 }
150 } 210 }
151 dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off, 211
152 min_t(size_t, PAGE_SIZE, len), dir); 212 /* Terminate Write list */
153 return dma_addr; 213 *p++ = xdr_zero;
214
215 /* Reply chunk discriminator; may be replaced later */
216 *p = xdr_zero;
217}
218
219/* The client provided a Reply chunk in the Call message. Fill in
220 * the segments in the Reply chunk in the Reply message with the
221 * number of bytes consumed in each segment.
222 *
223 * Assumptions:
224 * - Reply can always fit in the provided Reply chunk
225 */
226static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
227 unsigned int consumed)
228{
229 __be32 *p;
230
231 /* Find the Reply chunk in the Reply's xprt header.
232 * RPC-over-RDMA V1 replies never have a Read list.
233 */
234 p = rdma_resp + rpcrdma_fixed_maxsz + 1;
235
236 /* Skip past Write list */
237 while (*p++ != xdr_zero)
238 p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
239
240 xdr_encode_write_chunk(p, rp_ch, consumed);
154} 241}
155 242
156/* Parse the RPC Call's transport header. 243/* Parse the RPC Call's transport header.
157 */ 244 */
158static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, 245static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
159 struct rpcrdma_write_array **write, 246 __be32 **write, __be32 **reply)
160 struct rpcrdma_write_array **reply)
161{ 247{
162 __be32 *p; 248 __be32 *p;
163 249
164 p = (__be32 *)&rmsgp->rm_body.rm_chunks[0]; 250 p = rdma_argp + rpcrdma_fixed_maxsz;
165 251
166 /* Read list */ 252 /* Read list */
167 while (*p++ != xdr_zero) 253 while (*p++ != xdr_zero)
@@ -169,7 +255,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
169 255
170 /* Write list */ 256 /* Write list */
171 if (*p != xdr_zero) { 257 if (*p != xdr_zero) {
172 *write = (struct rpcrdma_write_array *)p; 258 *write = p;
173 while (*p++ != xdr_zero) 259 while (*p++ != xdr_zero)
174 p += 1 + be32_to_cpu(*p) * 4; 260 p += 1 + be32_to_cpu(*p) * 4;
175 } else { 261 } else {
@@ -179,7 +265,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
179 265
180 /* Reply chunk */ 266 /* Reply chunk */
181 if (*p != xdr_zero) 267 if (*p != xdr_zero)
182 *reply = (struct rpcrdma_write_array *)p; 268 *reply = p;
183 else 269 else
184 *reply = NULL; 270 *reply = NULL;
185} 271}
@@ -189,360 +275,321 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
189 * Invalidate, and responder chooses one rkey to invalidate. 275 * Invalidate, and responder chooses one rkey to invalidate.
190 * 276 *
191 * Find a candidate rkey to invalidate when sending a reply. Picks the 277 * Find a candidate rkey to invalidate when sending a reply. Picks the
192 * first rkey it finds in the chunks lists. 278 * first R_key it finds in the chunk lists.
193 * 279 *
194 * Returns zero if RPC's chunk lists are empty. 280 * Returns zero if RPC's chunk lists are empty.
195 */ 281 */
196static u32 svc_rdma_get_inv_rkey(struct rpcrdma_msg *rdma_argp, 282static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
197 struct rpcrdma_write_array *wr_ary, 283 __be32 *wr_lst, __be32 *rp_ch)
198 struct rpcrdma_write_array *rp_ary)
199{ 284{
200 struct rpcrdma_read_chunk *rd_ary; 285 __be32 *p;
201 struct rpcrdma_segment *arg_ch;
202 286
203 rd_ary = (struct rpcrdma_read_chunk *)&rdma_argp->rm_body.rm_chunks[0]; 287 p = rdma_argp + rpcrdma_fixed_maxsz;
204 if (rd_ary->rc_discrim != xdr_zero) 288 if (*p != xdr_zero)
205 return be32_to_cpu(rd_ary->rc_target.rs_handle); 289 p += 2;
290 else if (wr_lst && be32_to_cpup(wr_lst + 1))
291 p = wr_lst + 2;
292 else if (rp_ch && be32_to_cpup(rp_ch + 1))
293 p = rp_ch + 2;
294 else
295 return 0;
296 return be32_to_cpup(p);
297}
206 298
207 if (wr_ary && be32_to_cpu(wr_ary->wc_nchunks)) { 299/* ib_dma_map_page() is used here because svc_rdma_dma_unmap()
208 arg_ch = &wr_ary->wc_array[0].wc_target; 300 * is used during completion to DMA-unmap this memory, and
209 return be32_to_cpu(arg_ch->rs_handle); 301 * it uses ib_dma_unmap_page() exclusively.
210 } 302 */
303static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
304 struct svc_rdma_op_ctxt *ctxt,
305 unsigned int sge_no,
306 unsigned char *base,
307 unsigned int len)
308{
309 unsigned long offset = (unsigned long)base & ~PAGE_MASK;
310 struct ib_device *dev = rdma->sc_cm_id->device;
311 dma_addr_t dma_addr;
211 312
212 if (rp_ary && be32_to_cpu(rp_ary->wc_nchunks)) { 313 dma_addr = ib_dma_map_page(dev, virt_to_page(base),
213 arg_ch = &rp_ary->wc_array[0].wc_target; 314 offset, len, DMA_TO_DEVICE);
214 return be32_to_cpu(arg_ch->rs_handle); 315 if (ib_dma_mapping_error(dev, dma_addr))
215 } 316 return -EIO;
216 317
318 ctxt->sge[sge_no].addr = dma_addr;
319 ctxt->sge[sge_no].length = len;
320 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
321 svc_rdma_count_mappings(rdma, ctxt);
217 return 0; 322 return 0;
218} 323}
219 324
220/* Assumptions: 325static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
221 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE 326 struct svc_rdma_op_ctxt *ctxt,
222 */ 327 unsigned int sge_no,
223static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, 328 struct page *page,
224 u32 rmr, u64 to, 329 unsigned int offset,
225 u32 xdr_off, int write_len, 330 unsigned int len)
226 struct svc_rdma_req_map *vec)
227{ 331{
228 struct ib_rdma_wr write_wr; 332 struct ib_device *dev = rdma->sc_cm_id->device;
229 struct ib_sge *sge; 333 dma_addr_t dma_addr;
230 int xdr_sge_no;
231 int sge_no;
232 int sge_bytes;
233 int sge_off;
234 int bc;
235 struct svc_rdma_op_ctxt *ctxt;
236 334
237 if (vec->count > RPCSVC_MAXPAGES) { 335 dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
238 pr_err("svcrdma: Too many pages (%lu)\n", vec->count); 336 if (ib_dma_mapping_error(dev, dma_addr))
239 return -EIO; 337 return -EIO;
240 }
241 338
242 dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, " 339 ctxt->sge[sge_no].addr = dma_addr;
243 "write_len=%d, vec->sge=%p, vec->count=%lu\n", 340 ctxt->sge[sge_no].length = len;
244 rmr, (unsigned long long)to, xdr_off, 341 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
245 write_len, vec->sge, vec->count); 342 svc_rdma_count_mappings(rdma, ctxt);
343 return 0;
344}
246 345
247 ctxt = svc_rdma_get_context(xprt); 346/**
347 * svc_rdma_map_reply_hdr - DMA map the transport header buffer
348 * @rdma: controlling transport
349 * @ctxt: op_ctxt for the Send WR
350 * @rdma_resp: buffer containing transport header
351 * @len: length of transport header
352 *
353 * Returns:
354 * %0 if the header is DMA mapped,
355 * %-EIO if DMA mapping failed.
356 */
357int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
358 struct svc_rdma_op_ctxt *ctxt,
359 __be32 *rdma_resp,
360 unsigned int len)
361{
248 ctxt->direction = DMA_TO_DEVICE; 362 ctxt->direction = DMA_TO_DEVICE;
249 sge = ctxt->sge; 363 ctxt->pages[0] = virt_to_page(rdma_resp);
250 364 ctxt->count = 1;
251 /* Find the SGE associated with xdr_off */ 365 return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len);
252 for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
253 xdr_sge_no++) {
254 if (vec->sge[xdr_sge_no].iov_len > bc)
255 break;
256 bc -= vec->sge[xdr_sge_no].iov_len;
257 }
258
259 sge_off = bc;
260 bc = write_len;
261 sge_no = 0;
262
263 /* Copy the remaining SGE */
264 while (bc != 0) {
265 sge_bytes = min_t(size_t,
266 bc, vec->sge[xdr_sge_no].iov_len-sge_off);
267 sge[sge_no].length = sge_bytes;
268 sge[sge_no].addr =
269 dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
270 sge_bytes, DMA_TO_DEVICE);
271 xdr_off += sge_bytes;
272 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
273 sge[sge_no].addr))
274 goto err;
275 svc_rdma_count_mappings(xprt, ctxt);
276 sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
277 ctxt->count++;
278 sge_off = 0;
279 sge_no++;
280 xdr_sge_no++;
281 if (xdr_sge_no > vec->count) {
282 pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
283 goto err;
284 }
285 bc -= sge_bytes;
286 if (sge_no == xprt->sc_max_sge)
287 break;
288 }
289
290 /* Prepare WRITE WR */
291 memset(&write_wr, 0, sizeof write_wr);
292 ctxt->cqe.done = svc_rdma_wc_write;
293 write_wr.wr.wr_cqe = &ctxt->cqe;
294 write_wr.wr.sg_list = &sge[0];
295 write_wr.wr.num_sge = sge_no;
296 write_wr.wr.opcode = IB_WR_RDMA_WRITE;
297 write_wr.wr.send_flags = IB_SEND_SIGNALED;
298 write_wr.rkey = rmr;
299 write_wr.remote_addr = to;
300
301 /* Post It */
302 atomic_inc(&rdma_stat_write);
303 if (svc_rdma_send(xprt, &write_wr.wr))
304 goto err;
305 return write_len - bc;
306 err:
307 svc_rdma_unmap_dma(ctxt);
308 svc_rdma_put_context(ctxt, 0);
309 return -EIO;
310} 366}
311 367
312noinline 368/* Load the xdr_buf into the ctxt's sge array, and DMA map each
313static int send_write_chunks(struct svcxprt_rdma *xprt, 369 * element as it is added.
314 struct rpcrdma_write_array *wr_ary, 370 *
315 struct rpcrdma_msg *rdma_resp, 371 * Returns the number of sge elements loaded on success, or
316 struct svc_rqst *rqstp, 372 * a negative errno on failure.
317 struct svc_rdma_req_map *vec) 373 */
374static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
375 struct svc_rdma_op_ctxt *ctxt,
376 struct xdr_buf *xdr, __be32 *wr_lst)
318{ 377{
319 u32 xfer_len = rqstp->rq_res.page_len; 378 unsigned int len, sge_no, remaining, page_off;
320 int write_len; 379 struct page **ppages;
321 u32 xdr_off; 380 unsigned char *base;
322 int chunk_off; 381 u32 xdr_pad;
323 int chunk_no;
324 int nchunks;
325 struct rpcrdma_write_array *res_ary;
326 int ret; 382 int ret;
327 383
328 res_ary = (struct rpcrdma_write_array *) 384 sge_no = 1;
329 &rdma_resp->rm_body.rm_chunks[1]; 385
330 386 ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++,
331 /* Write chunks start at the pagelist */ 387 xdr->head[0].iov_base,
332 nchunks = be32_to_cpu(wr_ary->wc_nchunks); 388 xdr->head[0].iov_len);
333 for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0; 389 if (ret < 0)
334 xfer_len && chunk_no < nchunks; 390 return ret;
335 chunk_no++) { 391
336 struct rpcrdma_segment *arg_ch; 392 /* If a Write chunk is present, the xdr_buf's page list
337 u64 rs_offset; 393 * is not included inline. However the Upper Layer may
338 394 * have added XDR padding in the tail buffer, and that
339 arg_ch = &wr_ary->wc_array[chunk_no].wc_target; 395 * should not be included inline.
340 write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length)); 396 */
341 397 if (wr_lst) {
342 /* Prepare the response chunk given the length actually 398 base = xdr->tail[0].iov_base;
343 * written */ 399 len = xdr->tail[0].iov_len;
344 xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset); 400 xdr_pad = xdr_padsize(xdr->page_len);
345 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no, 401
346 arg_ch->rs_handle, 402 if (len && xdr_pad) {
347 arg_ch->rs_offset, 403 base += xdr_pad;
348 write_len); 404 len -= xdr_pad;
349 chunk_off = 0;
350 while (write_len) {
351 ret = send_write(xprt, rqstp,
352 be32_to_cpu(arg_ch->rs_handle),
353 rs_offset + chunk_off,
354 xdr_off,
355 write_len,
356 vec);
357 if (ret <= 0)
358 goto out_err;
359 chunk_off += ret;
360 xdr_off += ret;
361 xfer_len -= ret;
362 write_len -= ret;
363 } 405 }
406
407 goto tail;
408 }
409
410 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
411 page_off = xdr->page_base & ~PAGE_MASK;
412 remaining = xdr->page_len;
413 while (remaining) {
414 len = min_t(u32, PAGE_SIZE - page_off, remaining);
415
416 ret = svc_rdma_dma_map_page(rdma, ctxt, sge_no++,
417 *ppages++, page_off, len);
418 if (ret < 0)
419 return ret;
420
421 remaining -= len;
422 page_off = 0;
364 } 423 }
365 /* Update the req with the number of chunks actually used */
366 svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
367 424
368 return rqstp->rq_res.page_len; 425 base = xdr->tail[0].iov_base;
426 len = xdr->tail[0].iov_len;
427tail:
428 if (len) {
429 ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, base, len);
430 if (ret < 0)
431 return ret;
432 }
369 433
370out_err: 434 return sge_no - 1;
371 pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
372 return -EIO;
373} 435}
374 436
375noinline 437/* The svc_rqst and all resources it owns are released as soon as
376static int send_reply_chunks(struct svcxprt_rdma *xprt, 438 * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
377 struct rpcrdma_write_array *rp_ary, 439 * so they are released by the Send completion handler.
378 struct rpcrdma_msg *rdma_resp, 440 */
379 struct svc_rqst *rqstp, 441static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
380 struct svc_rdma_req_map *vec) 442 struct svc_rdma_op_ctxt *ctxt)
381{ 443{
382 u32 xfer_len = rqstp->rq_res.len; 444 int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
383 int write_len;
384 u32 xdr_off;
385 int chunk_no;
386 int chunk_off;
387 int nchunks;
388 struct rpcrdma_segment *ch;
389 struct rpcrdma_write_array *res_ary;
390 int ret;
391 445
392 /* XXX: need to fix when reply lists occur with read-list and or 446 ctxt->count += pages;
393 * write-list */ 447 for (i = 0; i < pages; i++) {
394 res_ary = (struct rpcrdma_write_array *) 448 ctxt->pages[i + 1] = rqstp->rq_respages[i];
395 &rdma_resp->rm_body.rm_chunks[2]; 449 rqstp->rq_respages[i] = NULL;
396
397 /* xdr offset starts at RPC message */
398 nchunks = be32_to_cpu(rp_ary->wc_nchunks);
399 for (xdr_off = 0, chunk_no = 0;
400 xfer_len && chunk_no < nchunks;
401 chunk_no++) {
402 u64 rs_offset;
403 ch = &rp_ary->wc_array[chunk_no].wc_target;
404 write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
405
406 /* Prepare the reply chunk given the length actually
407 * written */
408 xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
409 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
410 ch->rs_handle, ch->rs_offset,
411 write_len);
412 chunk_off = 0;
413 while (write_len) {
414 ret = send_write(xprt, rqstp,
415 be32_to_cpu(ch->rs_handle),
416 rs_offset + chunk_off,
417 xdr_off,
418 write_len,
419 vec);
420 if (ret <= 0)
421 goto out_err;
422 chunk_off += ret;
423 xdr_off += ret;
424 xfer_len -= ret;
425 write_len -= ret;
426 }
427 } 450 }
428 /* Update the req with the number of chunks actually used */ 451 rqstp->rq_next_page = rqstp->rq_respages + 1;
429 svc_rdma_xdr_encode_reply_array(res_ary, chunk_no); 452}
430 453
431 return rqstp->rq_res.len; 454/**
455 * svc_rdma_post_send_wr - Set up and post one Send Work Request
456 * @rdma: controlling transport
457 * @ctxt: op_ctxt for transmitting the Send WR
458 * @num_sge: number of SGEs to send
459 * @inv_rkey: R_key argument to Send With Invalidate, or zero
460 *
461 * Returns:
462 * %0 if the Send* was posted successfully,
463 * %-ENOTCONN if the connection was lost or dropped,
464 * %-EINVAL if there was a problem with the Send we built,
465 * %-ENOMEM if ib_post_send failed.
466 */
467int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
468 struct svc_rdma_op_ctxt *ctxt, int num_sge,
469 u32 inv_rkey)
470{
471 struct ib_send_wr *send_wr = &ctxt->send_wr;
432 472
433out_err: 473 dprintk("svcrdma: posting Send WR with %u sge(s)\n", num_sge);
434 pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret); 474
435 return -EIO; 475 send_wr->next = NULL;
476 ctxt->cqe.done = svc_rdma_wc_send;
477 send_wr->wr_cqe = &ctxt->cqe;
478 send_wr->sg_list = ctxt->sge;
479 send_wr->num_sge = num_sge;
480 send_wr->send_flags = IB_SEND_SIGNALED;
481 if (inv_rkey) {
482 send_wr->opcode = IB_WR_SEND_WITH_INV;
483 send_wr->ex.invalidate_rkey = inv_rkey;
484 } else {
485 send_wr->opcode = IB_WR_SEND;
486 }
487
488 return svc_rdma_send(rdma, send_wr);
436} 489}
437 490
438/* This function prepares the portion of the RPCRDMA message to be 491/* Prepare the portion of the RPC Reply that will be transmitted
439 * sent in the RDMA_SEND. This function is called after data sent via 492 * via RDMA Send. The RPC-over-RDMA transport header is prepared
440 * RDMA has already been transmitted. There are three cases: 493 * in sge[0], and the RPC xdr_buf is prepared in following sges.
441 * - The RPCRDMA header, RPC header, and payload are all sent in a 494 *
442 * single RDMA_SEND. This is the "inline" case. 495 * Depending on whether a Write list or Reply chunk is present,
443 * - The RPCRDMA header and some portion of the RPC header and data 496 * the server may send all, a portion of, or none of the xdr_buf.
444 * are sent via this RDMA_SEND and another portion of the data is 497 * In the latter case, only the transport header (sge[0]) is
445 * sent via RDMA. 498 * transmitted.
446 * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC 499 *
447 * header and data are all transmitted via RDMA. 500 * RDMA Send is the last step of transmitting an RPC reply. Pages
448 * In all three cases, this function prepares the RPCRDMA header in 501 * involved in the earlier RDMA Writes are here transferred out
449 * sge[0], the 'type' parameter indicates the type to place in the 502 * of the rqstp and into the ctxt's page array. These pages are
450 * RPCRDMA header, and the 'byte_count' field indicates how much of 503 * DMA unmapped by each Write completion, but the subsequent Send
451 * the XDR to include in this RDMA_SEND. NB: The offset of the payload 504 * completion finally releases these pages.
452 * to send is zero in the XDR. 505 *
506 * Assumptions:
507 * - The Reply's transport header will never be larger than a page.
453 */ 508 */
454static int send_reply(struct svcxprt_rdma *rdma, 509static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
455 struct svc_rqst *rqstp, 510 __be32 *rdma_argp, __be32 *rdma_resp,
456 struct page *page, 511 struct svc_rqst *rqstp,
457 struct rpcrdma_msg *rdma_resp, 512 __be32 *wr_lst, __be32 *rp_ch)
458 struct svc_rdma_req_map *vec,
459 int byte_count,
460 u32 inv_rkey)
461{ 513{
462 struct svc_rdma_op_ctxt *ctxt; 514 struct svc_rdma_op_ctxt *ctxt;
463 struct ib_send_wr send_wr; 515 u32 inv_rkey;
464 u32 xdr_off; 516 int ret;
465 int sge_no; 517
466 int sge_bytes; 518 dprintk("svcrdma: sending %s reply: head=%zu, pagelen=%u, tail=%zu\n",
467 int page_no; 519 (rp_ch ? "RDMA_NOMSG" : "RDMA_MSG"),
468 int pages; 520 rqstp->rq_res.head[0].iov_len,
469 int ret = -EIO; 521 rqstp->rq_res.page_len,
470 522 rqstp->rq_res.tail[0].iov_len);
471 /* Prepare the context */ 523
472 ctxt = svc_rdma_get_context(rdma); 524 ctxt = svc_rdma_get_context(rdma);
473 ctxt->direction = DMA_TO_DEVICE;
474 ctxt->pages[0] = page;
475 ctxt->count = 1;
476 525
477 /* Prepare the SGE for the RPCRDMA Header */ 526 ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp,
478 ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey; 527 svc_rdma_reply_hdr_len(rdma_resp));
479 ctxt->sge[0].length = 528 if (ret < 0)
480 svc_rdma_xdr_get_reply_hdr_len((__be32 *)rdma_resp);
481 ctxt->sge[0].addr =
482 ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
483 ctxt->sge[0].length, DMA_TO_DEVICE);
484 if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
485 goto err; 529 goto err;
486 svc_rdma_count_mappings(rdma, ctxt);
487
488 ctxt->direction = DMA_TO_DEVICE;
489 530
490 /* Map the payload indicated by 'byte_count' */ 531 if (!rp_ch) {
491 xdr_off = 0; 532 ret = svc_rdma_map_reply_msg(rdma, ctxt,
492 for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) { 533 &rqstp->rq_res, wr_lst);
493 sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count); 534 if (ret < 0)
494 byte_count -= sge_bytes;
495 ctxt->sge[sge_no].addr =
496 dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
497 sge_bytes, DMA_TO_DEVICE);
498 xdr_off += sge_bytes;
499 if (ib_dma_mapping_error(rdma->sc_cm_id->device,
500 ctxt->sge[sge_no].addr))
501 goto err; 535 goto err;
502 svc_rdma_count_mappings(rdma, ctxt);
503 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
504 ctxt->sge[sge_no].length = sge_bytes;
505 } 536 }
506 if (byte_count != 0) { 537
507 pr_err("svcrdma: Could not map %d bytes\n", byte_count); 538 svc_rdma_save_io_pages(rqstp, ctxt);
539
540 inv_rkey = 0;
541 if (rdma->sc_snd_w_inv)
542 inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch);
543 ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, inv_rkey);
544 if (ret)
508 goto err; 545 goto err;
509 }
510 546
511 /* Save all respages in the ctxt and remove them from the 547 return 0;
512 * respages array. They are our pages until the I/O 548
513 * completes. 549err:
550 pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
551 svc_rdma_unmap_dma(ctxt);
552 svc_rdma_put_context(ctxt, 1);
553 return ret;
554}
555
556/* Given the client-provided Write and Reply chunks, the server was not
557 * able to form a complete reply. Return an RDMA_ERROR message so the
558 * client can retire this RPC transaction. As above, the Send completion
559 * routine releases payload pages that were part of a previous RDMA Write.
560 *
561 * Remote Invalidation is skipped for simplicity.
562 */
563static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
564 __be32 *rdma_resp, struct svc_rqst *rqstp)
565{
566 struct svc_rdma_op_ctxt *ctxt;
567 __be32 *p;
568 int ret;
569
570 ctxt = svc_rdma_get_context(rdma);
571
572 /* Replace the original transport header with an
573 * RDMA_ERROR response. XID etc are preserved.
514 */ 574 */
515 pages = rqstp->rq_next_page - rqstp->rq_respages; 575 p = rdma_resp + 3;
516 for (page_no = 0; page_no < pages; page_no++) { 576 *p++ = rdma_error;
517 ctxt->pages[page_no+1] = rqstp->rq_respages[page_no]; 577 *p = err_chunk;
518 ctxt->count++;
519 rqstp->rq_respages[page_no] = NULL;
520 }
521 rqstp->rq_next_page = rqstp->rq_respages + 1;
522 578
523 if (sge_no > rdma->sc_max_sge) { 579 ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, 20);
524 pr_err("svcrdma: Too many sges (%d)\n", sge_no); 580 if (ret < 0)
525 goto err; 581 goto err;
526 }
527 memset(&send_wr, 0, sizeof send_wr);
528 ctxt->cqe.done = svc_rdma_wc_send;
529 send_wr.wr_cqe = &ctxt->cqe;
530 send_wr.sg_list = ctxt->sge;
531 send_wr.num_sge = sge_no;
532 if (inv_rkey) {
533 send_wr.opcode = IB_WR_SEND_WITH_INV;
534 send_wr.ex.invalidate_rkey = inv_rkey;
535 } else
536 send_wr.opcode = IB_WR_SEND;
537 send_wr.send_flags = IB_SEND_SIGNALED;
538 582
539 ret = svc_rdma_send(rdma, &send_wr); 583 svc_rdma_save_io_pages(rqstp, ctxt);
584
585 ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, 0);
540 if (ret) 586 if (ret)
541 goto err; 587 goto err;
542 588
543 return 0; 589 return 0;
544 590
545 err: 591err:
592 pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
546 svc_rdma_unmap_dma(ctxt); 593 svc_rdma_unmap_dma(ctxt);
547 svc_rdma_put_context(ctxt, 1); 594 svc_rdma_put_context(ctxt, 1);
548 return ret; 595 return ret;
@@ -552,39 +599,36 @@ void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
552{ 599{
553} 600}
554 601
602/**
603 * svc_rdma_sendto - Transmit an RPC reply
604 * @rqstp: processed RPC request, reply XDR already in ::rq_res
605 *
606 * Any resources still associated with @rqstp are released upon return.
607 * If no reply message was possible, the connection is closed.
608 *
609 * Returns:
610 * %0 if an RPC reply has been successfully posted,
611 * %-ENOMEM if a resource shortage occurred (connection is lost),
612 * %-ENOTCONN if posting failed (connection is lost).
613 */
555int svc_rdma_sendto(struct svc_rqst *rqstp) 614int svc_rdma_sendto(struct svc_rqst *rqstp)
556{ 615{
557 struct svc_xprt *xprt = rqstp->rq_xprt; 616 struct svc_xprt *xprt = rqstp->rq_xprt;
558 struct svcxprt_rdma *rdma = 617 struct svcxprt_rdma *rdma =
559 container_of(xprt, struct svcxprt_rdma, sc_xprt); 618 container_of(xprt, struct svcxprt_rdma, sc_xprt);
560 struct rpcrdma_msg *rdma_argp; 619 __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
561 struct rpcrdma_msg *rdma_resp; 620 struct xdr_buf *xdr = &rqstp->rq_res;
562 struct rpcrdma_write_array *wr_ary, *rp_ary;
563 int ret;
564 int inline_bytes;
565 struct page *res_page; 621 struct page *res_page;
566 struct svc_rdma_req_map *vec; 622 int ret;
567 u32 inv_rkey;
568 __be32 *p;
569
570 dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
571 623
572 /* Get the RDMA request header. The receive logic always 624 /* Find the call's chunk lists to decide how to send the reply.
573 * places this at the start of page 0. 625 * Receive places the Call's xprt header at the start of page 0.
574 */ 626 */
575 rdma_argp = page_address(rqstp->rq_pages[0]); 627 rdma_argp = page_address(rqstp->rq_pages[0]);
576 svc_rdma_get_write_arrays(rdma_argp, &wr_ary, &rp_ary); 628 svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch);
577
578 inv_rkey = 0;
579 if (rdma->sc_snd_w_inv)
580 inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_ary, rp_ary);
581 629
582 /* Build an req vec for the XDR */ 630 dprintk("svcrdma: preparing response for XID 0x%08x\n",
583 vec = svc_rdma_get_req_map(rdma); 631 be32_to_cpup(rdma_argp));
584 ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
585 if (ret)
586 goto err0;
587 inline_bytes = rqstp->rq_res.len;
588 632
589 /* Create the RDMA response header. xprt->xpt_mutex, 633 /* Create the RDMA response header. xprt->xpt_mutex,
590 * acquired in svc_send(), serializes RPC replies. The 634 * acquired in svc_send(), serializes RPC replies. The
@@ -598,115 +642,57 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
598 goto err0; 642 goto err0;
599 rdma_resp = page_address(res_page); 643 rdma_resp = page_address(res_page);
600 644
601 p = &rdma_resp->rm_xid; 645 p = rdma_resp;
602 *p++ = rdma_argp->rm_xid; 646 *p++ = *rdma_argp;
603 *p++ = rdma_argp->rm_vers; 647 *p++ = *(rdma_argp + 1);
604 *p++ = rdma->sc_fc_credits; 648 *p++ = rdma->sc_fc_credits;
605 *p++ = rp_ary ? rdma_nomsg : rdma_msg; 649 *p++ = rp_ch ? rdma_nomsg : rdma_msg;
606 650
607 /* Start with empty chunks */ 651 /* Start with empty chunks */
608 *p++ = xdr_zero; 652 *p++ = xdr_zero;
609 *p++ = xdr_zero; 653 *p++ = xdr_zero;
610 *p = xdr_zero; 654 *p = xdr_zero;
611 655
612 /* Send any write-chunk data and build resp write-list */ 656 if (wr_lst) {
613 if (wr_ary) { 657 /* XXX: Presume the client sent only one Write chunk */
614 ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec); 658 ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr);
615 if (ret < 0) 659 if (ret < 0)
616 goto err1; 660 goto err2;
617 inline_bytes -= ret + xdr_padsize(ret); 661 svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
618 } 662 }
619 663 if (rp_ch) {
620 /* Send any reply-list data and update resp reply-list */ 664 ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
621 if (rp_ary) {
622 ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
623 if (ret < 0) 665 if (ret < 0)
624 goto err1; 666 goto err2;
625 inline_bytes -= ret; 667 svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
626 } 668 }
627 669
628 /* Post a fresh Receive buffer _before_ sending the reply */
629 ret = svc_rdma_post_recv(rdma, GFP_KERNEL); 670 ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
630 if (ret) 671 if (ret)
631 goto err1; 672 goto err1;
632 673 ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp,
633 ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec, 674 wr_lst, rp_ch);
634 inline_bytes, inv_rkey);
635 if (ret < 0) 675 if (ret < 0)
636 goto err0; 676 goto err0;
677 return 0;
637 678
638 svc_rdma_put_req_map(rdma, vec); 679 err2:
639 dprintk("svcrdma: send_reply returns %d\n", ret); 680 if (ret != -E2BIG)
640 return ret; 681 goto err1;
682
683 ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
684 if (ret)
685 goto err1;
686 ret = svc_rdma_send_error_msg(rdma, rdma_resp, rqstp);
687 if (ret < 0)
688 goto err0;
689 return 0;
641 690
642 err1: 691 err1:
643 put_page(res_page); 692 put_page(res_page);
644 err0: 693 err0:
645 svc_rdma_put_req_map(rdma, vec);
646 pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n", 694 pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n",
647 ret); 695 ret);
648 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 696 set_bit(XPT_CLOSE, &xprt->xpt_flags);
649 return -ENOTCONN; 697 return -ENOTCONN;
650} 698}
651
652void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
653 int status)
654{
655 struct ib_send_wr err_wr;
656 struct page *p;
657 struct svc_rdma_op_ctxt *ctxt;
658 enum rpcrdma_errcode err;
659 __be32 *va;
660 int length;
661 int ret;
662
663 ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
664 if (ret)
665 return;
666
667 p = alloc_page(GFP_KERNEL);
668 if (!p)
669 return;
670 va = page_address(p);
671
672 /* XDR encode an error reply */
673 err = ERR_CHUNK;
674 if (status == -EPROTONOSUPPORT)
675 err = ERR_VERS;
676 length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
677
678 ctxt = svc_rdma_get_context(xprt);
679 ctxt->direction = DMA_TO_DEVICE;
680 ctxt->count = 1;
681 ctxt->pages[0] = p;
682
683 /* Prepare SGE for local address */
684 ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
685 ctxt->sge[0].length = length;
686 ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
687 p, 0, length, DMA_TO_DEVICE);
688 if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
689 dprintk("svcrdma: Error mapping buffer for protocol error\n");
690 svc_rdma_put_context(ctxt, 1);
691 return;
692 }
693 svc_rdma_count_mappings(xprt, ctxt);
694
695 /* Prepare SEND WR */
696 memset(&err_wr, 0, sizeof(err_wr));
697 ctxt->cqe.done = svc_rdma_wc_send;
698 err_wr.wr_cqe = &ctxt->cqe;
699 err_wr.sg_list = ctxt->sge;
700 err_wr.num_sge = 1;
701 err_wr.opcode = IB_WR_SEND;
702 err_wr.send_flags = IB_SEND_SIGNALED;
703
704 /* Post It */
705 ret = svc_rdma_send(xprt, &err_wr);
706 if (ret) {
707 dprintk("svcrdma: Error %d posting send for protocol error\n",
708 ret);
709 svc_rdma_unmap_dma(ctxt);
710 svc_rdma_put_context(ctxt, 1);
711 }
712}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index fc8f14c7bfec..a9d9cb1ba4c6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -272,85 +272,6 @@ static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
272 } 272 }
273} 273}
274 274
275static struct svc_rdma_req_map *alloc_req_map(gfp_t flags)
276{
277 struct svc_rdma_req_map *map;
278
279 map = kmalloc(sizeof(*map), flags);
280 if (map)
281 INIT_LIST_HEAD(&map->free);
282 return map;
283}
284
285static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt)
286{
287 unsigned int i;
288
289 /* One for each receive buffer on this connection. */
290 i = xprt->sc_max_requests;
291
292 while (i--) {
293 struct svc_rdma_req_map *map;
294
295 map = alloc_req_map(GFP_KERNEL);
296 if (!map) {
297 dprintk("svcrdma: No memory for request map\n");
298 return false;
299 }
300 list_add(&map->free, &xprt->sc_maps);
301 }
302 return true;
303}
304
305struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt)
306{
307 struct svc_rdma_req_map *map = NULL;
308
309 spin_lock(&xprt->sc_map_lock);
310 if (list_empty(&xprt->sc_maps))
311 goto out_empty;
312
313 map = list_first_entry(&xprt->sc_maps,
314 struct svc_rdma_req_map, free);
315 list_del_init(&map->free);
316 spin_unlock(&xprt->sc_map_lock);
317
318out:
319 map->count = 0;
320 return map;
321
322out_empty:
323 spin_unlock(&xprt->sc_map_lock);
324
325 /* Pre-allocation amount was incorrect */
326 map = alloc_req_map(GFP_NOIO);
327 if (map)
328 goto out;
329
330 WARN_ONCE(1, "svcrdma: empty request map list?\n");
331 return NULL;
332}
333
334void svc_rdma_put_req_map(struct svcxprt_rdma *xprt,
335 struct svc_rdma_req_map *map)
336{
337 spin_lock(&xprt->sc_map_lock);
338 list_add(&map->free, &xprt->sc_maps);
339 spin_unlock(&xprt->sc_map_lock);
340}
341
342static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
343{
344 while (!list_empty(&xprt->sc_maps)) {
345 struct svc_rdma_req_map *map;
346
347 map = list_first_entry(&xprt->sc_maps,
348 struct svc_rdma_req_map, free);
349 list_del(&map->free);
350 kfree(map);
351 }
352}
353
354/* QP event handler */ 275/* QP event handler */
355static void qp_event_handler(struct ib_event *event, void *context) 276static void qp_event_handler(struct ib_event *event, void *context)
356{ 277{
@@ -474,24 +395,6 @@ void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
474} 395}
475 396
476/** 397/**
477 * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC
478 * @cq: completion queue
479 * @wc: completed WR
480 *
481 */
482void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc)
483{
484 struct ib_cqe *cqe = wc->wr_cqe;
485 struct svc_rdma_op_ctxt *ctxt;
486
487 svc_rdma_send_wc_common_put(cq, wc, "write");
488
489 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
490 svc_rdma_unmap_dma(ctxt);
491 svc_rdma_put_context(ctxt, 0);
492}
493
494/**
495 * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC 398 * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
496 * @cq: completion queue 399 * @cq: completion queue
497 * @wc: completed WR 400 * @wc: completed WR
@@ -561,14 +464,14 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
561 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 464 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
562 INIT_LIST_HEAD(&cma_xprt->sc_frmr_q); 465 INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
563 INIT_LIST_HEAD(&cma_xprt->sc_ctxts); 466 INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
564 INIT_LIST_HEAD(&cma_xprt->sc_maps); 467 INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
565 init_waitqueue_head(&cma_xprt->sc_send_wait); 468 init_waitqueue_head(&cma_xprt->sc_send_wait);
566 469
567 spin_lock_init(&cma_xprt->sc_lock); 470 spin_lock_init(&cma_xprt->sc_lock);
568 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 471 spin_lock_init(&cma_xprt->sc_rq_dto_lock);
569 spin_lock_init(&cma_xprt->sc_frmr_q_lock); 472 spin_lock_init(&cma_xprt->sc_frmr_q_lock);
570 spin_lock_init(&cma_xprt->sc_ctxt_lock); 473 spin_lock_init(&cma_xprt->sc_ctxt_lock);
571 spin_lock_init(&cma_xprt->sc_map_lock); 474 spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
572 475
573 /* 476 /*
574 * Note that this implies that the underlying transport support 477 * Note that this implies that the underlying transport support
@@ -999,6 +902,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
999 newxprt, newxprt->sc_cm_id); 902 newxprt, newxprt->sc_cm_id);
1000 903
1001 dev = newxprt->sc_cm_id->device; 904 dev = newxprt->sc_cm_id->device;
905 newxprt->sc_port_num = newxprt->sc_cm_id->port_num;
1002 906
1003 /* Qualify the transport resource defaults with the 907 /* Qualify the transport resource defaults with the
1004 * capabilities of this particular device */ 908 * capabilities of this particular device */
@@ -1014,13 +918,11 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
1014 svcrdma_max_bc_requests); 918 svcrdma_max_bc_requests);
1015 newxprt->sc_rq_depth = newxprt->sc_max_requests + 919 newxprt->sc_rq_depth = newxprt->sc_max_requests +
1016 newxprt->sc_max_bc_requests; 920 newxprt->sc_max_bc_requests;
1017 newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth; 921 newxprt->sc_sq_depth = newxprt->sc_rq_depth;
1018 atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); 922 atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
1019 923
1020 if (!svc_rdma_prealloc_ctxts(newxprt)) 924 if (!svc_rdma_prealloc_ctxts(newxprt))
1021 goto errout; 925 goto errout;
1022 if (!svc_rdma_prealloc_maps(newxprt))
1023 goto errout;
1024 926
1025 /* 927 /*
1026 * Limit ORD based on client limit, local device limit, and 928 * Limit ORD based on client limit, local device limit, and
@@ -1050,6 +952,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
1050 memset(&qp_attr, 0, sizeof qp_attr); 952 memset(&qp_attr, 0, sizeof qp_attr);
1051 qp_attr.event_handler = qp_event_handler; 953 qp_attr.event_handler = qp_event_handler;
1052 qp_attr.qp_context = &newxprt->sc_xprt; 954 qp_attr.qp_context = &newxprt->sc_xprt;
955 qp_attr.port_num = newxprt->sc_cm_id->port_num;
956 qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests;
1053 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; 957 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
1054 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; 958 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
1055 qp_attr.cap.max_send_sge = newxprt->sc_max_sge; 959 qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
@@ -1248,8 +1152,8 @@ static void __svc_rdma_free(struct work_struct *work)
1248 } 1152 }
1249 1153
1250 rdma_dealloc_frmr_q(rdma); 1154 rdma_dealloc_frmr_q(rdma);
1155 svc_rdma_destroy_rw_ctxts(rdma);
1251 svc_rdma_destroy_ctxts(rdma); 1156 svc_rdma_destroy_ctxts(rdma);
1252 svc_rdma_destroy_maps(rdma);
1253 1157
1254 /* Destroy the QP if present (not a listener) */ 1158 /* Destroy the QP if present (not a listener) */
1255 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 1159 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))