summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2017-10-16 15:01:22 -0400
committerAnna Schumaker <Anna.Schumaker@Netapp.com>2017-11-17 13:47:54 -0500
commite1352c9610e3235f5e1b159038762d0c01c6ef36 (patch)
tree44cdb50836c18cf5ed49d0ccb62dea681f574e93 /net/sunrpc
parent5381e0ec72eeb9467796ac4181ccb7bbce6d3e81 (diff)
xprtrdma: Refactor rpcrdma_reply_handler some more
Clean up: I'd like to be able to invoke the tail of rpcrdma_reply_handler in two different places. Split the tail out into its own helper function. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c105
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h21
2 files changed, 69 insertions, 57 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index e355cd322a32..418bcc6b3e1d 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1211,6 +1211,60 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1211 return -EREMOTEIO; 1211 return -EREMOTEIO;
1212} 1212}
1213 1213
1214/* Perform XID lookup, reconstruction of the RPC reply, and
1215 * RPC completion while holding the transport lock to ensure
1216 * the rep, rqst, and rq_task pointers remain stable.
1217 */
1218void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
1219{
1220 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1221 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1222 struct rpc_rqst *rqst = rep->rr_rqst;
1223 unsigned long cwnd;
1224 int status;
1225
1226 xprt->reestablish_timeout = 0;
1227
1228 switch (rep->rr_proc) {
1229 case rdma_msg:
1230 status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1231 break;
1232 case rdma_nomsg:
1233 status = rpcrdma_decode_nomsg(r_xprt, rep);
1234 break;
1235 case rdma_error:
1236 status = rpcrdma_decode_error(r_xprt, rep, rqst);
1237 break;
1238 default:
1239 status = -EIO;
1240 }
1241 if (status < 0)
1242 goto out_badheader;
1243
1244out:
1245 spin_lock(&xprt->recv_lock);
1246 cwnd = xprt->cwnd;
1247 xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
1248 if (xprt->cwnd > cwnd)
1249 xprt_release_rqst_cong(rqst->rq_task);
1250
1251 xprt_complete_rqst(rqst->rq_task, status);
1252 xprt_unpin_rqst(rqst);
1253 spin_unlock(&xprt->recv_lock);
1254 return;
1255
1256/* If the incoming reply terminated a pending RPC, the next
1257 * RPC call will post a replacement receive buffer as it is
1258 * being marshaled.
1259 */
1260out_badheader:
1261 dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1262 rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc));
1263 r_xprt->rx_stats.bad_reply_count++;
1264 status = -EIO;
1265 goto out;
1266}
1267
1214/* Process received RPC/RDMA messages. 1268/* Process received RPC/RDMA messages.
1215 * 1269 *
1216 * Errors must result in the RPC task either being awakened, or 1270 * Errors must result in the RPC task either being awakened, or
@@ -1225,8 +1279,6 @@ rpcrdma_reply_handler(struct work_struct *work)
1225 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1279 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1226 struct rpcrdma_req *req; 1280 struct rpcrdma_req *req;
1227 struct rpc_rqst *rqst; 1281 struct rpc_rqst *rqst;
1228 unsigned long cwnd;
1229 int status;
1230 __be32 *p; 1282 __be32 *p;
1231 1283
1232 dprintk("RPC: %s: incoming rep %p\n", __func__, rep); 1284 dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
@@ -1263,6 +1315,7 @@ rpcrdma_reply_handler(struct work_struct *work)
1263 spin_unlock(&xprt->recv_lock); 1315 spin_unlock(&xprt->recv_lock);
1264 req = rpcr_to_rdmar(rqst); 1316 req = rpcr_to_rdmar(rqst);
1265 req->rl_reply = rep; 1317 req->rl_reply = rep;
1318 rep->rr_rqst = rqst;
1266 1319
1267 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", 1320 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
1268 __func__, rep, req, be32_to_cpu(rep->rr_xid)); 1321 __func__, rep, req, be32_to_cpu(rep->rr_xid));
@@ -1280,36 +1333,7 @@ rpcrdma_reply_handler(struct work_struct *work)
1280 &req->rl_registered); 1333 &req->rl_registered);
1281 } 1334 }
1282 1335
1283 xprt->reestablish_timeout = 0; 1336 rpcrdma_complete_rqst(rep);
1284
1285 switch (rep->rr_proc) {
1286 case rdma_msg:
1287 status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1288 break;
1289 case rdma_nomsg:
1290 status = rpcrdma_decode_nomsg(r_xprt, rep);
1291 break;
1292 case rdma_error:
1293 status = rpcrdma_decode_error(r_xprt, rep, rqst);
1294 break;
1295 default:
1296 status = -EIO;
1297 }
1298 if (status < 0)
1299 goto out_badheader;
1300
1301out:
1302 spin_lock(&xprt->recv_lock);
1303 cwnd = xprt->cwnd;
1304 xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
1305 if (xprt->cwnd > cwnd)
1306 xprt_release_rqst_cong(rqst->rq_task);
1307
1308 xprt_complete_rqst(rqst->rq_task, status);
1309 xprt_unpin_rqst(rqst);
1310 spin_unlock(&xprt->recv_lock);
1311 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
1312 __func__, xprt, rqst, status);
1313 return; 1337 return;
1314 1338
1315out_badstatus: 1339out_badstatus:
@@ -1325,20 +1349,8 @@ out_badversion:
1325 __func__, be32_to_cpu(rep->rr_vers)); 1349 __func__, be32_to_cpu(rep->rr_vers));
1326 goto repost; 1350 goto repost;
1327 1351
1328/* If the incoming reply terminated a pending RPC, the next 1352/* The RPC transaction has already been terminated, or the header
1329 * RPC call will post a replacement receive buffer as it is 1353 * is corrupt.
1330 * being marshaled.
1331 */
1332out_badheader:
1333 dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1334 rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc));
1335 r_xprt->rx_stats.bad_reply_count++;
1336 status = -EIO;
1337 goto out;
1338
1339/* The req was still available, but by the time the recv_lock
1340 * was acquired, the rqst and task had been released. Thus the RPC
1341 * has already been terminated.
1342 */ 1354 */
1343out_norqst: 1355out_norqst:
1344 spin_unlock(&xprt->recv_lock); 1356 spin_unlock(&xprt->recv_lock);
@@ -1348,7 +1360,6 @@ out_norqst:
1348 1360
1349out_shortreply: 1361out_shortreply:
1350 dprintk("RPC: %s: short/invalid reply\n", __func__); 1362 dprintk("RPC: %s: short/invalid reply\n", __func__);
1351 goto repost;
1352 1363
1353/* If no pending RPC transaction was matched, post a replacement 1364/* If no pending RPC transaction was matched, post a replacement
1354 * receive buffer before returning. 1365 * receive buffer before returning.
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 858b4c52047d..d68a1351d95e 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -202,18 +202,17 @@ enum {
202}; 202};
203 203
204/* 204/*
205 * struct rpcrdma_rep -- this structure encapsulates state required to recv 205 * struct rpcrdma_rep -- this structure encapsulates state required
206 * and complete a reply, asychronously. It needs several pieces of 206 * to receive and complete an RPC Reply, asychronously. It needs
207 * state: 207 * several pieces of state:
208 * o recv buffer (posted to provider)
209 * o ib_sge (also donated to provider)
210 * o status of reply (length, success or not)
211 * o bookkeeping state to get run by reply handler (list, etc)
212 * 208 *
213 * These are allocated during initialization, per-transport instance. 209 * o receive buffer and ib_sge (donated to provider)
210 * o status of receive (success or not, length, inv rkey)
211 * o bookkeeping state to get run by reply handler (XDR stream)
214 * 212 *
215 * N of these are associated with a transport instance, and stored in 213 * These structures are allocated during transport initialization.
216 * struct rpcrdma_buffer. N is the max number of outstanding requests. 214 * N of these are associated with a transport instance, managed by
215 * struct rpcrdma_buffer. N is the max number of outstanding RPCs.
217 */ 216 */
218 217
219struct rpcrdma_rep { 218struct rpcrdma_rep {
@@ -228,6 +227,7 @@ struct rpcrdma_rep {
228 struct work_struct rr_work; 227 struct work_struct rr_work;
229 struct xdr_buf rr_hdrbuf; 228 struct xdr_buf rr_hdrbuf;
230 struct xdr_stream rr_stream; 229 struct xdr_stream rr_stream;
230 struct rpc_rqst *rr_rqst;
231 struct list_head rr_list; 231 struct list_head rr_list;
232 struct ib_recv_wr rr_recv_wr; 232 struct ib_recv_wr rr_recv_wr;
233}; 233};
@@ -616,6 +616,7 @@ bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *,
616void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *); 616void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
617int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); 617int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
618void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); 618void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
619void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
619void rpcrdma_reply_handler(struct work_struct *work); 620void rpcrdma_reply_handler(struct work_struct *work);
620 621
621static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) 622static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)