aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/nfs/blocklayout/blocklayout.c7
-rw-r--r--fs/nfs/callback.c40
-rw-r--r--fs/nfs/callback.h12
-rw-r--r--fs/nfs/callback_proc.c2
-rw-r--r--fs/nfs/callback_xdr.c39
-rw-r--r--fs/nfs/client.c1
-rw-r--r--fs/nfs/delegation.c6
-rw-r--r--fs/nfs/dir.c3
-rw-r--r--fs/nfs/flexfilelayout/flexfilelayout.c40
-rw-r--r--fs/nfs/flexfilelayout/flexfilelayout.h7
-rw-r--r--fs/nfs/mount_clnt.c4
-rw-r--r--fs/nfs/nfs42.h1
-rw-r--r--fs/nfs/nfs42proc.c71
-rw-r--r--fs/nfs/nfs42xdr.c97
-rw-r--r--fs/nfs/nfs4_fs.h6
-rw-r--r--fs/nfs/nfs4file.c136
-rw-r--r--fs/nfs/nfs4proc.c182
-rw-r--r--fs/nfs/nfs4xdr.c53
-rw-r--r--fs/nfs/nfsroot.c2
-rw-r--r--fs/nfs/pnfs.c12
-rw-r--r--fs/nfs/read.c9
-rw-r--r--fs/nfs/super.c2
-rw-r--r--fs/nfs/write.c7
-rw-r--r--include/linux/nfs4.h3
-rw-r--r--include/linux/nfs_fs_sb.h2
-rw-r--r--include/linux/nfs_xdr.h28
-rw-r--r--include/linux/sunrpc/bc_xprt.h5
-rw-r--r--include/linux/sunrpc/svc_rdma.h6
-rw-r--r--include/linux/sunrpc/xprt.h9
-rw-r--r--include/linux/sunrpc/xprtsock.h2
-rw-r--r--include/uapi/linux/nfs.h13
-rw-r--r--net/sunrpc/backchannel_rqst.c24
-rw-r--r--net/sunrpc/svc.c5
-rw-r--r--net/sunrpc/sysctl.c23
-rw-r--r--net/sunrpc/xprtrdma/Makefile1
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c394
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c7
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c148
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c6
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c58
-rw-r--r--net/sunrpc/xprtrdma/transport.c18
-rw-r--r--net/sunrpc/xprtrdma/verbs.c479
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h54
-rw-r--r--net/sunrpc/xprtsock.c260
44 files changed, 1687 insertions, 597 deletions
diff --git a/fs/nfs/blocklayout/blocklayout.c b/fs/nfs/blocklayout/blocklayout.c
index 9cd4eb3a1e22..ddd0138f410c 100644
--- a/fs/nfs/blocklayout/blocklayout.c
+++ b/fs/nfs/blocklayout/blocklayout.c
@@ -229,7 +229,7 @@ bl_read_pagelist(struct nfs_pgio_header *header)
229 struct parallel_io *par; 229 struct parallel_io *par;
230 loff_t f_offset = header->args.offset; 230 loff_t f_offset = header->args.offset;
231 size_t bytes_left = header->args.count; 231 size_t bytes_left = header->args.count;
232 unsigned int pg_offset, pg_len; 232 unsigned int pg_offset = header->args.pgbase, pg_len;
233 struct page **pages = header->args.pages; 233 struct page **pages = header->args.pages;
234 int pg_index = header->args.pgbase >> PAGE_CACHE_SHIFT; 234 int pg_index = header->args.pgbase >> PAGE_CACHE_SHIFT;
235 const bool is_dio = (header->dreq != NULL); 235 const bool is_dio = (header->dreq != NULL);
@@ -262,7 +262,6 @@ bl_read_pagelist(struct nfs_pgio_header *header)
262 extent_length = be.be_length - (isect - be.be_f_offset); 262 extent_length = be.be_length - (isect - be.be_f_offset);
263 } 263 }
264 264
265 pg_offset = f_offset & ~PAGE_CACHE_MASK;
266 if (is_dio) { 265 if (is_dio) {
267 if (pg_offset + bytes_left > PAGE_CACHE_SIZE) 266 if (pg_offset + bytes_left > PAGE_CACHE_SIZE)
268 pg_len = PAGE_CACHE_SIZE - pg_offset; 267 pg_len = PAGE_CACHE_SIZE - pg_offset;
@@ -273,9 +272,6 @@ bl_read_pagelist(struct nfs_pgio_header *header)
273 pg_len = PAGE_CACHE_SIZE; 272 pg_len = PAGE_CACHE_SIZE;
274 } 273 }
275 274
276 isect += (pg_offset >> SECTOR_SHIFT);
277 extent_length -= (pg_offset >> SECTOR_SHIFT);
278
279 if (is_hole(&be)) { 275 if (is_hole(&be)) {
280 bio = bl_submit_bio(READ, bio); 276 bio = bl_submit_bio(READ, bio);
281 /* Fill hole w/ zeroes w/o accessing device */ 277 /* Fill hole w/ zeroes w/o accessing device */
@@ -301,6 +297,7 @@ bl_read_pagelist(struct nfs_pgio_header *header)
301 extent_length -= (pg_len >> SECTOR_SHIFT); 297 extent_length -= (pg_len >> SECTOR_SHIFT);
302 f_offset += pg_len; 298 f_offset += pg_len;
303 bytes_left -= pg_len; 299 bytes_left -= pg_len;
300 pg_offset = 0;
304 } 301 }
305 if ((isect << SECTOR_SHIFT) >= header->inode->i_size) { 302 if ((isect << SECTOR_SHIFT) >= header->inode->i_size) {
306 header->res.eof = 1; 303 header->res.eof = 1;
diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c
index 75f7c0a7538a..a7f2e6e33305 100644
--- a/fs/nfs/callback.c
+++ b/fs/nfs/callback.c
@@ -99,17 +99,6 @@ nfs4_callback_up(struct svc_serv *serv)
99} 99}
100 100
101#if defined(CONFIG_NFS_V4_1) 101#if defined(CONFIG_NFS_V4_1)
102static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net)
103{
104 /*
105 * Create an svc_sock for the back channel service that shares the
106 * fore channel connection.
107 * Returns the input port (0) and sets the svc_serv bc_xprt on success
108 */
109 return svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
110 SVC_SOCK_ANONYMOUS);
111}
112
113/* 102/*
114 * The callback service for NFSv4.1 callbacks 103 * The callback service for NFSv4.1 callbacks
115 */ 104 */
@@ -184,11 +173,6 @@ static inline void nfs_callback_bc_serv(u32 minorversion, struct rpc_xprt *xprt,
184 xprt->bc_serv = serv; 173 xprt->bc_serv = serv;
185} 174}
186#else 175#else
187static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net)
188{
189 return 0;
190}
191
192static void nfs_minorversion_callback_svc_setup(struct svc_serv *serv, 176static void nfs_minorversion_callback_svc_setup(struct svc_serv *serv,
193 struct svc_rqst **rqstpp, int (**callback_svc)(void *vrqstp)) 177 struct svc_rqst **rqstpp, int (**callback_svc)(void *vrqstp))
194{ 178{
@@ -259,7 +243,8 @@ static void nfs_callback_down_net(u32 minorversion, struct svc_serv *serv, struc
259 svc_shutdown_net(serv, net); 243 svc_shutdown_net(serv, net);
260} 244}
261 245
262static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct net *net) 246static int nfs_callback_up_net(int minorversion, struct svc_serv *serv,
247 struct net *net, struct rpc_xprt *xprt)
263{ 248{
264 struct nfs_net *nn = net_generic(net, nfs_net_id); 249 struct nfs_net *nn = net_generic(net, nfs_net_id);
265 int ret; 250 int ret;
@@ -275,20 +260,11 @@ static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct n
275 goto err_bind; 260 goto err_bind;
276 } 261 }
277 262
278 switch (minorversion) { 263 ret = -EPROTONOSUPPORT;
279 case 0: 264 if (minorversion == 0)
280 ret = nfs4_callback_up_net(serv, net); 265 ret = nfs4_callback_up_net(serv, net);
281 break; 266 else if (xprt->ops->bc_up)
282 case 1: 267 ret = xprt->ops->bc_up(serv, net);
283 case 2:
284 ret = nfs41_callback_up_net(serv, net);
285 break;
286 default:
287 printk(KERN_ERR "NFS: unknown callback version: %d\n",
288 minorversion);
289 ret = -EINVAL;
290 break;
291 }
292 268
293 if (ret < 0) { 269 if (ret < 0) {
294 printk(KERN_ERR "NFS: callback service start failed\n"); 270 printk(KERN_ERR "NFS: callback service start failed\n");
@@ -364,7 +340,7 @@ int nfs_callback_up(u32 minorversion, struct rpc_xprt *xprt)
364 goto err_create; 340 goto err_create;
365 } 341 }
366 342
367 ret = nfs_callback_up_net(minorversion, serv, net); 343 ret = nfs_callback_up_net(minorversion, serv, net, xprt);
368 if (ret < 0) 344 if (ret < 0)
369 goto err_net; 345 goto err_net;
370 346
diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h
index 84326e9fb47a..ff8195bd75ea 100644
--- a/fs/nfs/callback.h
+++ b/fs/nfs/callback.h
@@ -61,7 +61,6 @@ struct cb_compound_hdr_res {
61}; 61};
62 62
63struct cb_getattrargs { 63struct cb_getattrargs {
64 struct sockaddr *addr;
65 struct nfs_fh fh; 64 struct nfs_fh fh;
66 uint32_t bitmap[2]; 65 uint32_t bitmap[2];
67}; 66};
@@ -76,7 +75,6 @@ struct cb_getattrres {
76}; 75};
77 76
78struct cb_recallargs { 77struct cb_recallargs {
79 struct sockaddr *addr;
80 struct nfs_fh fh; 78 struct nfs_fh fh;
81 nfs4_stateid stateid; 79 nfs4_stateid stateid;
82 uint32_t truncate; 80 uint32_t truncate;
@@ -119,9 +117,6 @@ extern __be32 nfs4_callback_sequence(struct cb_sequenceargs *args,
119 struct cb_sequenceres *res, 117 struct cb_sequenceres *res,
120 struct cb_process_state *cps); 118 struct cb_process_state *cps);
121 119
122extern int nfs41_validate_delegation_stateid(struct nfs_delegation *delegation,
123 const nfs4_stateid *stateid);
124
125#define RCA4_TYPE_MASK_RDATA_DLG 0 120#define RCA4_TYPE_MASK_RDATA_DLG 0
126#define RCA4_TYPE_MASK_WDATA_DLG 1 121#define RCA4_TYPE_MASK_WDATA_DLG 1
127#define RCA4_TYPE_MASK_DIR_DLG 2 122#define RCA4_TYPE_MASK_DIR_DLG 2
@@ -134,7 +129,6 @@ extern int nfs41_validate_delegation_stateid(struct nfs_delegation *delegation,
134#define RCA4_TYPE_MASK_ALL 0xf31f 129#define RCA4_TYPE_MASK_ALL 0xf31f
135 130
136struct cb_recallanyargs { 131struct cb_recallanyargs {
137 struct sockaddr *craa_addr;
138 uint32_t craa_objs_to_keep; 132 uint32_t craa_objs_to_keep;
139 uint32_t craa_type_mask; 133 uint32_t craa_type_mask;
140}; 134};
@@ -144,7 +138,6 @@ extern __be32 nfs4_callback_recallany(struct cb_recallanyargs *args,
144 struct cb_process_state *cps); 138 struct cb_process_state *cps);
145 139
146struct cb_recallslotargs { 140struct cb_recallslotargs {
147 struct sockaddr *crsa_addr;
148 uint32_t crsa_target_highest_slotid; 141 uint32_t crsa_target_highest_slotid;
149}; 142};
150extern __be32 nfs4_callback_recallslot(struct cb_recallslotargs *args, 143extern __be32 nfs4_callback_recallslot(struct cb_recallslotargs *args,
@@ -152,7 +145,6 @@ extern __be32 nfs4_callback_recallslot(struct cb_recallslotargs *args,
152 struct cb_process_state *cps); 145 struct cb_process_state *cps);
153 146
154struct cb_layoutrecallargs { 147struct cb_layoutrecallargs {
155 struct sockaddr *cbl_addr;
156 uint32_t cbl_recall_type; 148 uint32_t cbl_recall_type;
157 uint32_t cbl_layout_type; 149 uint32_t cbl_layout_type;
158 uint32_t cbl_layoutchanged; 150 uint32_t cbl_layoutchanged;
@@ -196,9 +188,6 @@ extern __be32 nfs4_callback_recall(struct cb_recallargs *args, void *dummy,
196#if IS_ENABLED(CONFIG_NFS_V4) 188#if IS_ENABLED(CONFIG_NFS_V4)
197extern int nfs_callback_up(u32 minorversion, struct rpc_xprt *xprt); 189extern int nfs_callback_up(u32 minorversion, struct rpc_xprt *xprt);
198extern void nfs_callback_down(int minorversion, struct net *net); 190extern void nfs_callback_down(int minorversion, struct net *net);
199extern int nfs4_validate_delegation_stateid(struct nfs_delegation *delegation,
200 const nfs4_stateid *stateid);
201extern int nfs4_set_callback_sessionid(struct nfs_client *clp);
202#endif /* CONFIG_NFS_V4 */ 191#endif /* CONFIG_NFS_V4 */
203/* 192/*
204 * nfs41: Callbacks are expected to not cause substantial latency, 193 * nfs41: Callbacks are expected to not cause substantial latency,
@@ -209,6 +198,5 @@ extern int nfs4_set_callback_sessionid(struct nfs_client *clp);
209#define NFS41_BC_MAX_CALLBACKS 1 198#define NFS41_BC_MAX_CALLBACKS 1
210 199
211extern unsigned int nfs_callback_set_tcpport; 200extern unsigned int nfs_callback_set_tcpport;
212extern unsigned short nfs_callback_tcpport;
213 201
214#endif /* __LINUX_FS_NFS_CALLBACK_H */ 202#endif /* __LINUX_FS_NFS_CALLBACK_H */
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
index b85cf7a30232..807eb6ef4f91 100644
--- a/fs/nfs/callback_proc.c
+++ b/fs/nfs/callback_proc.c
@@ -17,9 +17,7 @@
17#include "nfs4session.h" 17#include "nfs4session.h"
18#include "nfs4trace.h" 18#include "nfs4trace.h"
19 19
20#ifdef NFS_DEBUG
21#define NFSDBG_FACILITY NFSDBG_CALLBACK 20#define NFSDBG_FACILITY NFSDBG_CALLBACK
22#endif
23 21
24__be32 nfs4_callback_getattr(struct cb_getattrargs *args, 22__be32 nfs4_callback_getattr(struct cb_getattrargs *args,
25 struct cb_getattrres *res, 23 struct cb_getattrres *res,
diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c
index 6b1697a01dde..646cdac73488 100644
--- a/fs/nfs/callback_xdr.c
+++ b/fs/nfs/callback_xdr.c
@@ -18,19 +18,21 @@
18#include "internal.h" 18#include "internal.h"
19#include "nfs4session.h" 19#include "nfs4session.h"
20 20
21#define CB_OP_TAGLEN_MAXSZ (512) 21#define CB_OP_TAGLEN_MAXSZ (512)
22#define CB_OP_HDR_RES_MAXSZ (2 + CB_OP_TAGLEN_MAXSZ) 22#define CB_OP_HDR_RES_MAXSZ (2 * 4) // opcode, status
23#define CB_OP_GETATTR_BITMAP_MAXSZ (4) 23#define CB_OP_GETATTR_BITMAP_MAXSZ (4 * 4) // bitmap length, 3 bitmaps
24#define CB_OP_GETATTR_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ + \ 24#define CB_OP_GETATTR_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ + \
25 CB_OP_GETATTR_BITMAP_MAXSZ + \ 25 CB_OP_GETATTR_BITMAP_MAXSZ + \
26 2 + 2 + 3 + 3) 26 /* change, size, ctime, mtime */\
27#define CB_OP_RECALL_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ) 27 (2 + 2 + 3 + 3) * 4)
28#define CB_OP_RECALL_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ)
28 29
29#if defined(CONFIG_NFS_V4_1) 30#if defined(CONFIG_NFS_V4_1)
30#define CB_OP_LAYOUTRECALL_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ) 31#define CB_OP_LAYOUTRECALL_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ)
31#define CB_OP_DEVICENOTIFY_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ) 32#define CB_OP_DEVICENOTIFY_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ)
32#define CB_OP_SEQUENCE_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ + \ 33#define CB_OP_SEQUENCE_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ + \
33 4 + 1 + 3) 34 NFS4_MAX_SESSIONID_LEN + \
35 (1 + 3) * 4) // seqid, 3 slotids
34#define CB_OP_RECALLANY_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ) 36#define CB_OP_RECALLANY_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ)
35#define CB_OP_RECALLSLOT_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ) 37#define CB_OP_RECALLSLOT_RES_MAXSZ (CB_OP_HDR_RES_MAXSZ)
36#endif /* CONFIG_NFS_V4_1 */ 38#endif /* CONFIG_NFS_V4_1 */
@@ -157,7 +159,7 @@ static __be32 decode_compound_hdr_arg(struct xdr_stream *xdr, struct cb_compound
157 if (unlikely(status != 0)) 159 if (unlikely(status != 0))
158 return status; 160 return status;
159 /* We do not like overly long tags! */ 161 /* We do not like overly long tags! */
160 if (hdr->taglen > CB_OP_TAGLEN_MAXSZ - 12) { 162 if (hdr->taglen > CB_OP_TAGLEN_MAXSZ) {
161 printk("NFS: NFSv4 CALLBACK %s: client sent tag of length %u\n", 163 printk("NFS: NFSv4 CALLBACK %s: client sent tag of length %u\n",
162 __func__, hdr->taglen); 164 __func__, hdr->taglen);
163 return htonl(NFS4ERR_RESOURCE); 165 return htonl(NFS4ERR_RESOURCE);
@@ -198,7 +200,6 @@ static __be32 decode_getattr_args(struct svc_rqst *rqstp, struct xdr_stream *xdr
198 status = decode_fh(xdr, &args->fh); 200 status = decode_fh(xdr, &args->fh);
199 if (unlikely(status != 0)) 201 if (unlikely(status != 0))
200 goto out; 202 goto out;
201 args->addr = svc_addr(rqstp);
202 status = decode_bitmap(xdr, args->bitmap); 203 status = decode_bitmap(xdr, args->bitmap);
203out: 204out:
204 dprintk("%s: exit with status = %d\n", __func__, ntohl(status)); 205 dprintk("%s: exit with status = %d\n", __func__, ntohl(status));
@@ -210,7 +211,6 @@ static __be32 decode_recall_args(struct svc_rqst *rqstp, struct xdr_stream *xdr,
210 __be32 *p; 211 __be32 *p;
211 __be32 status; 212 __be32 status;
212 213
213 args->addr = svc_addr(rqstp);
214 status = decode_stateid(xdr, &args->stateid); 214 status = decode_stateid(xdr, &args->stateid);
215 if (unlikely(status != 0)) 215 if (unlikely(status != 0))
216 goto out; 216 goto out;
@@ -236,7 +236,6 @@ static __be32 decode_layoutrecall_args(struct svc_rqst *rqstp,
236 __be32 status = 0; 236 __be32 status = 0;
237 uint32_t iomode; 237 uint32_t iomode;
238 238
239 args->cbl_addr = svc_addr(rqstp);
240 p = read_buf(xdr, 4 * sizeof(uint32_t)); 239 p = read_buf(xdr, 4 * sizeof(uint32_t));
241 if (unlikely(p == NULL)) { 240 if (unlikely(p == NULL)) {
242 status = htonl(NFS4ERR_BADXDR); 241 status = htonl(NFS4ERR_BADXDR);
@@ -383,13 +382,12 @@ static __be32 decode_sessionid(struct xdr_stream *xdr,
383 struct nfs4_sessionid *sid) 382 struct nfs4_sessionid *sid)
384{ 383{
385 __be32 *p; 384 __be32 *p;
386 int len = NFS4_MAX_SESSIONID_LEN;
387 385
388 p = read_buf(xdr, len); 386 p = read_buf(xdr, NFS4_MAX_SESSIONID_LEN);
389 if (unlikely(p == NULL)) 387 if (unlikely(p == NULL))
390 return htonl(NFS4ERR_RESOURCE); 388 return htonl(NFS4ERR_RESOURCE);
391 389
392 memcpy(sid->data, p, len); 390 memcpy(sid->data, p, NFS4_MAX_SESSIONID_LEN);
393 return 0; 391 return 0;
394} 392}
395 393
@@ -500,7 +498,6 @@ static __be32 decode_recallany_args(struct svc_rqst *rqstp,
500 uint32_t bitmap[2]; 498 uint32_t bitmap[2];
501 __be32 *p, status; 499 __be32 *p, status;
502 500
503 args->craa_addr = svc_addr(rqstp);
504 p = read_buf(xdr, 4); 501 p = read_buf(xdr, 4);
505 if (unlikely(p == NULL)) 502 if (unlikely(p == NULL))
506 return htonl(NFS4ERR_BADXDR); 503 return htonl(NFS4ERR_BADXDR);
@@ -519,7 +516,6 @@ static __be32 decode_recallslot_args(struct svc_rqst *rqstp,
519{ 516{
520 __be32 *p; 517 __be32 *p;
521 518
522 args->crsa_addr = svc_addr(rqstp);
523 p = read_buf(xdr, 4); 519 p = read_buf(xdr, 4);
524 if (unlikely(p == NULL)) 520 if (unlikely(p == NULL))
525 return htonl(NFS4ERR_BADXDR); 521 return htonl(NFS4ERR_BADXDR);
@@ -684,13 +680,12 @@ static __be32 encode_sessionid(struct xdr_stream *xdr,
684 const struct nfs4_sessionid *sid) 680 const struct nfs4_sessionid *sid)
685{ 681{
686 __be32 *p; 682 __be32 *p;
687 int len = NFS4_MAX_SESSIONID_LEN;
688 683
689 p = xdr_reserve_space(xdr, len); 684 p = xdr_reserve_space(xdr, NFS4_MAX_SESSIONID_LEN);
690 if (unlikely(p == NULL)) 685 if (unlikely(p == NULL))
691 return htonl(NFS4ERR_RESOURCE); 686 return htonl(NFS4ERR_RESOURCE);
692 687
693 memcpy(p, sid, len); 688 memcpy(p, sid, NFS4_MAX_SESSIONID_LEN);
694 return 0; 689 return 0;
695} 690}
696 691
@@ -704,7 +699,9 @@ static __be32 encode_cb_sequence_res(struct svc_rqst *rqstp,
704 if (unlikely(status != 0)) 699 if (unlikely(status != 0))
705 goto out; 700 goto out;
706 701
707 encode_sessionid(xdr, &res->csr_sessionid); 702 status = encode_sessionid(xdr, &res->csr_sessionid);
703 if (status)
704 goto out;
708 705
709 p = xdr_reserve_space(xdr, 4 * sizeof(uint32_t)); 706 p = xdr_reserve_space(xdr, 4 * sizeof(uint32_t));
710 if (unlikely(p == NULL)) 707 if (unlikely(p == NULL))
diff --git a/fs/nfs/client.c b/fs/nfs/client.c
index 57c5a02f6213..d6d5d2a48e83 100644
--- a/fs/nfs/client.c
+++ b/fs/nfs/client.c
@@ -764,6 +764,7 @@ static void nfs_server_set_fsinfo(struct nfs_server *server,
764 764
765 server->time_delta = fsinfo->time_delta; 765 server->time_delta = fsinfo->time_delta;
766 766
767 server->clone_blksize = fsinfo->clone_blksize;
767 /* We're airborne Set socket buffersize */ 768 /* We're airborne Set socket buffersize */
768 rpc_setbufsize(server->client, server->wsize + 100, server->rsize + 100); 769 rpc_setbufsize(server->client, server->wsize + 100, server->rsize + 100);
769} 770}
diff --git a/fs/nfs/delegation.c b/fs/nfs/delegation.c
index be806ead7f4d..5166adcfc0fb 100644
--- a/fs/nfs/delegation.c
+++ b/fs/nfs/delegation.c
@@ -721,14 +721,12 @@ int nfs_async_inode_return_delegation(struct inode *inode,
721 struct nfs_client *clp = server->nfs_client; 721 struct nfs_client *clp = server->nfs_client;
722 struct nfs_delegation *delegation; 722 struct nfs_delegation *delegation;
723 723
724 filemap_flush(inode->i_mapping);
725
726 rcu_read_lock(); 724 rcu_read_lock();
727 delegation = rcu_dereference(NFS_I(inode)->delegation); 725 delegation = rcu_dereference(NFS_I(inode)->delegation);
728 if (delegation == NULL) 726 if (delegation == NULL)
729 goto out_enoent; 727 goto out_enoent;
730 728 if (stateid != NULL &&
731 if (!clp->cl_mvops->match_stateid(&delegation->stateid, stateid)) 729 !clp->cl_mvops->match_stateid(&delegation->stateid, stateid))
732 goto out_enoent; 730 goto out_enoent;
733 nfs_mark_return_delegation(server, delegation); 731 nfs_mark_return_delegation(server, delegation);
734 rcu_read_unlock(); 732 rcu_read_unlock();
diff --git a/fs/nfs/dir.c b/fs/nfs/dir.c
index 3d8e4ffa0a33..ce5a21861074 100644
--- a/fs/nfs/dir.c
+++ b/fs/nfs/dir.c
@@ -1714,9 +1714,6 @@ nfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode, dev_t rdev)
1714 dfprintk(VFS, "NFS: mknod(%s/%lu), %pd\n", 1714 dfprintk(VFS, "NFS: mknod(%s/%lu), %pd\n",
1715 dir->i_sb->s_id, dir->i_ino, dentry); 1715 dir->i_sb->s_id, dir->i_ino, dentry);
1716 1716
1717 if (!new_valid_dev(rdev))
1718 return -EINVAL;
1719
1720 attr.ia_mode = mode; 1717 attr.ia_mode = mode;
1721 attr.ia_valid = ATTR_MODE; 1718 attr.ia_valid = ATTR_MODE;
1722 1719
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c
index fbc5a56de875..03516c80855a 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.c
+++ b/fs/nfs/flexfilelayout/flexfilelayout.c
@@ -339,6 +339,19 @@ static void ff_layout_sort_mirrors(struct nfs4_ff_layout_segment *fls)
339 } 339 }
340} 340}
341 341
342static void ff_layout_mark_devices_valid(struct nfs4_ff_layout_segment *fls)
343{
344 struct nfs4_deviceid_node *node;
345 int i;
346
347 if (!(fls->flags & FF_FLAGS_NO_IO_THRU_MDS))
348 return;
349 for (i = 0; i < fls->mirror_array_cnt; i++) {
350 node = &fls->mirror_array[i]->mirror_ds->id_node;
351 clear_bit(NFS_DEVICEID_UNAVAILABLE, &node->flags);
352 }
353}
354
342static struct pnfs_layout_segment * 355static struct pnfs_layout_segment *
343ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh, 356ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh,
344 struct nfs4_layoutget_res *lgr, 357 struct nfs4_layoutget_res *lgr,
@@ -499,6 +512,7 @@ ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh,
499 rc = ff_layout_check_layout(lgr); 512 rc = ff_layout_check_layout(lgr);
500 if (rc) 513 if (rc)
501 goto out_err_free; 514 goto out_err_free;
515 ff_layout_mark_devices_valid(fls);
502 516
503 ret = &fls->generic_hdr; 517 ret = &fls->generic_hdr;
504 dprintk("<-- %s (success)\n", __func__); 518 dprintk("<-- %s (success)\n", __func__);
@@ -741,17 +755,17 @@ ff_layout_alloc_commit_info(struct pnfs_layout_segment *lseg,
741} 755}
742 756
743static struct nfs4_pnfs_ds * 757static struct nfs4_pnfs_ds *
744ff_layout_choose_best_ds_for_read(struct nfs_pageio_descriptor *pgio, 758ff_layout_choose_best_ds_for_read(struct pnfs_layout_segment *lseg,
759 int start_idx,
745 int *best_idx) 760 int *best_idx)
746{ 761{
747 struct nfs4_ff_layout_segment *fls; 762 struct nfs4_ff_layout_segment *fls = FF_LAYOUT_LSEG(lseg);
748 struct nfs4_pnfs_ds *ds; 763 struct nfs4_pnfs_ds *ds;
749 int idx; 764 int idx;
750 765
751 fls = FF_LAYOUT_LSEG(pgio->pg_lseg);
752 /* mirrors are sorted by efficiency */ 766 /* mirrors are sorted by efficiency */
753 for (idx = 0; idx < fls->mirror_array_cnt; idx++) { 767 for (idx = start_idx; idx < fls->mirror_array_cnt; idx++) {
754 ds = nfs4_ff_layout_prepare_ds(pgio->pg_lseg, idx, false); 768 ds = nfs4_ff_layout_prepare_ds(lseg, idx, false);
755 if (ds) { 769 if (ds) {
756 *best_idx = idx; 770 *best_idx = idx;
757 return ds; 771 return ds;
@@ -782,7 +796,7 @@ ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio,
782 if (pgio->pg_lseg == NULL) 796 if (pgio->pg_lseg == NULL)
783 goto out_mds; 797 goto out_mds;
784 798
785 ds = ff_layout_choose_best_ds_for_read(pgio, &ds_idx); 799 ds = ff_layout_choose_best_ds_for_read(pgio->pg_lseg, 0, &ds_idx);
786 if (!ds) 800 if (!ds)
787 goto out_mds; 801 goto out_mds;
788 mirror = FF_LAYOUT_COMP(pgio->pg_lseg, ds_idx); 802 mirror = FF_LAYOUT_COMP(pgio->pg_lseg, ds_idx);
@@ -1035,7 +1049,8 @@ static int ff_layout_async_handle_error_v4(struct rpc_task *task,
1035 rpc_wake_up(&tbl->slot_tbl_waitq); 1049 rpc_wake_up(&tbl->slot_tbl_waitq);
1036 /* fall through */ 1050 /* fall through */
1037 default: 1051 default:
1038 if (ff_layout_has_available_ds(lseg)) 1052 if (ff_layout_no_fallback_to_mds(lseg) ||
1053 ff_layout_has_available_ds(lseg))
1039 return -NFS4ERR_RESET_TO_PNFS; 1054 return -NFS4ERR_RESET_TO_PNFS;
1040reset: 1055reset:
1041 dprintk("%s Retry through MDS. Error %d\n", __func__, 1056 dprintk("%s Retry through MDS. Error %d\n", __func__,
@@ -1153,7 +1168,6 @@ static void ff_layout_io_track_ds_error(struct pnfs_layout_segment *lseg,
1153} 1168}
1154 1169
1155/* NFS_PROTO call done callback routines */ 1170/* NFS_PROTO call done callback routines */
1156
1157static int ff_layout_read_done_cb(struct rpc_task *task, 1171static int ff_layout_read_done_cb(struct rpc_task *task,
1158 struct nfs_pgio_header *hdr) 1172 struct nfs_pgio_header *hdr)
1159{ 1173{
@@ -1171,6 +1185,10 @@ static int ff_layout_read_done_cb(struct rpc_task *task,
1171 1185
1172 switch (err) { 1186 switch (err) {
1173 case -NFS4ERR_RESET_TO_PNFS: 1187 case -NFS4ERR_RESET_TO_PNFS:
1188 if (ff_layout_choose_best_ds_for_read(hdr->lseg,
1189 hdr->pgio_mirror_idx + 1,
1190 &hdr->pgio_mirror_idx))
1191 goto out_eagain;
1174 set_bit(NFS_LAYOUT_RETURN_BEFORE_CLOSE, 1192 set_bit(NFS_LAYOUT_RETURN_BEFORE_CLOSE,
1175 &hdr->lseg->pls_layout->plh_flags); 1193 &hdr->lseg->pls_layout->plh_flags);
1176 pnfs_read_resend_pnfs(hdr); 1194 pnfs_read_resend_pnfs(hdr);
@@ -1179,11 +1197,13 @@ static int ff_layout_read_done_cb(struct rpc_task *task,
1179 ff_layout_reset_read(hdr); 1197 ff_layout_reset_read(hdr);
1180 return task->tk_status; 1198 return task->tk_status;
1181 case -EAGAIN: 1199 case -EAGAIN:
1182 rpc_restart_call_prepare(task); 1200 goto out_eagain;
1183 return -EAGAIN;
1184 } 1201 }
1185 1202
1186 return 0; 1203 return 0;
1204out_eagain:
1205 rpc_restart_call_prepare(task);
1206 return -EAGAIN;
1187} 1207}
1188 1208
1189static bool 1209static bool
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.h b/fs/nfs/flexfilelayout/flexfilelayout.h
index 68cc0d9828f9..2bb08bc6aaf0 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.h
+++ b/fs/nfs/flexfilelayout/flexfilelayout.h
@@ -10,6 +10,7 @@
10#define FS_NFS_NFS4FLEXFILELAYOUT_H 10#define FS_NFS_NFS4FLEXFILELAYOUT_H
11 11
12#define FF_FLAGS_NO_LAYOUTCOMMIT 1 12#define FF_FLAGS_NO_LAYOUTCOMMIT 1
13#define FF_FLAGS_NO_IO_THRU_MDS 2
13 14
14#include "../pnfs.h" 15#include "../pnfs.h"
15 16
@@ -146,6 +147,12 @@ FF_LAYOUT_MIRROR_COUNT(struct pnfs_layout_segment *lseg)
146} 147}
147 148
148static inline bool 149static inline bool
150ff_layout_no_fallback_to_mds(struct pnfs_layout_segment *lseg)
151{
152 return FF_LAYOUT_LSEG(lseg)->flags & FF_FLAGS_NO_IO_THRU_MDS;
153}
154
155static inline bool
149ff_layout_test_devid_unavailable(struct nfs4_deviceid_node *node) 156ff_layout_test_devid_unavailable(struct nfs4_deviceid_node *node)
150{ 157{
151 return nfs4_test_deviceid_unavailable(node); 158 return nfs4_test_deviceid_unavailable(node);
diff --git a/fs/nfs/mount_clnt.c b/fs/nfs/mount_clnt.c
index 99a45283b9ee..09b190015df4 100644
--- a/fs/nfs/mount_clnt.c
+++ b/fs/nfs/mount_clnt.c
@@ -16,9 +16,7 @@
16#include <linux/nfs_fs.h> 16#include <linux/nfs_fs.h>
17#include "internal.h" 17#include "internal.h"
18 18
19#ifdef NFS_DEBUG 19#define NFSDBG_FACILITY NFSDBG_MOUNT
20# define NFSDBG_FACILITY NFSDBG_MOUNT
21#endif
22 20
23/* 21/*
24 * Defined by RFC 1094, section A.3; and RFC 1813, section 5.1.4 22 * Defined by RFC 1094, section A.3; and RFC 1813, section 5.1.4
diff --git a/fs/nfs/nfs42.h b/fs/nfs/nfs42.h
index 814c1255f1d2..b587ccd31083 100644
--- a/fs/nfs/nfs42.h
+++ b/fs/nfs/nfs42.h
@@ -17,5 +17,6 @@ int nfs42_proc_deallocate(struct file *, loff_t, loff_t);
17loff_t nfs42_proc_llseek(struct file *, loff_t, int); 17loff_t nfs42_proc_llseek(struct file *, loff_t, int);
18int nfs42_proc_layoutstats_generic(struct nfs_server *, 18int nfs42_proc_layoutstats_generic(struct nfs_server *,
19 struct nfs42_layoutstat_data *); 19 struct nfs42_layoutstat_data *);
20int nfs42_proc_clone(struct file *, struct file *, loff_t, loff_t, loff_t);
20 21
21#endif /* __LINUX_FS_NFS_NFS4_2_H */ 22#endif /* __LINUX_FS_NFS_NFS4_2_H */
diff --git a/fs/nfs/nfs42proc.c b/fs/nfs/nfs42proc.c
index 0f020e4d8421..3e92a3cde15d 100644
--- a/fs/nfs/nfs42proc.c
+++ b/fs/nfs/nfs42proc.c
@@ -271,3 +271,74 @@ int nfs42_proc_layoutstats_generic(struct nfs_server *server,
271 return PTR_ERR(task); 271 return PTR_ERR(task);
272 return 0; 272 return 0;
273} 273}
274
275static int _nfs42_proc_clone(struct rpc_message *msg, struct file *src_f,
276 struct file *dst_f, loff_t src_offset,
277 loff_t dst_offset, loff_t count)
278{
279 struct inode *src_inode = file_inode(src_f);
280 struct inode *dst_inode = file_inode(dst_f);
281 struct nfs_server *server = NFS_SERVER(dst_inode);
282 struct nfs42_clone_args args = {
283 .src_fh = NFS_FH(src_inode),
284 .dst_fh = NFS_FH(dst_inode),
285 .src_offset = src_offset,
286 .dst_offset = dst_offset,
287 .dst_bitmask = server->cache_consistency_bitmask,
288 };
289 struct nfs42_clone_res res = {
290 .server = server,
291 };
292 int status;
293
294 msg->rpc_argp = &args;
295 msg->rpc_resp = &res;
296
297 status = nfs42_set_rw_stateid(&args.src_stateid, src_f, FMODE_READ);
298 if (status)
299 return status;
300
301 status = nfs42_set_rw_stateid(&args.dst_stateid, dst_f, FMODE_WRITE);
302 if (status)
303 return status;
304
305 res.dst_fattr = nfs_alloc_fattr();
306 if (!res.dst_fattr)
307 return -ENOMEM;
308
309 status = nfs4_call_sync(server->client, server, msg,
310 &args.seq_args, &res.seq_res, 0);
311 if (status == 0)
312 status = nfs_post_op_update_inode(dst_inode, res.dst_fattr);
313
314 kfree(res.dst_fattr);
315 return status;
316}
317
318int nfs42_proc_clone(struct file *src_f, struct file *dst_f,
319 loff_t src_offset, loff_t dst_offset, loff_t count)
320{
321 struct rpc_message msg = {
322 .rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_CLONE],
323 };
324 struct inode *inode = file_inode(src_f);
325 struct nfs_server *server = NFS_SERVER(file_inode(src_f));
326 struct nfs4_exception exception = { };
327 int err;
328
329 if (!nfs_server_capable(inode, NFS_CAP_CLONE))
330 return -EOPNOTSUPP;
331
332 do {
333 err = _nfs42_proc_clone(&msg, src_f, dst_f, src_offset,
334 dst_offset, count);
335 if (err == -ENOTSUPP || err == -EOPNOTSUPP) {
336 NFS_SERVER(inode)->caps &= ~NFS_CAP_CLONE;
337 return -EOPNOTSUPP;
338 }
339 err = nfs4_handle_exception(server, err, &exception);
340 } while (exception.retry);
341
342 return err;
343
344}
diff --git a/fs/nfs/nfs42xdr.c b/fs/nfs/nfs42xdr.c
index 0eb29e14070d..0ca482a51e53 100644
--- a/fs/nfs/nfs42xdr.c
+++ b/fs/nfs/nfs42xdr.c
@@ -34,6 +34,12 @@
34 1 /* opaque devaddr4 length */ + \ 34 1 /* opaque devaddr4 length */ + \
35 XDR_QUADLEN(PNFS_LAYOUTSTATS_MAXSIZE)) 35 XDR_QUADLEN(PNFS_LAYOUTSTATS_MAXSIZE))
36#define decode_layoutstats_maxsz (op_decode_hdr_maxsz) 36#define decode_layoutstats_maxsz (op_decode_hdr_maxsz)
37#define encode_clone_maxsz (encode_stateid_maxsz + \
38 encode_stateid_maxsz + \
39 2 /* src offset */ + \
40 2 /* dst offset */ + \
41 2 /* count */)
42#define decode_clone_maxsz (op_decode_hdr_maxsz)
37 43
38#define NFS4_enc_allocate_sz (compound_encode_hdr_maxsz + \ 44#define NFS4_enc_allocate_sz (compound_encode_hdr_maxsz + \
39 encode_putfh_maxsz + \ 45 encode_putfh_maxsz + \
@@ -65,7 +71,20 @@
65 decode_sequence_maxsz + \ 71 decode_sequence_maxsz + \
66 decode_putfh_maxsz + \ 72 decode_putfh_maxsz + \
67 PNFS_LAYOUTSTATS_MAXDEV * decode_layoutstats_maxsz) 73 PNFS_LAYOUTSTATS_MAXDEV * decode_layoutstats_maxsz)
68 74#define NFS4_enc_clone_sz (compound_encode_hdr_maxsz + \
75 encode_sequence_maxsz + \
76 encode_putfh_maxsz + \
77 encode_savefh_maxsz + \
78 encode_putfh_maxsz + \
79 encode_clone_maxsz + \
80 encode_getattr_maxsz)
81#define NFS4_dec_clone_sz (compound_decode_hdr_maxsz + \
82 decode_sequence_maxsz + \
83 decode_putfh_maxsz + \
84 decode_savefh_maxsz + \
85 decode_putfh_maxsz + \
86 decode_clone_maxsz + \
87 decode_getattr_maxsz)
69 88
70static void encode_fallocate(struct xdr_stream *xdr, 89static void encode_fallocate(struct xdr_stream *xdr,
71 struct nfs42_falloc_args *args) 90 struct nfs42_falloc_args *args)
@@ -128,6 +147,21 @@ static void encode_layoutstats(struct xdr_stream *xdr,
128 encode_uint32(xdr, 0); 147 encode_uint32(xdr, 0);
129} 148}
130 149
150static void encode_clone(struct xdr_stream *xdr,
151 struct nfs42_clone_args *args,
152 struct compound_hdr *hdr)
153{
154 __be32 *p;
155
156 encode_op_hdr(xdr, OP_CLONE, decode_clone_maxsz, hdr);
157 encode_nfs4_stateid(xdr, &args->src_stateid);
158 encode_nfs4_stateid(xdr, &args->dst_stateid);
159 p = reserve_space(xdr, 3*8);
160 p = xdr_encode_hyper(p, args->src_offset);
161 p = xdr_encode_hyper(p, args->dst_offset);
162 xdr_encode_hyper(p, args->count);
163}
164
131/* 165/*
132 * Encode ALLOCATE request 166 * Encode ALLOCATE request
133 */ 167 */
@@ -206,6 +240,27 @@ static void nfs4_xdr_enc_layoutstats(struct rpc_rqst *req,
206 encode_nops(&hdr); 240 encode_nops(&hdr);
207} 241}
208 242
243/*
244 * Encode CLONE request
245 */
246static void nfs4_xdr_enc_clone(struct rpc_rqst *req,
247 struct xdr_stream *xdr,
248 struct nfs42_clone_args *args)
249{
250 struct compound_hdr hdr = {
251 .minorversion = nfs4_xdr_minorversion(&args->seq_args),
252 };
253
254 encode_compound_hdr(xdr, req, &hdr);
255 encode_sequence(xdr, &args->seq_args, &hdr);
256 encode_putfh(xdr, args->src_fh, &hdr);
257 encode_savefh(xdr, &hdr);
258 encode_putfh(xdr, args->dst_fh, &hdr);
259 encode_clone(xdr, args, &hdr);
260 encode_getfattr(xdr, args->dst_bitmask, &hdr);
261 encode_nops(&hdr);
262}
263
209static int decode_allocate(struct xdr_stream *xdr, struct nfs42_falloc_res *res) 264static int decode_allocate(struct xdr_stream *xdr, struct nfs42_falloc_res *res)
210{ 265{
211 return decode_op_hdr(xdr, OP_ALLOCATE); 266 return decode_op_hdr(xdr, OP_ALLOCATE);
@@ -243,6 +298,11 @@ static int decode_layoutstats(struct xdr_stream *xdr)
243 return decode_op_hdr(xdr, OP_LAYOUTSTATS); 298 return decode_op_hdr(xdr, OP_LAYOUTSTATS);
244} 299}
245 300
301static int decode_clone(struct xdr_stream *xdr)
302{
303 return decode_op_hdr(xdr, OP_CLONE);
304}
305
246/* 306/*
247 * Decode ALLOCATE request 307 * Decode ALLOCATE request
248 */ 308 */
@@ -351,4 +411,39 @@ out:
351 return status; 411 return status;
352} 412}
353 413
414/*
415 * Decode CLONE request
416 */
417static int nfs4_xdr_dec_clone(struct rpc_rqst *rqstp,
418 struct xdr_stream *xdr,
419 struct nfs42_clone_res *res)
420{
421 struct compound_hdr hdr;
422 int status;
423
424 status = decode_compound_hdr(xdr, &hdr);
425 if (status)
426 goto out;
427 status = decode_sequence(xdr, &res->seq_res, rqstp);
428 if (status)
429 goto out;
430 status = decode_putfh(xdr);
431 if (status)
432 goto out;
433 status = decode_savefh(xdr);
434 if (status)
435 goto out;
436 status = decode_putfh(xdr);
437 if (status)
438 goto out;
439 status = decode_clone(xdr);
440 if (status)
441 goto out;
442 status = decode_getfattr(xdr, res->dst_fattr, res->server);
443
444out:
445 res->rpc_status = status;
446 return status;
447}
448
354#endif /* __LINUX_FS_NFS_NFS4_2XDR_H */ 449#endif /* __LINUX_FS_NFS_NFS4_2XDR_H */
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index 50cfc4ca7a02..4afdee420d25 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -183,10 +183,12 @@ struct nfs4_state {
183 183
184 184
185struct nfs4_exception { 185struct nfs4_exception {
186 long timeout;
187 int retry;
188 struct nfs4_state *state; 186 struct nfs4_state *state;
189 struct inode *inode; 187 struct inode *inode;
188 long timeout;
189 unsigned char delay : 1,
190 recovering : 1,
191 retry : 1;
190}; 192};
191 193
192struct nfs4_state_recovery_ops { 194struct nfs4_state_recovery_ops {
diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c
index b0dbe0abed53..4aa571956cd6 100644
--- a/fs/nfs/nfs4file.c
+++ b/fs/nfs/nfs4file.c
@@ -4,6 +4,7 @@
4 * Copyright (C) 1992 Rick Sladkey 4 * Copyright (C) 1992 Rick Sladkey
5 */ 5 */
6#include <linux/fs.h> 6#include <linux/fs.h>
7#include <linux/file.h>
7#include <linux/falloc.h> 8#include <linux/falloc.h>
8#include <linux/nfs_fs.h> 9#include <linux/nfs_fs.h>
9#include "delegation.h" 10#include "delegation.h"
@@ -192,8 +193,138 @@ static long nfs42_fallocate(struct file *filep, int mode, loff_t offset, loff_t
192 return nfs42_proc_deallocate(filep, offset, len); 193 return nfs42_proc_deallocate(filep, offset, len);
193 return nfs42_proc_allocate(filep, offset, len); 194 return nfs42_proc_allocate(filep, offset, len);
194} 195}
196
197static noinline long
198nfs42_ioctl_clone(struct file *dst_file, unsigned long srcfd,
199 u64 src_off, u64 dst_off, u64 count)
200{
201 struct inode *dst_inode = file_inode(dst_file);
202 struct nfs_server *server = NFS_SERVER(dst_inode);
203 struct fd src_file;
204 struct inode *src_inode;
205 unsigned int bs = server->clone_blksize;
206 int ret;
207
208 /* dst file must be opened for writing */
209 if (!(dst_file->f_mode & FMODE_WRITE))
210 return -EINVAL;
211
212 ret = mnt_want_write_file(dst_file);
213 if (ret)
214 return ret;
215
216 src_file = fdget(srcfd);
217 if (!src_file.file) {
218 ret = -EBADF;
219 goto out_drop_write;
220 }
221
222 src_inode = file_inode(src_file.file);
223
224 /* src and dst must be different files */
225 ret = -EINVAL;
226 if (src_inode == dst_inode)
227 goto out_fput;
228
229 /* src file must be opened for reading */
230 if (!(src_file.file->f_mode & FMODE_READ))
231 goto out_fput;
232
233 /* src and dst must be regular files */
234 ret = -EISDIR;
235 if (!S_ISREG(src_inode->i_mode) || !S_ISREG(dst_inode->i_mode))
236 goto out_fput;
237
238 ret = -EXDEV;
239 if (src_file.file->f_path.mnt != dst_file->f_path.mnt ||
240 src_inode->i_sb != dst_inode->i_sb)
241 goto out_fput;
242
243 /* check alignment w.r.t. clone_blksize */
244 ret = -EINVAL;
245 if (bs) {
246 if (!IS_ALIGNED(src_off, bs) || !IS_ALIGNED(dst_off, bs))
247 goto out_fput;
248 if (!IS_ALIGNED(count, bs) && i_size_read(src_inode) != (src_off + count))
249 goto out_fput;
250 }
251
252 /* XXX: do we lock at all? what if server needs CB_RECALL_LAYOUT? */
253 if (dst_inode < src_inode) {
254 mutex_lock_nested(&dst_inode->i_mutex, I_MUTEX_PARENT);
255 mutex_lock_nested(&src_inode->i_mutex, I_MUTEX_CHILD);
256 } else {
257 mutex_lock_nested(&src_inode->i_mutex, I_MUTEX_PARENT);
258 mutex_lock_nested(&dst_inode->i_mutex, I_MUTEX_CHILD);
259 }
260
261 /* flush all pending writes on both src and dst so that server
262 * has the latest data */
263 ret = nfs_sync_inode(src_inode);
264 if (ret)
265 goto out_unlock;
266 ret = nfs_sync_inode(dst_inode);
267 if (ret)
268 goto out_unlock;
269
270 ret = nfs42_proc_clone(src_file.file, dst_file, src_off, dst_off, count);
271
272 /* truncate inode page cache of the dst range so that future reads can fetch
273 * new data from server */
274 if (!ret)
275 truncate_inode_pages_range(&dst_inode->i_data, dst_off, dst_off + count - 1);
276
277out_unlock:
278 if (dst_inode < src_inode) {
279 mutex_unlock(&src_inode->i_mutex);
280 mutex_unlock(&dst_inode->i_mutex);
281 } else {
282 mutex_unlock(&dst_inode->i_mutex);
283 mutex_unlock(&src_inode->i_mutex);
284 }
285out_fput:
286 fdput(src_file);
287out_drop_write:
288 mnt_drop_write_file(dst_file);
289 return ret;
290}
291
292static long nfs42_ioctl_clone_range(struct file *dst_file, void __user *argp)
293{
294 struct nfs_ioctl_clone_range_args args;
295
296 if (copy_from_user(&args, argp, sizeof(args)))
297 return -EFAULT;
298
299 return nfs42_ioctl_clone(dst_file, args.src_fd, args.src_off, args.dst_off, args.count);
300}
301#else
302static long nfs42_ioctl_clone(struct file *dst_file, unsigned long srcfd,
303 u64 src_off, u64 dst_off, u64 count)
304{
305 return -ENOTTY;
306}
307
308static long nfs42_ioctl_clone_range(struct file *dst_file, void __user *argp)
309{
310 return -ENOTTY;
311}
195#endif /* CONFIG_NFS_V4_2 */ 312#endif /* CONFIG_NFS_V4_2 */
196 313
314long nfs4_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
315{
316 void __user *argp = (void __user *)arg;
317
318 switch (cmd) {
319 case NFS_IOC_CLONE:
320 return nfs42_ioctl_clone(file, arg, 0, 0, 0);
321 case NFS_IOC_CLONE_RANGE:
322 return nfs42_ioctl_clone_range(file, argp);
323 }
324
325 return -ENOTTY;
326}
327
197const struct file_operations nfs4_file_operations = { 328const struct file_operations nfs4_file_operations = {
198#ifdef CONFIG_NFS_V4_2 329#ifdef CONFIG_NFS_V4_2
199 .llseek = nfs4_file_llseek, 330 .llseek = nfs4_file_llseek,
@@ -216,4 +347,9 @@ const struct file_operations nfs4_file_operations = {
216#endif /* CONFIG_NFS_V4_2 */ 347#endif /* CONFIG_NFS_V4_2 */
217 .check_flags = nfs_check_flags, 348 .check_flags = nfs_check_flags,
218 .setlease = simple_nosetlease, 349 .setlease = simple_nosetlease,
350#ifdef CONFIG_COMPAT
351 .unlocked_ioctl = nfs4_ioctl,
352#else
353 .compat_ioctl = nfs4_ioctl,
354#endif /* CONFIG_COMPAT */
219}; 355};
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 0e5ff69455c7..ff5bddc49a2a 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -78,7 +78,6 @@ struct nfs4_opendata;
78static int _nfs4_proc_open(struct nfs4_opendata *data); 78static int _nfs4_proc_open(struct nfs4_opendata *data);
79static int _nfs4_recover_proc_open(struct nfs4_opendata *data); 79static int _nfs4_recover_proc_open(struct nfs4_opendata *data);
80static int nfs4_do_fsinfo(struct nfs_server *, struct nfs_fh *, struct nfs_fsinfo *); 80static int nfs4_do_fsinfo(struct nfs_server *, struct nfs_fh *, struct nfs_fsinfo *);
81static int nfs4_async_handle_error(struct rpc_task *, const struct nfs_server *, struct nfs4_state *, long *);
82static void nfs_fixup_referral_attributes(struct nfs_fattr *fattr); 81static void nfs_fixup_referral_attributes(struct nfs_fattr *fattr);
83static int nfs4_proc_getattr(struct nfs_server *, struct nfs_fh *, struct nfs_fattr *, struct nfs4_label *label); 82static int nfs4_proc_getattr(struct nfs_server *, struct nfs_fh *, struct nfs_fattr *, struct nfs4_label *label);
84static int _nfs4_proc_getattr(struct nfs_server *server, struct nfs_fh *fhandle, struct nfs_fattr *fattr, struct nfs4_label *label); 83static int _nfs4_proc_getattr(struct nfs_server *server, struct nfs_fh *fhandle, struct nfs_fattr *fattr, struct nfs4_label *label);
@@ -239,6 +238,7 @@ const u32 nfs4_fsinfo_bitmap[3] = { FATTR4_WORD0_MAXFILESIZE
239 FATTR4_WORD1_TIME_DELTA 238 FATTR4_WORD1_TIME_DELTA
240 | FATTR4_WORD1_FS_LAYOUT_TYPES, 239 | FATTR4_WORD1_FS_LAYOUT_TYPES,
241 FATTR4_WORD2_LAYOUT_BLKSIZE 240 FATTR4_WORD2_LAYOUT_BLKSIZE
241 | FATTR4_WORD2_CLONE_BLKSIZE
242}; 242};
243 243
244const u32 nfs4_fs_locations_bitmap[3] = { 244const u32 nfs4_fs_locations_bitmap[3] = {
@@ -344,13 +344,16 @@ static int nfs4_delay(struct rpc_clnt *clnt, long *timeout)
344/* This is the error handling routine for processes that are allowed 344/* This is the error handling routine for processes that are allowed
345 * to sleep. 345 * to sleep.
346 */ 346 */
347int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_exception *exception) 347static int nfs4_do_handle_exception(struct nfs_server *server,
348 int errorcode, struct nfs4_exception *exception)
348{ 349{
349 struct nfs_client *clp = server->nfs_client; 350 struct nfs_client *clp = server->nfs_client;
350 struct nfs4_state *state = exception->state; 351 struct nfs4_state *state = exception->state;
351 struct inode *inode = exception->inode; 352 struct inode *inode = exception->inode;
352 int ret = errorcode; 353 int ret = errorcode;
353 354
355 exception->delay = 0;
356 exception->recovering = 0;
354 exception->retry = 0; 357 exception->retry = 0;
355 switch(errorcode) { 358 switch(errorcode) {
356 case 0: 359 case 0:
@@ -359,11 +362,9 @@ int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_
359 case -NFS4ERR_DELEG_REVOKED: 362 case -NFS4ERR_DELEG_REVOKED:
360 case -NFS4ERR_ADMIN_REVOKED: 363 case -NFS4ERR_ADMIN_REVOKED:
361 case -NFS4ERR_BAD_STATEID: 364 case -NFS4ERR_BAD_STATEID:
362 if (inode && nfs4_have_delegation(inode, FMODE_READ)) { 365 if (inode && nfs_async_inode_return_delegation(inode,
363 nfs4_inode_return_delegation(inode); 366 NULL) == 0)
364 exception->retry = 1; 367 goto wait_on_recovery;
365 return 0;
366 }
367 if (state == NULL) 368 if (state == NULL)
368 break; 369 break;
369 ret = nfs4_schedule_stateid_recovery(server, state); 370 ret = nfs4_schedule_stateid_recovery(server, state);
@@ -409,11 +410,12 @@ int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_
409 ret = -EBUSY; 410 ret = -EBUSY;
410 break; 411 break;
411 } 412 }
412 case -NFS4ERR_GRACE:
413 case -NFS4ERR_DELAY: 413 case -NFS4ERR_DELAY:
414 ret = nfs4_delay(server->client, &exception->timeout); 414 nfs_inc_server_stats(server, NFSIOS_DELAY);
415 if (ret != 0) 415 case -NFS4ERR_GRACE:
416 break; 416 exception->delay = 1;
417 return 0;
418
417 case -NFS4ERR_RETRY_UNCACHED_REP: 419 case -NFS4ERR_RETRY_UNCACHED_REP:
418 case -NFS4ERR_OLD_STATEID: 420 case -NFS4ERR_OLD_STATEID:
419 exception->retry = 1; 421 exception->retry = 1;
@@ -434,14 +436,85 @@ int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_
434 /* We failed to handle the error */ 436 /* We failed to handle the error */
435 return nfs4_map_errors(ret); 437 return nfs4_map_errors(ret);
436wait_on_recovery: 438wait_on_recovery:
437 ret = nfs4_wait_clnt_recover(clp); 439 exception->recovering = 1;
440 return 0;
441}
442
443/* This is the error handling routine for processes that are allowed
444 * to sleep.
445 */
446int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_exception *exception)
447{
448 struct nfs_client *clp = server->nfs_client;
449 int ret;
450
451 ret = nfs4_do_handle_exception(server, errorcode, exception);
452 if (exception->delay) {
453 ret = nfs4_delay(server->client, &exception->timeout);
454 goto out_retry;
455 }
456 if (exception->recovering) {
457 ret = nfs4_wait_clnt_recover(clp);
458 if (test_bit(NFS_MIG_FAILED, &server->mig_status))
459 return -EIO;
460 goto out_retry;
461 }
462 return ret;
463out_retry:
464 if (ret == 0)
465 exception->retry = 1;
466 return ret;
467}
468
469static int
470nfs4_async_handle_exception(struct rpc_task *task, struct nfs_server *server,
471 int errorcode, struct nfs4_exception *exception)
472{
473 struct nfs_client *clp = server->nfs_client;
474 int ret;
475
476 ret = nfs4_do_handle_exception(server, errorcode, exception);
477 if (exception->delay) {
478 rpc_delay(task, nfs4_update_delay(&exception->timeout));
479 goto out_retry;
480 }
481 if (exception->recovering) {
482 rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL);
483 if (test_bit(NFS4CLNT_MANAGER_RUNNING, &clp->cl_state) == 0)
484 rpc_wake_up_queued_task(&clp->cl_rpcwaitq, task);
485 goto out_retry;
486 }
438 if (test_bit(NFS_MIG_FAILED, &server->mig_status)) 487 if (test_bit(NFS_MIG_FAILED, &server->mig_status))
439 return -EIO; 488 ret = -EIO;
489 return ret;
490out_retry:
440 if (ret == 0) 491 if (ret == 0)
441 exception->retry = 1; 492 exception->retry = 1;
442 return ret; 493 return ret;
443} 494}
444 495
496static int
497nfs4_async_handle_error(struct rpc_task *task, struct nfs_server *server,
498 struct nfs4_state *state, long *timeout)
499{
500 struct nfs4_exception exception = {
501 .state = state,
502 };
503
504 if (task->tk_status >= 0)
505 return 0;
506 if (timeout)
507 exception.timeout = *timeout;
508 task->tk_status = nfs4_async_handle_exception(task, server,
509 task->tk_status,
510 &exception);
511 if (exception.delay && timeout)
512 *timeout = exception.timeout;
513 if (exception.retry)
514 return -EAGAIN;
515 return 0;
516}
517
445/* 518/*
446 * Return 'true' if 'clp' is using an rpc_client that is integrity protected 519 * Return 'true' if 'clp' is using an rpc_client that is integrity protected
447 * or 'false' otherwise. 520 * or 'false' otherwise.
@@ -4530,7 +4603,7 @@ static inline int nfs4_server_supports_acls(struct nfs_server *server)
4530#define NFS4ACL_MAXPAGES DIV_ROUND_UP(XATTR_SIZE_MAX, PAGE_SIZE) 4603#define NFS4ACL_MAXPAGES DIV_ROUND_UP(XATTR_SIZE_MAX, PAGE_SIZE)
4531 4604
4532static int buf_to_pages_noslab(const void *buf, size_t buflen, 4605static int buf_to_pages_noslab(const void *buf, size_t buflen,
4533 struct page **pages, unsigned int *pgbase) 4606 struct page **pages)
4534{ 4607{
4535 struct page *newpage, **spages; 4608 struct page *newpage, **spages;
4536 int rc = 0; 4609 int rc = 0;
@@ -4674,7 +4747,6 @@ static ssize_t __nfs4_get_acl_uncached(struct inode *inode, void *buf, size_t bu
4674 goto out_free; 4747 goto out_free;
4675 4748
4676 args.acl_len = npages * PAGE_SIZE; 4749 args.acl_len = npages * PAGE_SIZE;
4677 args.acl_pgbase = 0;
4678 4750
4679 dprintk("%s buf %p buflen %zu npages %d args.acl_len %zu\n", 4751 dprintk("%s buf %p buflen %zu npages %d args.acl_len %zu\n",
4680 __func__, buf, buflen, npages, args.acl_len); 4752 __func__, buf, buflen, npages, args.acl_len);
@@ -4766,7 +4838,7 @@ static int __nfs4_proc_set_acl(struct inode *inode, const void *buf, size_t bufl
4766 return -EOPNOTSUPP; 4838 return -EOPNOTSUPP;
4767 if (npages > ARRAY_SIZE(pages)) 4839 if (npages > ARRAY_SIZE(pages))
4768 return -ERANGE; 4840 return -ERANGE;
4769 i = buf_to_pages_noslab(buf, buflen, arg.acl_pages, &arg.acl_pgbase); 4841 i = buf_to_pages_noslab(buf, buflen, arg.acl_pages);
4770 if (i < 0) 4842 if (i < 0)
4771 return i; 4843 return i;
4772 nfs4_inode_return_delegation(inode); 4844 nfs4_inode_return_delegation(inode);
@@ -4955,79 +5027,6 @@ out:
4955#endif /* CONFIG_NFS_V4_SECURITY_LABEL */ 5027#endif /* CONFIG_NFS_V4_SECURITY_LABEL */
4956 5028
4957 5029
4958static int
4959nfs4_async_handle_error(struct rpc_task *task, const struct nfs_server *server,
4960 struct nfs4_state *state, long *timeout)
4961{
4962 struct nfs_client *clp = server->nfs_client;
4963
4964 if (task->tk_status >= 0)
4965 return 0;
4966 switch(task->tk_status) {
4967 case -NFS4ERR_DELEG_REVOKED:
4968 case -NFS4ERR_ADMIN_REVOKED:
4969 case -NFS4ERR_BAD_STATEID:
4970 case -NFS4ERR_OPENMODE:
4971 if (state == NULL)
4972 break;
4973 if (nfs4_schedule_stateid_recovery(server, state) < 0)
4974 goto recovery_failed;
4975 goto wait_on_recovery;
4976 case -NFS4ERR_EXPIRED:
4977 if (state != NULL) {
4978 if (nfs4_schedule_stateid_recovery(server, state) < 0)
4979 goto recovery_failed;
4980 }
4981 case -NFS4ERR_STALE_STATEID:
4982 case -NFS4ERR_STALE_CLIENTID:
4983 nfs4_schedule_lease_recovery(clp);
4984 goto wait_on_recovery;
4985 case -NFS4ERR_MOVED:
4986 if (nfs4_schedule_migration_recovery(server) < 0)
4987 goto recovery_failed;
4988 goto wait_on_recovery;
4989 case -NFS4ERR_LEASE_MOVED:
4990 nfs4_schedule_lease_moved_recovery(clp);
4991 goto wait_on_recovery;
4992#if defined(CONFIG_NFS_V4_1)
4993 case -NFS4ERR_BADSESSION:
4994 case -NFS4ERR_BADSLOT:
4995 case -NFS4ERR_BAD_HIGH_SLOT:
4996 case -NFS4ERR_DEADSESSION:
4997 case -NFS4ERR_CONN_NOT_BOUND_TO_SESSION:
4998 case -NFS4ERR_SEQ_FALSE_RETRY:
4999 case -NFS4ERR_SEQ_MISORDERED:
5000 dprintk("%s ERROR %d, Reset session\n", __func__,
5001 task->tk_status);
5002 nfs4_schedule_session_recovery(clp->cl_session, task->tk_status);
5003 goto wait_on_recovery;
5004#endif /* CONFIG_NFS_V4_1 */
5005 case -NFS4ERR_DELAY:
5006 nfs_inc_server_stats(server, NFSIOS_DELAY);
5007 rpc_delay(task, nfs4_update_delay(timeout));
5008 goto restart_call;
5009 case -NFS4ERR_GRACE:
5010 rpc_delay(task, NFS4_POLL_RETRY_MAX);
5011 case -NFS4ERR_RETRY_UNCACHED_REP:
5012 case -NFS4ERR_OLD_STATEID:
5013 goto restart_call;
5014 }
5015 task->tk_status = nfs4_map_errors(task->tk_status);
5016 return 0;
5017recovery_failed:
5018 task->tk_status = -EIO;
5019 return 0;
5020wait_on_recovery:
5021 rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL);
5022 if (test_bit(NFS4CLNT_MANAGER_RUNNING, &clp->cl_state) == 0)
5023 rpc_wake_up_queued_task(&clp->cl_rpcwaitq, task);
5024 if (test_bit(NFS_MIG_FAILED, &server->mig_status))
5025 goto recovery_failed;
5026restart_call:
5027 task->tk_status = 0;
5028 return -EAGAIN;
5029}
5030
5031static void nfs4_init_boot_verifier(const struct nfs_client *clp, 5030static void nfs4_init_boot_verifier(const struct nfs_client *clp,
5032 nfs4_verifier *bootverf) 5031 nfs4_verifier *bootverf)
5033{ 5032{
@@ -5522,7 +5521,7 @@ struct nfs4_unlockdata {
5522 struct nfs4_lock_state *lsp; 5521 struct nfs4_lock_state *lsp;
5523 struct nfs_open_context *ctx; 5522 struct nfs_open_context *ctx;
5524 struct file_lock fl; 5523 struct file_lock fl;
5525 const struct nfs_server *server; 5524 struct nfs_server *server;
5526 unsigned long timestamp; 5525 unsigned long timestamp;
5527}; 5526};
5528 5527
@@ -8718,7 +8717,8 @@ static const struct nfs4_minor_version_ops nfs_v4_2_minor_ops = {
8718 | NFS_CAP_ALLOCATE 8717 | NFS_CAP_ALLOCATE
8719 | NFS_CAP_DEALLOCATE 8718 | NFS_CAP_DEALLOCATE
8720 | NFS_CAP_SEEK 8719 | NFS_CAP_SEEK
8721 | NFS_CAP_LAYOUTSTATS, 8720 | NFS_CAP_LAYOUTSTATS
8721 | NFS_CAP_CLONE,
8722 .init_client = nfs41_init_client, 8722 .init_client = nfs41_init_client,
8723 .shutdown_client = nfs41_shutdown_client, 8723 .shutdown_client = nfs41_shutdown_client,
8724 .match_stateid = nfs41_match_stateid, 8724 .match_stateid = nfs41_match_stateid,
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 788adf3897c7..dfed4f5c8fcc 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -1659,7 +1659,7 @@ encode_setacl(struct xdr_stream *xdr, struct nfs_setaclargs *arg, struct compoun
1659 *p = cpu_to_be32(FATTR4_WORD0_ACL); 1659 *p = cpu_to_be32(FATTR4_WORD0_ACL);
1660 p = reserve_space(xdr, 4); 1660 p = reserve_space(xdr, 4);
1661 *p = cpu_to_be32(arg->acl_len); 1661 *p = cpu_to_be32(arg->acl_len);
1662 xdr_write_pages(xdr, arg->acl_pages, arg->acl_pgbase, arg->acl_len); 1662 xdr_write_pages(xdr, arg->acl_pages, 0, arg->acl_len);
1663} 1663}
1664 1664
1665static void 1665static void
@@ -2491,7 +2491,7 @@ static void nfs4_xdr_enc_getacl(struct rpc_rqst *req, struct xdr_stream *xdr,
2491 encode_getattr_two(xdr, FATTR4_WORD0_ACL, 0, &hdr); 2491 encode_getattr_two(xdr, FATTR4_WORD0_ACL, 0, &hdr);
2492 2492
2493 xdr_inline_pages(&req->rq_rcv_buf, replen << 2, 2493 xdr_inline_pages(&req->rq_rcv_buf, replen << 2,
2494 args->acl_pages, args->acl_pgbase, args->acl_len); 2494 args->acl_pages, 0, args->acl_len);
2495 2495
2496 encode_nops(&hdr); 2496 encode_nops(&hdr);
2497} 2497}
@@ -4375,6 +4375,11 @@ static int decode_statfs(struct xdr_stream *xdr, struct nfs_fsstat *fsstat)
4375 goto xdr_error; 4375 goto xdr_error;
4376 if ((status = decode_attr_files_total(xdr, bitmap, &fsstat->tfiles)) != 0) 4376 if ((status = decode_attr_files_total(xdr, bitmap, &fsstat->tfiles)) != 0)
4377 goto xdr_error; 4377 goto xdr_error;
4378
4379 status = -EIO;
4380 if (unlikely(bitmap[0]))
4381 goto xdr_error;
4382
4378 if ((status = decode_attr_space_avail(xdr, bitmap, &fsstat->abytes)) != 0) 4383 if ((status = decode_attr_space_avail(xdr, bitmap, &fsstat->abytes)) != 0)
4379 goto xdr_error; 4384 goto xdr_error;
4380 if ((status = decode_attr_space_free(xdr, bitmap, &fsstat->fbytes)) != 0) 4385 if ((status = decode_attr_space_free(xdr, bitmap, &fsstat->fbytes)) != 0)
@@ -4574,6 +4579,10 @@ static int decode_getfattr_attrs(struct xdr_stream *xdr, uint32_t *bitmap,
4574 goto xdr_error; 4579 goto xdr_error;
4575 fattr->valid |= status; 4580 fattr->valid |= status;
4576 4581
4582 status = -EIO;
4583 if (unlikely(bitmap[0]))
4584 goto xdr_error;
4585
4577 status = decode_attr_mode(xdr, bitmap, &fmode); 4586 status = decode_attr_mode(xdr, bitmap, &fmode);
4578 if (status < 0) 4587 if (status < 0)
4579 goto xdr_error; 4588 goto xdr_error;
@@ -4627,6 +4636,10 @@ static int decode_getfattr_attrs(struct xdr_stream *xdr, uint32_t *bitmap,
4627 goto xdr_error; 4636 goto xdr_error;
4628 fattr->valid |= status; 4637 fattr->valid |= status;
4629 4638
4639 status = -EIO;
4640 if (unlikely(bitmap[1]))
4641 goto xdr_error;
4642
4630 status = decode_attr_mdsthreshold(xdr, bitmap, fattr->mdsthreshold); 4643 status = decode_attr_mdsthreshold(xdr, bitmap, fattr->mdsthreshold);
4631 if (status < 0) 4644 if (status < 0)
4632 goto xdr_error; 4645 goto xdr_error;
@@ -4764,6 +4777,28 @@ static int decode_attr_layout_blksize(struct xdr_stream *xdr, uint32_t *bitmap,
4764 return 0; 4777 return 0;
4765} 4778}
4766 4779
4780/*
4781 * The granularity of a CLONE operation.
4782 */
4783static int decode_attr_clone_blksize(struct xdr_stream *xdr, uint32_t *bitmap,
4784 uint32_t *res)
4785{
4786 __be32 *p;
4787
4788 dprintk("%s: bitmap is %x\n", __func__, bitmap[2]);
4789 *res = 0;
4790 if (bitmap[2] & FATTR4_WORD2_CLONE_BLKSIZE) {
4791 p = xdr_inline_decode(xdr, 4);
4792 if (unlikely(!p)) {
4793 print_overflow_msg(__func__, xdr);
4794 return -EIO;
4795 }
4796 *res = be32_to_cpup(p);
4797 bitmap[2] &= ~FATTR4_WORD2_CLONE_BLKSIZE;
4798 }
4799 return 0;
4800}
4801
4767static int decode_fsinfo(struct xdr_stream *xdr, struct nfs_fsinfo *fsinfo) 4802static int decode_fsinfo(struct xdr_stream *xdr, struct nfs_fsinfo *fsinfo)
4768{ 4803{
4769 unsigned int savep; 4804 unsigned int savep;
@@ -4789,15 +4824,28 @@ static int decode_fsinfo(struct xdr_stream *xdr, struct nfs_fsinfo *fsinfo)
4789 if ((status = decode_attr_maxwrite(xdr, bitmap, &fsinfo->wtmax)) != 0) 4824 if ((status = decode_attr_maxwrite(xdr, bitmap, &fsinfo->wtmax)) != 0)
4790 goto xdr_error; 4825 goto xdr_error;
4791 fsinfo->wtpref = fsinfo->wtmax; 4826 fsinfo->wtpref = fsinfo->wtmax;
4827
4828 status = -EIO;
4829 if (unlikely(bitmap[0]))
4830 goto xdr_error;
4831
4792 status = decode_attr_time_delta(xdr, bitmap, &fsinfo->time_delta); 4832 status = decode_attr_time_delta(xdr, bitmap, &fsinfo->time_delta);
4793 if (status != 0) 4833 if (status != 0)
4794 goto xdr_error; 4834 goto xdr_error;
4795 status = decode_attr_pnfstype(xdr, bitmap, &fsinfo->layouttype); 4835 status = decode_attr_pnfstype(xdr, bitmap, &fsinfo->layouttype);
4796 if (status != 0) 4836 if (status != 0)
4797 goto xdr_error; 4837 goto xdr_error;
4838
4839 status = -EIO;
4840 if (unlikely(bitmap[1]))
4841 goto xdr_error;
4842
4798 status = decode_attr_layout_blksize(xdr, bitmap, &fsinfo->blksize); 4843 status = decode_attr_layout_blksize(xdr, bitmap, &fsinfo->blksize);
4799 if (status) 4844 if (status)
4800 goto xdr_error; 4845 goto xdr_error;
4846 status = decode_attr_clone_blksize(xdr, bitmap, &fsinfo->clone_blksize);
4847 if (status)
4848 goto xdr_error;
4801 4849
4802 status = verify_attr_len(xdr, savep, attrlen); 4850 status = verify_attr_len(xdr, savep, attrlen);
4803xdr_error: 4851xdr_error:
@@ -7465,6 +7513,7 @@ struct rpc_procinfo nfs4_procedures[] = {
7465 PROC(ALLOCATE, enc_allocate, dec_allocate), 7513 PROC(ALLOCATE, enc_allocate, dec_allocate),
7466 PROC(DEALLOCATE, enc_deallocate, dec_deallocate), 7514 PROC(DEALLOCATE, enc_deallocate, dec_deallocate),
7467 PROC(LAYOUTSTATS, enc_layoutstats, dec_layoutstats), 7515 PROC(LAYOUTSTATS, enc_layoutstats, dec_layoutstats),
7516 PROC(CLONE, enc_clone, dec_clone),
7468#endif /* CONFIG_NFS_V4_2 */ 7517#endif /* CONFIG_NFS_V4_2 */
7469}; 7518};
7470 7519
diff --git a/fs/nfs/nfsroot.c b/fs/nfs/nfsroot.c
index 9bc9f04fb7f6..89a15dbe5efc 100644
--- a/fs/nfs/nfsroot.c
+++ b/fs/nfs/nfsroot.c
@@ -90,7 +90,7 @@
90#define NFS_DEF_OPTIONS "vers=2,udp,rsize=4096,wsize=4096" 90#define NFS_DEF_OPTIONS "vers=2,udp,rsize=4096,wsize=4096"
91 91
92/* Parameters passed from the kernel command line */ 92/* Parameters passed from the kernel command line */
93static char nfs_root_parms[256] __initdata = ""; 93static char nfs_root_parms[NFS_MAXPATHLEN + 1] __initdata = "";
94 94
95/* Text-based mount options passed to super.c */ 95/* Text-based mount options passed to super.c */
96static char nfs_root_options[256] __initdata = NFS_DEF_OPTIONS; 96static char nfs_root_options[256] __initdata = NFS_DEF_OPTIONS;
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 8abe27165ad0..93496c059837 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -1912,12 +1912,13 @@ static void pnfs_ld_handle_write_error(struct nfs_pgio_header *hdr)
1912 */ 1912 */
1913void pnfs_ld_write_done(struct nfs_pgio_header *hdr) 1913void pnfs_ld_write_done(struct nfs_pgio_header *hdr)
1914{ 1914{
1915 trace_nfs4_pnfs_write(hdr, hdr->pnfs_error); 1915 if (likely(!hdr->pnfs_error)) {
1916 if (!hdr->pnfs_error) {
1917 pnfs_set_layoutcommit(hdr->inode, hdr->lseg, 1916 pnfs_set_layoutcommit(hdr->inode, hdr->lseg,
1918 hdr->mds_offset + hdr->res.count); 1917 hdr->mds_offset + hdr->res.count);
1919 hdr->mds_ops->rpc_call_done(&hdr->task, hdr); 1918 hdr->mds_ops->rpc_call_done(&hdr->task, hdr);
1920 } else 1919 }
1920 trace_nfs4_pnfs_write(hdr, hdr->pnfs_error);
1921 if (unlikely(hdr->pnfs_error))
1921 pnfs_ld_handle_write_error(hdr); 1922 pnfs_ld_handle_write_error(hdr);
1922 hdr->mds_ops->rpc_release(hdr); 1923 hdr->mds_ops->rpc_release(hdr);
1923} 1924}
@@ -2028,11 +2029,12 @@ static void pnfs_ld_handle_read_error(struct nfs_pgio_header *hdr)
2028 */ 2029 */
2029void pnfs_ld_read_done(struct nfs_pgio_header *hdr) 2030void pnfs_ld_read_done(struct nfs_pgio_header *hdr)
2030{ 2031{
2031 trace_nfs4_pnfs_read(hdr, hdr->pnfs_error);
2032 if (likely(!hdr->pnfs_error)) { 2032 if (likely(!hdr->pnfs_error)) {
2033 __nfs4_read_done_cb(hdr); 2033 __nfs4_read_done_cb(hdr);
2034 hdr->mds_ops->rpc_call_done(&hdr->task, hdr); 2034 hdr->mds_ops->rpc_call_done(&hdr->task, hdr);
2035 } else 2035 }
2036 trace_nfs4_pnfs_read(hdr, hdr->pnfs_error);
2037 if (unlikely(hdr->pnfs_error))
2036 pnfs_ld_handle_read_error(hdr); 2038 pnfs_ld_handle_read_error(hdr);
2037 hdr->mds_ops->rpc_release(hdr); 2039 hdr->mds_ops->rpc_release(hdr);
2038} 2040}
diff --git a/fs/nfs/read.c b/fs/nfs/read.c
index 01b8cc8e8cfc..0a5e33f33b5c 100644
--- a/fs/nfs/read.c
+++ b/fs/nfs/read.c
@@ -246,6 +246,13 @@ static void nfs_readpage_retry(struct rpc_task *task,
246 nfs_set_pgio_error(hdr, -EIO, argp->offset); 246 nfs_set_pgio_error(hdr, -EIO, argp->offset);
247 return; 247 return;
248 } 248 }
249
250 /* For non rpc-based layout drivers, retry-through-MDS */
251 if (!task->tk_ops) {
252 hdr->pnfs_error = -EAGAIN;
253 return;
254 }
255
249 /* Yes, so retry the read at the end of the hdr */ 256 /* Yes, so retry the read at the end of the hdr */
250 hdr->mds_offset += resp->count; 257 hdr->mds_offset += resp->count;
251 argp->offset += resp->count; 258 argp->offset += resp->count;
@@ -268,7 +275,7 @@ static void nfs_readpage_result(struct rpc_task *task,
268 hdr->good_bytes = bound - hdr->io_start; 275 hdr->good_bytes = bound - hdr->io_start;
269 } 276 }
270 spin_unlock(&hdr->lock); 277 spin_unlock(&hdr->lock);
271 } else if (hdr->res.count != hdr->args.count) 278 } else if (hdr->res.count < hdr->args.count)
272 nfs_readpage_retry(task, hdr); 279 nfs_readpage_retry(task, hdr);
273} 280}
274 281
diff --git a/fs/nfs/super.c b/fs/nfs/super.c
index 383a027de452..f1268280244e 100644
--- a/fs/nfs/super.c
+++ b/fs/nfs/super.c
@@ -2816,7 +2816,6 @@ out_invalid_transport_udp:
2816 * NFS client for backwards compatibility 2816 * NFS client for backwards compatibility
2817 */ 2817 */
2818unsigned int nfs_callback_set_tcpport; 2818unsigned int nfs_callback_set_tcpport;
2819unsigned short nfs_callback_tcpport;
2820/* Default cache timeout is 10 minutes */ 2819/* Default cache timeout is 10 minutes */
2821unsigned int nfs_idmap_cache_timeout = 600; 2820unsigned int nfs_idmap_cache_timeout = 600;
2822/* Turn off NFSv4 uid/gid mapping when using AUTH_SYS */ 2821/* Turn off NFSv4 uid/gid mapping when using AUTH_SYS */
@@ -2827,7 +2826,6 @@ char nfs4_client_id_uniquifier[NFS4_CLIENT_ID_UNIQ_LEN] = "";
2827bool recover_lost_locks = false; 2826bool recover_lost_locks = false;
2828 2827
2829EXPORT_SYMBOL_GPL(nfs_callback_set_tcpport); 2828EXPORT_SYMBOL_GPL(nfs_callback_set_tcpport);
2830EXPORT_SYMBOL_GPL(nfs_callback_tcpport);
2831EXPORT_SYMBOL_GPL(nfs_idmap_cache_timeout); 2829EXPORT_SYMBOL_GPL(nfs_idmap_cache_timeout);
2832EXPORT_SYMBOL_GPL(nfs4_disable_idmapping); 2830EXPORT_SYMBOL_GPL(nfs4_disable_idmapping);
2833EXPORT_SYMBOL_GPL(max_session_slots); 2831EXPORT_SYMBOL_GPL(max_session_slots);
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index 75ab7622e0cc..7b9316406930 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -1505,6 +1505,13 @@ static void nfs_writeback_result(struct rpc_task *task,
1505 task->tk_status = -EIO; 1505 task->tk_status = -EIO;
1506 return; 1506 return;
1507 } 1507 }
1508
1509 /* For non rpc-based layout drivers, retry-through-MDS */
1510 if (!task->tk_ops) {
1511 hdr->pnfs_error = -EAGAIN;
1512 return;
1513 }
1514
1508 /* Was this an NFSv2 write or an NFSv3 stable write? */ 1515 /* Was this an NFSv2 write or an NFSv3 stable write? */
1509 if (resp->verf->committed != NFS_UNSTABLE) { 1516 if (resp->verf->committed != NFS_UNSTABLE) {
1510 /* Resend from where the server left off */ 1517 /* Resend from where the server left off */
diff --git a/include/linux/nfs4.h b/include/linux/nfs4.h
index 00121f298269..e7e78537aea2 100644
--- a/include/linux/nfs4.h
+++ b/include/linux/nfs4.h
@@ -130,6 +130,7 @@ enum nfs_opnum4 {
130 OP_READ_PLUS = 68, 130 OP_READ_PLUS = 68,
131 OP_SEEK = 69, 131 OP_SEEK = 69,
132 OP_WRITE_SAME = 70, 132 OP_WRITE_SAME = 70,
133 OP_CLONE = 71,
133 134
134 OP_ILLEGAL = 10044, 135 OP_ILLEGAL = 10044,
135}; 136};
@@ -421,6 +422,7 @@ enum lock_type4 {
421#define FATTR4_WORD2_LAYOUT_TYPES (1UL << 0) 422#define FATTR4_WORD2_LAYOUT_TYPES (1UL << 0)
422#define FATTR4_WORD2_LAYOUT_BLKSIZE (1UL << 1) 423#define FATTR4_WORD2_LAYOUT_BLKSIZE (1UL << 1)
423#define FATTR4_WORD2_MDSTHRESHOLD (1UL << 4) 424#define FATTR4_WORD2_MDSTHRESHOLD (1UL << 4)
425#define FATTR4_WORD2_CLONE_BLKSIZE (1UL << 13)
424#define FATTR4_WORD2_SECURITY_LABEL (1UL << 16) 426#define FATTR4_WORD2_SECURITY_LABEL (1UL << 16)
425 427
426/* MDS threshold bitmap bits */ 428/* MDS threshold bitmap bits */
@@ -501,6 +503,7 @@ enum {
501 NFSPROC4_CLNT_ALLOCATE, 503 NFSPROC4_CLNT_ALLOCATE,
502 NFSPROC4_CLNT_DEALLOCATE, 504 NFSPROC4_CLNT_DEALLOCATE,
503 NFSPROC4_CLNT_LAYOUTSTATS, 505 NFSPROC4_CLNT_LAYOUTSTATS,
506 NFSPROC4_CLNT_CLONE,
504}; 507};
505 508
506/* nfs41 types */ 509/* nfs41 types */
diff --git a/include/linux/nfs_fs_sb.h b/include/linux/nfs_fs_sb.h
index 570a7df2775b..2469ab0bb3a1 100644
--- a/include/linux/nfs_fs_sb.h
+++ b/include/linux/nfs_fs_sb.h
@@ -147,6 +147,7 @@ struct nfs_server {
147 unsigned int acdirmax; 147 unsigned int acdirmax;
148 unsigned int namelen; 148 unsigned int namelen;
149 unsigned int options; /* extra options enabled by mount */ 149 unsigned int options; /* extra options enabled by mount */
150 unsigned int clone_blksize; /* granularity of a CLONE operation */
150#define NFS_OPTION_FSCACHE 0x00000001 /* - local caching enabled */ 151#define NFS_OPTION_FSCACHE 0x00000001 /* - local caching enabled */
151#define NFS_OPTION_MIGRATION 0x00000002 /* - NFSv4 migration enabled */ 152#define NFS_OPTION_MIGRATION 0x00000002 /* - NFSv4 migration enabled */
152 153
@@ -243,5 +244,6 @@ struct nfs_server {
243#define NFS_CAP_ALLOCATE (1U << 20) 244#define NFS_CAP_ALLOCATE (1U << 20)
244#define NFS_CAP_DEALLOCATE (1U << 21) 245#define NFS_CAP_DEALLOCATE (1U << 21)
245#define NFS_CAP_LAYOUTSTATS (1U << 22) 246#define NFS_CAP_LAYOUTSTATS (1U << 22)
247#define NFS_CAP_CLONE (1U << 23)
246 248
247#endif 249#endif
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index 52faf7e96c65..570d630f98ae 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -141,6 +141,7 @@ struct nfs_fsinfo {
141 __u32 lease_time; /* in seconds */ 141 __u32 lease_time; /* in seconds */
142 __u32 layouttype; /* supported pnfs layout driver */ 142 __u32 layouttype; /* supported pnfs layout driver */
143 __u32 blksize; /* preferred pnfs io block size */ 143 __u32 blksize; /* preferred pnfs io block size */
144 __u32 clone_blksize; /* granularity of a CLONE operation */
144}; 145};
145 146
146struct nfs_fsstat { 147struct nfs_fsstat {
@@ -359,6 +360,25 @@ struct nfs42_layoutstat_data {
359 struct nfs42_layoutstat_res res; 360 struct nfs42_layoutstat_res res;
360}; 361};
361 362
363struct nfs42_clone_args {
364 struct nfs4_sequence_args seq_args;
365 struct nfs_fh *src_fh;
366 struct nfs_fh *dst_fh;
367 nfs4_stateid src_stateid;
368 nfs4_stateid dst_stateid;
369 __u64 src_offset;
370 __u64 dst_offset;
371 __u64 count;
372 const u32 *dst_bitmask;
373};
374
375struct nfs42_clone_res {
376 struct nfs4_sequence_res seq_res;
377 unsigned int rpc_status;
378 struct nfs_fattr *dst_fattr;
379 const struct nfs_server *server;
380};
381
362struct stateowner_id { 382struct stateowner_id {
363 __u64 create_time; 383 __u64 create_time;
364 __u32 uniquifier; 384 __u32 uniquifier;
@@ -528,7 +548,7 @@ struct nfs4_delegreturnargs {
528struct nfs4_delegreturnres { 548struct nfs4_delegreturnres {
529 struct nfs4_sequence_res seq_res; 549 struct nfs4_sequence_res seq_res;
530 struct nfs_fattr * fattr; 550 struct nfs_fattr * fattr;
531 const struct nfs_server *server; 551 struct nfs_server *server;
532}; 552};
533 553
534/* 554/*
@@ -601,7 +621,7 @@ struct nfs_removeargs {
601 621
602struct nfs_removeres { 622struct nfs_removeres {
603 struct nfs4_sequence_res seq_res; 623 struct nfs4_sequence_res seq_res;
604 const struct nfs_server *server; 624 struct nfs_server *server;
605 struct nfs_fattr *dir_attr; 625 struct nfs_fattr *dir_attr;
606 struct nfs4_change_info cinfo; 626 struct nfs4_change_info cinfo;
607}; 627};
@@ -619,7 +639,7 @@ struct nfs_renameargs {
619 639
620struct nfs_renameres { 640struct nfs_renameres {
621 struct nfs4_sequence_res seq_res; 641 struct nfs4_sequence_res seq_res;
622 const struct nfs_server *server; 642 struct nfs_server *server;
623 struct nfs4_change_info old_cinfo; 643 struct nfs4_change_info old_cinfo;
624 struct nfs_fattr *old_fattr; 644 struct nfs_fattr *old_fattr;
625 struct nfs4_change_info new_cinfo; 645 struct nfs4_change_info new_cinfo;
@@ -685,7 +705,6 @@ struct nfs_setaclargs {
685 struct nfs4_sequence_args seq_args; 705 struct nfs4_sequence_args seq_args;
686 struct nfs_fh * fh; 706 struct nfs_fh * fh;
687 size_t acl_len; 707 size_t acl_len;
688 unsigned int acl_pgbase;
689 struct page ** acl_pages; 708 struct page ** acl_pages;
690}; 709};
691 710
@@ -697,7 +716,6 @@ struct nfs_getaclargs {
697 struct nfs4_sequence_args seq_args; 716 struct nfs4_sequence_args seq_args;
698 struct nfs_fh * fh; 717 struct nfs_fh * fh;
699 size_t acl_len; 718 size_t acl_len;
700 unsigned int acl_pgbase;
701 struct page ** acl_pages; 719 struct page ** acl_pages;
702}; 720};
703 721
diff --git a/include/linux/sunrpc/bc_xprt.h b/include/linux/sunrpc/bc_xprt.h
index 8df43c9f11dc..4397a4824c81 100644
--- a/include/linux/sunrpc/bc_xprt.h
+++ b/include/linux/sunrpc/bc_xprt.h
@@ -38,6 +38,11 @@ void xprt_free_bc_request(struct rpc_rqst *req);
38int xprt_setup_backchannel(struct rpc_xprt *, unsigned int min_reqs); 38int xprt_setup_backchannel(struct rpc_xprt *, unsigned int min_reqs);
39void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs); 39void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs);
40 40
41/* Socket backchannel transport methods */
42int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs);
43void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs);
44void xprt_free_bc_rqst(struct rpc_rqst *req);
45
41/* 46/*
42 * Determine if a shared backchannel is in use 47 * Determine if a shared backchannel is in use
43 */ 48 */
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 1e4438ea2380..f869807a0d0e 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -226,9 +226,13 @@ extern void svc_rdma_put_frmr(struct svcxprt_rdma *,
226 struct svc_rdma_fastreg_mr *); 226 struct svc_rdma_fastreg_mr *);
227extern void svc_sq_reap(struct svcxprt_rdma *); 227extern void svc_sq_reap(struct svcxprt_rdma *);
228extern void svc_rq_reap(struct svcxprt_rdma *); 228extern void svc_rq_reap(struct svcxprt_rdma *);
229extern struct svc_xprt_class svc_rdma_class;
230extern void svc_rdma_prep_reply_hdr(struct svc_rqst *); 229extern void svc_rdma_prep_reply_hdr(struct svc_rqst *);
231 230
231extern struct svc_xprt_class svc_rdma_class;
232#ifdef CONFIG_SUNRPC_BACKCHANNEL
233extern struct svc_xprt_class svc_rdma_bc_class;
234#endif
235
232/* svc_rdma.c */ 236/* svc_rdma.c */
233extern int svc_rdma_init(void); 237extern int svc_rdma_init(void);
234extern void svc_rdma_cleanup(void); 238extern void svc_rdma_cleanup(void);
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 0fb9acbb4780..69ef5b3ab038 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -54,6 +54,8 @@ enum rpc_display_format_t {
54struct rpc_task; 54struct rpc_task;
55struct rpc_xprt; 55struct rpc_xprt;
56struct seq_file; 56struct seq_file;
57struct svc_serv;
58struct net;
57 59
58/* 60/*
59 * This describes a complete RPC request 61 * This describes a complete RPC request
@@ -136,6 +138,12 @@ struct rpc_xprt_ops {
136 int (*enable_swap)(struct rpc_xprt *xprt); 138 int (*enable_swap)(struct rpc_xprt *xprt);
137 void (*disable_swap)(struct rpc_xprt *xprt); 139 void (*disable_swap)(struct rpc_xprt *xprt);
138 void (*inject_disconnect)(struct rpc_xprt *xprt); 140 void (*inject_disconnect)(struct rpc_xprt *xprt);
141 int (*bc_setup)(struct rpc_xprt *xprt,
142 unsigned int min_reqs);
143 int (*bc_up)(struct svc_serv *serv, struct net *net);
144 void (*bc_free_rqst)(struct rpc_rqst *rqst);
145 void (*bc_destroy)(struct rpc_xprt *xprt,
146 unsigned int max_reqs);
139}; 147};
140 148
141/* 149/*
@@ -153,6 +161,7 @@ enum xprt_transports {
153 XPRT_TRANSPORT_TCP = IPPROTO_TCP, 161 XPRT_TRANSPORT_TCP = IPPROTO_TCP,
154 XPRT_TRANSPORT_BC_TCP = IPPROTO_TCP | XPRT_TRANSPORT_BC, 162 XPRT_TRANSPORT_BC_TCP = IPPROTO_TCP | XPRT_TRANSPORT_BC,
155 XPRT_TRANSPORT_RDMA = 256, 163 XPRT_TRANSPORT_RDMA = 256,
164 XPRT_TRANSPORT_BC_RDMA = XPRT_TRANSPORT_RDMA | XPRT_TRANSPORT_BC,
156 XPRT_TRANSPORT_LOCAL = 257, 165 XPRT_TRANSPORT_LOCAL = 257,
157}; 166};
158 167
diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h
index 357e44c1a46b..0ece4ba06f06 100644
--- a/include/linux/sunrpc/xprtsock.h
+++ b/include/linux/sunrpc/xprtsock.h
@@ -44,6 +44,8 @@ struct sock_xprt {
44 */ 44 */
45 unsigned long sock_state; 45 unsigned long sock_state;
46 struct delayed_work connect_worker; 46 struct delayed_work connect_worker;
47 struct work_struct recv_worker;
48 struct mutex recv_mutex;
47 struct sockaddr_storage srcaddr; 49 struct sockaddr_storage srcaddr;
48 unsigned short srcport; 50 unsigned short srcport;
49 51
diff --git a/include/uapi/linux/nfs.h b/include/uapi/linux/nfs.h
index 5199a36dd574..654bae3f1a38 100644
--- a/include/uapi/linux/nfs.h
+++ b/include/uapi/linux/nfs.h
@@ -7,6 +7,8 @@
7#ifndef _UAPI_LINUX_NFS_H 7#ifndef _UAPI_LINUX_NFS_H
8#define _UAPI_LINUX_NFS_H 8#define _UAPI_LINUX_NFS_H
9 9
10#include <linux/types.h>
11
10#define NFS_PROGRAM 100003 12#define NFS_PROGRAM 100003
11#define NFS_PORT 2049 13#define NFS_PORT 2049
12#define NFS_MAXDATA 8192 14#define NFS_MAXDATA 8192
@@ -31,6 +33,17 @@
31 33
32#define NFS_PIPE_DIRNAME "nfs" 34#define NFS_PIPE_DIRNAME "nfs"
33 35
36/* NFS ioctls */
37/* Let's follow btrfs lead on CLONE to avoid messing userspace */
38#define NFS_IOC_CLONE _IOW(0x94, 9, int)
39#define NFS_IOC_CLONE_RANGE _IOW(0x94, 13, int)
40
41struct nfs_ioctl_clone_range_args {
42 __s64 src_fd;
43 __u64 src_off, count;
44 __u64 dst_off;
45};
46
34/* 47/*
35 * NFS stats. The good thing with these values is that NFSv3 errors are 48 * NFS stats. The good thing with these values is that NFSv3 errors are
36 * a superset of NFSv2 errors (with the exception of NFSERR_WFLUSH which 49 * a superset of NFSv2 errors (with the exception of NFSERR_WFLUSH which
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 6255d141133b..229956bf8457 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -138,6 +138,14 @@ out_free:
138 */ 138 */
139int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) 139int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
140{ 140{
141 if (!xprt->ops->bc_setup)
142 return 0;
143 return xprt->ops->bc_setup(xprt, min_reqs);
144}
145EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
146
147int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
148{
141 struct rpc_rqst *req; 149 struct rpc_rqst *req;
142 struct list_head tmp_list; 150 struct list_head tmp_list;
143 int i; 151 int i;
@@ -192,7 +200,6 @@ out_free:
192 dprintk("RPC: setup backchannel transport failed\n"); 200 dprintk("RPC: setup backchannel transport failed\n");
193 return -ENOMEM; 201 return -ENOMEM;
194} 202}
195EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
196 203
197/** 204/**
198 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures. 205 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
@@ -205,6 +212,13 @@ EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
205 */ 212 */
206void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) 213void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
207{ 214{
215 if (xprt->ops->bc_destroy)
216 xprt->ops->bc_destroy(xprt, max_reqs);
217}
218EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
219
220void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
221{
208 struct rpc_rqst *req = NULL, *tmp = NULL; 222 struct rpc_rqst *req = NULL, *tmp = NULL;
209 223
210 dprintk("RPC: destroy backchannel transport\n"); 224 dprintk("RPC: destroy backchannel transport\n");
@@ -227,7 +241,6 @@ out:
227 dprintk("RPC: backchannel list empty= %s\n", 241 dprintk("RPC: backchannel list empty= %s\n",
228 list_empty(&xprt->bc_pa_list) ? "true" : "false"); 242 list_empty(&xprt->bc_pa_list) ? "true" : "false");
229} 243}
230EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
231 244
232static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid) 245static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)
233{ 246{
@@ -264,6 +277,13 @@ void xprt_free_bc_request(struct rpc_rqst *req)
264{ 277{
265 struct rpc_xprt *xprt = req->rq_xprt; 278 struct rpc_xprt *xprt = req->rq_xprt;
266 279
280 xprt->ops->bc_free_rqst(req);
281}
282
283void xprt_free_bc_rqst(struct rpc_rqst *req)
284{
285 struct rpc_xprt *xprt = req->rq_xprt;
286
267 dprintk("RPC: free backchannel req=%p\n", req); 287 dprintk("RPC: free backchannel req=%p\n", req);
268 288
269 req->rq_connect_cookie = xprt->connect_cookie - 1; 289 req->rq_connect_cookie = xprt->connect_cookie - 1;
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index a8f579df14d8..bc5b7b5032ca 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1367,11 +1367,6 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1367 /* reset result send buffer "put" position */ 1367 /* reset result send buffer "put" position */
1368 resv->iov_len = 0; 1368 resv->iov_len = 0;
1369 1369
1370 if (rqstp->rq_prot != IPPROTO_TCP) {
1371 printk(KERN_ERR "No support for Non-TCP transports!\n");
1372 BUG();
1373 }
1374
1375 /* 1370 /*
1376 * Skip the next two words because they've already been 1371 * Skip the next two words because they've already been
1377 * processed in the transport 1372 * processed in the transport
diff --git a/net/sunrpc/sysctl.c b/net/sunrpc/sysctl.c
index 887f0183b4c6..c88d9bc06f5c 100644
--- a/net/sunrpc/sysctl.c
+++ b/net/sunrpc/sysctl.c
@@ -76,7 +76,7 @@ static int
76proc_dodebug(struct ctl_table *table, int write, 76proc_dodebug(struct ctl_table *table, int write,
77 void __user *buffer, size_t *lenp, loff_t *ppos) 77 void __user *buffer, size_t *lenp, loff_t *ppos)
78{ 78{
79 char tmpbuf[20], c, *s; 79 char tmpbuf[20], c, *s = NULL;
80 char __user *p; 80 char __user *p;
81 unsigned int value; 81 unsigned int value;
82 size_t left, len; 82 size_t left, len;
@@ -103,23 +103,24 @@ proc_dodebug(struct ctl_table *table, int write,
103 return -EFAULT; 103 return -EFAULT;
104 tmpbuf[left] = '\0'; 104 tmpbuf[left] = '\0';
105 105
106 for (s = tmpbuf, value = 0; '0' <= *s && *s <= '9'; s++, left--) 106 value = simple_strtol(tmpbuf, &s, 0);
107 value = 10 * value + (*s - '0'); 107 if (s) {
108 if (*s && !isspace(*s)) 108 left -= (s - tmpbuf);
109 return -EINVAL; 109 if (left && !isspace(*s))
110 while (left && isspace(*s)) 110 return -EINVAL;
111 left--, s++; 111 while (left && isspace(*s))
112 left--, s++;
113 } else
114 left = 0;
112 *(unsigned int *) table->data = value; 115 *(unsigned int *) table->data = value;
113 /* Display the RPC tasks on writing to rpc_debug */ 116 /* Display the RPC tasks on writing to rpc_debug */
114 if (strcmp(table->procname, "rpc_debug") == 0) 117 if (strcmp(table->procname, "rpc_debug") == 0)
115 rpc_show_tasks(&init_net); 118 rpc_show_tasks(&init_net);
116 } else { 119 } else {
117 if (!access_ok(VERIFY_WRITE, buffer, left)) 120 len = sprintf(tmpbuf, "0x%04x", *(unsigned int *) table->data);
118 return -EFAULT;
119 len = sprintf(tmpbuf, "%d", *(unsigned int *) table->data);
120 if (len > left) 121 if (len > left)
121 len = left; 122 len = left;
122 if (__copy_to_user(buffer, tmpbuf, len)) 123 if (copy_to_user(buffer, tmpbuf, len))
123 return -EFAULT; 124 return -EFAULT;
124 if ((left -= len) > 0) { 125 if ((left -= len) > 0) {
125 if (put_user('\n', (char __user *)buffer + len)) 126 if (put_user('\n', (char __user *)buffer + len))
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 48913de240bd..33f99d3004f2 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
5 svc_rdma.o svc_rdma_transport.o \ 5 svc_rdma.o svc_rdma_transport.o \
6 svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ 6 svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
7 module.o 7 module.o
8rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
new file mode 100644
index 000000000000..2dcb44f69e53
--- /dev/null
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -0,0 +1,394 @@
1/*
2 * Copyright (c) 2015 Oracle. All rights reserved.
3 *
4 * Support for backward direction RPCs on RPC/RDMA.
5 */
6
7#include <linux/module.h>
8#include <linux/sunrpc/xprt.h>
9#include <linux/sunrpc/svc.h>
10#include <linux/sunrpc/svc_xprt.h>
11
12#include "xprt_rdma.h"
13
14#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
15# define RPCDBG_FACILITY RPCDBG_TRANS
16#endif
17
18#define RPCRDMA_BACKCHANNEL_DEBUG
19
20static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
21 struct rpc_rqst *rqst)
22{
23 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
24 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
25
26 spin_lock(&buf->rb_reqslock);
27 list_del(&req->rl_all);
28 spin_unlock(&buf->rb_reqslock);
29
30 rpcrdma_destroy_req(&r_xprt->rx_ia, req);
31
32 kfree(rqst);
33}
34
35static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
36 struct rpc_rqst *rqst)
37{
38 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
39 struct rpcrdma_regbuf *rb;
40 struct rpcrdma_req *req;
41 struct xdr_buf *buf;
42 size_t size;
43
44 req = rpcrdma_create_req(r_xprt);
45 if (!req)
46 return -ENOMEM;
47 req->rl_backchannel = true;
48
49 size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
50 rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
51 if (IS_ERR(rb))
52 goto out_fail;
53 req->rl_rdmabuf = rb;
54
55 size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
56 rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
57 if (IS_ERR(rb))
58 goto out_fail;
59 rb->rg_owner = req;
60 req->rl_sendbuf = rb;
61 /* so that rpcr_to_rdmar works when receiving a request */
62 rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
63
64 buf = &rqst->rq_snd_buf;
65 buf->head[0].iov_base = rqst->rq_buffer;
66 buf->head[0].iov_len = 0;
67 buf->tail[0].iov_base = NULL;
68 buf->tail[0].iov_len = 0;
69 buf->page_len = 0;
70 buf->len = 0;
71 buf->buflen = size;
72
73 return 0;
74
75out_fail:
76 rpcrdma_bc_free_rqst(r_xprt, rqst);
77 return -ENOMEM;
78}
79
80/* Allocate and add receive buffers to the rpcrdma_buffer's
81 * existing list of rep's. These are released when the
82 * transport is destroyed.
83 */
84static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
85 unsigned int count)
86{
87 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
88 struct rpcrdma_rep *rep;
89 unsigned long flags;
90 int rc = 0;
91
92 while (count--) {
93 rep = rpcrdma_create_rep(r_xprt);
94 if (IS_ERR(rep)) {
95 pr_err("RPC: %s: reply buffer alloc failed\n",
96 __func__);
97 rc = PTR_ERR(rep);
98 break;
99 }
100
101 spin_lock_irqsave(&buffers->rb_lock, flags);
102 list_add(&rep->rr_list, &buffers->rb_recv_bufs);
103 spin_unlock_irqrestore(&buffers->rb_lock, flags);
104 }
105
106 return rc;
107}
108
109/**
110 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
111 * @xprt: transport associated with these backchannel resources
112 * @reqs: number of concurrent incoming requests to expect
113 *
114 * Returns 0 on success; otherwise a negative errno
115 */
116int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
117{
118 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
119 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
120 struct rpc_rqst *rqst;
121 unsigned int i;
122 int rc;
123
124 /* The backchannel reply path returns each rpc_rqst to the
125 * bc_pa_list _after_ the reply is sent. If the server is
126 * faster than the client, it can send another backward
127 * direction request before the rpc_rqst is returned to the
128 * list. The client rejects the request in this case.
129 *
130 * Twice as many rpc_rqsts are prepared to ensure there is
131 * always an rpc_rqst available as soon as a reply is sent.
132 */
133 if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
134 goto out_err;
135
136 for (i = 0; i < (reqs << 1); i++) {
137 rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
138 if (!rqst) {
139 pr_err("RPC: %s: Failed to create bc rpc_rqst\n",
140 __func__);
141 goto out_free;
142 }
143
144 rqst->rq_xprt = &r_xprt->rx_xprt;
145 INIT_LIST_HEAD(&rqst->rq_list);
146 INIT_LIST_HEAD(&rqst->rq_bc_list);
147
148 if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
149 goto out_free;
150
151 spin_lock_bh(&xprt->bc_pa_lock);
152 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
153 spin_unlock_bh(&xprt->bc_pa_lock);
154 }
155
156 rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
157 if (rc)
158 goto out_free;
159
160 rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
161 if (rc)
162 goto out_free;
163
164 buffer->rb_bc_srv_max_requests = reqs;
165 request_module("svcrdma");
166
167 return 0;
168
169out_free:
170 xprt_rdma_bc_destroy(xprt, reqs);
171
172out_err:
173 pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
174 return -ENOMEM;
175}
176
177/**
178 * xprt_rdma_bc_up - Create transport endpoint for backchannel service
179 * @serv: server endpoint
180 * @net: network namespace
181 *
182 * The "xprt" is an implied argument: it supplies the name of the
183 * backchannel transport class.
184 *
185 * Returns zero on success, negative errno on failure
186 */
187int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
188{
189 int ret;
190
191 ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
192 if (ret < 0)
193 return ret;
194 return 0;
195}
196
197/**
198 * rpcrdma_bc_marshal_reply - Send backwards direction reply
199 * @rqst: buffer containing RPC reply data
200 *
201 * Returns zero on success.
202 */
203int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
204{
205 struct rpc_xprt *xprt = rqst->rq_xprt;
206 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
207 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
208 struct rpcrdma_msg *headerp;
209 size_t rpclen;
210
211 headerp = rdmab_to_msg(req->rl_rdmabuf);
212 headerp->rm_xid = rqst->rq_xid;
213 headerp->rm_vers = rpcrdma_version;
214 headerp->rm_credit =
215 cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
216 headerp->rm_type = rdma_msg;
217 headerp->rm_body.rm_chunks[0] = xdr_zero;
218 headerp->rm_body.rm_chunks[1] = xdr_zero;
219 headerp->rm_body.rm_chunks[2] = xdr_zero;
220
221 rpclen = rqst->rq_svec[0].iov_len;
222
223 pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
224 __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
225 pr_info("RPC: %s: RPC/RDMA: %*ph\n",
226 __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
227 pr_info("RPC: %s: RPC: %*ph\n",
228 __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
229
230 req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
231 req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
232 req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
233
234 req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
235 req->rl_send_iov[1].length = rpclen;
236 req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
237
238 req->rl_niovs = 2;
239 return 0;
240}
241
242/**
243 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
244 * @xprt: transport associated with these backchannel resources
245 * @reqs: number of incoming requests to destroy; ignored
246 */
247void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
248{
249 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
250 struct rpc_rqst *rqst, *tmp;
251
252 spin_lock_bh(&xprt->bc_pa_lock);
253 list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
254 list_del(&rqst->rq_bc_pa_list);
255 spin_unlock_bh(&xprt->bc_pa_lock);
256
257 rpcrdma_bc_free_rqst(r_xprt, rqst);
258
259 spin_lock_bh(&xprt->bc_pa_lock);
260 }
261 spin_unlock_bh(&xprt->bc_pa_lock);
262}
263
264/**
265 * xprt_rdma_bc_free_rqst - Release a backchannel rqst
266 * @rqst: request to release
267 */
268void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
269{
270 struct rpc_xprt *xprt = rqst->rq_xprt;
271
272 smp_mb__before_atomic();
273 WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
274 clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
275 smp_mb__after_atomic();
276
277 spin_lock_bh(&xprt->bc_pa_lock);
278 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
279 spin_unlock_bh(&xprt->bc_pa_lock);
280}
281
282/**
283 * rpcrdma_bc_receive_call - Handle a backward direction call
284 * @xprt: transport receiving the call
285 * @rep: receive buffer containing the call
286 *
287 * Called in the RPC reply handler, which runs in a tasklet.
288 * Be quick about it.
289 *
290 * Operational assumptions:
291 * o Backchannel credits are ignored, just as the NFS server
292 * forechannel currently does
293 * o The ULP manages a replay cache (eg, NFSv4.1 sessions).
294 * No replay detection is done at the transport level
295 */
296void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
297 struct rpcrdma_rep *rep)
298{
299 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
300 struct rpcrdma_msg *headerp;
301 struct svc_serv *bc_serv;
302 struct rpcrdma_req *req;
303 struct rpc_rqst *rqst;
304 struct xdr_buf *buf;
305 size_t size;
306 __be32 *p;
307
308 headerp = rdmab_to_msg(rep->rr_rdmabuf);
309#ifdef RPCRDMA_BACKCHANNEL_DEBUG
310 pr_info("RPC: %s: callback XID %08x, length=%u\n",
311 __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
312 pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp);
313#endif
314
315 /* Sanity check:
316 * Need at least enough bytes for RPC/RDMA header, as code
317 * here references the header fields by array offset. Also,
318 * backward calls are always inline, so ensure there
319 * are some bytes beyond the RPC/RDMA header.
320 */
321 if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
322 goto out_short;
323 p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
324 size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
325
326 /* Grab a free bc rqst */
327 spin_lock(&xprt->bc_pa_lock);
328 if (list_empty(&xprt->bc_pa_list)) {
329 spin_unlock(&xprt->bc_pa_lock);
330 goto out_overflow;
331 }
332 rqst = list_first_entry(&xprt->bc_pa_list,
333 struct rpc_rqst, rq_bc_pa_list);
334 list_del(&rqst->rq_bc_pa_list);
335 spin_unlock(&xprt->bc_pa_lock);
336#ifdef RPCRDMA_BACKCHANNEL_DEBUG
337 pr_info("RPC: %s: using rqst %p\n", __func__, rqst);
338#endif
339
340 /* Prepare rqst */
341 rqst->rq_reply_bytes_recvd = 0;
342 rqst->rq_bytes_sent = 0;
343 rqst->rq_xid = headerp->rm_xid;
344 set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
345
346 buf = &rqst->rq_rcv_buf;
347 memset(buf, 0, sizeof(*buf));
348 buf->head[0].iov_base = p;
349 buf->head[0].iov_len = size;
350 buf->len = size;
351
352 /* The receive buffer has to be hooked to the rpcrdma_req
353 * so that it can be reposted after the server is done
354 * parsing it but just before sending the backward
355 * direction reply.
356 */
357 req = rpcr_to_rdmar(rqst);
358#ifdef RPCRDMA_BACKCHANNEL_DEBUG
359 pr_info("RPC: %s: attaching rep %p to req %p\n",
360 __func__, rep, req);
361#endif
362 req->rl_reply = rep;
363
364 /* Defeat the retransmit detection logic in send_request */
365 req->rl_connect_cookie = 0;
366
367 /* Queue rqst for ULP's callback service */
368 bc_serv = xprt->bc_serv;
369 spin_lock(&bc_serv->sv_cb_lock);
370 list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
371 spin_unlock(&bc_serv->sv_cb_lock);
372
373 wake_up(&bc_serv->sv_cb_waitq);
374
375 r_xprt->rx_stats.bcall_count++;
376 return;
377
378out_overflow:
379 pr_warn("RPC/RDMA backchannel overflow\n");
380 xprt_disconnect_done(xprt);
381 /* This receive buffer gets reposted automatically
382 * when the connection is re-established.
383 */
384 return;
385
386out_short:
387 pr_warn("RPC/RDMA short backward direction call\n");
388
389 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
390 xprt_disconnect_done(xprt);
391 else
392 pr_warn("RPC: %s: reposting rep %p\n",
393 __func__, rep);
394}
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index a1434447b0d6..88cf9e7269c2 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -256,8 +256,11 @@ frwr_sendcompletion(struct ib_wc *wc)
256 256
257 /* WARNING: Only wr_id and status are reliable at this point */ 257 /* WARNING: Only wr_id and status are reliable at this point */
258 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; 258 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
259 pr_warn("RPC: %s: frmr %p flushed, status %s (%d)\n", 259 if (wc->status == IB_WC_WR_FLUSH_ERR)
260 __func__, r, ib_wc_status_msg(wc->status), wc->status); 260 dprintk("RPC: %s: frmr %p flushed\n", __func__, r);
261 else
262 pr_warn("RPC: %s: frmr %p error, status %s (%d)\n",
263 __func__, r, ib_wc_status_msg(wc->status), wc->status);
261 r->r.frmr.fr_state = FRMR_IS_STALE; 264 r->r.frmr.fr_state = FRMR_IS_STALE;
262} 265}
263 266
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index bc8bd6577467..c10d9699441c 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -441,6 +441,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
441 enum rpcrdma_chunktype rtype, wtype; 441 enum rpcrdma_chunktype rtype, wtype;
442 struct rpcrdma_msg *headerp; 442 struct rpcrdma_msg *headerp;
443 443
444#if defined(CONFIG_SUNRPC_BACKCHANNEL)
445 if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
446 return rpcrdma_bc_marshal_reply(rqst);
447#endif
448
444 /* 449 /*
445 * rpclen gets amount of data in first buffer, which is the 450 * rpclen gets amount of data in first buffer, which is the
446 * pre-registered buffer. 451 * pre-registered buffer.
@@ -711,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work)
711 spin_unlock_bh(&xprt->transport_lock); 716 spin_unlock_bh(&xprt->transport_lock);
712} 717}
713 718
719#if defined(CONFIG_SUNRPC_BACKCHANNEL)
720/* By convention, backchannel calls arrive via rdma_msg type
721 * messages, and never populate the chunk lists. This makes
722 * the RPC/RDMA header small and fixed in size, so it is
723 * straightforward to check the RPC header's direction field.
724 */
725static bool
726rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
727{
728 __be32 *p = (__be32 *)headerp;
729
730 if (headerp->rm_type != rdma_msg)
731 return false;
732 if (headerp->rm_body.rm_chunks[0] != xdr_zero)
733 return false;
734 if (headerp->rm_body.rm_chunks[1] != xdr_zero)
735 return false;
736 if (headerp->rm_body.rm_chunks[2] != xdr_zero)
737 return false;
738
739 /* sanity */
740 if (p[7] != headerp->rm_xid)
741 return false;
742 /* call direction */
743 if (p[8] != cpu_to_be32(RPC_CALL))
744 return false;
745
746 return true;
747}
748#endif /* CONFIG_SUNRPC_BACKCHANNEL */
749
714/* 750/*
715 * This function is called when an async event is posted to 751 * This function is called when an async event is posted to
716 * the connection which changes the connection state. All it 752 * the connection which changes the connection state. All it
@@ -723,8 +759,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
723 schedule_delayed_work(&ep->rep_connect_worker, 0); 759 schedule_delayed_work(&ep->rep_connect_worker, 0);
724} 760}
725 761
726/* 762/* Process received RPC/RDMA messages.
727 * Called as a tasklet to do req/reply match and complete a request 763 *
728 * Errors must result in the RPC task either being awakened, or 764 * Errors must result in the RPC task either being awakened, or
729 * allowed to timeout, to discover the errors at that time. 765 * allowed to timeout, to discover the errors at that time.
730 */ 766 */
@@ -741,52 +777,32 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
741 unsigned long cwnd; 777 unsigned long cwnd;
742 u32 credits; 778 u32 credits;
743 779
744 /* Check status. If bad, signal disconnect and return rep to pool */ 780 dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
745 if (rep->rr_len == ~0U) { 781
746 rpcrdma_recv_buffer_put(rep); 782 if (rep->rr_len == RPCRDMA_BAD_LEN)
747 if (r_xprt->rx_ep.rep_connected == 1) { 783 goto out_badstatus;
748 r_xprt->rx_ep.rep_connected = -EIO; 784 if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
749 rpcrdma_conn_func(&r_xprt->rx_ep); 785 goto out_shortreply;
750 } 786
751 return;
752 }
753 if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
754 dprintk("RPC: %s: short/invalid reply\n", __func__);
755 goto repost;
756 }
757 headerp = rdmab_to_msg(rep->rr_rdmabuf); 787 headerp = rdmab_to_msg(rep->rr_rdmabuf);
758 if (headerp->rm_vers != rpcrdma_version) { 788 if (headerp->rm_vers != rpcrdma_version)
759 dprintk("RPC: %s: invalid version %d\n", 789 goto out_badversion;
760 __func__, be32_to_cpu(headerp->rm_vers)); 790#if defined(CONFIG_SUNRPC_BACKCHANNEL)
761 goto repost; 791 if (rpcrdma_is_bcall(headerp))
762 } 792 goto out_bcall;
793#endif
763 794
764 /* Get XID and try for a match. */ 795 /* Match incoming rpcrdma_rep to an rpcrdma_req to
765 spin_lock(&xprt->transport_lock); 796 * get context for handling any incoming chunks.
797 */
798 spin_lock_bh(&xprt->transport_lock);
766 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); 799 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
767 if (rqst == NULL) { 800 if (!rqst)
768 spin_unlock(&xprt->transport_lock); 801 goto out_nomatch;
769 dprintk("RPC: %s: reply 0x%p failed "
770 "to match any request xid 0x%08x len %d\n",
771 __func__, rep, be32_to_cpu(headerp->rm_xid),
772 rep->rr_len);
773repost:
774 r_xprt->rx_stats.bad_reply_count++;
775 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
776 rpcrdma_recv_buffer_put(rep);
777 802
778 return;
779 }
780
781 /* get request object */
782 req = rpcr_to_rdmar(rqst); 803 req = rpcr_to_rdmar(rqst);
783 if (req->rl_reply) { 804 if (req->rl_reply)
784 spin_unlock(&xprt->transport_lock); 805 goto out_duplicate;
785 dprintk("RPC: %s: duplicate reply 0x%p to RPC "
786 "request 0x%p: xid 0x%08x\n", __func__, rep, req,
787 be32_to_cpu(headerp->rm_xid));
788 goto repost;
789 }
790 806
791 dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" 807 dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
792 " RPC request 0x%p xid 0x%08x\n", 808 " RPC request 0x%p xid 0x%08x\n",
@@ -883,8 +899,50 @@ badheader:
883 if (xprt->cwnd > cwnd) 899 if (xprt->cwnd > cwnd)
884 xprt_release_rqst_cong(rqst->rq_task); 900 xprt_release_rqst_cong(rqst->rq_task);
885 901
902 xprt_complete_rqst(rqst->rq_task, status);
903 spin_unlock_bh(&xprt->transport_lock);
886 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", 904 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
887 __func__, xprt, rqst, status); 905 __func__, xprt, rqst, status);
888 xprt_complete_rqst(rqst->rq_task, status); 906 return;
889 spin_unlock(&xprt->transport_lock); 907
908out_badstatus:
909 rpcrdma_recv_buffer_put(rep);
910 if (r_xprt->rx_ep.rep_connected == 1) {
911 r_xprt->rx_ep.rep_connected = -EIO;
912 rpcrdma_conn_func(&r_xprt->rx_ep);
913 }
914 return;
915
916#if defined(CONFIG_SUNRPC_BACKCHANNEL)
917out_bcall:
918 rpcrdma_bc_receive_call(r_xprt, rep);
919 return;
920#endif
921
922out_shortreply:
923 dprintk("RPC: %s: short/invalid reply\n", __func__);
924 goto repost;
925
926out_badversion:
927 dprintk("RPC: %s: invalid version %d\n",
928 __func__, be32_to_cpu(headerp->rm_vers));
929 goto repost;
930
931out_nomatch:
932 spin_unlock_bh(&xprt->transport_lock);
933 dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
934 __func__, be32_to_cpu(headerp->rm_xid),
935 rep->rr_len);
936 goto repost;
937
938out_duplicate:
939 spin_unlock_bh(&xprt->transport_lock);
940 dprintk("RPC: %s: "
941 "duplicate reply %p to RPC request %p: xid 0x%08x\n",
942 __func__, rep, req, be32_to_cpu(headerp->rm_xid));
943
944repost:
945 r_xprt->rx_stats.bad_reply_count++;
946 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
947 rpcrdma_recv_buffer_put(rep);
890} 948}
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index 2cd252f023a5..1b7051bdbdc8 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -239,6 +239,9 @@ void svc_rdma_cleanup(void)
239 unregister_sysctl_table(svcrdma_table_header); 239 unregister_sysctl_table(svcrdma_table_header);
240 svcrdma_table_header = NULL; 240 svcrdma_table_header = NULL;
241 } 241 }
242#if defined(CONFIG_SUNRPC_BACKCHANNEL)
243 svc_unreg_xprt_class(&svc_rdma_bc_class);
244#endif
242 svc_unreg_xprt_class(&svc_rdma_class); 245 svc_unreg_xprt_class(&svc_rdma_class);
243 kmem_cache_destroy(svc_rdma_map_cachep); 246 kmem_cache_destroy(svc_rdma_map_cachep);
244 kmem_cache_destroy(svc_rdma_ctxt_cachep); 247 kmem_cache_destroy(svc_rdma_ctxt_cachep);
@@ -286,6 +289,9 @@ int svc_rdma_init(void)
286 289
287 /* Register RDMA with the SVC transport switch */ 290 /* Register RDMA with the SVC transport switch */
288 svc_reg_xprt_class(&svc_rdma_class); 291 svc_reg_xprt_class(&svc_rdma_class);
292#if defined(CONFIG_SUNRPC_BACKCHANNEL)
293 svc_reg_xprt_class(&svc_rdma_bc_class);
294#endif
289 return 0; 295 return 0;
290 err1: 296 err1:
291 kmem_cache_destroy(svc_rdma_map_cachep); 297 kmem_cache_destroy(svc_rdma_map_cachep);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index a266e870d870..b348b4adef29 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -56,6 +56,7 @@
56 56
57#define RPCDBG_FACILITY RPCDBG_SVCXPRT 57#define RPCDBG_FACILITY RPCDBG_SVCXPRT
58 58
59static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
59static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 60static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
60 struct net *net, 61 struct net *net,
61 struct sockaddr *sa, int salen, 62 struct sockaddr *sa, int salen,
@@ -95,6 +96,63 @@ struct svc_xprt_class svc_rdma_class = {
95 .xcl_ident = XPRT_TRANSPORT_RDMA, 96 .xcl_ident = XPRT_TRANSPORT_RDMA,
96}; 97};
97 98
99#if defined(CONFIG_SUNRPC_BACKCHANNEL)
100static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
101 struct sockaddr *, int, int);
102static void svc_rdma_bc_detach(struct svc_xprt *);
103static void svc_rdma_bc_free(struct svc_xprt *);
104
105static struct svc_xprt_ops svc_rdma_bc_ops = {
106 .xpo_create = svc_rdma_bc_create,
107 .xpo_detach = svc_rdma_bc_detach,
108 .xpo_free = svc_rdma_bc_free,
109 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
110 .xpo_secure_port = svc_rdma_secure_port,
111};
112
113struct svc_xprt_class svc_rdma_bc_class = {
114 .xcl_name = "rdma-bc",
115 .xcl_owner = THIS_MODULE,
116 .xcl_ops = &svc_rdma_bc_ops,
117 .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
118};
119
120static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
121 struct net *net,
122 struct sockaddr *sa, int salen,
123 int flags)
124{
125 struct svcxprt_rdma *cma_xprt;
126 struct svc_xprt *xprt;
127
128 cma_xprt = rdma_create_xprt(serv, 0);
129 if (!cma_xprt)
130 return ERR_PTR(-ENOMEM);
131 xprt = &cma_xprt->sc_xprt;
132
133 svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
134 serv->sv_bc_xprt = xprt;
135
136 dprintk("svcrdma: %s(%p)\n", __func__, xprt);
137 return xprt;
138}
139
140static void svc_rdma_bc_detach(struct svc_xprt *xprt)
141{
142 dprintk("svcrdma: %s(%p)\n", __func__, xprt);
143}
144
145static void svc_rdma_bc_free(struct svc_xprt *xprt)
146{
147 struct svcxprt_rdma *rdma =
148 container_of(xprt, struct svcxprt_rdma, sc_xprt);
149
150 dprintk("svcrdma: %s(%p)\n", __func__, xprt);
151 if (xprt)
152 kfree(rdma);
153}
154#endif /* CONFIG_SUNRPC_BACKCHANNEL */
155
98struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 156struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
99{ 157{
100 struct svc_rdma_op_ctxt *ctxt; 158 struct svc_rdma_op_ctxt *ctxt;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 41e452bc580c..8c545f7d7525 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -676,7 +676,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
676static int 676static int
677xprt_rdma_enable_swap(struct rpc_xprt *xprt) 677xprt_rdma_enable_swap(struct rpc_xprt *xprt)
678{ 678{
679 return -EINVAL; 679 return 0;
680} 680}
681 681
682static void 682static void
@@ -705,7 +705,13 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
705 .print_stats = xprt_rdma_print_stats, 705 .print_stats = xprt_rdma_print_stats,
706 .enable_swap = xprt_rdma_enable_swap, 706 .enable_swap = xprt_rdma_enable_swap,
707 .disable_swap = xprt_rdma_disable_swap, 707 .disable_swap = xprt_rdma_disable_swap,
708 .inject_disconnect = xprt_rdma_inject_disconnect 708 .inject_disconnect = xprt_rdma_inject_disconnect,
709#if defined(CONFIG_SUNRPC_BACKCHANNEL)
710 .bc_setup = xprt_rdma_bc_setup,
711 .bc_up = xprt_rdma_bc_up,
712 .bc_free_rqst = xprt_rdma_bc_free_rqst,
713 .bc_destroy = xprt_rdma_bc_destroy,
714#endif
709}; 715};
710 716
711static struct xprt_class xprt_rdma = { 717static struct xprt_class xprt_rdma = {
@@ -732,6 +738,7 @@ void xprt_rdma_cleanup(void)
732 dprintk("RPC: %s: xprt_unregister returned %i\n", 738 dprintk("RPC: %s: xprt_unregister returned %i\n",
733 __func__, rc); 739 __func__, rc);
734 740
741 rpcrdma_destroy_wq();
735 frwr_destroy_recovery_wq(); 742 frwr_destroy_recovery_wq();
736} 743}
737 744
@@ -743,8 +750,15 @@ int xprt_rdma_init(void)
743 if (rc) 750 if (rc)
744 return rc; 751 return rc;
745 752
753 rc = rpcrdma_alloc_wq();
754 if (rc) {
755 frwr_destroy_recovery_wq();
756 return rc;
757 }
758
746 rc = xprt_register_transport(&xprt_rdma); 759 rc = xprt_register_transport(&xprt_rdma);
747 if (rc) { 760 if (rc) {
761 rpcrdma_destroy_wq();
748 frwr_destroy_recovery_wq(); 762 frwr_destroy_recovery_wq();
749 return rc; 763 return rc;
750 } 764 }
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index f63369bd01c5..eadd1655145a 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -68,47 +68,33 @@
68 * internal functions 68 * internal functions
69 */ 69 */
70 70
71/* 71static struct workqueue_struct *rpcrdma_receive_wq;
72 * handle replies in tasklet context, using a single, global list
73 * rdma tasklet function -- just turn around and call the func
74 * for all replies on the list
75 */
76
77static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
78static LIST_HEAD(rpcrdma_tasklets_g);
79 72
80static void 73int
81rpcrdma_run_tasklet(unsigned long data) 74rpcrdma_alloc_wq(void)
82{ 75{
83 struct rpcrdma_rep *rep; 76 struct workqueue_struct *recv_wq;
84 unsigned long flags;
85
86 data = data;
87 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88 while (!list_empty(&rpcrdma_tasklets_g)) {
89 rep = list_entry(rpcrdma_tasklets_g.next,
90 struct rpcrdma_rep, rr_list);
91 list_del(&rep->rr_list);
92 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93 77
94 rpcrdma_reply_handler(rep); 78 recv_wq = alloc_workqueue("xprtrdma_receive",
79 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
80 0);
81 if (!recv_wq)
82 return -ENOMEM;
95 83
96 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); 84 rpcrdma_receive_wq = recv_wq;
97 } 85 return 0;
98 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
99} 86}
100 87
101static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL); 88void
102 89rpcrdma_destroy_wq(void)
103static void
104rpcrdma_schedule_tasklet(struct list_head *sched_list)
105{ 90{
106 unsigned long flags; 91 struct workqueue_struct *wq;
107 92
108 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); 93 if (rpcrdma_receive_wq) {
109 list_splice_tail(sched_list, &rpcrdma_tasklets_g); 94 wq = rpcrdma_receive_wq;
110 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); 95 rpcrdma_receive_wq = NULL;
111 tasklet_schedule(&rpcrdma_tasklet_g); 96 destroy_workqueue(wq);
97 }
112} 98}
113 99
114static void 100static void
@@ -158,63 +144,54 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
158 } 144 }
159} 145}
160 146
161static int 147/* The common case is a single send completion is waiting. By
162rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) 148 * passing two WC entries to ib_poll_cq, a return code of 1
149 * means there is exactly one WC waiting and no more. We don't
150 * have to invoke ib_poll_cq again to know that the CQ has been
151 * properly drained.
152 */
153static void
154rpcrdma_sendcq_poll(struct ib_cq *cq)
163{ 155{
164 struct ib_wc *wcs; 156 struct ib_wc *pos, wcs[2];
165 int budget, count, rc; 157 int count, rc;
166 158
167 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
168 do { 159 do {
169 wcs = ep->rep_send_wcs; 160 pos = wcs;
170 161
171 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); 162 rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
172 if (rc <= 0) 163 if (rc < 0)
173 return rc; 164 break;
174 165
175 count = rc; 166 count = rc;
176 while (count-- > 0) 167 while (count-- > 0)
177 rpcrdma_sendcq_process_wc(wcs++); 168 rpcrdma_sendcq_process_wc(pos++);
178 } while (rc == RPCRDMA_POLLSIZE && --budget); 169 } while (rc == ARRAY_SIZE(wcs));
179 return 0; 170 return;
180} 171}
181 172
182/* 173/* Handle provider send completion upcalls.
183 * Handle send, fast_reg_mr, and local_inv completions.
184 *
185 * Send events are typically suppressed and thus do not result
186 * in an upcall. Occasionally one is signaled, however. This
187 * prevents the provider's completion queue from wrapping and
188 * losing a completion.
189 */ 174 */
190static void 175static void
191rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) 176rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
192{ 177{
193 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; 178 do {
194 int rc; 179 rpcrdma_sendcq_poll(cq);
195 180 } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
196 rc = rpcrdma_sendcq_poll(cq, ep); 181 IB_CQ_REPORT_MISSED_EVENTS) > 0);
197 if (rc) { 182}
198 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
199 __func__, rc);
200 return;
201 }
202 183
203 rc = ib_req_notify_cq(cq, 184static void
204 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); 185rpcrdma_receive_worker(struct work_struct *work)
205 if (rc == 0) 186{
206 return; 187 struct rpcrdma_rep *rep =
207 if (rc < 0) { 188 container_of(work, struct rpcrdma_rep, rr_work);
208 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
209 __func__, rc);
210 return;
211 }
212 189
213 rpcrdma_sendcq_poll(cq, ep); 190 rpcrdma_reply_handler(rep);
214} 191}
215 192
216static void 193static void
217rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) 194rpcrdma_recvcq_process_wc(struct ib_wc *wc)
218{ 195{
219 struct rpcrdma_rep *rep = 196 struct rpcrdma_rep *rep =
220 (struct rpcrdma_rep *)(unsigned long)wc->wr_id; 197 (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -237,91 +214,60 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
237 prefetch(rdmab_to_msg(rep->rr_rdmabuf)); 214 prefetch(rdmab_to_msg(rep->rr_rdmabuf));
238 215
239out_schedule: 216out_schedule:
240 list_add_tail(&rep->rr_list, sched_list); 217 queue_work(rpcrdma_receive_wq, &rep->rr_work);
241 return; 218 return;
219
242out_fail: 220out_fail:
243 if (wc->status != IB_WC_WR_FLUSH_ERR) 221 if (wc->status != IB_WC_WR_FLUSH_ERR)
244 pr_err("RPC: %s: rep %p: %s\n", 222 pr_err("RPC: %s: rep %p: %s\n",
245 __func__, rep, ib_wc_status_msg(wc->status)); 223 __func__, rep, ib_wc_status_msg(wc->status));
246 rep->rr_len = ~0U; 224 rep->rr_len = RPCRDMA_BAD_LEN;
247 goto out_schedule; 225 goto out_schedule;
248} 226}
249 227
250static int 228/* The wc array is on stack: automatic memory is always CPU-local.
251rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) 229 *
230 * struct ib_wc is 64 bytes, making the poll array potentially
231 * large. But this is at the bottom of the call chain. Further
232 * substantial work is done in another thread.
233 */
234static void
235rpcrdma_recvcq_poll(struct ib_cq *cq)
252{ 236{
253 struct list_head sched_list; 237 struct ib_wc *pos, wcs[4];
254 struct ib_wc *wcs; 238 int count, rc;
255 int budget, count, rc;
256 239
257 INIT_LIST_HEAD(&sched_list);
258 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
259 do { 240 do {
260 wcs = ep->rep_recv_wcs; 241 pos = wcs;
261 242
262 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); 243 rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
263 if (rc <= 0) 244 if (rc < 0)
264 goto out_schedule; 245 break;
265 246
266 count = rc; 247 count = rc;
267 while (count-- > 0) 248 while (count-- > 0)
268 rpcrdma_recvcq_process_wc(wcs++, &sched_list); 249 rpcrdma_recvcq_process_wc(pos++);
269 } while (rc == RPCRDMA_POLLSIZE && --budget); 250 } while (rc == ARRAY_SIZE(wcs));
270 rc = 0;
271
272out_schedule:
273 rpcrdma_schedule_tasklet(&sched_list);
274 return rc;
275} 251}
276 252
277/* 253/* Handle provider receive completion upcalls.
278 * Handle receive completions.
279 *
280 * It is reentrant but processes single events in order to maintain
281 * ordering of receives to keep server credits.
282 *
283 * It is the responsibility of the scheduled tasklet to return
284 * recv buffers to the pool. NOTE: this affects synchronization of
285 * connection shutdown. That is, the structures required for
286 * the completion of the reply handler must remain intact until
287 * all memory has been reclaimed.
288 */ 254 */
289static void 255static void
290rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) 256rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
291{ 257{
292 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; 258 do {
293 int rc; 259 rpcrdma_recvcq_poll(cq);
294 260 } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
295 rc = rpcrdma_recvcq_poll(cq, ep); 261 IB_CQ_REPORT_MISSED_EVENTS) > 0);
296 if (rc) {
297 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
298 __func__, rc);
299 return;
300 }
301
302 rc = ib_req_notify_cq(cq,
303 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
304 if (rc == 0)
305 return;
306 if (rc < 0) {
307 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
308 __func__, rc);
309 return;
310 }
311
312 rpcrdma_recvcq_poll(cq, ep);
313} 262}
314 263
315static void 264static void
316rpcrdma_flush_cqs(struct rpcrdma_ep *ep) 265rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
317{ 266{
318 struct ib_wc wc; 267 struct ib_wc wc;
319 LIST_HEAD(sched_list);
320 268
321 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) 269 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
322 rpcrdma_recvcq_process_wc(&wc, &sched_list); 270 rpcrdma_recvcq_process_wc(&wc);
323 if (!list_empty(&sched_list))
324 rpcrdma_schedule_tasklet(&sched_list);
325 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) 271 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
326 rpcrdma_sendcq_process_wc(&wc); 272 rpcrdma_sendcq_process_wc(&wc);
327} 273}
@@ -623,6 +569,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
623 struct ib_device_attr *devattr = &ia->ri_devattr; 569 struct ib_device_attr *devattr = &ia->ri_devattr;
624 struct ib_cq *sendcq, *recvcq; 570 struct ib_cq *sendcq, *recvcq;
625 struct ib_cq_init_attr cq_attr = {}; 571 struct ib_cq_init_attr cq_attr = {};
572 unsigned int max_qp_wr;
626 int rc, err; 573 int rc, err;
627 574
628 if (devattr->max_sge < RPCRDMA_MAX_IOVS) { 575 if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
@@ -631,18 +578,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
631 return -ENOMEM; 578 return -ENOMEM;
632 } 579 }
633 580
581 if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
582 dprintk("RPC: %s: insufficient wqe's available\n",
583 __func__);
584 return -ENOMEM;
585 }
586 max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
587
634 /* check provider's send/recv wr limits */ 588 /* check provider's send/recv wr limits */
635 if (cdata->max_requests > devattr->max_qp_wr) 589 if (cdata->max_requests > max_qp_wr)
636 cdata->max_requests = devattr->max_qp_wr; 590 cdata->max_requests = max_qp_wr;
637 591
638 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 592 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
639 ep->rep_attr.qp_context = ep; 593 ep->rep_attr.qp_context = ep;
640 ep->rep_attr.srq = NULL; 594 ep->rep_attr.srq = NULL;
641 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 595 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
596 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
642 rc = ia->ri_ops->ro_open(ia, ep, cdata); 597 rc = ia->ri_ops->ro_open(ia, ep, cdata);
643 if (rc) 598 if (rc)
644 return rc; 599 return rc;
645 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 600 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
601 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
646 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; 602 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
647 ep->rep_attr.cap.max_recv_sge = 1; 603 ep->rep_attr.cap.max_recv_sge = 1;
648 ep->rep_attr.cap.max_inline_data = 0; 604 ep->rep_attr.cap.max_inline_data = 0;
@@ -670,7 +626,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
670 626
671 cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1; 627 cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
672 sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall, 628 sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
673 rpcrdma_cq_async_error_upcall, ep, &cq_attr); 629 rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
674 if (IS_ERR(sendcq)) { 630 if (IS_ERR(sendcq)) {
675 rc = PTR_ERR(sendcq); 631 rc = PTR_ERR(sendcq);
676 dprintk("RPC: %s: failed to create send CQ: %i\n", 632 dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -687,7 +643,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
687 643
688 cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1; 644 cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
689 recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall, 645 recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
690 rpcrdma_cq_async_error_upcall, ep, &cq_attr); 646 rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
691 if (IS_ERR(recvcq)) { 647 if (IS_ERR(recvcq)) {
692 rc = PTR_ERR(recvcq); 648 rc = PTR_ERR(recvcq);
693 dprintk("RPC: %s: failed to create recv CQ: %i\n", 649 dprintk("RPC: %s: failed to create recv CQ: %i\n",
@@ -886,7 +842,21 @@ retry:
886 } 842 }
887 rc = ep->rep_connected; 843 rc = ep->rep_connected;
888 } else { 844 } else {
845 struct rpcrdma_xprt *r_xprt;
846 unsigned int extras;
847
889 dprintk("RPC: %s: connected\n", __func__); 848 dprintk("RPC: %s: connected\n", __func__);
849
850 r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
851 extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
852
853 if (extras) {
854 rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
855 if (rc)
856 pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
857 __func__, rc);
858 rc = 0;
859 }
890 } 860 }
891 861
892out: 862out:
@@ -923,20 +893,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
923 } 893 }
924} 894}
925 895
926static struct rpcrdma_req * 896struct rpcrdma_req *
927rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 897rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
928{ 898{
899 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
929 struct rpcrdma_req *req; 900 struct rpcrdma_req *req;
930 901
931 req = kzalloc(sizeof(*req), GFP_KERNEL); 902 req = kzalloc(sizeof(*req), GFP_KERNEL);
932 if (req == NULL) 903 if (req == NULL)
933 return ERR_PTR(-ENOMEM); 904 return ERR_PTR(-ENOMEM);
934 905
906 INIT_LIST_HEAD(&req->rl_free);
907 spin_lock(&buffer->rb_reqslock);
908 list_add(&req->rl_all, &buffer->rb_allreqs);
909 spin_unlock(&buffer->rb_reqslock);
935 req->rl_buffer = &r_xprt->rx_buf; 910 req->rl_buffer = &r_xprt->rx_buf;
936 return req; 911 return req;
937} 912}
938 913
939static struct rpcrdma_rep * 914struct rpcrdma_rep *
940rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 915rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
941{ 916{
942 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 917 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
@@ -958,6 +933,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
958 933
959 rep->rr_device = ia->ri_device; 934 rep->rr_device = ia->ri_device;
960 rep->rr_rxprt = r_xprt; 935 rep->rr_rxprt = r_xprt;
936 INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
961 return rep; 937 return rep;
962 938
963out_free: 939out_free:
@@ -971,44 +947,21 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
971{ 947{
972 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 948 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
973 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 949 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
974 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
975 char *p;
976 size_t len;
977 int i, rc; 950 int i, rc;
978 951
979 buf->rb_max_requests = cdata->max_requests; 952 buf->rb_max_requests = r_xprt->rx_data.max_requests;
953 buf->rb_bc_srv_max_requests = 0;
980 spin_lock_init(&buf->rb_lock); 954 spin_lock_init(&buf->rb_lock);
981 955
982 /* Need to allocate:
983 * 1. arrays for send and recv pointers
984 * 2. arrays of struct rpcrdma_req to fill in pointers
985 * 3. array of struct rpcrdma_rep for replies
986 * Send/recv buffers in req/rep need to be registered
987 */
988 len = buf->rb_max_requests *
989 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
990
991 p = kzalloc(len, GFP_KERNEL);
992 if (p == NULL) {
993 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
994 __func__, len);
995 rc = -ENOMEM;
996 goto out;
997 }
998 buf->rb_pool = p; /* for freeing it later */
999
1000 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1001 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1002 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1003 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1004
1005 rc = ia->ri_ops->ro_init(r_xprt); 956 rc = ia->ri_ops->ro_init(r_xprt);
1006 if (rc) 957 if (rc)
1007 goto out; 958 goto out;
1008 959
960 INIT_LIST_HEAD(&buf->rb_send_bufs);
961 INIT_LIST_HEAD(&buf->rb_allreqs);
962 spin_lock_init(&buf->rb_reqslock);
1009 for (i = 0; i < buf->rb_max_requests; i++) { 963 for (i = 0; i < buf->rb_max_requests; i++) {
1010 struct rpcrdma_req *req; 964 struct rpcrdma_req *req;
1011 struct rpcrdma_rep *rep;
1012 965
1013 req = rpcrdma_create_req(r_xprt); 966 req = rpcrdma_create_req(r_xprt);
1014 if (IS_ERR(req)) { 967 if (IS_ERR(req)) {
@@ -1017,7 +970,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1017 rc = PTR_ERR(req); 970 rc = PTR_ERR(req);
1018 goto out; 971 goto out;
1019 } 972 }
1020 buf->rb_send_bufs[i] = req; 973 req->rl_backchannel = false;
974 list_add(&req->rl_free, &buf->rb_send_bufs);
975 }
976
977 INIT_LIST_HEAD(&buf->rb_recv_bufs);
978 for (i = 0; i < buf->rb_max_requests + 2; i++) {
979 struct rpcrdma_rep *rep;
1021 980
1022 rep = rpcrdma_create_rep(r_xprt); 981 rep = rpcrdma_create_rep(r_xprt);
1023 if (IS_ERR(rep)) { 982 if (IS_ERR(rep)) {
@@ -1026,7 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1026 rc = PTR_ERR(rep); 985 rc = PTR_ERR(rep);
1027 goto out; 986 goto out;
1028 } 987 }
1029 buf->rb_recv_bufs[i] = rep; 988 list_add(&rep->rr_list, &buf->rb_recv_bufs);
1030 } 989 }
1031 990
1032 return 0; 991 return 0;
@@ -1035,22 +994,38 @@ out:
1035 return rc; 994 return rc;
1036} 995}
1037 996
997static struct rpcrdma_req *
998rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
999{
1000 struct rpcrdma_req *req;
1001
1002 req = list_first_entry(&buf->rb_send_bufs,
1003 struct rpcrdma_req, rl_free);
1004 list_del(&req->rl_free);
1005 return req;
1006}
1007
1008static struct rpcrdma_rep *
1009rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
1010{
1011 struct rpcrdma_rep *rep;
1012
1013 rep = list_first_entry(&buf->rb_recv_bufs,
1014 struct rpcrdma_rep, rr_list);
1015 list_del(&rep->rr_list);
1016 return rep;
1017}
1018
1038static void 1019static void
1039rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) 1020rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1040{ 1021{
1041 if (!rep)
1042 return;
1043
1044 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); 1022 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1045 kfree(rep); 1023 kfree(rep);
1046} 1024}
1047 1025
1048static void 1026void
1049rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) 1027rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1050{ 1028{
1051 if (!req)
1052 return;
1053
1054 rpcrdma_free_regbuf(ia, req->rl_sendbuf); 1029 rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1055 rpcrdma_free_regbuf(ia, req->rl_rdmabuf); 1030 rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1056 kfree(req); 1031 kfree(req);
@@ -1060,25 +1035,29 @@ void
1060rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1035rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1061{ 1036{
1062 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1037 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1063 int i;
1064 1038
1065 /* clean up in reverse order from create 1039 while (!list_empty(&buf->rb_recv_bufs)) {
1066 * 1. recv mr memory (mr free, then kfree) 1040 struct rpcrdma_rep *rep;
1067 * 2. send mr memory (mr free, then kfree)
1068 * 3. MWs
1069 */
1070 dprintk("RPC: %s: entering\n", __func__);
1071 1041
1072 for (i = 0; i < buf->rb_max_requests; i++) { 1042 rep = rpcrdma_buffer_get_rep_locked(buf);
1073 if (buf->rb_recv_bufs) 1043 rpcrdma_destroy_rep(ia, rep);
1074 rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1075 if (buf->rb_send_bufs)
1076 rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1077 } 1044 }
1078 1045
1079 ia->ri_ops->ro_destroy(buf); 1046 spin_lock(&buf->rb_reqslock);
1047 while (!list_empty(&buf->rb_allreqs)) {
1048 struct rpcrdma_req *req;
1049
1050 req = list_first_entry(&buf->rb_allreqs,
1051 struct rpcrdma_req, rl_all);
1052 list_del(&req->rl_all);
1053
1054 spin_unlock(&buf->rb_reqslock);
1055 rpcrdma_destroy_req(ia, req);
1056 spin_lock(&buf->rb_reqslock);
1057 }
1058 spin_unlock(&buf->rb_reqslock);
1080 1059
1081 kfree(buf->rb_pool); 1060 ia->ri_ops->ro_destroy(buf);
1082} 1061}
1083 1062
1084struct rpcrdma_mw * 1063struct rpcrdma_mw *
@@ -1110,53 +1089,34 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
1110 spin_unlock(&buf->rb_mwlock); 1089 spin_unlock(&buf->rb_mwlock);
1111} 1090}
1112 1091
1113static void
1114rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1115{
1116 buf->rb_send_bufs[--buf->rb_send_index] = req;
1117 req->rl_niovs = 0;
1118 if (req->rl_reply) {
1119 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1120 req->rl_reply = NULL;
1121 }
1122}
1123
1124/* 1092/*
1125 * Get a set of request/reply buffers. 1093 * Get a set of request/reply buffers.
1126 * 1094 *
1127 * Reply buffer (if needed) is attached to send buffer upon return. 1095 * Reply buffer (if available) is attached to send buffer upon return.
1128 * Rule:
1129 * rb_send_index and rb_recv_index MUST always be pointing to the
1130 * *next* available buffer (non-NULL). They are incremented after
1131 * removing buffers, and decremented *before* returning them.
1132 */ 1096 */
1133struct rpcrdma_req * 1097struct rpcrdma_req *
1134rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1098rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1135{ 1099{
1136 struct rpcrdma_req *req; 1100 struct rpcrdma_req *req;
1137 unsigned long flags;
1138
1139 spin_lock_irqsave(&buffers->rb_lock, flags);
1140 1101
1141 if (buffers->rb_send_index == buffers->rb_max_requests) { 1102 spin_lock(&buffers->rb_lock);
1142 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1103 if (list_empty(&buffers->rb_send_bufs))
1143 dprintk("RPC: %s: out of request buffers\n", __func__); 1104 goto out_reqbuf;
1144 return ((struct rpcrdma_req *)NULL); 1105 req = rpcrdma_buffer_get_req_locked(buffers);
1145 } 1106 if (list_empty(&buffers->rb_recv_bufs))
1146 1107 goto out_repbuf;
1147 req = buffers->rb_send_bufs[buffers->rb_send_index]; 1108 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1148 if (buffers->rb_send_index < buffers->rb_recv_index) { 1109 spin_unlock(&buffers->rb_lock);
1149 dprintk("RPC: %s: %d extra receives outstanding (ok)\n", 1110 return req;
1150 __func__,
1151 buffers->rb_recv_index - buffers->rb_send_index);
1152 req->rl_reply = NULL;
1153 } else {
1154 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1155 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1156 }
1157 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1158 1111
1159 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1112out_reqbuf:
1113 spin_unlock(&buffers->rb_lock);
1114 pr_warn("RPC: %s: out of request buffers\n", __func__);
1115 return NULL;
1116out_repbuf:
1117 spin_unlock(&buffers->rb_lock);
1118 pr_warn("RPC: %s: out of reply buffers\n", __func__);
1119 req->rl_reply = NULL;
1160 return req; 1120 return req;
1161} 1121}
1162 1122
@@ -1168,30 +1128,31 @@ void
1168rpcrdma_buffer_put(struct rpcrdma_req *req) 1128rpcrdma_buffer_put(struct rpcrdma_req *req)
1169{ 1129{
1170 struct rpcrdma_buffer *buffers = req->rl_buffer; 1130 struct rpcrdma_buffer *buffers = req->rl_buffer;
1171 unsigned long flags; 1131 struct rpcrdma_rep *rep = req->rl_reply;
1172 1132
1173 spin_lock_irqsave(&buffers->rb_lock, flags); 1133 req->rl_niovs = 0;
1174 rpcrdma_buffer_put_sendbuf(req, buffers); 1134 req->rl_reply = NULL;
1175 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1135
1136 spin_lock(&buffers->rb_lock);
1137 list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
1138 if (rep)
1139 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1140 spin_unlock(&buffers->rb_lock);
1176} 1141}
1177 1142
1178/* 1143/*
1179 * Recover reply buffers from pool. 1144 * Recover reply buffers from pool.
1180 * This happens when recovering from error conditions. 1145 * This happens when recovering from disconnect.
1181 * Post-increment counter/array index.
1182 */ 1146 */
1183void 1147void
1184rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1148rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1185{ 1149{
1186 struct rpcrdma_buffer *buffers = req->rl_buffer; 1150 struct rpcrdma_buffer *buffers = req->rl_buffer;
1187 unsigned long flags;
1188 1151
1189 spin_lock_irqsave(&buffers->rb_lock, flags); 1152 spin_lock(&buffers->rb_lock);
1190 if (buffers->rb_recv_index < buffers->rb_max_requests) { 1153 if (!list_empty(&buffers->rb_recv_bufs))
1191 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; 1154 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1192 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; 1155 spin_unlock(&buffers->rb_lock);
1193 }
1194 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1195} 1156}
1196 1157
1197/* 1158/*
@@ -1202,11 +1163,10 @@ void
1202rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1163rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1203{ 1164{
1204 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1165 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1205 unsigned long flags;
1206 1166
1207 spin_lock_irqsave(&buffers->rb_lock, flags); 1167 spin_lock(&buffers->rb_lock);
1208 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep; 1168 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1209 spin_unlock_irqrestore(&buffers->rb_lock, flags); 1169 spin_unlock(&buffers->rb_lock);
1210} 1170}
1211 1171
1212/* 1172/*
@@ -1363,6 +1323,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1363 return rc; 1323 return rc;
1364} 1324}
1365 1325
1326/**
1327 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
1328 * @r_xprt: transport associated with these backchannel resources
1329 * @min_reqs: minimum number of incoming requests expected
1330 *
1331 * Returns zero if all requested buffers were posted, or a negative errno.
1332 */
1333int
1334rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
1335{
1336 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
1337 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1338 struct rpcrdma_ep *ep = &r_xprt->rx_ep;
1339 struct rpcrdma_rep *rep;
1340 unsigned long flags;
1341 int rc;
1342
1343 while (count--) {
1344 spin_lock_irqsave(&buffers->rb_lock, flags);
1345 if (list_empty(&buffers->rb_recv_bufs))
1346 goto out_reqbuf;
1347 rep = rpcrdma_buffer_get_rep_locked(buffers);
1348 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1349
1350 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1351 if (rc)
1352 goto out_rc;
1353 }
1354
1355 return 0;
1356
1357out_reqbuf:
1358 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1359 pr_warn("%s: no extra receive buffers\n", __func__);
1360 return -ENOMEM;
1361
1362out_rc:
1363 rpcrdma_recv_buffer_put(rep);
1364 return rc;
1365}
1366
1366/* How many chunk list items fit within our inline buffers? 1367/* How many chunk list items fit within our inline buffers?
1367 */ 1368 */
1368unsigned int 1369unsigned int
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c82abf44e39d..ac7f8d4f632a 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -77,9 +77,6 @@ struct rpcrdma_ia {
77 * RDMA Endpoint -- one per transport instance 77 * RDMA Endpoint -- one per transport instance
78 */ 78 */
79 79
80#define RPCRDMA_WC_BUDGET (128)
81#define RPCRDMA_POLLSIZE (16)
82
83struct rpcrdma_ep { 80struct rpcrdma_ep {
84 atomic_t rep_cqcount; 81 atomic_t rep_cqcount;
85 int rep_cqinit; 82 int rep_cqinit;
@@ -89,8 +86,6 @@ struct rpcrdma_ep {
89 struct rdma_conn_param rep_remote_cma; 86 struct rdma_conn_param rep_remote_cma;
90 struct sockaddr_storage rep_remote_addr; 87 struct sockaddr_storage rep_remote_addr;
91 struct delayed_work rep_connect_worker; 88 struct delayed_work rep_connect_worker;
92 struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
93 struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
94}; 89};
95 90
96/* 91/*
@@ -106,6 +101,16 @@ struct rpcrdma_ep {
106 */ 101 */
107#define RPCRDMA_IGNORE_COMPLETION (0ULL) 102#define RPCRDMA_IGNORE_COMPLETION (0ULL)
108 103
104/* Pre-allocate extra Work Requests for handling backward receives
105 * and sends. This is a fixed value because the Work Queues are
106 * allocated when the forward channel is set up.
107 */
108#if defined(CONFIG_SUNRPC_BACKCHANNEL)
109#define RPCRDMA_BACKWARD_WRS (8)
110#else
111#define RPCRDMA_BACKWARD_WRS (0)
112#endif
113
109/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV 114/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
110 * 115 *
111 * The below structure appears at the front of a large region of kmalloc'd 116 * The below structure appears at the front of a large region of kmalloc'd
@@ -169,10 +174,13 @@ struct rpcrdma_rep {
169 unsigned int rr_len; 174 unsigned int rr_len;
170 struct ib_device *rr_device; 175 struct ib_device *rr_device;
171 struct rpcrdma_xprt *rr_rxprt; 176 struct rpcrdma_xprt *rr_rxprt;
177 struct work_struct rr_work;
172 struct list_head rr_list; 178 struct list_head rr_list;
173 struct rpcrdma_regbuf *rr_rdmabuf; 179 struct rpcrdma_regbuf *rr_rdmabuf;
174}; 180};
175 181
182#define RPCRDMA_BAD_LEN (~0U)
183
176/* 184/*
177 * struct rpcrdma_mw - external memory region metadata 185 * struct rpcrdma_mw - external memory region metadata
178 * 186 *
@@ -256,6 +264,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
256#define RPCRDMA_MAX_IOVS (2) 264#define RPCRDMA_MAX_IOVS (2)
257 265
258struct rpcrdma_req { 266struct rpcrdma_req {
267 struct list_head rl_free;
259 unsigned int rl_niovs; 268 unsigned int rl_niovs;
260 unsigned int rl_nchunks; 269 unsigned int rl_nchunks;
261 unsigned int rl_connect_cookie; 270 unsigned int rl_connect_cookie;
@@ -265,6 +274,9 @@ struct rpcrdma_req {
265 struct rpcrdma_regbuf *rl_rdmabuf; 274 struct rpcrdma_regbuf *rl_rdmabuf;
266 struct rpcrdma_regbuf *rl_sendbuf; 275 struct rpcrdma_regbuf *rl_sendbuf;
267 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; 276 struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
277
278 struct list_head rl_all;
279 bool rl_backchannel;
268}; 280};
269 281
270static inline struct rpcrdma_req * 282static inline struct rpcrdma_req *
@@ -289,12 +301,14 @@ struct rpcrdma_buffer {
289 struct list_head rb_all; 301 struct list_head rb_all;
290 char *rb_pool; 302 char *rb_pool;
291 303
292 spinlock_t rb_lock; /* protect buf arrays */ 304 spinlock_t rb_lock; /* protect buf lists */
305 struct list_head rb_send_bufs;
306 struct list_head rb_recv_bufs;
293 u32 rb_max_requests; 307 u32 rb_max_requests;
294 int rb_send_index; 308
295 int rb_recv_index; 309 u32 rb_bc_srv_max_requests;
296 struct rpcrdma_req **rb_send_bufs; 310 spinlock_t rb_reqslock; /* protect rb_allreqs */
297 struct rpcrdma_rep **rb_recv_bufs; 311 struct list_head rb_allreqs;
298}; 312};
299#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) 313#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
300 314
@@ -340,6 +354,7 @@ struct rpcrdma_stats {
340 unsigned long failed_marshal_count; 354 unsigned long failed_marshal_count;
341 unsigned long bad_reply_count; 355 unsigned long bad_reply_count;
342 unsigned long nomsg_call_count; 356 unsigned long nomsg_call_count;
357 unsigned long bcall_count;
343}; 358};
344 359
345/* 360/*
@@ -415,6 +430,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
415/* 430/*
416 * Buffer calls - xprtrdma/verbs.c 431 * Buffer calls - xprtrdma/verbs.c
417 */ 432 */
433struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
434struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
435void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
418int rpcrdma_buffer_create(struct rpcrdma_xprt *); 436int rpcrdma_buffer_create(struct rpcrdma_xprt *);
419void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); 437void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
420 438
@@ -431,10 +449,14 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
431 struct rpcrdma_regbuf *); 449 struct rpcrdma_regbuf *);
432 450
433unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); 451unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
452int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
434 453
435int frwr_alloc_recovery_wq(void); 454int frwr_alloc_recovery_wq(void);
436void frwr_destroy_recovery_wq(void); 455void frwr_destroy_recovery_wq(void);
437 456
457int rpcrdma_alloc_wq(void);
458void rpcrdma_destroy_wq(void);
459
438/* 460/*
439 * Wrappers for chunk registration, shared by read/write chunk code. 461 * Wrappers for chunk registration, shared by read/write chunk code.
440 */ 462 */
@@ -495,6 +517,18 @@ int rpcrdma_marshal_req(struct rpc_rqst *);
495int xprt_rdma_init(void); 517int xprt_rdma_init(void);
496void xprt_rdma_cleanup(void); 518void xprt_rdma_cleanup(void);
497 519
520/* Backchannel calls - xprtrdma/backchannel.c
521 */
522#if defined(CONFIG_SUNRPC_BACKCHANNEL)
523int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
524int xprt_rdma_bc_up(struct svc_serv *, struct net *);
525int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
526void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
527int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
528void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
529void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
530#endif /* CONFIG_SUNRPC_BACKCHANNEL */
531
498/* Temporary NFS request map cache. Created in svc_rdma.c */ 532/* Temporary NFS request map cache. Created in svc_rdma.c */
499extern struct kmem_cache *svc_rdma_map_cachep; 533extern struct kmem_cache *svc_rdma_map_cachep;
500/* WR context cache. Created in svc_rdma.c */ 534/* WR context cache. Created in svc_rdma.c */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 1a85e0ed0b48..1d1a70498910 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -360,8 +360,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
360 int flags = XS_SENDMSG_FLAGS; 360 int flags = XS_SENDMSG_FLAGS;
361 361
362 remainder -= len; 362 remainder -= len;
363 if (remainder != 0 || more) 363 if (more)
364 flags |= MSG_MORE; 364 flags |= MSG_MORE;
365 if (remainder != 0)
366 flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE;
365 err = do_sendpage(sock, *ppage, base, len, flags); 367 err = do_sendpage(sock, *ppage, base, len, flags);
366 if (remainder == 0 || err != len) 368 if (remainder == 0 || err != len)
367 break; 369 break;
@@ -823,6 +825,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
823 825
824 kernel_sock_shutdown(sock, SHUT_RDWR); 826 kernel_sock_shutdown(sock, SHUT_RDWR);
825 827
828 mutex_lock(&transport->recv_mutex);
826 write_lock_bh(&sk->sk_callback_lock); 829 write_lock_bh(&sk->sk_callback_lock);
827 transport->inet = NULL; 830 transport->inet = NULL;
828 transport->sock = NULL; 831 transport->sock = NULL;
@@ -833,6 +836,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
833 xprt_clear_connected(xprt); 836 xprt_clear_connected(xprt);
834 write_unlock_bh(&sk->sk_callback_lock); 837 write_unlock_bh(&sk->sk_callback_lock);
835 xs_sock_reset_connection_flags(xprt); 838 xs_sock_reset_connection_flags(xprt);
839 mutex_unlock(&transport->recv_mutex);
836 840
837 trace_rpc_socket_close(xprt, sock); 841 trace_rpc_socket_close(xprt, sock);
838 sock_release(sock); 842 sock_release(sock);
@@ -886,6 +890,7 @@ static void xs_destroy(struct rpc_xprt *xprt)
886 890
887 cancel_delayed_work_sync(&transport->connect_worker); 891 cancel_delayed_work_sync(&transport->connect_worker);
888 xs_close(xprt); 892 xs_close(xprt);
893 cancel_work_sync(&transport->recv_worker);
889 xs_xprt_free(xprt); 894 xs_xprt_free(xprt);
890 module_put(THIS_MODULE); 895 module_put(THIS_MODULE);
891} 896}
@@ -906,44 +911,36 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
906} 911}
907 912
908/** 913/**
909 * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets 914 * xs_local_data_read_skb
910 * @sk: socket with data to read 915 * @xprt: transport
916 * @sk: socket
917 * @skb: skbuff
911 * 918 *
912 * Currently this assumes we can read the whole reply in a single gulp. 919 * Currently this assumes we can read the whole reply in a single gulp.
913 */ 920 */
914static void xs_local_data_ready(struct sock *sk) 921static void xs_local_data_read_skb(struct rpc_xprt *xprt,
922 struct sock *sk,
923 struct sk_buff *skb)
915{ 924{
916 struct rpc_task *task; 925 struct rpc_task *task;
917 struct rpc_xprt *xprt;
918 struct rpc_rqst *rovr; 926 struct rpc_rqst *rovr;
919 struct sk_buff *skb; 927 int repsize, copied;
920 int err, repsize, copied;
921 u32 _xid; 928 u32 _xid;
922 __be32 *xp; 929 __be32 *xp;
923 930
924 read_lock_bh(&sk->sk_callback_lock);
925 dprintk("RPC: %s...\n", __func__);
926 xprt = xprt_from_sock(sk);
927 if (xprt == NULL)
928 goto out;
929
930 skb = skb_recv_datagram(sk, 0, 1, &err);
931 if (skb == NULL)
932 goto out;
933
934 repsize = skb->len - sizeof(rpc_fraghdr); 931 repsize = skb->len - sizeof(rpc_fraghdr);
935 if (repsize < 4) { 932 if (repsize < 4) {
936 dprintk("RPC: impossible RPC reply size %d\n", repsize); 933 dprintk("RPC: impossible RPC reply size %d\n", repsize);
937 goto dropit; 934 return;
938 } 935 }
939 936
940 /* Copy the XID from the skb... */ 937 /* Copy the XID from the skb... */
941 xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); 938 xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
942 if (xp == NULL) 939 if (xp == NULL)
943 goto dropit; 940 return;
944 941
945 /* Look up and lock the request corresponding to the given XID */ 942 /* Look up and lock the request corresponding to the given XID */
946 spin_lock(&xprt->transport_lock); 943 spin_lock_bh(&xprt->transport_lock);
947 rovr = xprt_lookup_rqst(xprt, *xp); 944 rovr = xprt_lookup_rqst(xprt, *xp);
948 if (!rovr) 945 if (!rovr)
949 goto out_unlock; 946 goto out_unlock;
@@ -961,50 +958,68 @@ static void xs_local_data_ready(struct sock *sk)
961 xprt_complete_rqst(task, copied); 958 xprt_complete_rqst(task, copied);
962 959
963 out_unlock: 960 out_unlock:
964 spin_unlock(&xprt->transport_lock); 961 spin_unlock_bh(&xprt->transport_lock);
965 dropit: 962}
966 skb_free_datagram(sk, skb); 963
967 out: 964static void xs_local_data_receive(struct sock_xprt *transport)
968 read_unlock_bh(&sk->sk_callback_lock); 965{
966 struct sk_buff *skb;
967 struct sock *sk;
968 int err;
969
970 mutex_lock(&transport->recv_mutex);
971 sk = transport->inet;
972 if (sk == NULL)
973 goto out;
974 for (;;) {
975 skb = skb_recv_datagram(sk, 0, 1, &err);
976 if (skb == NULL)
977 break;
978 xs_local_data_read_skb(&transport->xprt, sk, skb);
979 skb_free_datagram(sk, skb);
980 }
981out:
982 mutex_unlock(&transport->recv_mutex);
983}
984
985static void xs_local_data_receive_workfn(struct work_struct *work)
986{
987 struct sock_xprt *transport =
988 container_of(work, struct sock_xprt, recv_worker);
989 xs_local_data_receive(transport);
969} 990}
970 991
971/** 992/**
972 * xs_udp_data_ready - "data ready" callback for UDP sockets 993 * xs_udp_data_read_skb - receive callback for UDP sockets
973 * @sk: socket with data to read 994 * @xprt: transport
995 * @sk: socket
996 * @skb: skbuff
974 * 997 *
975 */ 998 */
976static void xs_udp_data_ready(struct sock *sk) 999static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
1000 struct sock *sk,
1001 struct sk_buff *skb)
977{ 1002{
978 struct rpc_task *task; 1003 struct rpc_task *task;
979 struct rpc_xprt *xprt;
980 struct rpc_rqst *rovr; 1004 struct rpc_rqst *rovr;
981 struct sk_buff *skb; 1005 int repsize, copied;
982 int err, repsize, copied;
983 u32 _xid; 1006 u32 _xid;
984 __be32 *xp; 1007 __be32 *xp;
985 1008
986 read_lock_bh(&sk->sk_callback_lock);
987 dprintk("RPC: xs_udp_data_ready...\n");
988 if (!(xprt = xprt_from_sock(sk)))
989 goto out;
990
991 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
992 goto out;
993
994 repsize = skb->len - sizeof(struct udphdr); 1009 repsize = skb->len - sizeof(struct udphdr);
995 if (repsize < 4) { 1010 if (repsize < 4) {
996 dprintk("RPC: impossible RPC reply size %d!\n", repsize); 1011 dprintk("RPC: impossible RPC reply size %d!\n", repsize);
997 goto dropit; 1012 return;
998 } 1013 }
999 1014
1000 /* Copy the XID from the skb... */ 1015 /* Copy the XID from the skb... */
1001 xp = skb_header_pointer(skb, sizeof(struct udphdr), 1016 xp = skb_header_pointer(skb, sizeof(struct udphdr),
1002 sizeof(_xid), &_xid); 1017 sizeof(_xid), &_xid);
1003 if (xp == NULL) 1018 if (xp == NULL)
1004 goto dropit; 1019 return;
1005 1020
1006 /* Look up and lock the request corresponding to the given XID */ 1021 /* Look up and lock the request corresponding to the given XID */
1007 spin_lock(&xprt->transport_lock); 1022 spin_lock_bh(&xprt->transport_lock);
1008 rovr = xprt_lookup_rqst(xprt, *xp); 1023 rovr = xprt_lookup_rqst(xprt, *xp);
1009 if (!rovr) 1024 if (!rovr)
1010 goto out_unlock; 1025 goto out_unlock;
@@ -1025,10 +1040,54 @@ static void xs_udp_data_ready(struct sock *sk)
1025 xprt_complete_rqst(task, copied); 1040 xprt_complete_rqst(task, copied);
1026 1041
1027 out_unlock: 1042 out_unlock:
1028 spin_unlock(&xprt->transport_lock); 1043 spin_unlock_bh(&xprt->transport_lock);
1029 dropit: 1044}
1030 skb_free_datagram(sk, skb); 1045
1031 out: 1046static void xs_udp_data_receive(struct sock_xprt *transport)
1047{
1048 struct sk_buff *skb;
1049 struct sock *sk;
1050 int err;
1051
1052 mutex_lock(&transport->recv_mutex);
1053 sk = transport->inet;
1054 if (sk == NULL)
1055 goto out;
1056 for (;;) {
1057 skb = skb_recv_datagram(sk, 0, 1, &err);
1058 if (skb == NULL)
1059 break;
1060 xs_udp_data_read_skb(&transport->xprt, sk, skb);
1061 skb_free_datagram(sk, skb);
1062 }
1063out:
1064 mutex_unlock(&transport->recv_mutex);
1065}
1066
1067static void xs_udp_data_receive_workfn(struct work_struct *work)
1068{
1069 struct sock_xprt *transport =
1070 container_of(work, struct sock_xprt, recv_worker);
1071 xs_udp_data_receive(transport);
1072}
1073
1074/**
1075 * xs_data_ready - "data ready" callback for UDP sockets
1076 * @sk: socket with data to read
1077 *
1078 */
1079static void xs_data_ready(struct sock *sk)
1080{
1081 struct rpc_xprt *xprt;
1082
1083 read_lock_bh(&sk->sk_callback_lock);
1084 dprintk("RPC: xs_data_ready...\n");
1085 xprt = xprt_from_sock(sk);
1086 if (xprt != NULL) {
1087 struct sock_xprt *transport = container_of(xprt,
1088 struct sock_xprt, xprt);
1089 queue_work(rpciod_workqueue, &transport->recv_worker);
1090 }
1032 read_unlock_bh(&sk->sk_callback_lock); 1091 read_unlock_bh(&sk->sk_callback_lock);
1033} 1092}
1034 1093
@@ -1243,12 +1302,12 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1243 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); 1302 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid));
1244 1303
1245 /* Find and lock the request corresponding to this xid */ 1304 /* Find and lock the request corresponding to this xid */
1246 spin_lock(&xprt->transport_lock); 1305 spin_lock_bh(&xprt->transport_lock);
1247 req = xprt_lookup_rqst(xprt, transport->tcp_xid); 1306 req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1248 if (!req) { 1307 if (!req) {
1249 dprintk("RPC: XID %08x request not found!\n", 1308 dprintk("RPC: XID %08x request not found!\n",
1250 ntohl(transport->tcp_xid)); 1309 ntohl(transport->tcp_xid));
1251 spin_unlock(&xprt->transport_lock); 1310 spin_unlock_bh(&xprt->transport_lock);
1252 return -1; 1311 return -1;
1253 } 1312 }
1254 1313
@@ -1257,7 +1316,7 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1257 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) 1316 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1258 xprt_complete_rqst(req->rq_task, transport->tcp_copied); 1317 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1259 1318
1260 spin_unlock(&xprt->transport_lock); 1319 spin_unlock_bh(&xprt->transport_lock);
1261 return 0; 1320 return 0;
1262} 1321}
1263 1322
@@ -1277,10 +1336,10 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
1277 struct rpc_rqst *req; 1336 struct rpc_rqst *req;
1278 1337
1279 /* Look up and lock the request corresponding to the given XID */ 1338 /* Look up and lock the request corresponding to the given XID */
1280 spin_lock(&xprt->transport_lock); 1339 spin_lock_bh(&xprt->transport_lock);
1281 req = xprt_lookup_bc_request(xprt, transport->tcp_xid); 1340 req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
1282 if (req == NULL) { 1341 if (req == NULL) {
1283 spin_unlock(&xprt->transport_lock); 1342 spin_unlock_bh(&xprt->transport_lock);
1284 printk(KERN_WARNING "Callback slot table overflowed\n"); 1343 printk(KERN_WARNING "Callback slot table overflowed\n");
1285 xprt_force_disconnect(xprt); 1344 xprt_force_disconnect(xprt);
1286 return -1; 1345 return -1;
@@ -1291,7 +1350,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
1291 1350
1292 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) 1351 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1293 xprt_complete_bc_request(req, transport->tcp_copied); 1352 xprt_complete_bc_request(req, transport->tcp_copied);
1294 spin_unlock(&xprt->transport_lock); 1353 spin_unlock_bh(&xprt->transport_lock);
1295 1354
1296 return 0; 1355 return 0;
1297} 1356}
@@ -1306,6 +1365,17 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1306 xs_tcp_read_reply(xprt, desc) : 1365 xs_tcp_read_reply(xprt, desc) :
1307 xs_tcp_read_callback(xprt, desc); 1366 xs_tcp_read_callback(xprt, desc);
1308} 1367}
1368
1369static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
1370{
1371 int ret;
1372
1373 ret = svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
1374 SVC_SOCK_ANONYMOUS);
1375 if (ret < 0)
1376 return ret;
1377 return 0;
1378}
1309#else 1379#else
1310static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, 1380static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1311 struct xdr_skb_reader *desc) 1381 struct xdr_skb_reader *desc)
@@ -1391,6 +1461,44 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
1391 return len - desc.count; 1461 return len - desc.count;
1392} 1462}
1393 1463
1464static void xs_tcp_data_receive(struct sock_xprt *transport)
1465{
1466 struct rpc_xprt *xprt = &transport->xprt;
1467 struct sock *sk;
1468 read_descriptor_t rd_desc = {
1469 .count = 2*1024*1024,
1470 .arg.data = xprt,
1471 };
1472 unsigned long total = 0;
1473 int read = 0;
1474
1475 mutex_lock(&transport->recv_mutex);
1476 sk = transport->inet;
1477 if (sk == NULL)
1478 goto out;
1479
1480 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1481 for (;;) {
1482 lock_sock(sk);
1483 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1484 release_sock(sk);
1485 if (read <= 0)
1486 break;
1487 total += read;
1488 rd_desc.count = 65536;
1489 }
1490out:
1491 mutex_unlock(&transport->recv_mutex);
1492 trace_xs_tcp_data_ready(xprt, read, total);
1493}
1494
1495static void xs_tcp_data_receive_workfn(struct work_struct *work)
1496{
1497 struct sock_xprt *transport =
1498 container_of(work, struct sock_xprt, recv_worker);
1499 xs_tcp_data_receive(transport);
1500}
1501
1394/** 1502/**
1395 * xs_tcp_data_ready - "data ready" callback for TCP sockets 1503 * xs_tcp_data_ready - "data ready" callback for TCP sockets
1396 * @sk: socket with data to read 1504 * @sk: socket with data to read
@@ -1398,34 +1506,24 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
1398 */ 1506 */
1399static void xs_tcp_data_ready(struct sock *sk) 1507static void xs_tcp_data_ready(struct sock *sk)
1400{ 1508{
1509 struct sock_xprt *transport;
1401 struct rpc_xprt *xprt; 1510 struct rpc_xprt *xprt;
1402 read_descriptor_t rd_desc;
1403 int read;
1404 unsigned long total = 0;
1405 1511
1406 dprintk("RPC: xs_tcp_data_ready...\n"); 1512 dprintk("RPC: xs_tcp_data_ready...\n");
1407 1513
1408 read_lock_bh(&sk->sk_callback_lock); 1514 read_lock_bh(&sk->sk_callback_lock);
1409 if (!(xprt = xprt_from_sock(sk))) { 1515 if (!(xprt = xprt_from_sock(sk)))
1410 read = 0;
1411 goto out; 1516 goto out;
1412 } 1517 transport = container_of(xprt, struct sock_xprt, xprt);
1518
1413 /* Any data means we had a useful conversation, so 1519 /* Any data means we had a useful conversation, so
1414 * the we don't need to delay the next reconnect 1520 * the we don't need to delay the next reconnect
1415 */ 1521 */
1416 if (xprt->reestablish_timeout) 1522 if (xprt->reestablish_timeout)
1417 xprt->reestablish_timeout = 0; 1523 xprt->reestablish_timeout = 0;
1524 queue_work(rpciod_workqueue, &transport->recv_worker);
1418 1525
1419 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1420 rd_desc.arg.data = xprt;
1421 do {
1422 rd_desc.count = 65536;
1423 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1424 if (read > 0)
1425 total += read;
1426 } while (read > 0);
1427out: 1526out:
1428 trace_xs_tcp_data_ready(xprt, read, total);
1429 read_unlock_bh(&sk->sk_callback_lock); 1527 read_unlock_bh(&sk->sk_callback_lock);
1430} 1528}
1431 1529
@@ -1873,7 +1971,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1873 xs_save_old_callbacks(transport, sk); 1971 xs_save_old_callbacks(transport, sk);
1874 1972
1875 sk->sk_user_data = xprt; 1973 sk->sk_user_data = xprt;
1876 sk->sk_data_ready = xs_local_data_ready; 1974 sk->sk_data_ready = xs_data_ready;
1877 sk->sk_write_space = xs_udp_write_space; 1975 sk->sk_write_space = xs_udp_write_space;
1878 sk->sk_error_report = xs_error_report; 1976 sk->sk_error_report = xs_error_report;
1879 sk->sk_allocation = GFP_NOIO; 1977 sk->sk_allocation = GFP_NOIO;
@@ -2059,7 +2157,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2059 xs_save_old_callbacks(transport, sk); 2157 xs_save_old_callbacks(transport, sk);
2060 2158
2061 sk->sk_user_data = xprt; 2159 sk->sk_user_data = xprt;
2062 sk->sk_data_ready = xs_udp_data_ready; 2160 sk->sk_data_ready = xs_data_ready;
2063 sk->sk_write_space = xs_udp_write_space; 2161 sk->sk_write_space = xs_udp_write_space;
2064 sk->sk_allocation = GFP_NOIO; 2162 sk->sk_allocation = GFP_NOIO;
2065 2163
@@ -2472,7 +2570,7 @@ static int bc_send_request(struct rpc_task *task)
2472{ 2570{
2473 struct rpc_rqst *req = task->tk_rqstp; 2571 struct rpc_rqst *req = task->tk_rqstp;
2474 struct svc_xprt *xprt; 2572 struct svc_xprt *xprt;
2475 u32 len; 2573 int len;
2476 2574
2477 dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid)); 2575 dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
2478 /* 2576 /*
@@ -2580,6 +2678,12 @@ static struct rpc_xprt_ops xs_tcp_ops = {
2580 .enable_swap = xs_enable_swap, 2678 .enable_swap = xs_enable_swap,
2581 .disable_swap = xs_disable_swap, 2679 .disable_swap = xs_disable_swap,
2582 .inject_disconnect = xs_inject_disconnect, 2680 .inject_disconnect = xs_inject_disconnect,
2681#ifdef CONFIG_SUNRPC_BACKCHANNEL
2682 .bc_setup = xprt_setup_bc,
2683 .bc_up = xs_tcp_bc_up,
2684 .bc_free_rqst = xprt_free_bc_rqst,
2685 .bc_destroy = xprt_destroy_bc,
2686#endif
2583}; 2687};
2584 2688
2585/* 2689/*
@@ -2650,6 +2754,7 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2650 } 2754 }
2651 2755
2652 new = container_of(xprt, struct sock_xprt, xprt); 2756 new = container_of(xprt, struct sock_xprt, xprt);
2757 mutex_init(&new->recv_mutex);
2653 memcpy(&xprt->addr, args->dstaddr, args->addrlen); 2758 memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2654 xprt->addrlen = args->addrlen; 2759 xprt->addrlen = args->addrlen;
2655 if (args->srcaddr) 2760 if (args->srcaddr)
@@ -2703,6 +2808,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
2703 xprt->ops = &xs_local_ops; 2808 xprt->ops = &xs_local_ops;
2704 xprt->timeout = &xs_local_default_timeout; 2809 xprt->timeout = &xs_local_default_timeout;
2705 2810
2811 INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn);
2706 INIT_DELAYED_WORK(&transport->connect_worker, 2812 INIT_DELAYED_WORK(&transport->connect_worker,
2707 xs_dummy_setup_socket); 2813 xs_dummy_setup_socket);
2708 2814
@@ -2774,21 +2880,20 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2774 2880
2775 xprt->timeout = &xs_udp_default_timeout; 2881 xprt->timeout = &xs_udp_default_timeout;
2776 2882
2883 INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
2884 INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
2885
2777 switch (addr->sa_family) { 2886 switch (addr->sa_family) {
2778 case AF_INET: 2887 case AF_INET:
2779 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2888 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2780 xprt_set_bound(xprt); 2889 xprt_set_bound(xprt);
2781 2890
2782 INIT_DELAYED_WORK(&transport->connect_worker,
2783 xs_udp_setup_socket);
2784 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); 2891 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2785 break; 2892 break;
2786 case AF_INET6: 2893 case AF_INET6:
2787 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2894 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2788 xprt_set_bound(xprt); 2895 xprt_set_bound(xprt);
2789 2896
2790 INIT_DELAYED_WORK(&transport->connect_worker,
2791 xs_udp_setup_socket);
2792 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); 2897 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2793 break; 2898 break;
2794 default: 2899 default:
@@ -2853,21 +2958,20 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2853 xprt->ops = &xs_tcp_ops; 2958 xprt->ops = &xs_tcp_ops;
2854 xprt->timeout = &xs_tcp_default_timeout; 2959 xprt->timeout = &xs_tcp_default_timeout;
2855 2960
2961 INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn);
2962 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
2963
2856 switch (addr->sa_family) { 2964 switch (addr->sa_family) {
2857 case AF_INET: 2965 case AF_INET:
2858 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2966 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2859 xprt_set_bound(xprt); 2967 xprt_set_bound(xprt);
2860 2968
2861 INIT_DELAYED_WORK(&transport->connect_worker,
2862 xs_tcp_setup_socket);
2863 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); 2969 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2864 break; 2970 break;
2865 case AF_INET6: 2971 case AF_INET6:
2866 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2972 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2867 xprt_set_bound(xprt); 2973 xprt_set_bound(xprt);
2868 2974
2869 INIT_DELAYED_WORK(&transport->connect_worker,
2870 xs_tcp_setup_socket);
2871 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); 2975 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2872 break; 2976 break;
2873 default: 2977 default: