aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/auth_gss/gss_rpc_upcall.c9
-rw-r--r--net/sunrpc/auth_gss/gss_rpc_xdr.c14
-rw-r--r--net/sunrpc/auth_gss/gss_rpc_xdr.h4
-rw-r--r--net/sunrpc/auth_gss/svcauth_gss.c8
-rw-r--r--net/sunrpc/clnt.c16
-rw-r--r--net/sunrpc/rpcb_clnt.c82
-rw-r--r--net/sunrpc/stats.c16
-rw-r--r--net/sunrpc/svc.c35
-rw-r--r--net/sunrpc/svc_xprt.c10
-rw-r--r--net/sunrpc/xprtrdma/Makefile4
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_marshal.c168
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c734
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_rw.c449
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c15
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c250
15 files changed, 869 insertions, 945 deletions
diff --git a/net/sunrpc/auth_gss/gss_rpc_upcall.c b/net/sunrpc/auth_gss/gss_rpc_upcall.c
index f0c6a8c78a56..46b295e4f2b8 100644
--- a/net/sunrpc/auth_gss/gss_rpc_upcall.c
+++ b/net/sunrpc/auth_gss/gss_rpc_upcall.c
@@ -55,15 +55,15 @@ enum {
55#define PROC(proc, name) \ 55#define PROC(proc, name) \
56[GSSX_##proc] = { \ 56[GSSX_##proc] = { \
57 .p_proc = GSSX_##proc, \ 57 .p_proc = GSSX_##proc, \
58 .p_encode = (kxdreproc_t)gssx_enc_##name, \ 58 .p_encode = gssx_enc_##name, \
59 .p_decode = (kxdrdproc_t)gssx_dec_##name, \ 59 .p_decode = gssx_dec_##name, \
60 .p_arglen = GSSX_ARG_##name##_sz, \ 60 .p_arglen = GSSX_ARG_##name##_sz, \
61 .p_replen = GSSX_RES_##name##_sz, \ 61 .p_replen = GSSX_RES_##name##_sz, \
62 .p_statidx = GSSX_##proc, \ 62 .p_statidx = GSSX_##proc, \
63 .p_name = #proc, \ 63 .p_name = #proc, \
64} 64}
65 65
66static struct rpc_procinfo gssp_procedures[] = { 66static const struct rpc_procinfo gssp_procedures[] = {
67 PROC(INDICATE_MECHS, indicate_mechs), 67 PROC(INDICATE_MECHS, indicate_mechs),
68 PROC(GET_CALL_CONTEXT, get_call_context), 68 PROC(GET_CALL_CONTEXT, get_call_context),
69 PROC(IMPORT_AND_CANON_NAME, import_and_canon_name), 69 PROC(IMPORT_AND_CANON_NAME, import_and_canon_name),
@@ -364,11 +364,12 @@ void gssp_free_upcall_data(struct gssp_upcall_data *data)
364/* 364/*
365 * Initialization stuff 365 * Initialization stuff
366 */ 366 */
367 367static unsigned int gssp_version1_counts[ARRAY_SIZE(gssp_procedures)];
368static const struct rpc_version gssp_version1 = { 368static const struct rpc_version gssp_version1 = {
369 .number = GSSPROXY_VERS_1, 369 .number = GSSPROXY_VERS_1,
370 .nrprocs = ARRAY_SIZE(gssp_procedures), 370 .nrprocs = ARRAY_SIZE(gssp_procedures),
371 .procs = gssp_procedures, 371 .procs = gssp_procedures,
372 .counts = gssp_version1_counts,
372}; 373};
373 374
374static const struct rpc_version *gssp_version[] = { 375static const struct rpc_version *gssp_version[] = {
diff --git a/net/sunrpc/auth_gss/gss_rpc_xdr.c b/net/sunrpc/auth_gss/gss_rpc_xdr.c
index 25d9a9cf7b66..c4778cae58ef 100644
--- a/net/sunrpc/auth_gss/gss_rpc_xdr.c
+++ b/net/sunrpc/auth_gss/gss_rpc_xdr.c
@@ -44,7 +44,7 @@ static int gssx_dec_bool(struct xdr_stream *xdr, u32 *v)
44} 44}
45 45
46static int gssx_enc_buffer(struct xdr_stream *xdr, 46static int gssx_enc_buffer(struct xdr_stream *xdr,
47 gssx_buffer *buf) 47 const gssx_buffer *buf)
48{ 48{
49 __be32 *p; 49 __be32 *p;
50 50
@@ -56,7 +56,7 @@ static int gssx_enc_buffer(struct xdr_stream *xdr,
56} 56}
57 57
58static int gssx_enc_in_token(struct xdr_stream *xdr, 58static int gssx_enc_in_token(struct xdr_stream *xdr,
59 struct gssp_in_token *in) 59 const struct gssp_in_token *in)
60{ 60{
61 __be32 *p; 61 __be32 *p;
62 62
@@ -130,7 +130,7 @@ static int gssx_dec_option(struct xdr_stream *xdr,
130} 130}
131 131
132static int dummy_enc_opt_array(struct xdr_stream *xdr, 132static int dummy_enc_opt_array(struct xdr_stream *xdr,
133 struct gssx_option_array *oa) 133 const struct gssx_option_array *oa)
134{ 134{
135 __be32 *p; 135 __be32 *p;
136 136
@@ -348,7 +348,7 @@ static int gssx_dec_status(struct xdr_stream *xdr,
348} 348}
349 349
350static int gssx_enc_call_ctx(struct xdr_stream *xdr, 350static int gssx_enc_call_ctx(struct xdr_stream *xdr,
351 struct gssx_call_ctx *ctx) 351 const struct gssx_call_ctx *ctx)
352{ 352{
353 struct gssx_option opt; 353 struct gssx_option opt;
354 __be32 *p; 354 __be32 *p;
@@ -733,8 +733,9 @@ static int gssx_enc_cb(struct xdr_stream *xdr, struct gssx_cb *cb)
733 733
734void gssx_enc_accept_sec_context(struct rpc_rqst *req, 734void gssx_enc_accept_sec_context(struct rpc_rqst *req,
735 struct xdr_stream *xdr, 735 struct xdr_stream *xdr,
736 struct gssx_arg_accept_sec_context *arg) 736 const void *data)
737{ 737{
738 const struct gssx_arg_accept_sec_context *arg = data;
738 int err; 739 int err;
739 740
740 err = gssx_enc_call_ctx(xdr, &arg->call_ctx); 741 err = gssx_enc_call_ctx(xdr, &arg->call_ctx);
@@ -789,8 +790,9 @@ done:
789 790
790int gssx_dec_accept_sec_context(struct rpc_rqst *rqstp, 791int gssx_dec_accept_sec_context(struct rpc_rqst *rqstp,
791 struct xdr_stream *xdr, 792 struct xdr_stream *xdr,
792 struct gssx_res_accept_sec_context *res) 793 void *data)
793{ 794{
795 struct gssx_res_accept_sec_context *res = data;
794 u32 value_follows; 796 u32 value_follows;
795 int err; 797 int err;
796 struct page *scratch; 798 struct page *scratch;
diff --git a/net/sunrpc/auth_gss/gss_rpc_xdr.h b/net/sunrpc/auth_gss/gss_rpc_xdr.h
index 9d88c6239f01..146c31032917 100644
--- a/net/sunrpc/auth_gss/gss_rpc_xdr.h
+++ b/net/sunrpc/auth_gss/gss_rpc_xdr.h
@@ -179,10 +179,10 @@ struct gssx_res_accept_sec_context {
179#define gssx_dec_init_sec_context NULL 179#define gssx_dec_init_sec_context NULL
180void gssx_enc_accept_sec_context(struct rpc_rqst *req, 180void gssx_enc_accept_sec_context(struct rpc_rqst *req,
181 struct xdr_stream *xdr, 181 struct xdr_stream *xdr,
182 struct gssx_arg_accept_sec_context *args); 182 const void *data);
183int gssx_dec_accept_sec_context(struct rpc_rqst *rqstp, 183int gssx_dec_accept_sec_context(struct rpc_rqst *rqstp,
184 struct xdr_stream *xdr, 184 struct xdr_stream *xdr,
185 struct gssx_res_accept_sec_context *res); 185 void *data);
186#define gssx_enc_release_handle NULL 186#define gssx_enc_release_handle NULL
187#define gssx_dec_release_handle NULL 187#define gssx_dec_release_handle NULL
188#define gssx_enc_get_mic NULL 188#define gssx_enc_get_mic NULL
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index a54a7a3d28f5..7b1ee5a0b03c 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -838,6 +838,14 @@ unwrap_integ_data(struct svc_rqst *rqstp, struct xdr_buf *buf, u32 seq, struct g
838 struct xdr_netobj mic; 838 struct xdr_netobj mic;
839 struct xdr_buf integ_buf; 839 struct xdr_buf integ_buf;
840 840
841 /* NFS READ normally uses splice to send data in-place. However
842 * the data in cache can change after the reply's MIC is computed
843 * but before the RPC reply is sent. To prevent the client from
844 * rejecting the server-computed MIC in this somewhat rare case,
845 * do not use splice with the GSS integrity service.
846 */
847 clear_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
848
841 /* Did we already verify the signature on the original pass through? */ 849 /* Did we already verify the signature on the original pass through? */
842 if (rqstp->rq_deferred) 850 if (rqstp->rq_deferred)
843 return 0; 851 return 0;
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index b5cb921775a0..2e49d1f892b7 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1517,14 +1517,16 @@ static void
1517call_start(struct rpc_task *task) 1517call_start(struct rpc_task *task)
1518{ 1518{
1519 struct rpc_clnt *clnt = task->tk_client; 1519 struct rpc_clnt *clnt = task->tk_client;
1520 int idx = task->tk_msg.rpc_proc->p_statidx;
1520 1521
1521 dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid, 1522 dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1522 clnt->cl_program->name, clnt->cl_vers, 1523 clnt->cl_program->name, clnt->cl_vers,
1523 rpc_proc_name(task), 1524 rpc_proc_name(task),
1524 (RPC_IS_ASYNC(task) ? "async" : "sync")); 1525 (RPC_IS_ASYNC(task) ? "async" : "sync"));
1525 1526
1526 /* Increment call count */ 1527 /* Increment call count (version might not be valid for ping) */
1527 task->tk_msg.rpc_proc->p_count++; 1528 if (clnt->cl_program->version[clnt->cl_vers])
1529 clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1528 clnt->cl_stats->rpccnt++; 1530 clnt->cl_stats->rpccnt++;
1529 task->tk_action = call_reserve; 1531 task->tk_action = call_reserve;
1530} 1532}
@@ -1672,7 +1674,7 @@ call_allocate(struct rpc_task *task)
1672 unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack; 1674 unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1673 struct rpc_rqst *req = task->tk_rqstp; 1675 struct rpc_rqst *req = task->tk_rqstp;
1674 struct rpc_xprt *xprt = req->rq_xprt; 1676 struct rpc_xprt *xprt = req->rq_xprt;
1675 struct rpc_procinfo *proc = task->tk_msg.rpc_proc; 1677 const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1676 int status; 1678 int status;
1677 1679
1678 dprint_status(task); 1680 dprint_status(task);
@@ -2476,16 +2478,18 @@ out_overflow:
2476 goto out_garbage; 2478 goto out_garbage;
2477} 2479}
2478 2480
2479static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj) 2481static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2482 const void *obj)
2480{ 2483{
2481} 2484}
2482 2485
2483static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj) 2486static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2487 void *obj)
2484{ 2488{
2485 return 0; 2489 return 0;
2486} 2490}
2487 2491
2488static struct rpc_procinfo rpcproc_null = { 2492static const struct rpc_procinfo rpcproc_null = {
2489 .p_encode = rpcproc_encode_null, 2493 .p_encode = rpcproc_encode_null,
2490 .p_decode = rpcproc_decode_null, 2494 .p_decode = rpcproc_decode_null,
2491}; 2495};
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index 5b30603596d0..ea0676f199c8 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -128,13 +128,13 @@ struct rpcbind_args {
128 int r_status; 128 int r_status;
129}; 129};
130 130
131static struct rpc_procinfo rpcb_procedures2[]; 131static const struct rpc_procinfo rpcb_procedures2[];
132static struct rpc_procinfo rpcb_procedures3[]; 132static const struct rpc_procinfo rpcb_procedures3[];
133static struct rpc_procinfo rpcb_procedures4[]; 133static const struct rpc_procinfo rpcb_procedures4[];
134 134
135struct rpcb_info { 135struct rpcb_info {
136 u32 rpc_vers; 136 u32 rpc_vers;
137 struct rpc_procinfo * rpc_proc; 137 const struct rpc_procinfo *rpc_proc;
138}; 138};
139 139
140static const struct rpcb_info rpcb_next_version[]; 140static const struct rpcb_info rpcb_next_version[];
@@ -620,7 +620,8 @@ int rpcb_v4_register(struct net *net, const u32 program, const u32 version,
620 return -EAFNOSUPPORT; 620 return -EAFNOSUPPORT;
621} 621}
622 622
623static struct rpc_task *rpcb_call_async(struct rpc_clnt *rpcb_clnt, struct rpcbind_args *map, struct rpc_procinfo *proc) 623static struct rpc_task *rpcb_call_async(struct rpc_clnt *rpcb_clnt,
624 struct rpcbind_args *map, const struct rpc_procinfo *proc)
624{ 625{
625 struct rpc_message msg = { 626 struct rpc_message msg = {
626 .rpc_proc = proc, 627 .rpc_proc = proc,
@@ -671,7 +672,7 @@ static struct rpc_clnt *rpcb_find_transport_owner(struct rpc_clnt *clnt)
671void rpcb_getport_async(struct rpc_task *task) 672void rpcb_getport_async(struct rpc_task *task)
672{ 673{
673 struct rpc_clnt *clnt; 674 struct rpc_clnt *clnt;
674 struct rpc_procinfo *proc; 675 const struct rpc_procinfo *proc;
675 u32 bind_version; 676 u32 bind_version;
676 struct rpc_xprt *xprt; 677 struct rpc_xprt *xprt;
677 struct rpc_clnt *rpcb_clnt; 678 struct rpc_clnt *rpcb_clnt;
@@ -843,8 +844,9 @@ static void rpcb_getport_done(struct rpc_task *child, void *data)
843 */ 844 */
844 845
845static void rpcb_enc_mapping(struct rpc_rqst *req, struct xdr_stream *xdr, 846static void rpcb_enc_mapping(struct rpc_rqst *req, struct xdr_stream *xdr,
846 const struct rpcbind_args *rpcb) 847 const void *data)
847{ 848{
849 const struct rpcbind_args *rpcb = data;
848 __be32 *p; 850 __be32 *p;
849 851
850 dprintk("RPC: %5u encoding PMAP_%s call (%u, %u, %d, %u)\n", 852 dprintk("RPC: %5u encoding PMAP_%s call (%u, %u, %d, %u)\n",
@@ -860,8 +862,9 @@ static void rpcb_enc_mapping(struct rpc_rqst *req, struct xdr_stream *xdr,
860} 862}
861 863
862static int rpcb_dec_getport(struct rpc_rqst *req, struct xdr_stream *xdr, 864static int rpcb_dec_getport(struct rpc_rqst *req, struct xdr_stream *xdr,
863 struct rpcbind_args *rpcb) 865 void *data)
864{ 866{
867 struct rpcbind_args *rpcb = data;
865 unsigned long port; 868 unsigned long port;
866 __be32 *p; 869 __be32 *p;
867 870
@@ -882,8 +885,9 @@ static int rpcb_dec_getport(struct rpc_rqst *req, struct xdr_stream *xdr,
882} 885}
883 886
884static int rpcb_dec_set(struct rpc_rqst *req, struct xdr_stream *xdr, 887static int rpcb_dec_set(struct rpc_rqst *req, struct xdr_stream *xdr,
885 unsigned int *boolp) 888 void *data)
886{ 889{
890 unsigned int *boolp = data;
887 __be32 *p; 891 __be32 *p;
888 892
889 p = xdr_inline_decode(xdr, 4); 893 p = xdr_inline_decode(xdr, 4);
@@ -917,8 +921,9 @@ static void encode_rpcb_string(struct xdr_stream *xdr, const char *string,
917} 921}
918 922
919static void rpcb_enc_getaddr(struct rpc_rqst *req, struct xdr_stream *xdr, 923static void rpcb_enc_getaddr(struct rpc_rqst *req, struct xdr_stream *xdr,
920 const struct rpcbind_args *rpcb) 924 const void *data)
921{ 925{
926 const struct rpcbind_args *rpcb = data;
922 __be32 *p; 927 __be32 *p;
923 928
924 dprintk("RPC: %5u encoding RPCB_%s call (%u, %u, '%s', '%s')\n", 929 dprintk("RPC: %5u encoding RPCB_%s call (%u, %u, '%s', '%s')\n",
@@ -937,8 +942,9 @@ static void rpcb_enc_getaddr(struct rpc_rqst *req, struct xdr_stream *xdr,
937} 942}
938 943
939static int rpcb_dec_getaddr(struct rpc_rqst *req, struct xdr_stream *xdr, 944static int rpcb_dec_getaddr(struct rpc_rqst *req, struct xdr_stream *xdr,
940 struct rpcbind_args *rpcb) 945 void *data)
941{ 946{
947 struct rpcbind_args *rpcb = data;
942 struct sockaddr_storage address; 948 struct sockaddr_storage address;
943 struct sockaddr *sap = (struct sockaddr *)&address; 949 struct sockaddr *sap = (struct sockaddr *)&address;
944 __be32 *p; 950 __be32 *p;
@@ -989,11 +995,11 @@ out_fail:
989 * since the Linux kernel RPC code requires only these. 995 * since the Linux kernel RPC code requires only these.
990 */ 996 */
991 997
992static struct rpc_procinfo rpcb_procedures2[] = { 998static const struct rpc_procinfo rpcb_procedures2[] = {
993 [RPCBPROC_SET] = { 999 [RPCBPROC_SET] = {
994 .p_proc = RPCBPROC_SET, 1000 .p_proc = RPCBPROC_SET,
995 .p_encode = (kxdreproc_t)rpcb_enc_mapping, 1001 .p_encode = rpcb_enc_mapping,
996 .p_decode = (kxdrdproc_t)rpcb_dec_set, 1002 .p_decode = rpcb_dec_set,
997 .p_arglen = RPCB_mappingargs_sz, 1003 .p_arglen = RPCB_mappingargs_sz,
998 .p_replen = RPCB_setres_sz, 1004 .p_replen = RPCB_setres_sz,
999 .p_statidx = RPCBPROC_SET, 1005 .p_statidx = RPCBPROC_SET,
@@ -1002,8 +1008,8 @@ static struct rpc_procinfo rpcb_procedures2[] = {
1002 }, 1008 },
1003 [RPCBPROC_UNSET] = { 1009 [RPCBPROC_UNSET] = {
1004 .p_proc = RPCBPROC_UNSET, 1010 .p_proc = RPCBPROC_UNSET,
1005 .p_encode = (kxdreproc_t)rpcb_enc_mapping, 1011 .p_encode = rpcb_enc_mapping,
1006 .p_decode = (kxdrdproc_t)rpcb_dec_set, 1012 .p_decode = rpcb_dec_set,
1007 .p_arglen = RPCB_mappingargs_sz, 1013 .p_arglen = RPCB_mappingargs_sz,
1008 .p_replen = RPCB_setres_sz, 1014 .p_replen = RPCB_setres_sz,
1009 .p_statidx = RPCBPROC_UNSET, 1015 .p_statidx = RPCBPROC_UNSET,
@@ -1012,8 +1018,8 @@ static struct rpc_procinfo rpcb_procedures2[] = {
1012 }, 1018 },
1013 [RPCBPROC_GETPORT] = { 1019 [RPCBPROC_GETPORT] = {
1014 .p_proc = RPCBPROC_GETPORT, 1020 .p_proc = RPCBPROC_GETPORT,
1015 .p_encode = (kxdreproc_t)rpcb_enc_mapping, 1021 .p_encode = rpcb_enc_mapping,
1016 .p_decode = (kxdrdproc_t)rpcb_dec_getport, 1022 .p_decode = rpcb_dec_getport,
1017 .p_arglen = RPCB_mappingargs_sz, 1023 .p_arglen = RPCB_mappingargs_sz,
1018 .p_replen = RPCB_getportres_sz, 1024 .p_replen = RPCB_getportres_sz,
1019 .p_statidx = RPCBPROC_GETPORT, 1025 .p_statidx = RPCBPROC_GETPORT,
@@ -1022,11 +1028,11 @@ static struct rpc_procinfo rpcb_procedures2[] = {
1022 }, 1028 },
1023}; 1029};
1024 1030
1025static struct rpc_procinfo rpcb_procedures3[] = { 1031static const struct rpc_procinfo rpcb_procedures3[] = {
1026 [RPCBPROC_SET] = { 1032 [RPCBPROC_SET] = {
1027 .p_proc = RPCBPROC_SET, 1033 .p_proc = RPCBPROC_SET,
1028 .p_encode = (kxdreproc_t)rpcb_enc_getaddr, 1034 .p_encode = rpcb_enc_getaddr,
1029 .p_decode = (kxdrdproc_t)rpcb_dec_set, 1035 .p_decode = rpcb_dec_set,
1030 .p_arglen = RPCB_getaddrargs_sz, 1036 .p_arglen = RPCB_getaddrargs_sz,
1031 .p_replen = RPCB_setres_sz, 1037 .p_replen = RPCB_setres_sz,
1032 .p_statidx = RPCBPROC_SET, 1038 .p_statidx = RPCBPROC_SET,
@@ -1035,8 +1041,8 @@ static struct rpc_procinfo rpcb_procedures3[] = {
1035 }, 1041 },
1036 [RPCBPROC_UNSET] = { 1042 [RPCBPROC_UNSET] = {
1037 .p_proc = RPCBPROC_UNSET, 1043 .p_proc = RPCBPROC_UNSET,
1038 .p_encode = (kxdreproc_t)rpcb_enc_getaddr, 1044 .p_encode = rpcb_enc_getaddr,
1039 .p_decode = (kxdrdproc_t)rpcb_dec_set, 1045 .p_decode = rpcb_dec_set,
1040 .p_arglen = RPCB_getaddrargs_sz, 1046 .p_arglen = RPCB_getaddrargs_sz,
1041 .p_replen = RPCB_setres_sz, 1047 .p_replen = RPCB_setres_sz,
1042 .p_statidx = RPCBPROC_UNSET, 1048 .p_statidx = RPCBPROC_UNSET,
@@ -1045,8 +1051,8 @@ static struct rpc_procinfo rpcb_procedures3[] = {
1045 }, 1051 },
1046 [RPCBPROC_GETADDR] = { 1052 [RPCBPROC_GETADDR] = {
1047 .p_proc = RPCBPROC_GETADDR, 1053 .p_proc = RPCBPROC_GETADDR,
1048 .p_encode = (kxdreproc_t)rpcb_enc_getaddr, 1054 .p_encode = rpcb_enc_getaddr,
1049 .p_decode = (kxdrdproc_t)rpcb_dec_getaddr, 1055 .p_decode = rpcb_dec_getaddr,
1050 .p_arglen = RPCB_getaddrargs_sz, 1056 .p_arglen = RPCB_getaddrargs_sz,
1051 .p_replen = RPCB_getaddrres_sz, 1057 .p_replen = RPCB_getaddrres_sz,
1052 .p_statidx = RPCBPROC_GETADDR, 1058 .p_statidx = RPCBPROC_GETADDR,
@@ -1055,11 +1061,11 @@ static struct rpc_procinfo rpcb_procedures3[] = {
1055 }, 1061 },
1056}; 1062};
1057 1063
1058static struct rpc_procinfo rpcb_procedures4[] = { 1064static const struct rpc_procinfo rpcb_procedures4[] = {
1059 [RPCBPROC_SET] = { 1065 [RPCBPROC_SET] = {
1060 .p_proc = RPCBPROC_SET, 1066 .p_proc = RPCBPROC_SET,
1061 .p_encode = (kxdreproc_t)rpcb_enc_getaddr, 1067 .p_encode = rpcb_enc_getaddr,
1062 .p_decode = (kxdrdproc_t)rpcb_dec_set, 1068 .p_decode = rpcb_dec_set,
1063 .p_arglen = RPCB_getaddrargs_sz, 1069 .p_arglen = RPCB_getaddrargs_sz,
1064 .p_replen = RPCB_setres_sz, 1070 .p_replen = RPCB_setres_sz,
1065 .p_statidx = RPCBPROC_SET, 1071 .p_statidx = RPCBPROC_SET,
@@ -1068,8 +1074,8 @@ static struct rpc_procinfo rpcb_procedures4[] = {
1068 }, 1074 },
1069 [RPCBPROC_UNSET] = { 1075 [RPCBPROC_UNSET] = {
1070 .p_proc = RPCBPROC_UNSET, 1076 .p_proc = RPCBPROC_UNSET,
1071 .p_encode = (kxdreproc_t)rpcb_enc_getaddr, 1077 .p_encode = rpcb_enc_getaddr,
1072 .p_decode = (kxdrdproc_t)rpcb_dec_set, 1078 .p_decode = rpcb_dec_set,
1073 .p_arglen = RPCB_getaddrargs_sz, 1079 .p_arglen = RPCB_getaddrargs_sz,
1074 .p_replen = RPCB_setres_sz, 1080 .p_replen = RPCB_setres_sz,
1075 .p_statidx = RPCBPROC_UNSET, 1081 .p_statidx = RPCBPROC_UNSET,
@@ -1078,8 +1084,8 @@ static struct rpc_procinfo rpcb_procedures4[] = {
1078 }, 1084 },
1079 [RPCBPROC_GETADDR] = { 1085 [RPCBPROC_GETADDR] = {
1080 .p_proc = RPCBPROC_GETADDR, 1086 .p_proc = RPCBPROC_GETADDR,
1081 .p_encode = (kxdreproc_t)rpcb_enc_getaddr, 1087 .p_encode = rpcb_enc_getaddr,
1082 .p_decode = (kxdrdproc_t)rpcb_dec_getaddr, 1088 .p_decode = rpcb_dec_getaddr,
1083 .p_arglen = RPCB_getaddrargs_sz, 1089 .p_arglen = RPCB_getaddrargs_sz,
1084 .p_replen = RPCB_getaddrres_sz, 1090 .p_replen = RPCB_getaddrres_sz,
1085 .p_statidx = RPCBPROC_GETADDR, 1091 .p_statidx = RPCBPROC_GETADDR,
@@ -1112,22 +1118,28 @@ static const struct rpcb_info rpcb_next_version6[] = {
1112 }, 1118 },
1113}; 1119};
1114 1120
1121static unsigned int rpcb_version2_counts[ARRAY_SIZE(rpcb_procedures2)];
1115static const struct rpc_version rpcb_version2 = { 1122static const struct rpc_version rpcb_version2 = {
1116 .number = RPCBVERS_2, 1123 .number = RPCBVERS_2,
1117 .nrprocs = ARRAY_SIZE(rpcb_procedures2), 1124 .nrprocs = ARRAY_SIZE(rpcb_procedures2),
1118 .procs = rpcb_procedures2 1125 .procs = rpcb_procedures2,
1126 .counts = rpcb_version2_counts,
1119}; 1127};
1120 1128
1129static unsigned int rpcb_version3_counts[ARRAY_SIZE(rpcb_procedures3)];
1121static const struct rpc_version rpcb_version3 = { 1130static const struct rpc_version rpcb_version3 = {
1122 .number = RPCBVERS_3, 1131 .number = RPCBVERS_3,
1123 .nrprocs = ARRAY_SIZE(rpcb_procedures3), 1132 .nrprocs = ARRAY_SIZE(rpcb_procedures3),
1124 .procs = rpcb_procedures3 1133 .procs = rpcb_procedures3,
1134 .counts = rpcb_version3_counts,
1125}; 1135};
1126 1136
1137static unsigned int rpcb_version4_counts[ARRAY_SIZE(rpcb_procedures4)];
1127static const struct rpc_version rpcb_version4 = { 1138static const struct rpc_version rpcb_version4 = {
1128 .number = RPCBVERS_4, 1139 .number = RPCBVERS_4,
1129 .nrprocs = ARRAY_SIZE(rpcb_procedures4), 1140 .nrprocs = ARRAY_SIZE(rpcb_procedures4),
1130 .procs = rpcb_procedures4 1141 .procs = rpcb_procedures4,
1142 .counts = rpcb_version4_counts,
1131}; 1143};
1132 1144
1133static const struct rpc_version *rpcb_version[] = { 1145static const struct rpc_version *rpcb_version[] = {
diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c
index caeb01ad2b5a..1e671333c3d5 100644
--- a/net/sunrpc/stats.c
+++ b/net/sunrpc/stats.c
@@ -55,8 +55,7 @@ static int rpc_proc_show(struct seq_file *seq, void *v) {
55 seq_printf(seq, "proc%u %u", 55 seq_printf(seq, "proc%u %u",
56 vers->number, vers->nrprocs); 56 vers->number, vers->nrprocs);
57 for (j = 0; j < vers->nrprocs; j++) 57 for (j = 0; j < vers->nrprocs; j++)
58 seq_printf(seq, " %u", 58 seq_printf(seq, " %u", vers->counts[j]);
59 vers->procs[j].p_count);
60 seq_putc(seq, '\n'); 59 seq_putc(seq, '\n');
61 } 60 }
62 return 0; 61 return 0;
@@ -78,9 +77,9 @@ static const struct file_operations rpc_proc_fops = {
78/* 77/*
79 * Get RPC server stats 78 * Get RPC server stats
80 */ 79 */
81void svc_seq_show(struct seq_file *seq, const struct svc_stat *statp) { 80void svc_seq_show(struct seq_file *seq, const struct svc_stat *statp)
81{
82 const struct svc_program *prog = statp->program; 82 const struct svc_program *prog = statp->program;
83 const struct svc_procedure *proc;
84 const struct svc_version *vers; 83 const struct svc_version *vers;
85 unsigned int i, j; 84 unsigned int i, j;
86 85
@@ -99,11 +98,12 @@ void svc_seq_show(struct seq_file *seq, const struct svc_stat *statp) {
99 statp->rpcbadclnt); 98 statp->rpcbadclnt);
100 99
101 for (i = 0; i < prog->pg_nvers; i++) { 100 for (i = 0; i < prog->pg_nvers; i++) {
102 if (!(vers = prog->pg_vers[i]) || !(proc = vers->vs_proc)) 101 vers = prog->pg_vers[i];
102 if (!vers)
103 continue; 103 continue;
104 seq_printf(seq, "proc%d %u", i, vers->vs_nproc); 104 seq_printf(seq, "proc%d %u", i, vers->vs_nproc);
105 for (j = 0; j < vers->vs_nproc; j++, proc++) 105 for (j = 0; j < vers->vs_nproc; j++)
106 seq_printf(seq, " %u", proc->pc_count); 106 seq_printf(seq, " %u", vers->vs_count[j]);
107 seq_putc(seq, '\n'); 107 seq_putc(seq, '\n');
108 } 108 }
109} 109}
@@ -192,7 +192,7 @@ void rpc_count_iostats(const struct rpc_task *task, struct rpc_iostats *stats)
192EXPORT_SYMBOL_GPL(rpc_count_iostats); 192EXPORT_SYMBOL_GPL(rpc_count_iostats);
193 193
194static void _print_name(struct seq_file *seq, unsigned int op, 194static void _print_name(struct seq_file *seq, unsigned int op,
195 struct rpc_procinfo *procs) 195 const struct rpc_procinfo *procs)
196{ 196{
197 if (procs[op].p_name) 197 if (procs[op].p_name)
198 seq_printf(seq, "\t%12s: ", procs[op].p_name); 198 seq_printf(seq, "\t%12s: ", procs[op].p_name);
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index bc0f5a0ecbdc..85ce0db5b0a6 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1008,7 +1008,7 @@ int svc_register(const struct svc_serv *serv, struct net *net,
1008 const unsigned short port) 1008 const unsigned short port)
1009{ 1009{
1010 struct svc_program *progp; 1010 struct svc_program *progp;
1011 struct svc_version *vers; 1011 const struct svc_version *vers;
1012 unsigned int i; 1012 unsigned int i;
1013 int error = 0; 1013 int error = 0;
1014 1014
@@ -1151,10 +1151,9 @@ static int
1151svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv) 1151svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1152{ 1152{
1153 struct svc_program *progp; 1153 struct svc_program *progp;
1154 struct svc_version *versp = NULL; /* compiler food */ 1154 const struct svc_version *versp = NULL; /* compiler food */
1155 struct svc_procedure *procp = NULL; 1155 const struct svc_procedure *procp = NULL;
1156 struct svc_serv *serv = rqstp->rq_server; 1156 struct svc_serv *serv = rqstp->rq_server;
1157 kxdrproc_t xdr;
1158 __be32 *statp; 1157 __be32 *statp;
1159 u32 prog, vers, proc; 1158 u32 prog, vers, proc;
1160 __be32 auth_stat, rpc_stat; 1159 __be32 auth_stat, rpc_stat;
@@ -1166,7 +1165,7 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1166 if (argv->iov_len < 6*4) 1165 if (argv->iov_len < 6*4)
1167 goto err_short_len; 1166 goto err_short_len;
1168 1167
1169 /* Will be turned off only in gss privacy case: */ 1168 /* Will be turned off by GSS integrity and privacy services */
1170 set_bit(RQ_SPLICE_OK, &rqstp->rq_flags); 1169 set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
1171 /* Will be turned off only when NFSv4 Sessions are used */ 1170 /* Will be turned off only when NFSv4 Sessions are used */
1172 set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags); 1171 set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
@@ -1262,7 +1261,7 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1262 svc_putnl(resv, RPC_SUCCESS); 1261 svc_putnl(resv, RPC_SUCCESS);
1263 1262
1264 /* Bump per-procedure stats counter */ 1263 /* Bump per-procedure stats counter */
1265 procp->pc_count++; 1264 versp->vs_count[proc]++;
1266 1265
1267 /* Initialize storage for argp and resp */ 1266 /* Initialize storage for argp and resp */
1268 memset(rqstp->rq_argp, 0, procp->pc_argsize); 1267 memset(rqstp->rq_argp, 0, procp->pc_argsize);
@@ -1276,28 +1275,30 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1276 1275
1277 /* Call the function that processes the request. */ 1276 /* Call the function that processes the request. */
1278 if (!versp->vs_dispatch) { 1277 if (!versp->vs_dispatch) {
1279 /* Decode arguments */ 1278 /*
1280 xdr = procp->pc_decode; 1279 * Decode arguments
1281 if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp)) 1280 * XXX: why do we ignore the return value?
1281 */
1282 if (procp->pc_decode &&
1283 !procp->pc_decode(rqstp, argv->iov_base))
1282 goto err_garbage; 1284 goto err_garbage;
1283 1285
1284 *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp); 1286 *statp = procp->pc_func(rqstp);
1285 1287
1286 /* Encode reply */ 1288 /* Encode reply */
1287 if (*statp == rpc_drop_reply || 1289 if (*statp == rpc_drop_reply ||
1288 test_bit(RQ_DROPME, &rqstp->rq_flags)) { 1290 test_bit(RQ_DROPME, &rqstp->rq_flags)) {
1289 if (procp->pc_release) 1291 if (procp->pc_release)
1290 procp->pc_release(rqstp, NULL, rqstp->rq_resp); 1292 procp->pc_release(rqstp);
1291 goto dropit; 1293 goto dropit;
1292 } 1294 }
1293 if (*statp == rpc_autherr_badcred) { 1295 if (*statp == rpc_autherr_badcred) {
1294 if (procp->pc_release) 1296 if (procp->pc_release)
1295 procp->pc_release(rqstp, NULL, rqstp->rq_resp); 1297 procp->pc_release(rqstp);
1296 goto err_bad_auth; 1298 goto err_bad_auth;
1297 } 1299 }
1298 if (*statp == rpc_success && 1300 if (*statp == rpc_success && procp->pc_encode &&
1299 (xdr = procp->pc_encode) && 1301 !procp->pc_encode(rqstp, resv->iov_base + resv->iov_len)) {
1300 !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
1301 dprintk("svc: failed to encode reply\n"); 1302 dprintk("svc: failed to encode reply\n");
1302 /* serv->sv_stats->rpcsystemerr++; */ 1303 /* serv->sv_stats->rpcsystemerr++; */
1303 *statp = rpc_system_err; 1304 *statp = rpc_system_err;
@@ -1307,7 +1308,7 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1307 if (!versp->vs_dispatch(rqstp, statp)) { 1308 if (!versp->vs_dispatch(rqstp, statp)) {
1308 /* Release reply info */ 1309 /* Release reply info */
1309 if (procp->pc_release) 1310 if (procp->pc_release)
1310 procp->pc_release(rqstp, NULL, rqstp->rq_resp); 1311 procp->pc_release(rqstp);
1311 goto dropit; 1312 goto dropit;
1312 } 1313 }
1313 } 1314 }
@@ -1318,7 +1319,7 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1318 1319
1319 /* Release reply info */ 1320 /* Release reply info */
1320 if (procp->pc_release) 1321 if (procp->pc_release)
1321 procp->pc_release(rqstp, NULL, rqstp->rq_resp); 1322 procp->pc_release(rqstp);
1322 1323
1323 if (procp->pc_encode == NULL) 1324 if (procp->pc_encode == NULL)
1324 goto dropit; 1325 goto dropit;
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 7bfe1fb42add..d16a8b423c20 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -659,11 +659,13 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
659 int i; 659 int i;
660 660
661 /* now allocate needed pages. If we get a failure, sleep briefly */ 661 /* now allocate needed pages. If we get a failure, sleep briefly */
662 pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; 662 pages = (serv->sv_max_mesg + 2 * PAGE_SIZE) >> PAGE_SHIFT;
663 WARN_ON_ONCE(pages >= RPCSVC_MAXPAGES); 663 if (pages > RPCSVC_MAXPAGES) {
664 if (pages >= RPCSVC_MAXPAGES) 664 pr_warn_once("svc: warning: pages=%u > RPCSVC_MAXPAGES=%lu\n",
665 pages, RPCSVC_MAXPAGES);
665 /* use as many pages as possible */ 666 /* use as many pages as possible */
666 pages = RPCSVC_MAXPAGES - 1; 667 pages = RPCSVC_MAXPAGES;
668 }
667 for (i = 0; i < pages ; i++) 669 for (i = 0; i < pages ; i++)
668 while (rqstp->rq_pages[i] == NULL) { 670 while (rqstp->rq_pages[i] == NULL) {
669 struct page *p = alloc_page(GFP_KERNEL); 671 struct page *p = alloc_page(GFP_KERNEL);
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index c1ae8142ab73..b8213ddce2f2 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -3,6 +3,6 @@ obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o
3rpcrdma-y := transport.o rpc_rdma.o verbs.o \ 3rpcrdma-y := transport.o rpc_rdma.o verbs.o \
4 fmr_ops.o frwr_ops.o \ 4 fmr_ops.o frwr_ops.o \
5 svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \ 5 svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
6 svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ 6 svc_rdma_sendto.o svc_rdma_recvfrom.o svc_rdma_rw.o \
7 svc_rdma_rw.o module.o 7 module.o
8rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o 8rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
deleted file mode 100644
index bdcf7d85a3dc..000000000000
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ /dev/null
@@ -1,168 +0,0 @@
1/*
2 * Copyright (c) 2016 Oracle. All rights reserved.
3 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses. You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the BSD-type
9 * license below:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions
13 * are met:
14 *
15 * Redistributions of source code must retain the above copyright
16 * notice, this list of conditions and the following disclaimer.
17 *
18 * Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials provided
21 * with the distribution.
22 *
23 * Neither the name of the Network Appliance, Inc. nor the names of
24 * its contributors may be used to endorse or promote products
25 * derived from this software without specific prior written
26 * permission.
27 *
28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39 *
40 * Author: Tom Tucker <tom@opengridcomputing.com>
41 */
42
43#include <linux/sunrpc/xdr.h>
44#include <linux/sunrpc/debug.h>
45#include <asm/unaligned.h>
46#include <linux/sunrpc/rpc_rdma.h>
47#include <linux/sunrpc/svc_rdma.h>
48
49#define RPCDBG_FACILITY RPCDBG_SVCXPRT
50
51static __be32 *xdr_check_read_list(__be32 *p, __be32 *end)
52{
53 __be32 *next;
54
55 while (*p++ != xdr_zero) {
56 next = p + rpcrdma_readchunk_maxsz - 1;
57 if (next > end)
58 return NULL;
59 p = next;
60 }
61 return p;
62}
63
64static __be32 *xdr_check_write_list(__be32 *p, __be32 *end)
65{
66 __be32 *next;
67
68 while (*p++ != xdr_zero) {
69 next = p + 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
70 if (next > end)
71 return NULL;
72 p = next;
73 }
74 return p;
75}
76
77static __be32 *xdr_check_reply_chunk(__be32 *p, __be32 *end)
78{
79 __be32 *next;
80
81 if (*p++ != xdr_zero) {
82 next = p + 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
83 if (next > end)
84 return NULL;
85 p = next;
86 }
87 return p;
88}
89
90/**
91 * svc_rdma_xdr_decode_req - Parse incoming RPC-over-RDMA header
92 * @rq_arg: Receive buffer
93 *
94 * On entry, xdr->head[0].iov_base points to first byte in the
95 * RPC-over-RDMA header.
96 *
97 * On successful exit, head[0] points to first byte past the
98 * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message.
99 * The length of the RPC-over-RDMA header is returned.
100 */
101int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
102{
103 __be32 *p, *end, *rdma_argp;
104 unsigned int hdr_len;
105
106 /* Verify that there's enough bytes for header + something */
107 if (rq_arg->len <= RPCRDMA_HDRLEN_ERR)
108 goto out_short;
109
110 rdma_argp = rq_arg->head[0].iov_base;
111 if (*(rdma_argp + 1) != rpcrdma_version)
112 goto out_version;
113
114 switch (*(rdma_argp + 3)) {
115 case rdma_msg:
116 case rdma_nomsg:
117 break;
118
119 case rdma_done:
120 goto out_drop;
121
122 case rdma_error:
123 goto out_drop;
124
125 default:
126 goto out_proc;
127 }
128
129 end = (__be32 *)((unsigned long)rdma_argp + rq_arg->len);
130 p = xdr_check_read_list(rdma_argp + 4, end);
131 if (!p)
132 goto out_inval;
133 p = xdr_check_write_list(p, end);
134 if (!p)
135 goto out_inval;
136 p = xdr_check_reply_chunk(p, end);
137 if (!p)
138 goto out_inval;
139 if (p > end)
140 goto out_inval;
141
142 rq_arg->head[0].iov_base = p;
143 hdr_len = (unsigned long)p - (unsigned long)rdma_argp;
144 rq_arg->head[0].iov_len -= hdr_len;
145 return hdr_len;
146
147out_short:
148 dprintk("svcrdma: header too short = %d\n", rq_arg->len);
149 return -EINVAL;
150
151out_version:
152 dprintk("svcrdma: bad xprt version: %u\n",
153 be32_to_cpup(rdma_argp + 1));
154 return -EPROTONOSUPPORT;
155
156out_drop:
157 dprintk("svcrdma: dropping RDMA_DONE/ERROR message\n");
158 return 0;
159
160out_proc:
161 dprintk("svcrdma: bad rdma procedure (%u)\n",
162 be32_to_cpup(rdma_argp + 3));
163 return -EINVAL;
164
165out_inval:
166 dprintk("svcrdma: failed to parse transport header\n");
167 return -EINVAL;
168}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 27a99bf5b1a6..ad4bd62eebf1 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -1,4 +1,5 @@
1/* 1/*
2 * Copyright (c) 2016, 2017 Oracle. All rights reserved.
2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 3 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
3 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 4 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
4 * 5 *
@@ -40,12 +41,66 @@
40 * Author: Tom Tucker <tom@opengridcomputing.com> 41 * Author: Tom Tucker <tom@opengridcomputing.com>
41 */ 42 */
42 43
43#include <linux/sunrpc/debug.h> 44/* Operation
44#include <linux/sunrpc/rpc_rdma.h> 45 *
45#include <linux/spinlock.h> 46 * The main entry point is svc_rdma_recvfrom. This is called from
47 * svc_recv when the transport indicates there is incoming data to
48 * be read. "Data Ready" is signaled when an RDMA Receive completes,
49 * or when a set of RDMA Reads complete.
50 *
51 * An svc_rqst is passed in. This structure contains an array of
52 * free pages (rq_pages) that will contain the incoming RPC message.
53 *
54 * Short messages are moved directly into svc_rqst::rq_arg, and
55 * the RPC Call is ready to be processed by the Upper Layer.
56 * svc_rdma_recvfrom returns the length of the RPC Call message,
57 * completing the reception of the RPC Call.
58 *
59 * However, when an incoming message has Read chunks,
60 * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's
61 * data payload from the client. svc_rdma_recvfrom sets up the
62 * RDMA Reads using pages in svc_rqst::rq_pages, which are
63 * transferred to an svc_rdma_op_ctxt for the duration of the
64 * I/O. svc_rdma_recvfrom then returns zero, since the RPC message
65 * is still not yet ready.
66 *
67 * When the Read chunk payloads have become available on the
68 * server, "Data Ready" is raised again, and svc_recv calls
69 * svc_rdma_recvfrom again. This second call may use a different
70 * svc_rqst than the first one, thus any information that needs
71 * to be preserved across these two calls is kept in an
72 * svc_rdma_op_ctxt.
73 *
74 * The second call to svc_rdma_recvfrom performs final assembly
75 * of the RPC Call message, using the RDMA Read sink pages kept in
76 * the svc_rdma_op_ctxt. The xdr_buf is copied from the
77 * svc_rdma_op_ctxt to the second svc_rqst. The second call returns
78 * the length of the completed RPC Call message.
79 *
80 * Page Management
81 *
82 * Pages under I/O must be transferred from the first svc_rqst to an
83 * svc_rdma_op_ctxt before the first svc_rdma_recvfrom call returns.
84 *
85 * The first svc_rqst supplies pages for RDMA Reads. These are moved
86 * from rqstp::rq_pages into ctxt::pages. The consumed elements of
87 * the rq_pages array are set to NULL and refilled with the first
88 * svc_rdma_recvfrom call returns.
89 *
90 * During the second svc_rdma_recvfrom call, RDMA Read sink pages
91 * are transferred from the svc_rdma_op_ctxt to the second svc_rqst
92 * (see rdma_read_complete() below).
93 */
94
46#include <asm/unaligned.h> 95#include <asm/unaligned.h>
47#include <rdma/ib_verbs.h> 96#include <rdma/ib_verbs.h>
48#include <rdma/rdma_cm.h> 97#include <rdma/rdma_cm.h>
98
99#include <linux/spinlock.h>
100
101#include <linux/sunrpc/xdr.h>
102#include <linux/sunrpc/debug.h>
103#include <linux/sunrpc/rpc_rdma.h>
49#include <linux/sunrpc/svc_rdma.h> 104#include <linux/sunrpc/svc_rdma.h>
50 105
51#define RPCDBG_FACILITY RPCDBG_SVCXPRT 106#define RPCDBG_FACILITY RPCDBG_SVCXPRT
@@ -59,7 +114,6 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
59 struct svc_rdma_op_ctxt *ctxt, 114 struct svc_rdma_op_ctxt *ctxt,
60 u32 byte_count) 115 u32 byte_count)
61{ 116{
62 struct rpcrdma_msg *rmsgp;
63 struct page *page; 117 struct page *page;
64 u32 bc; 118 u32 bc;
65 int sge_no; 119 int sge_no;
@@ -83,20 +137,12 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
83 rqstp->rq_arg.page_len = bc; 137 rqstp->rq_arg.page_len = bc;
84 rqstp->rq_arg.page_base = 0; 138 rqstp->rq_arg.page_base = 0;
85 139
86 /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
87 rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
88 if (rmsgp->rm_type == rdma_nomsg)
89 rqstp->rq_arg.pages = &rqstp->rq_pages[0];
90 else
91 rqstp->rq_arg.pages = &rqstp->rq_pages[1];
92
93 sge_no = 1; 140 sge_no = 1;
94 while (bc && sge_no < ctxt->count) { 141 while (bc && sge_no < ctxt->count) {
95 page = ctxt->pages[sge_no]; 142 page = ctxt->pages[sge_no];
96 put_page(rqstp->rq_pages[sge_no]); 143 put_page(rqstp->rq_pages[sge_no]);
97 rqstp->rq_pages[sge_no] = page; 144 rqstp->rq_pages[sge_no] = page;
98 bc -= min_t(u32, bc, ctxt->sge[sge_no].length); 145 bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
99 rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
100 sge_no++; 146 sge_no++;
101 } 147 }
102 rqstp->rq_respages = &rqstp->rq_pages[sge_no]; 148 rqstp->rq_respages = &rqstp->rq_pages[sge_no];
@@ -115,406 +161,208 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
115 rqstp->rq_arg.tail[0].iov_len = 0; 161 rqstp->rq_arg.tail[0].iov_len = 0;
116} 162}
117 163
118/* Issue an RDMA_READ using the local lkey to map the data sink */ 164/* This accommodates the largest possible Write chunk,
119int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt, 165 * in one segment.
120 struct svc_rqst *rqstp, 166 */
121 struct svc_rdma_op_ctxt *head, 167#define MAX_BYTES_WRITE_SEG ((u32)(RPCSVC_MAXPAGES << PAGE_SHIFT))
122 int *page_no,
123 u32 *page_offset,
124 u32 rs_handle,
125 u32 rs_length,
126 u64 rs_offset,
127 bool last)
128{
129 struct ib_rdma_wr read_wr;
130 int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
131 struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
132 int ret, read, pno;
133 u32 pg_off = *page_offset;
134 u32 pg_no = *page_no;
135
136 ctxt->direction = DMA_FROM_DEVICE;
137 ctxt->read_hdr = head;
138 pages_needed = min_t(int, pages_needed, xprt->sc_max_sge_rd);
139 read = min_t(int, (pages_needed << PAGE_SHIFT) - *page_offset,
140 rs_length);
141
142 for (pno = 0; pno < pages_needed; pno++) {
143 int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
144
145 head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
146 head->arg.page_len += len;
147
148 head->arg.len += len;
149 if (!pg_off)
150 head->count++;
151 rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
152 rqstp->rq_next_page = rqstp->rq_respages + 1;
153 ctxt->sge[pno].addr =
154 ib_dma_map_page(xprt->sc_cm_id->device,
155 head->arg.pages[pg_no], pg_off,
156 PAGE_SIZE - pg_off,
157 DMA_FROM_DEVICE);
158 ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
159 ctxt->sge[pno].addr);
160 if (ret)
161 goto err;
162 svc_rdma_count_mappings(xprt, ctxt);
163
164 ctxt->sge[pno].lkey = xprt->sc_pd->local_dma_lkey;
165 ctxt->sge[pno].length = len;
166 ctxt->count++;
167
168 /* adjust offset and wrap to next page if needed */
169 pg_off += len;
170 if (pg_off == PAGE_SIZE) {
171 pg_off = 0;
172 pg_no++;
173 }
174 rs_length -= len;
175 }
176
177 if (last && rs_length == 0)
178 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
179 else
180 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
181
182 memset(&read_wr, 0, sizeof(read_wr));
183 ctxt->cqe.done = svc_rdma_wc_read;
184 read_wr.wr.wr_cqe = &ctxt->cqe;
185 read_wr.wr.opcode = IB_WR_RDMA_READ;
186 read_wr.wr.send_flags = IB_SEND_SIGNALED;
187 read_wr.rkey = rs_handle;
188 read_wr.remote_addr = rs_offset;
189 read_wr.wr.sg_list = ctxt->sge;
190 read_wr.wr.num_sge = pages_needed;
191
192 ret = svc_rdma_send(xprt, &read_wr.wr);
193 if (ret) {
194 pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
195 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
196 goto err;
197 }
198 168
199 /* return current location in page array */ 169/* This accommodates the largest possible Position-Zero
200 *page_no = pg_no; 170 * Read chunk or Reply chunk, in one segment.
201 *page_offset = pg_off; 171 */
202 ret = read; 172#define MAX_BYTES_SPECIAL_SEG ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT))
203 atomic_inc(&rdma_stat_read);
204 return ret;
205 err:
206 svc_rdma_unmap_dma(ctxt);
207 svc_rdma_put_context(ctxt, 0);
208 return ret;
209}
210 173
211/* Issue an RDMA_READ using an FRMR to map the data sink */ 174/* Sanity check the Read list.
212int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt, 175 *
213 struct svc_rqst *rqstp, 176 * Implementation limits:
214 struct svc_rdma_op_ctxt *head, 177 * - This implementation supports only one Read chunk.
215 int *page_no, 178 *
216 u32 *page_offset, 179 * Sanity checks:
217 u32 rs_handle, 180 * - Read list does not overflow buffer.
218 u32 rs_length, 181 * - Segment size limited by largest NFS data payload.
219 u64 rs_offset, 182 *
220 bool last) 183 * The segment count is limited to how many segments can
184 * fit in the transport header without overflowing the
185 * buffer. That's about 40 Read segments for a 1KB inline
186 * threshold.
187 *
188 * Returns pointer to the following Write list.
189 */
190static __be32 *xdr_check_read_list(__be32 *p, const __be32 *end)
221{ 191{
222 struct ib_rdma_wr read_wr; 192 u32 position;
223 struct ib_send_wr inv_wr; 193 bool first;
224 struct ib_reg_wr reg_wr; 194
225 u8 key; 195 first = true;
226 int nents = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT; 196 while (*p++ != xdr_zero) {
227 struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt); 197 if (first) {
228 struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt); 198 position = be32_to_cpup(p++);
229 int ret, read, pno, dma_nents, n; 199 first = false;
230 u32 pg_off = *page_offset; 200 } else if (be32_to_cpup(p++) != position) {
231 u32 pg_no = *page_no; 201 return NULL;
232
233 if (IS_ERR(frmr))
234 return -ENOMEM;
235
236 ctxt->direction = DMA_FROM_DEVICE;
237 ctxt->frmr = frmr;
238 nents = min_t(unsigned int, nents, xprt->sc_frmr_pg_list_len);
239 read = min_t(int, (nents << PAGE_SHIFT) - *page_offset, rs_length);
240
241 frmr->direction = DMA_FROM_DEVICE;
242 frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
243 frmr->sg_nents = nents;
244
245 for (pno = 0; pno < nents; pno++) {
246 int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
247
248 head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
249 head->arg.page_len += len;
250 head->arg.len += len;
251 if (!pg_off)
252 head->count++;
253
254 sg_set_page(&frmr->sg[pno], rqstp->rq_arg.pages[pg_no],
255 len, pg_off);
256
257 rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
258 rqstp->rq_next_page = rqstp->rq_respages + 1;
259
260 /* adjust offset and wrap to next page if needed */
261 pg_off += len;
262 if (pg_off == PAGE_SIZE) {
263 pg_off = 0;
264 pg_no++;
265 } 202 }
266 rs_length -= len; 203 p++; /* handle */
267 } 204 if (be32_to_cpup(p++) > MAX_BYTES_SPECIAL_SEG)
205 return NULL;
206 p += 2; /* offset */
268 207
269 if (last && rs_length == 0) 208 if (p > end)
270 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 209 return NULL;
271 else
272 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
273
274 dma_nents = ib_dma_map_sg(xprt->sc_cm_id->device,
275 frmr->sg, frmr->sg_nents,
276 frmr->direction);
277 if (!dma_nents) {
278 pr_err("svcrdma: failed to dma map sg %p\n",
279 frmr->sg);
280 return -ENOMEM;
281 } 210 }
211 return p;
212}
282 213
283 n = ib_map_mr_sg(frmr->mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE); 214/* The segment count is limited to how many segments can
284 if (unlikely(n != frmr->sg_nents)) { 215 * fit in the transport header without overflowing the
285 pr_err("svcrdma: failed to map mr %p (%d/%d elements)\n", 216 * buffer. That's about 60 Write segments for a 1KB inline
286 frmr->mr, n, frmr->sg_nents); 217 * threshold.
287 return n < 0 ? n : -EINVAL; 218 */
288 } 219static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end,
220 u32 maxlen)
221{
222 u32 i, segcount;
289 223
290 /* Bump the key */ 224 segcount = be32_to_cpup(p++);
291 key = (u8)(frmr->mr->lkey & 0x000000FF); 225 for (i = 0; i < segcount; i++) {
292 ib_update_fast_reg_key(frmr->mr, ++key); 226 p++; /* handle */
293 227 if (be32_to_cpup(p++) > maxlen)
294 ctxt->sge[0].addr = frmr->mr->iova; 228 return NULL;
295 ctxt->sge[0].lkey = frmr->mr->lkey; 229 p += 2; /* offset */
296 ctxt->sge[0].length = frmr->mr->length;
297 ctxt->count = 1;
298 ctxt->read_hdr = head;
299
300 /* Prepare REG WR */
301 ctxt->reg_cqe.done = svc_rdma_wc_reg;
302 reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
303 reg_wr.wr.opcode = IB_WR_REG_MR;
304 reg_wr.wr.send_flags = IB_SEND_SIGNALED;
305 reg_wr.wr.num_sge = 0;
306 reg_wr.mr = frmr->mr;
307 reg_wr.key = frmr->mr->lkey;
308 reg_wr.access = frmr->access_flags;
309 reg_wr.wr.next = &read_wr.wr;
310
311 /* Prepare RDMA_READ */
312 memset(&read_wr, 0, sizeof(read_wr));
313 ctxt->cqe.done = svc_rdma_wc_read;
314 read_wr.wr.wr_cqe = &ctxt->cqe;
315 read_wr.wr.send_flags = IB_SEND_SIGNALED;
316 read_wr.rkey = rs_handle;
317 read_wr.remote_addr = rs_offset;
318 read_wr.wr.sg_list = ctxt->sge;
319 read_wr.wr.num_sge = 1;
320 if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
321 read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
322 read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
323 } else {
324 read_wr.wr.opcode = IB_WR_RDMA_READ;
325 read_wr.wr.next = &inv_wr;
326 /* Prepare invalidate */
327 memset(&inv_wr, 0, sizeof(inv_wr));
328 ctxt->inv_cqe.done = svc_rdma_wc_inv;
329 inv_wr.wr_cqe = &ctxt->inv_cqe;
330 inv_wr.opcode = IB_WR_LOCAL_INV;
331 inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
332 inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
333 }
334 230
335 /* Post the chain */ 231 if (p > end)
336 ret = svc_rdma_send(xprt, &reg_wr.wr); 232 return NULL;
337 if (ret) {
338 pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
339 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
340 goto err;
341 } 233 }
342 234
343 /* return current location in page array */ 235 return p;
344 *page_no = pg_no;
345 *page_offset = pg_off;
346 ret = read;
347 atomic_inc(&rdma_stat_read);
348 return ret;
349 err:
350 svc_rdma_put_context(ctxt, 0);
351 svc_rdma_put_frmr(xprt, frmr);
352 return ret;
353}
354
355static unsigned int
356rdma_rcl_chunk_count(struct rpcrdma_read_chunk *ch)
357{
358 unsigned int count;
359
360 for (count = 0; ch->rc_discrim != xdr_zero; ch++)
361 count++;
362 return count;
363} 236}
364 237
365/* If there was additional inline content, append it to the end of arg.pages. 238/* Sanity check the Write list.
366 * Tail copy has to be done after the reader function has determined how many 239 *
367 * pages are needed for RDMA READ. 240 * Implementation limits:
241 * - This implementation supports only one Write chunk.
242 *
243 * Sanity checks:
244 * - Write list does not overflow buffer.
245 * - Segment size limited by largest NFS data payload.
246 *
247 * Returns pointer to the following Reply chunk.
368 */ 248 */
369static int 249static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end)
370rdma_copy_tail(struct svc_rqst *rqstp, struct svc_rdma_op_ctxt *head,
371 u32 position, u32 byte_count, u32 page_offset, int page_no)
372{ 250{
373 char *srcp, *destp; 251 u32 chcount;
374 252
375 srcp = head->arg.head[0].iov_base + position; 253 chcount = 0;
376 byte_count = head->arg.head[0].iov_len - position; 254 while (*p++ != xdr_zero) {
377 if (byte_count > PAGE_SIZE) { 255 p = xdr_check_write_chunk(p, end, MAX_BYTES_WRITE_SEG);
378 dprintk("svcrdma: large tail unsupported\n"); 256 if (!p)
379 return 0; 257 return NULL;
380 } 258 if (chcount++ > 1)
381 259 return NULL;
382 /* Fit as much of the tail on the current page as possible */
383 if (page_offset != PAGE_SIZE) {
384 destp = page_address(rqstp->rq_arg.pages[page_no]);
385 destp += page_offset;
386 while (byte_count--) {
387 *destp++ = *srcp++;
388 page_offset++;
389 if (page_offset == PAGE_SIZE && byte_count)
390 goto more;
391 }
392 goto done;
393 } 260 }
394 261 return p;
395more:
396 /* Fit the rest on the next page */
397 page_no++;
398 destp = page_address(rqstp->rq_arg.pages[page_no]);
399 while (byte_count--)
400 *destp++ = *srcp++;
401
402 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
403 rqstp->rq_next_page = rqstp->rq_respages + 1;
404
405done:
406 byte_count = head->arg.head[0].iov_len - position;
407 head->arg.page_len += byte_count;
408 head->arg.len += byte_count;
409 head->arg.buflen += byte_count;
410 return 1;
411} 262}
412 263
413/* Returns the address of the first read chunk or <nul> if no read chunk 264/* Sanity check the Reply chunk.
414 * is present 265 *
266 * Sanity checks:
267 * - Reply chunk does not overflow buffer.
268 * - Segment size limited by largest NFS data payload.
269 *
270 * Returns pointer to the following RPC header.
415 */ 271 */
416static struct rpcrdma_read_chunk * 272static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end)
417svc_rdma_get_read_chunk(struct rpcrdma_msg *rmsgp)
418{ 273{
419 struct rpcrdma_read_chunk *ch = 274 if (*p++ != xdr_zero) {
420 (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 275 p = xdr_check_write_chunk(p, end, MAX_BYTES_SPECIAL_SEG);
421 276 if (!p)
422 if (ch->rc_discrim == xdr_zero) 277 return NULL;
423 return NULL; 278 }
424 return ch; 279 return p;
425} 280}
426 281
427static int rdma_read_chunks(struct svcxprt_rdma *xprt, 282/* On entry, xdr->head[0].iov_base points to first byte in the
428 struct rpcrdma_msg *rmsgp, 283 * RPC-over-RDMA header.
429 struct svc_rqst *rqstp, 284 *
430 struct svc_rdma_op_ctxt *head) 285 * On successful exit, head[0] points to first byte past the
286 * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message.
287 * The length of the RPC-over-RDMA header is returned.
288 *
289 * Assumptions:
290 * - The transport header is entirely contained in the head iovec.
291 */
292static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
431{ 293{
432 int page_no, ret; 294 __be32 *p, *end, *rdma_argp;
433 struct rpcrdma_read_chunk *ch; 295 unsigned int hdr_len;
434 u32 handle, page_offset, byte_count; 296 char *proc;
435 u32 position; 297
436 u64 rs_offset; 298 /* Verify that there's enough bytes for header + something */
437 bool last; 299 if (rq_arg->len <= RPCRDMA_HDRLEN_ERR)
438 300 goto out_short;
439 /* If no read list is present, return 0 */ 301
440 ch = svc_rdma_get_read_chunk(rmsgp); 302 rdma_argp = rq_arg->head[0].iov_base;
441 if (!ch) 303 if (*(rdma_argp + 1) != rpcrdma_version)
442 return 0; 304 goto out_version;
305
306 switch (*(rdma_argp + 3)) {
307 case rdma_msg:
308 proc = "RDMA_MSG";
309 break;
310 case rdma_nomsg:
311 proc = "RDMA_NOMSG";
312 break;
313
314 case rdma_done:
315 goto out_drop;
443 316
444 if (rdma_rcl_chunk_count(ch) > RPCSVC_MAXPAGES) 317 case rdma_error:
445 return -EINVAL; 318 goto out_drop;
446
447 /* The request is completed when the RDMA_READs complete. The
448 * head context keeps all the pages that comprise the
449 * request.
450 */
451 head->arg.head[0] = rqstp->rq_arg.head[0];
452 head->arg.tail[0] = rqstp->rq_arg.tail[0];
453 head->hdr_count = head->count;
454 head->arg.page_base = 0;
455 head->arg.page_len = 0;
456 head->arg.len = rqstp->rq_arg.len;
457 head->arg.buflen = rqstp->rq_arg.buflen;
458
459 /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
460 position = be32_to_cpu(ch->rc_position);
461 if (position == 0) {
462 head->arg.pages = &head->pages[0];
463 page_offset = head->byte_len;
464 } else {
465 head->arg.pages = &head->pages[head->count];
466 page_offset = 0;
467 }
468 319
469 ret = 0; 320 default:
470 page_no = 0; 321 goto out_proc;
471 for (; ch->rc_discrim != xdr_zero; ch++) {
472 if (be32_to_cpu(ch->rc_position) != position)
473 goto err;
474
475 handle = be32_to_cpu(ch->rc_target.rs_handle),
476 byte_count = be32_to_cpu(ch->rc_target.rs_length);
477 xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
478 &rs_offset);
479
480 while (byte_count > 0) {
481 last = (ch + 1)->rc_discrim == xdr_zero;
482 ret = xprt->sc_reader(xprt, rqstp, head,
483 &page_no, &page_offset,
484 handle, byte_count,
485 rs_offset, last);
486 if (ret < 0)
487 goto err;
488 byte_count -= ret;
489 rs_offset += ret;
490 head->arg.buflen += ret;
491 }
492 } 322 }
493 323
494 /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */ 324 end = (__be32 *)((unsigned long)rdma_argp + rq_arg->len);
495 if (page_offset & 3) { 325 p = xdr_check_read_list(rdma_argp + 4, end);
496 u32 pad = 4 - (page_offset & 3); 326 if (!p)
497 327 goto out_inval;
498 head->arg.tail[0].iov_len += pad; 328 p = xdr_check_write_list(p, end);
499 head->arg.len += pad; 329 if (!p)
500 head->arg.buflen += pad; 330 goto out_inval;
501 page_offset += pad; 331 p = xdr_check_reply_chunk(p, end);
502 } 332 if (!p)
333 goto out_inval;
334 if (p > end)
335 goto out_inval;
336
337 rq_arg->head[0].iov_base = p;
338 hdr_len = (unsigned long)p - (unsigned long)rdma_argp;
339 rq_arg->head[0].iov_len -= hdr_len;
340 rq_arg->len -= hdr_len;
341 dprintk("svcrdma: received %s request for XID 0x%08x, hdr_len=%u\n",
342 proc, be32_to_cpup(rdma_argp), hdr_len);
343 return hdr_len;
344
345out_short:
346 dprintk("svcrdma: header too short = %d\n", rq_arg->len);
347 return -EINVAL;
348
349out_version:
350 dprintk("svcrdma: bad xprt version: %u\n",
351 be32_to_cpup(rdma_argp + 1));
352 return -EPROTONOSUPPORT;
503 353
504 ret = 1; 354out_drop:
505 if (position && position < head->arg.head[0].iov_len) 355 dprintk("svcrdma: dropping RDMA_DONE/ERROR message\n");
506 ret = rdma_copy_tail(rqstp, head, position, 356 return 0;
507 byte_count, page_offset, page_no);
508 head->arg.head[0].iov_len = position;
509 head->position = position;
510 357
511 err: 358out_proc:
512 /* Detach arg pages. svc_recv will replenish them */ 359 dprintk("svcrdma: bad rdma procedure (%u)\n",
513 for (page_no = 0; 360 be32_to_cpup(rdma_argp + 3));
514 &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++) 361 return -EINVAL;
515 rqstp->rq_pages[page_no] = NULL;
516 362
517 return ret; 363out_inval:
364 dprintk("svcrdma: failed to parse transport header\n");
365 return -EINVAL;
518} 366}
519 367
520static void rdma_read_complete(struct svc_rqst *rqstp, 368static void rdma_read_complete(struct svc_rqst *rqstp,
@@ -528,24 +376,9 @@ static void rdma_read_complete(struct svc_rqst *rqstp,
528 rqstp->rq_pages[page_no] = head->pages[page_no]; 376 rqstp->rq_pages[page_no] = head->pages[page_no];
529 } 377 }
530 378
531 /* Adjustments made for RDMA_NOMSG type requests */
532 if (head->position == 0) {
533 if (head->arg.len <= head->sge[0].length) {
534 head->arg.head[0].iov_len = head->arg.len -
535 head->byte_len;
536 head->arg.page_len = 0;
537 } else {
538 head->arg.head[0].iov_len = head->sge[0].length -
539 head->byte_len;
540 head->arg.page_len = head->arg.len -
541 head->sge[0].length;
542 }
543 }
544
545 /* Point rq_arg.pages past header */ 379 /* Point rq_arg.pages past header */
546 rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count]; 380 rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
547 rqstp->rq_arg.page_len = head->arg.page_len; 381 rqstp->rq_arg.page_len = head->arg.page_len;
548 rqstp->rq_arg.page_base = head->arg.page_base;
549 382
550 /* rq_respages starts after the last arg page */ 383 /* rq_respages starts after the last arg page */
551 rqstp->rq_respages = &rqstp->rq_pages[page_no]; 384 rqstp->rq_respages = &rqstp->rq_pages[page_no];
@@ -642,21 +475,44 @@ static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt,
642 return true; 475 return true;
643} 476}
644 477
645/* 478/**
646 * Set up the rqstp thread context to point to the RQ buffer. If 479 * svc_rdma_recvfrom - Receive an RPC call
647 * necessary, pull additional data from the client with an RDMA_READ 480 * @rqstp: request structure into which to receive an RPC Call
648 * request. 481 *
482 * Returns:
483 * The positive number of bytes in the RPC Call message,
484 * %0 if there were no Calls ready to return,
485 * %-EINVAL if the Read chunk data is too large,
486 * %-ENOMEM if rdma_rw context pool was exhausted,
487 * %-ENOTCONN if posting failed (connection is lost),
488 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
489 *
490 * Called in a loop when XPT_DATA is set. XPT_DATA is cleared only
491 * when there are no remaining ctxt's to process.
492 *
493 * The next ctxt is removed from the "receive" lists.
494 *
495 * - If the ctxt completes a Read, then finish assembling the Call
496 * message and return the number of bytes in the message.
497 *
498 * - If the ctxt completes a Receive, then construct the Call
499 * message from the contents of the Receive buffer.
500 *
501 * - If there are no Read chunks in this message, then finish
502 * assembling the Call message and return the number of bytes
503 * in the message.
504 *
505 * - If there are Read chunks in this message, post Read WRs to
506 * pull that payload and return 0.
649 */ 507 */
650int svc_rdma_recvfrom(struct svc_rqst *rqstp) 508int svc_rdma_recvfrom(struct svc_rqst *rqstp)
651{ 509{
652 struct svc_xprt *xprt = rqstp->rq_xprt; 510 struct svc_xprt *xprt = rqstp->rq_xprt;
653 struct svcxprt_rdma *rdma_xprt = 511 struct svcxprt_rdma *rdma_xprt =
654 container_of(xprt, struct svcxprt_rdma, sc_xprt); 512 container_of(xprt, struct svcxprt_rdma, sc_xprt);
655 struct svc_rdma_op_ctxt *ctxt = NULL; 513 struct svc_rdma_op_ctxt *ctxt;
656 struct rpcrdma_msg *rmsgp; 514 __be32 *p;
657 int ret = 0; 515 int ret;
658
659 dprintk("svcrdma: rqstp=%p\n", rqstp);
660 516
661 spin_lock(&rdma_xprt->sc_rq_dto_lock); 517 spin_lock(&rdma_xprt->sc_rq_dto_lock);
662 if (!list_empty(&rdma_xprt->sc_read_complete_q)) { 518 if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
@@ -671,22 +527,14 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
671 struct svc_rdma_op_ctxt, list); 527 struct svc_rdma_op_ctxt, list);
672 list_del(&ctxt->list); 528 list_del(&ctxt->list);
673 } else { 529 } else {
674 atomic_inc(&rdma_stat_rq_starve); 530 /* No new incoming requests, terminate the loop */
675 clear_bit(XPT_DATA, &xprt->xpt_flags); 531 clear_bit(XPT_DATA, &xprt->xpt_flags);
676 ctxt = NULL; 532 spin_unlock(&rdma_xprt->sc_rq_dto_lock);
533 return 0;
677 } 534 }
678 spin_unlock(&rdma_xprt->sc_rq_dto_lock); 535 spin_unlock(&rdma_xprt->sc_rq_dto_lock);
679 if (!ctxt) { 536
680 /* This is the EAGAIN path. The svc_recv routine will 537 dprintk("svcrdma: recvfrom: ctxt=%p on xprt=%p, rqstp=%p\n",
681 * return -EAGAIN, the nfsd thread will go to call into
682 * svc_recv again and we shouldn't be on the active
683 * transport list
684 */
685 if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
686 goto defer;
687 goto out;
688 }
689 dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p\n",
690 ctxt, rdma_xprt, rqstp); 538 ctxt, rdma_xprt, rqstp);
691 atomic_inc(&rdma_stat_recv); 539 atomic_inc(&rdma_stat_recv);
692 540
@@ -694,7 +542,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
694 rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len); 542 rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
695 543
696 /* Decode the RDMA header. */ 544 /* Decode the RDMA header. */
697 rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base; 545 p = (__be32 *)rqstp->rq_arg.head[0].iov_base;
698 ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg); 546 ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg);
699 if (ret < 0) 547 if (ret < 0)
700 goto out_err; 548 goto out_err;
@@ -702,9 +550,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
702 goto out_drop; 550 goto out_drop;
703 rqstp->rq_xprt_hlen = ret; 551 rqstp->rq_xprt_hlen = ret;
704 552
705 if (svc_rdma_is_backchannel_reply(xprt, &rmsgp->rm_xid)) { 553 if (svc_rdma_is_backchannel_reply(xprt, p)) {
706 ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, 554 ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, p,
707 &rmsgp->rm_xid,
708 &rqstp->rq_arg); 555 &rqstp->rq_arg);
709 svc_rdma_put_context(ctxt, 0); 556 svc_rdma_put_context(ctxt, 0);
710 if (ret) 557 if (ret)
@@ -712,39 +559,34 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
712 return ret; 559 return ret;
713 } 560 }
714 561
715 /* Read read-list data. */ 562 p += rpcrdma_fixed_maxsz;
716 ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt); 563 if (*p != xdr_zero)
717 if (ret > 0) { 564 goto out_readchunk;
718 /* read-list posted, defer until data received from client. */
719 goto defer;
720 } else if (ret < 0) {
721 /* Post of read-list failed, free context. */
722 svc_rdma_put_context(ctxt, 1);
723 return 0;
724 }
725 565
726complete: 566complete:
727 ret = rqstp->rq_arg.head[0].iov_len
728 + rqstp->rq_arg.page_len
729 + rqstp->rq_arg.tail[0].iov_len;
730 svc_rdma_put_context(ctxt, 0); 567 svc_rdma_put_context(ctxt, 0);
731 out: 568 dprintk("svcrdma: recvfrom: xprt=%p, rqstp=%p, rq_arg.len=%u\n",
732 dprintk("svcrdma: ret=%d, rq_arg.len=%u, " 569 rdma_xprt, rqstp, rqstp->rq_arg.len);
733 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zd\n",
734 ret, rqstp->rq_arg.len,
735 rqstp->rq_arg.head[0].iov_base,
736 rqstp->rq_arg.head[0].iov_len);
737 rqstp->rq_prot = IPPROTO_MAX; 570 rqstp->rq_prot = IPPROTO_MAX;
738 svc_xprt_copy_addrs(rqstp, xprt); 571 svc_xprt_copy_addrs(rqstp, xprt);
739 return ret; 572 return rqstp->rq_arg.len;
573
574out_readchunk:
575 ret = svc_rdma_recv_read_chunk(rdma_xprt, rqstp, ctxt, p);
576 if (ret < 0)
577 goto out_postfail;
578 return 0;
740 579
741out_err: 580out_err:
742 svc_rdma_send_error(rdma_xprt, &rmsgp->rm_xid, ret); 581 svc_rdma_send_error(rdma_xprt, p, ret);
743 svc_rdma_put_context(ctxt, 0); 582 svc_rdma_put_context(ctxt, 0);
744 return 0; 583 return 0;
745 584
746defer: 585out_postfail:
747 return 0; 586 if (ret == -EINVAL)
587 svc_rdma_send_error(rdma_xprt, p, ret);
588 svc_rdma_put_context(ctxt, 1);
589 return ret;
748 590
749out_drop: 591out_drop:
750 svc_rdma_put_context(ctxt, 1); 592 svc_rdma_put_context(ctxt, 1);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 0cf620277693..933f79bed270 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -12,6 +12,9 @@
12 12
13#define RPCDBG_FACILITY RPCDBG_SVCXPRT 13#define RPCDBG_FACILITY RPCDBG_SVCXPRT
14 14
15static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
16static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
17
15/* Each R/W context contains state for one chain of RDMA Read or 18/* Each R/W context contains state for one chain of RDMA Read or
16 * Write Work Requests. 19 * Write Work Requests.
17 * 20 *
@@ -113,22 +116,20 @@ struct svc_rdma_chunk_ctxt {
113 struct svcxprt_rdma *cc_rdma; 116 struct svcxprt_rdma *cc_rdma;
114 struct list_head cc_rwctxts; 117 struct list_head cc_rwctxts;
115 int cc_sqecount; 118 int cc_sqecount;
116 enum dma_data_direction cc_dir;
117}; 119};
118 120
119static void svc_rdma_cc_init(struct svcxprt_rdma *rdma, 121static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
120 struct svc_rdma_chunk_ctxt *cc, 122 struct svc_rdma_chunk_ctxt *cc)
121 enum dma_data_direction dir)
122{ 123{
123 cc->cc_rdma = rdma; 124 cc->cc_rdma = rdma;
124 svc_xprt_get(&rdma->sc_xprt); 125 svc_xprt_get(&rdma->sc_xprt);
125 126
126 INIT_LIST_HEAD(&cc->cc_rwctxts); 127 INIT_LIST_HEAD(&cc->cc_rwctxts);
127 cc->cc_sqecount = 0; 128 cc->cc_sqecount = 0;
128 cc->cc_dir = dir;
129} 129}
130 130
131static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc) 131static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
132 enum dma_data_direction dir)
132{ 133{
133 struct svcxprt_rdma *rdma = cc->cc_rdma; 134 struct svcxprt_rdma *rdma = cc->cc_rdma;
134 struct svc_rdma_rw_ctxt *ctxt; 135 struct svc_rdma_rw_ctxt *ctxt;
@@ -138,7 +139,7 @@ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc)
138 139
139 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, 140 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
140 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 141 rdma->sc_port_num, ctxt->rw_sg_table.sgl,
141 ctxt->rw_nents, cc->cc_dir); 142 ctxt->rw_nents, dir);
142 svc_rdma_put_rw_ctxt(rdma, ctxt); 143 svc_rdma_put_rw_ctxt(rdma, ctxt);
143 } 144 }
144 svc_xprt_put(&rdma->sc_xprt); 145 svc_xprt_put(&rdma->sc_xprt);
@@ -176,13 +177,14 @@ svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
176 info->wi_seg_no = 0; 177 info->wi_seg_no = 0;
177 info->wi_nsegs = be32_to_cpup(++chunk); 178 info->wi_nsegs = be32_to_cpup(++chunk);
178 info->wi_segs = ++chunk; 179 info->wi_segs = ++chunk;
179 svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE); 180 svc_rdma_cc_init(rdma, &info->wi_cc);
181 info->wi_cc.cc_cqe.done = svc_rdma_write_done;
180 return info; 182 return info;
181} 183}
182 184
183static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) 185static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
184{ 186{
185 svc_rdma_cc_release(&info->wi_cc); 187 svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE);
186 kfree(info); 188 kfree(info);
187} 189}
188 190
@@ -216,6 +218,76 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
216 svc_rdma_write_info_free(info); 218 svc_rdma_write_info_free(info);
217} 219}
218 220
221/* State for pulling a Read chunk.
222 */
223struct svc_rdma_read_info {
224 struct svc_rdma_op_ctxt *ri_readctxt;
225 unsigned int ri_position;
226 unsigned int ri_pageno;
227 unsigned int ri_pageoff;
228 unsigned int ri_chunklen;
229
230 struct svc_rdma_chunk_ctxt ri_cc;
231};
232
233static struct svc_rdma_read_info *
234svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma)
235{
236 struct svc_rdma_read_info *info;
237
238 info = kmalloc(sizeof(*info), GFP_KERNEL);
239 if (!info)
240 return info;
241
242 svc_rdma_cc_init(rdma, &info->ri_cc);
243 info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
244 return info;
245}
246
247static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
248{
249 svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE);
250 kfree(info);
251}
252
253/**
254 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
255 * @cq: controlling Completion Queue
256 * @wc: Work Completion
257 *
258 */
259static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
260{
261 struct ib_cqe *cqe = wc->wr_cqe;
262 struct svc_rdma_chunk_ctxt *cc =
263 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
264 struct svcxprt_rdma *rdma = cc->cc_rdma;
265 struct svc_rdma_read_info *info =
266 container_of(cc, struct svc_rdma_read_info, ri_cc);
267
268 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
269 wake_up(&rdma->sc_send_wait);
270
271 if (unlikely(wc->status != IB_WC_SUCCESS)) {
272 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
273 if (wc->status != IB_WC_WR_FLUSH_ERR)
274 pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
275 ib_wc_status_msg(wc->status),
276 wc->status, wc->vendor_err);
277 svc_rdma_put_context(info->ri_readctxt, 1);
278 } else {
279 spin_lock(&rdma->sc_rq_dto_lock);
280 list_add_tail(&info->ri_readctxt->list,
281 &rdma->sc_read_complete_q);
282 spin_unlock(&rdma->sc_rq_dto_lock);
283
284 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
285 svc_xprt_enqueue(&rdma->sc_xprt);
286 }
287
288 svc_rdma_read_info_free(info);
289}
290
219/* This function sleeps when the transport's Send Queue is congested. 291/* This function sleeps when the transport's Send Queue is congested.
220 * 292 *
221 * Assumptions: 293 * Assumptions:
@@ -232,6 +304,9 @@ static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
232 struct ib_cqe *cqe; 304 struct ib_cqe *cqe;
233 int ret; 305 int ret;
234 306
307 if (cc->cc_sqecount > rdma->sc_sq_depth)
308 return -EINVAL;
309
235 first_wr = NULL; 310 first_wr = NULL;
236 cqe = &cc->cc_cqe; 311 cqe = &cc->cc_cqe;
237 list_for_each(tmp, &cc->cc_rwctxts) { 312 list_for_each(tmp, &cc->cc_rwctxts) {
@@ -295,8 +370,9 @@ static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
295 struct scatterlist *sg; 370 struct scatterlist *sg;
296 struct page **page; 371 struct page **page;
297 372
298 page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK; 373 page_off = info->wi_next_off + xdr->page_base;
299 page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT; 374 page_no = page_off >> PAGE_SHIFT;
375 page_off = offset_in_page(page_off);
300 page = xdr->pages + page_no; 376 page = xdr->pages + page_no;
301 info->wi_next_off += remaining; 377 info->wi_next_off += remaining;
302 sg = ctxt->rw_sg_table.sgl; 378 sg = ctxt->rw_sg_table.sgl;
@@ -332,7 +408,6 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
332 __be32 *seg; 408 __be32 *seg;
333 int ret; 409 int ret;
334 410
335 cc->cc_cqe.done = svc_rdma_write_done;
336 seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz; 411 seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
337 do { 412 do {
338 unsigned int write_len; 413 unsigned int write_len;
@@ -425,6 +500,7 @@ static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
425 * 500 *
426 * Returns a non-negative number of bytes the chunk consumed, or 501 * Returns a non-negative number of bytes the chunk consumed, or
427 * %-E2BIG if the payload was larger than the Write chunk, 502 * %-E2BIG if the payload was larger than the Write chunk,
503 * %-EINVAL if client provided too many segments,
428 * %-ENOMEM if rdma_rw context pool was exhausted, 504 * %-ENOMEM if rdma_rw context pool was exhausted,
429 * %-ENOTCONN if posting failed (connection is lost), 505 * %-ENOTCONN if posting failed (connection is lost),
430 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 506 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
@@ -465,6 +541,7 @@ out_err:
465 * 541 *
466 * Returns a non-negative number of bytes the chunk consumed, or 542 * Returns a non-negative number of bytes the chunk consumed, or
467 * %-E2BIG if the payload was larger than the Reply chunk, 543 * %-E2BIG if the payload was larger than the Reply chunk,
544 * %-EINVAL if client provided too many segments,
468 * %-ENOMEM if rdma_rw context pool was exhausted, 545 * %-ENOMEM if rdma_rw context pool was exhausted,
469 * %-ENOTCONN if posting failed (connection is lost), 546 * %-ENOTCONN if posting failed (connection is lost),
470 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 547 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
@@ -510,3 +587,353 @@ out_err:
510 svc_rdma_write_info_free(info); 587 svc_rdma_write_info_free(info);
511 return ret; 588 return ret;
512} 589}
590
591static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
592 struct svc_rqst *rqstp,
593 u32 rkey, u32 len, u64 offset)
594{
595 struct svc_rdma_op_ctxt *head = info->ri_readctxt;
596 struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
597 struct svc_rdma_rw_ctxt *ctxt;
598 unsigned int sge_no, seg_len;
599 struct scatterlist *sg;
600 int ret;
601
602 sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
603 ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
604 if (!ctxt)
605 goto out_noctx;
606 ctxt->rw_nents = sge_no;
607
608 dprintk("svcrdma: reading segment %u@0x%016llx:0x%08x (%u sges)\n",
609 len, offset, rkey, sge_no);
610
611 sg = ctxt->rw_sg_table.sgl;
612 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
613 seg_len = min_t(unsigned int, len,
614 PAGE_SIZE - info->ri_pageoff);
615
616 head->arg.pages[info->ri_pageno] =
617 rqstp->rq_pages[info->ri_pageno];
618 if (!info->ri_pageoff)
619 head->count++;
620
621 sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
622 seg_len, info->ri_pageoff);
623 sg = sg_next(sg);
624
625 info->ri_pageoff += seg_len;
626 if (info->ri_pageoff == PAGE_SIZE) {
627 info->ri_pageno++;
628 info->ri_pageoff = 0;
629 }
630 len -= seg_len;
631
632 /* Safety check */
633 if (len &&
634 &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end)
635 goto out_overrun;
636 }
637
638 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, cc->cc_rdma->sc_qp,
639 cc->cc_rdma->sc_port_num,
640 ctxt->rw_sg_table.sgl, ctxt->rw_nents,
641 0, offset, rkey, DMA_FROM_DEVICE);
642 if (ret < 0)
643 goto out_initerr;
644
645 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
646 cc->cc_sqecount += ret;
647 return 0;
648
649out_noctx:
650 dprintk("svcrdma: no R/W ctxs available\n");
651 return -ENOMEM;
652
653out_overrun:
654 dprintk("svcrdma: request overruns rq_pages\n");
655 return -EINVAL;
656
657out_initerr:
658 svc_rdma_put_rw_ctxt(cc->cc_rdma, ctxt);
659 pr_err("svcrdma: failed to map pagelist (%d)\n", ret);
660 return -EIO;
661}
662
663static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
664 struct svc_rdma_read_info *info,
665 __be32 *p)
666{
667 int ret;
668
669 info->ri_chunklen = 0;
670 while (*p++ != xdr_zero) {
671 u32 rs_handle, rs_length;
672 u64 rs_offset;
673
674 if (be32_to_cpup(p++) != info->ri_position)
675 break;
676 rs_handle = be32_to_cpup(p++);
677 rs_length = be32_to_cpup(p++);
678 p = xdr_decode_hyper(p, &rs_offset);
679
680 ret = svc_rdma_build_read_segment(info, rqstp,
681 rs_handle, rs_length,
682 rs_offset);
683 if (ret < 0)
684 break;
685
686 info->ri_chunklen += rs_length;
687 }
688
689 return ret;
690}
691
692/* If there is inline content following the Read chunk, append it to
693 * the page list immediately following the data payload. This has to
694 * be done after the reader function has determined how many pages
695 * were consumed for RDMA Read.
696 *
697 * On entry, ri_pageno and ri_pageoff point directly to the end of the
698 * page list. On exit, both have been updated to the new "next byte".
699 *
700 * Assumptions:
701 * - Inline content fits entirely in rq_pages[0]
702 * - Trailing content is only a handful of bytes
703 */
704static int svc_rdma_copy_tail(struct svc_rqst *rqstp,
705 struct svc_rdma_read_info *info)
706{
707 struct svc_rdma_op_ctxt *head = info->ri_readctxt;
708 unsigned int tail_length, remaining;
709 u8 *srcp, *destp;
710
711 /* Assert that all inline content fits in page 0. This is an
712 * implementation limit, not a protocol limit.
713 */
714 if (head->arg.head[0].iov_len > PAGE_SIZE) {
715 pr_warn_once("svcrdma: too much trailing inline content\n");
716 return -EINVAL;
717 }
718
719 srcp = head->arg.head[0].iov_base;
720 srcp += info->ri_position;
721 tail_length = head->arg.head[0].iov_len - info->ri_position;
722 remaining = tail_length;
723
724 /* If there is room on the last page in the page list, try to
725 * fit the trailing content there.
726 */
727 if (info->ri_pageoff > 0) {
728 unsigned int len;
729
730 len = min_t(unsigned int, remaining,
731 PAGE_SIZE - info->ri_pageoff);
732 destp = page_address(rqstp->rq_pages[info->ri_pageno]);
733 destp += info->ri_pageoff;
734
735 memcpy(destp, srcp, len);
736 srcp += len;
737 destp += len;
738 info->ri_pageoff += len;
739 remaining -= len;
740
741 if (info->ri_pageoff == PAGE_SIZE) {
742 info->ri_pageno++;
743 info->ri_pageoff = 0;
744 }
745 }
746
747 /* Otherwise, a fresh page is needed. */
748 if (remaining) {
749 head->arg.pages[info->ri_pageno] =
750 rqstp->rq_pages[info->ri_pageno];
751 head->count++;
752
753 destp = page_address(rqstp->rq_pages[info->ri_pageno]);
754 memcpy(destp, srcp, remaining);
755 info->ri_pageoff += remaining;
756 }
757
758 head->arg.page_len += tail_length;
759 head->arg.len += tail_length;
760 head->arg.buflen += tail_length;
761 return 0;
762}
763
764/* Construct RDMA Reads to pull over a normal Read chunk. The chunk
765 * data lands in the page list of head->arg.pages.
766 *
767 * Currently NFSD does not look at the head->arg.tail[0] iovec.
768 * Therefore, XDR round-up of the Read chunk and trailing
769 * inline content must both be added at the end of the pagelist.
770 */
771static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
772 struct svc_rdma_read_info *info,
773 __be32 *p)
774{
775 struct svc_rdma_op_ctxt *head = info->ri_readctxt;
776 int ret;
777
778 dprintk("svcrdma: Reading Read chunk at position %u\n",
779 info->ri_position);
780
781 info->ri_pageno = head->hdr_count;
782 info->ri_pageoff = 0;
783
784 ret = svc_rdma_build_read_chunk(rqstp, info, p);
785 if (ret < 0)
786 goto out;
787
788 /* Read chunk may need XDR round-up (see RFC 5666, s. 3.7).
789 */
790 if (info->ri_chunklen & 3) {
791 u32 padlen = 4 - (info->ri_chunklen & 3);
792
793 info->ri_chunklen += padlen;
794
795 /* NB: data payload always starts on XDR alignment,
796 * thus the pad can never contain a page boundary.
797 */
798 info->ri_pageoff += padlen;
799 if (info->ri_pageoff == PAGE_SIZE) {
800 info->ri_pageno++;
801 info->ri_pageoff = 0;
802 }
803 }
804
805 head->arg.page_len = info->ri_chunklen;
806 head->arg.len += info->ri_chunklen;
807 head->arg.buflen += info->ri_chunklen;
808
809 if (info->ri_position < head->arg.head[0].iov_len) {
810 ret = svc_rdma_copy_tail(rqstp, info);
811 if (ret < 0)
812 goto out;
813 }
814 head->arg.head[0].iov_len = info->ri_position;
815
816out:
817 return ret;
818}
819
820/* Construct RDMA Reads to pull over a Position Zero Read chunk.
821 * The start of the data lands in the first page just after
822 * the Transport header, and the rest lands in the page list of
823 * head->arg.pages.
824 *
825 * Assumptions:
826 * - A PZRC has an XDR-aligned length (no implicit round-up).
827 * - There can be no trailing inline content (IOW, we assume
828 * a PZRC is never sent in an RDMA_MSG message, though it's
829 * allowed by spec).
830 */
831static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
832 struct svc_rdma_read_info *info,
833 __be32 *p)
834{
835 struct svc_rdma_op_ctxt *head = info->ri_readctxt;
836 int ret;
837
838 dprintk("svcrdma: Reading Position Zero Read chunk\n");
839
840 info->ri_pageno = head->hdr_count - 1;
841 info->ri_pageoff = offset_in_page(head->byte_len);
842
843 ret = svc_rdma_build_read_chunk(rqstp, info, p);
844 if (ret < 0)
845 goto out;
846
847 head->arg.len += info->ri_chunklen;
848 head->arg.buflen += info->ri_chunklen;
849
850 if (head->arg.buflen <= head->sge[0].length) {
851 /* Transport header and RPC message fit entirely
852 * in page where head iovec resides.
853 */
854 head->arg.head[0].iov_len = info->ri_chunklen;
855 } else {
856 /* Transport header and part of RPC message reside
857 * in the head iovec's page.
858 */
859 head->arg.head[0].iov_len =
860 head->sge[0].length - head->byte_len;
861 head->arg.page_len =
862 info->ri_chunklen - head->arg.head[0].iov_len;
863 }
864
865out:
866 return ret;
867}
868
869/**
870 * svc_rdma_recv_read_chunk - Pull a Read chunk from the client
871 * @rdma: controlling RDMA transport
872 * @rqstp: set of pages to use as Read sink buffers
873 * @head: pages under I/O collect here
874 * @p: pointer to start of Read chunk
875 *
876 * Returns:
877 * %0 if all needed RDMA Reads were posted successfully,
878 * %-EINVAL if client provided too many segments,
879 * %-ENOMEM if rdma_rw context pool was exhausted,
880 * %-ENOTCONN if posting failed (connection is lost),
881 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
882 *
883 * Assumptions:
884 * - All Read segments in @p have the same Position value.
885 */
886int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
887 struct svc_rdma_op_ctxt *head, __be32 *p)
888{
889 struct svc_rdma_read_info *info;
890 struct page **page;
891 int ret;
892
893 /* The request (with page list) is constructed in
894 * head->arg. Pages involved with RDMA Read I/O are
895 * transferred there.
896 */
897 head->hdr_count = head->count;
898 head->arg.head[0] = rqstp->rq_arg.head[0];
899 head->arg.tail[0] = rqstp->rq_arg.tail[0];
900 head->arg.pages = head->pages;
901 head->arg.page_base = 0;
902 head->arg.page_len = 0;
903 head->arg.len = rqstp->rq_arg.len;
904 head->arg.buflen = rqstp->rq_arg.buflen;
905
906 info = svc_rdma_read_info_alloc(rdma);
907 if (!info)
908 return -ENOMEM;
909 info->ri_readctxt = head;
910
911 info->ri_position = be32_to_cpup(p + 1);
912 if (info->ri_position)
913 ret = svc_rdma_build_normal_read_chunk(rqstp, info, p);
914 else
915 ret = svc_rdma_build_pz_read_chunk(rqstp, info, p);
916
917 /* Mark the start of the pages that can be used for the reply */
918 if (info->ri_pageoff > 0)
919 info->ri_pageno++;
920 rqstp->rq_respages = &rqstp->rq_pages[info->ri_pageno];
921 rqstp->rq_next_page = rqstp->rq_respages + 1;
922
923 if (ret < 0)
924 goto out;
925
926 ret = svc_rdma_post_chunk_ctxt(&info->ri_cc);
927
928out:
929 /* Read sink pages have been moved from rqstp->rq_pages to
930 * head->arg.pages. Force svc_recv to refill those slots
931 * in rq_pages.
932 */
933 for (page = rqstp->rq_pages; page < rqstp->rq_respages; page++)
934 *page = NULL;
935
936 if (ret < 0)
937 svc_rdma_read_info_free(info);
938 return ret;
939}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 1736337f3a55..7c3a211e0e9a 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -313,13 +313,17 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
313 dma_addr = ib_dma_map_page(dev, virt_to_page(base), 313 dma_addr = ib_dma_map_page(dev, virt_to_page(base),
314 offset, len, DMA_TO_DEVICE); 314 offset, len, DMA_TO_DEVICE);
315 if (ib_dma_mapping_error(dev, dma_addr)) 315 if (ib_dma_mapping_error(dev, dma_addr))
316 return -EIO; 316 goto out_maperr;
317 317
318 ctxt->sge[sge_no].addr = dma_addr; 318 ctxt->sge[sge_no].addr = dma_addr;
319 ctxt->sge[sge_no].length = len; 319 ctxt->sge[sge_no].length = len;
320 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; 320 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
321 svc_rdma_count_mappings(rdma, ctxt); 321 svc_rdma_count_mappings(rdma, ctxt);
322 return 0; 322 return 0;
323
324out_maperr:
325 pr_err("svcrdma: failed to map buffer\n");
326 return -EIO;
323} 327}
324 328
325static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, 329static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -334,13 +338,17 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
334 338
335 dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE); 339 dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
336 if (ib_dma_mapping_error(dev, dma_addr)) 340 if (ib_dma_mapping_error(dev, dma_addr))
337 return -EIO; 341 goto out_maperr;
338 342
339 ctxt->sge[sge_no].addr = dma_addr; 343 ctxt->sge[sge_no].addr = dma_addr;
340 ctxt->sge[sge_no].length = len; 344 ctxt->sge[sge_no].length = len;
341 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; 345 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
342 svc_rdma_count_mappings(rdma, ctxt); 346 svc_rdma_count_mappings(rdma, ctxt);
343 return 0; 347 return 0;
348
349out_maperr:
350 pr_err("svcrdma: failed to map page\n");
351 return -EIO;
344} 352}
345 353
346/** 354/**
@@ -547,7 +555,6 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
547 return 0; 555 return 0;
548 556
549err: 557err:
550 pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
551 svc_rdma_unmap_dma(ctxt); 558 svc_rdma_unmap_dma(ctxt);
552 svc_rdma_put_context(ctxt, 1); 559 svc_rdma_put_context(ctxt, 1);
553 return ret; 560 return ret;
@@ -677,7 +684,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
677 return 0; 684 return 0;
678 685
679 err2: 686 err2:
680 if (ret != -E2BIG) 687 if (ret != -E2BIG && ret != -EINVAL)
681 goto err1; 688 goto err1;
682 689
683 ret = svc_rdma_post_recv(rdma, GFP_KERNEL); 690 ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index a9d9cb1ba4c6..e660d4965b18 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -202,7 +202,6 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
202out: 202out:
203 ctxt->count = 0; 203 ctxt->count = 0;
204 ctxt->mapped_sges = 0; 204 ctxt->mapped_sges = 0;
205 ctxt->frmr = NULL;
206 return ctxt; 205 return ctxt;
207 206
208out_empty: 207out_empty:
@@ -226,22 +225,13 @@ void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
226{ 225{
227 struct svcxprt_rdma *xprt = ctxt->xprt; 226 struct svcxprt_rdma *xprt = ctxt->xprt;
228 struct ib_device *device = xprt->sc_cm_id->device; 227 struct ib_device *device = xprt->sc_cm_id->device;
229 u32 lkey = xprt->sc_pd->local_dma_lkey;
230 unsigned int i; 228 unsigned int i;
231 229
232 for (i = 0; i < ctxt->mapped_sges; i++) { 230 for (i = 0; i < ctxt->mapped_sges; i++)
233 /* 231 ib_dma_unmap_page(device,
234 * Unmap the DMA addr in the SGE if the lkey matches 232 ctxt->sge[i].addr,
235 * the local_dma_lkey, otherwise, ignore it since it is 233 ctxt->sge[i].length,
236 * an FRMR lkey and will be unmapped later when the 234 ctxt->direction);
237 * last WR that uses it completes.
238 */
239 if (ctxt->sge[i].lkey == lkey)
240 ib_dma_unmap_page(device,
241 ctxt->sge[i].addr,
242 ctxt->sge[i].length,
243 ctxt->direction);
244 }
245 ctxt->mapped_sges = 0; 235 ctxt->mapped_sges = 0;
246} 236}
247 237
@@ -346,36 +336,6 @@ out:
346 svc_xprt_put(&xprt->sc_xprt); 336 svc_xprt_put(&xprt->sc_xprt);
347} 337}
348 338
349static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
350 struct ib_wc *wc,
351 const char *opname)
352{
353 if (wc->status != IB_WC_SUCCESS)
354 goto err;
355
356out:
357 atomic_inc(&xprt->sc_sq_avail);
358 wake_up(&xprt->sc_send_wait);
359 return;
360
361err:
362 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
363 if (wc->status != IB_WC_WR_FLUSH_ERR)
364 pr_err("svcrdma: %s: %s (%u/0x%x)\n",
365 opname, ib_wc_status_msg(wc->status),
366 wc->status, wc->vendor_err);
367 goto out;
368}
369
370static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
371 const char *opname)
372{
373 struct svcxprt_rdma *xprt = cq->cq_context;
374
375 svc_rdma_send_wc_common(xprt, wc, opname);
376 svc_xprt_put(&xprt->sc_xprt);
377}
378
379/** 339/**
380 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC 340 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
381 * @cq: completion queue 341 * @cq: completion queue
@@ -384,73 +344,28 @@ static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
384 */ 344 */
385void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 345void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
386{ 346{
387 struct ib_cqe *cqe = wc->wr_cqe;
388 struct svc_rdma_op_ctxt *ctxt;
389
390 svc_rdma_send_wc_common_put(cq, wc, "send");
391
392 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
393 svc_rdma_unmap_dma(ctxt);
394 svc_rdma_put_context(ctxt, 1);
395}
396
397/**
398 * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
399 * @cq: completion queue
400 * @wc: completed WR
401 *
402 */
403void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc)
404{
405 svc_rdma_send_wc_common_put(cq, wc, "fastreg");
406}
407
408/**
409 * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC
410 * @cq: completion queue
411 * @wc: completed WR
412 *
413 */
414void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc)
415{
416 struct svcxprt_rdma *xprt = cq->cq_context; 347 struct svcxprt_rdma *xprt = cq->cq_context;
417 struct ib_cqe *cqe = wc->wr_cqe; 348 struct ib_cqe *cqe = wc->wr_cqe;
418 struct svc_rdma_op_ctxt *ctxt; 349 struct svc_rdma_op_ctxt *ctxt;
419 350
420 svc_rdma_send_wc_common(xprt, wc, "read"); 351 atomic_inc(&xprt->sc_sq_avail);
352 wake_up(&xprt->sc_send_wait);
421 353
422 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 354 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
423 svc_rdma_unmap_dma(ctxt); 355 svc_rdma_unmap_dma(ctxt);
424 svc_rdma_put_frmr(xprt, ctxt->frmr); 356 svc_rdma_put_context(ctxt, 1);
425
426 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
427 struct svc_rdma_op_ctxt *read_hdr;
428
429 read_hdr = ctxt->read_hdr;
430 spin_lock(&xprt->sc_rq_dto_lock);
431 list_add_tail(&read_hdr->list,
432 &xprt->sc_read_complete_q);
433 spin_unlock(&xprt->sc_rq_dto_lock);
434 357
435 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 358 if (unlikely(wc->status != IB_WC_SUCCESS)) {
436 svc_xprt_enqueue(&xprt->sc_xprt); 359 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
360 if (wc->status != IB_WC_WR_FLUSH_ERR)
361 pr_err("svcrdma: Send: %s (%u/0x%x)\n",
362 ib_wc_status_msg(wc->status),
363 wc->status, wc->vendor_err);
437 } 364 }
438 365
439 svc_rdma_put_context(ctxt, 0);
440 svc_xprt_put(&xprt->sc_xprt); 366 svc_xprt_put(&xprt->sc_xprt);
441} 367}
442 368
443/**
444 * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC
445 * @cq: completion queue
446 * @wc: completed WR
447 *
448 */
449void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc)
450{
451 svc_rdma_send_wc_common_put(cq, wc, "localInv");
452}
453
454static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 369static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
455 int listener) 370 int listener)
456{ 371{
@@ -462,14 +377,12 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
462 INIT_LIST_HEAD(&cma_xprt->sc_accept_q); 377 INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
463 INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); 378 INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
464 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 379 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
465 INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
466 INIT_LIST_HEAD(&cma_xprt->sc_ctxts); 380 INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
467 INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); 381 INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
468 init_waitqueue_head(&cma_xprt->sc_send_wait); 382 init_waitqueue_head(&cma_xprt->sc_send_wait);
469 383
470 spin_lock_init(&cma_xprt->sc_lock); 384 spin_lock_init(&cma_xprt->sc_lock);
471 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 385 spin_lock_init(&cma_xprt->sc_rq_dto_lock);
472 spin_lock_init(&cma_xprt->sc_frmr_q_lock);
473 spin_lock_init(&cma_xprt->sc_ctxt_lock); 386 spin_lock_init(&cma_xprt->sc_ctxt_lock);
474 spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); 387 spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
475 388
@@ -780,86 +693,6 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
780 return ERR_PTR(ret); 693 return ERR_PTR(ret);
781} 694}
782 695
783static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
784{
785 struct ib_mr *mr;
786 struct scatterlist *sg;
787 struct svc_rdma_fastreg_mr *frmr;
788 u32 num_sg;
789
790 frmr = kmalloc(sizeof(*frmr), GFP_KERNEL);
791 if (!frmr)
792 goto err;
793
794 num_sg = min_t(u32, RPCSVC_MAXPAGES, xprt->sc_frmr_pg_list_len);
795 mr = ib_alloc_mr(xprt->sc_pd, IB_MR_TYPE_MEM_REG, num_sg);
796 if (IS_ERR(mr))
797 goto err_free_frmr;
798
799 sg = kcalloc(RPCSVC_MAXPAGES, sizeof(*sg), GFP_KERNEL);
800 if (!sg)
801 goto err_free_mr;
802
803 sg_init_table(sg, RPCSVC_MAXPAGES);
804
805 frmr->mr = mr;
806 frmr->sg = sg;
807 INIT_LIST_HEAD(&frmr->frmr_list);
808 return frmr;
809
810 err_free_mr:
811 ib_dereg_mr(mr);
812 err_free_frmr:
813 kfree(frmr);
814 err:
815 return ERR_PTR(-ENOMEM);
816}
817
818static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
819{
820 struct svc_rdma_fastreg_mr *frmr;
821
822 while (!list_empty(&xprt->sc_frmr_q)) {
823 frmr = list_entry(xprt->sc_frmr_q.next,
824 struct svc_rdma_fastreg_mr, frmr_list);
825 list_del_init(&frmr->frmr_list);
826 kfree(frmr->sg);
827 ib_dereg_mr(frmr->mr);
828 kfree(frmr);
829 }
830}
831
832struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
833{
834 struct svc_rdma_fastreg_mr *frmr = NULL;
835
836 spin_lock(&rdma->sc_frmr_q_lock);
837 if (!list_empty(&rdma->sc_frmr_q)) {
838 frmr = list_entry(rdma->sc_frmr_q.next,
839 struct svc_rdma_fastreg_mr, frmr_list);
840 list_del_init(&frmr->frmr_list);
841 frmr->sg_nents = 0;
842 }
843 spin_unlock(&rdma->sc_frmr_q_lock);
844 if (frmr)
845 return frmr;
846
847 return rdma_alloc_frmr(rdma);
848}
849
850void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
851 struct svc_rdma_fastreg_mr *frmr)
852{
853 if (frmr) {
854 ib_dma_unmap_sg(rdma->sc_cm_id->device,
855 frmr->sg, frmr->sg_nents, frmr->direction);
856 spin_lock(&rdma->sc_frmr_q_lock);
857 WARN_ON_ONCE(!list_empty(&frmr->frmr_list));
858 list_add(&frmr->frmr_list, &rdma->sc_frmr_q);
859 spin_unlock(&rdma->sc_frmr_q_lock);
860 }
861}
862
863/* 696/*
864 * This is the xpo_recvfrom function for listening endpoints. Its 697 * This is the xpo_recvfrom function for listening endpoints. Its
865 * purpose is to accept incoming connections. The CMA callback handler 698 * purpose is to accept incoming connections. The CMA callback handler
@@ -908,8 +741,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
908 * capabilities of this particular device */ 741 * capabilities of this particular device */
909 newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, 742 newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge,
910 (size_t)RPCSVC_MAXPAGES); 743 (size_t)RPCSVC_MAXPAGES);
911 newxprt->sc_max_sge_rd = min_t(size_t, dev->attrs.max_sge_rd,
912 RPCSVC_MAXPAGES);
913 newxprt->sc_max_req_size = svcrdma_max_req_size; 744 newxprt->sc_max_req_size = svcrdma_max_req_size;
914 newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr, 745 newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr,
915 svcrdma_max_requests); 746 svcrdma_max_requests);
@@ -952,7 +783,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
952 memset(&qp_attr, 0, sizeof qp_attr); 783 memset(&qp_attr, 0, sizeof qp_attr);
953 qp_attr.event_handler = qp_event_handler; 784 qp_attr.event_handler = qp_event_handler;
954 qp_attr.qp_context = &newxprt->sc_xprt; 785 qp_attr.qp_context = &newxprt->sc_xprt;
955 qp_attr.port_num = newxprt->sc_cm_id->port_num; 786 qp_attr.port_num = newxprt->sc_port_num;
956 qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests; 787 qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests;
957 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; 788 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
958 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; 789 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
@@ -976,47 +807,12 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
976 } 807 }
977 newxprt->sc_qp = newxprt->sc_cm_id->qp; 808 newxprt->sc_qp = newxprt->sc_cm_id->qp;
978 809
979 /* 810 if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
980 * Use the most secure set of MR resources based on the
981 * transport type and available memory management features in
982 * the device. Here's the table implemented below:
983 *
984 * Fast Global DMA Remote WR
985 * Reg LKEY MR Access
986 * Sup'd Sup'd Needed Needed
987 *
988 * IWARP N N Y Y
989 * N Y Y Y
990 * Y N Y N
991 * Y Y N -
992 *
993 * IB N N Y N
994 * N Y N -
995 * Y N Y N
996 * Y Y N -
997 *
998 * NB: iWARP requires remote write access for the data sink
999 * of an RDMA_READ. IB does not.
1000 */
1001 newxprt->sc_reader = rdma_read_chunk_lcl;
1002 if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
1003 newxprt->sc_frmr_pg_list_len =
1004 dev->attrs.max_fast_reg_page_list_len;
1005 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
1006 newxprt->sc_reader = rdma_read_chunk_frmr;
1007 } else
1008 newxprt->sc_snd_w_inv = false; 811 newxprt->sc_snd_w_inv = false;
1009 812 if (!rdma_protocol_iwarp(dev, newxprt->sc_port_num) &&
1010 /* 813 !rdma_ib_or_roce(dev, newxprt->sc_port_num))
1011 * Determine if a DMA MR is required and if so, what privs are required
1012 */
1013 if (!rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num) &&
1014 !rdma_ib_or_roce(dev, newxprt->sc_cm_id->port_num))
1015 goto errout; 814 goto errout;
1016 815
1017 if (rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num))
1018 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;
1019
1020 /* Post receive buffers */ 816 /* Post receive buffers */
1021 for (i = 0; i < newxprt->sc_max_requests; i++) { 817 for (i = 0; i < newxprt->sc_max_requests; i++) {
1022 ret = svc_rdma_post_recv(newxprt, GFP_KERNEL); 818 ret = svc_rdma_post_recv(newxprt, GFP_KERNEL);
@@ -1056,7 +852,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
1056 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 852 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
1057 dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap)); 853 dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap));
1058 dprintk(" max_sge : %d\n", newxprt->sc_max_sge); 854 dprintk(" max_sge : %d\n", newxprt->sc_max_sge);
1059 dprintk(" max_sge_rd : %d\n", newxprt->sc_max_sge_rd);
1060 dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth); 855 dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth);
1061 dprintk(" max_requests : %d\n", newxprt->sc_max_requests); 856 dprintk(" max_requests : %d\n", newxprt->sc_max_requests);
1062 dprintk(" ord : %d\n", newxprt->sc_ord); 857 dprintk(" ord : %d\n", newxprt->sc_ord);
@@ -1117,12 +912,6 @@ static void __svc_rdma_free(struct work_struct *work)
1117 pr_err("svcrdma: sc_xprt still in use? (%d)\n", 912 pr_err("svcrdma: sc_xprt still in use? (%d)\n",
1118 kref_read(&xprt->xpt_ref)); 913 kref_read(&xprt->xpt_ref));
1119 914
1120 /*
1121 * Destroy queued, but not processed read completions. Note
1122 * that this cleanup has to be done before destroying the
1123 * cm_id because the device ptr is needed to unmap the dma in
1124 * svc_rdma_put_context.
1125 */
1126 while (!list_empty(&rdma->sc_read_complete_q)) { 915 while (!list_empty(&rdma->sc_read_complete_q)) {
1127 struct svc_rdma_op_ctxt *ctxt; 916 struct svc_rdma_op_ctxt *ctxt;
1128 ctxt = list_first_entry(&rdma->sc_read_complete_q, 917 ctxt = list_first_entry(&rdma->sc_read_complete_q,
@@ -1130,8 +919,6 @@ static void __svc_rdma_free(struct work_struct *work)
1130 list_del(&ctxt->list); 919 list_del(&ctxt->list);
1131 svc_rdma_put_context(ctxt, 1); 920 svc_rdma_put_context(ctxt, 1);
1132 } 921 }
1133
1134 /* Destroy queued, but not processed recv completions */
1135 while (!list_empty(&rdma->sc_rq_dto_q)) { 922 while (!list_empty(&rdma->sc_rq_dto_q)) {
1136 struct svc_rdma_op_ctxt *ctxt; 923 struct svc_rdma_op_ctxt *ctxt;
1137 ctxt = list_first_entry(&rdma->sc_rq_dto_q, 924 ctxt = list_first_entry(&rdma->sc_rq_dto_q,
@@ -1151,7 +938,6 @@ static void __svc_rdma_free(struct work_struct *work)
1151 xprt->xpt_bc_xprt = NULL; 938 xprt->xpt_bc_xprt = NULL;
1152 } 939 }
1153 940
1154 rdma_dealloc_frmr_q(rdma);
1155 svc_rdma_destroy_rw_ctxts(rdma); 941 svc_rdma_destroy_rw_ctxts(rdma);
1156 svc_rdma_destroy_ctxts(rdma); 942 svc_rdma_destroy_ctxts(rdma);
1157 943