aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2015-10-24 17:28:08 -0400
committerAnna Schumaker <Anna.Schumaker@Netapp.com>2015-11-02 13:45:15 -0500
commit63cae47005af51c937f4cdcc4835f29075add2ba (patch)
tree657deac560e388a810caf1f755432eda412e678d
parent83128a60ca74e996c5e0336c4fff0579f4a8c909 (diff)
xprtrdma: Handle incoming backward direction RPC calls
Introduce a code path in the rpcrdma_reply_handler() to catch incoming backward direction RPC calls and route them to the ULP's backchannel server. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Reviewed-by: Sagi Grimberg <sagig@mellanox.com> Tested-By: Devesh Sharma <devesh.sharma@avagotech.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c118
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c41
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h2
3 files changed, 161 insertions, 0 deletions
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index ffc4853a068e..0b3387fe3f0d 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -5,6 +5,8 @@
5 */ 5 */
6 6
7#include <linux/module.h> 7#include <linux/module.h>
8#include <linux/sunrpc/xprt.h>
9#include <linux/sunrpc/svc.h>
8 10
9#include "xprt_rdma.h" 11#include "xprt_rdma.h"
10 12
@@ -12,6 +14,8 @@
12# define RPCDBG_FACILITY RPCDBG_TRANS 14# define RPCDBG_FACILITY RPCDBG_TRANS
13#endif 15#endif
14 16
17#define RPCRDMA_BACKCHANNEL_DEBUG
18
15static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, 19static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
16 struct rpc_rqst *rqst) 20 struct rpc_rqst *rqst)
17{ 21{
@@ -253,3 +257,117 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
253 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 257 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
254 spin_unlock_bh(&xprt->bc_pa_lock); 258 spin_unlock_bh(&xprt->bc_pa_lock);
255} 259}
260
261/**
262 * rpcrdma_bc_receive_call - Handle a backward direction call
263 * @xprt: transport receiving the call
264 * @rep: receive buffer containing the call
265 *
266 * Called in the RPC reply handler, which runs in a tasklet.
267 * Be quick about it.
268 *
269 * Operational assumptions:
270 * o Backchannel credits are ignored, just as the NFS server
271 * forechannel currently does
272 * o The ULP manages a replay cache (eg, NFSv4.1 sessions).
273 * No replay detection is done at the transport level
274 */
275void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
276 struct rpcrdma_rep *rep)
277{
278 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
279 struct rpcrdma_msg *headerp;
280 struct svc_serv *bc_serv;
281 struct rpcrdma_req *req;
282 struct rpc_rqst *rqst;
283 struct xdr_buf *buf;
284 size_t size;
285 __be32 *p;
286
287 headerp = rdmab_to_msg(rep->rr_rdmabuf);
288#ifdef RPCRDMA_BACKCHANNEL_DEBUG
289 pr_info("RPC: %s: callback XID %08x, length=%u\n",
290 __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
291 pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp);
292#endif
293
294 /* Sanity check:
295 * Need at least enough bytes for RPC/RDMA header, as code
296 * here references the header fields by array offset. Also,
297 * backward calls are always inline, so ensure there
298 * are some bytes beyond the RPC/RDMA header.
299 */
300 if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
301 goto out_short;
302 p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
303 size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
304
305 /* Grab a free bc rqst */
306 spin_lock(&xprt->bc_pa_lock);
307 if (list_empty(&xprt->bc_pa_list)) {
308 spin_unlock(&xprt->bc_pa_lock);
309 goto out_overflow;
310 }
311 rqst = list_first_entry(&xprt->bc_pa_list,
312 struct rpc_rqst, rq_bc_pa_list);
313 list_del(&rqst->rq_bc_pa_list);
314 spin_unlock(&xprt->bc_pa_lock);
315#ifdef RPCRDMA_BACKCHANNEL_DEBUG
316 pr_info("RPC: %s: using rqst %p\n", __func__, rqst);
317#endif
318
319 /* Prepare rqst */
320 rqst->rq_reply_bytes_recvd = 0;
321 rqst->rq_bytes_sent = 0;
322 rqst->rq_xid = headerp->rm_xid;
323 set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
324
325 buf = &rqst->rq_rcv_buf;
326 memset(buf, 0, sizeof(*buf));
327 buf->head[0].iov_base = p;
328 buf->head[0].iov_len = size;
329 buf->len = size;
330
331 /* The receive buffer has to be hooked to the rpcrdma_req
332 * so that it can be reposted after the server is done
333 * parsing it but just before sending the backward
334 * direction reply.
335 */
336 req = rpcr_to_rdmar(rqst);
337#ifdef RPCRDMA_BACKCHANNEL_DEBUG
338 pr_info("RPC: %s: attaching rep %p to req %p\n",
339 __func__, rep, req);
340#endif
341 req->rl_reply = rep;
342
343 /* Defeat the retransmit detection logic in send_request */
344 req->rl_connect_cookie = 0;
345
346 /* Queue rqst for ULP's callback service */
347 bc_serv = xprt->bc_serv;
348 spin_lock(&bc_serv->sv_cb_lock);
349 list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
350 spin_unlock(&bc_serv->sv_cb_lock);
351
352 wake_up(&bc_serv->sv_cb_waitq);
353
354 r_xprt->rx_stats.bcall_count++;
355 return;
356
357out_overflow:
358 pr_warn("RPC/RDMA backchannel overflow\n");
359 xprt_disconnect_done(xprt);
360 /* This receive buffer gets reposted automatically
361 * when the connection is re-established.
362 */
363 return;
364
365out_short:
366 pr_warn("RPC/RDMA short backward direction call\n");
367
368 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
369 xprt_disconnect_done(xprt);
370 else
371 pr_warn("RPC: %s: reposting rep %p\n",
372 __func__, rep);
373}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index b7a21e551888..c10d9699441c 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -716,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work)
716 spin_unlock_bh(&xprt->transport_lock); 716 spin_unlock_bh(&xprt->transport_lock);
717} 717}
718 718
719#if defined(CONFIG_SUNRPC_BACKCHANNEL)
720/* By convention, backchannel calls arrive via rdma_msg type
721 * messages, and never populate the chunk lists. This makes
722 * the RPC/RDMA header small and fixed in size, so it is
723 * straightforward to check the RPC header's direction field.
724 */
725static bool
726rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
727{
728 __be32 *p = (__be32 *)headerp;
729
730 if (headerp->rm_type != rdma_msg)
731 return false;
732 if (headerp->rm_body.rm_chunks[0] != xdr_zero)
733 return false;
734 if (headerp->rm_body.rm_chunks[1] != xdr_zero)
735 return false;
736 if (headerp->rm_body.rm_chunks[2] != xdr_zero)
737 return false;
738
739 /* sanity */
740 if (p[7] != headerp->rm_xid)
741 return false;
742 /* call direction */
743 if (p[8] != cpu_to_be32(RPC_CALL))
744 return false;
745
746 return true;
747}
748#endif /* CONFIG_SUNRPC_BACKCHANNEL */
749
719/* 750/*
720 * This function is called when an async event is posted to 751 * This function is called when an async event is posted to
721 * the connection which changes the connection state. All it 752 * the connection which changes the connection state. All it
@@ -756,6 +787,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
756 headerp = rdmab_to_msg(rep->rr_rdmabuf); 787 headerp = rdmab_to_msg(rep->rr_rdmabuf);
757 if (headerp->rm_vers != rpcrdma_version) 788 if (headerp->rm_vers != rpcrdma_version)
758 goto out_badversion; 789 goto out_badversion;
790#if defined(CONFIG_SUNRPC_BACKCHANNEL)
791 if (rpcrdma_is_bcall(headerp))
792 goto out_bcall;
793#endif
759 794
760 /* Match incoming rpcrdma_rep to an rpcrdma_req to 795 /* Match incoming rpcrdma_rep to an rpcrdma_req to
761 * get context for handling any incoming chunks. 796 * get context for handling any incoming chunks.
@@ -878,6 +913,12 @@ out_badstatus:
878 } 913 }
879 return; 914 return;
880 915
916#if defined(CONFIG_SUNRPC_BACKCHANNEL)
917out_bcall:
918 rpcrdma_bc_receive_call(r_xprt, rep);
919 return;
920#endif
921
881out_shortreply: 922out_shortreply:
882 dprintk("RPC: %s: short/invalid reply\n", __func__); 923 dprintk("RPC: %s: short/invalid reply\n", __func__);
883 goto repost; 924 goto repost;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index e2d23ea23df9..eb87d96e80ca 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -353,6 +353,7 @@ struct rpcrdma_stats {
353 unsigned long failed_marshal_count; 353 unsigned long failed_marshal_count;
354 unsigned long bad_reply_count; 354 unsigned long bad_reply_count;
355 unsigned long nomsg_call_count; 355 unsigned long nomsg_call_count;
356 unsigned long bcall_count;
356}; 357};
357 358
358/* 359/*
@@ -520,6 +521,7 @@ void xprt_rdma_cleanup(void);
520#if defined(CONFIG_SUNRPC_BACKCHANNEL) 521#if defined(CONFIG_SUNRPC_BACKCHANNEL)
521int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); 522int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
522int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); 523int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
524void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
523int rpcrdma_bc_marshal_reply(struct rpc_rqst *); 525int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
524void xprt_rdma_bc_free_rqst(struct rpc_rqst *); 526void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
525void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); 527void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);