aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
authorTom Tucker <tom@opengridcomputing.com>2008-05-06 12:33:11 -0400
committerTom Tucker <tom@opengridcomputing.com>2008-05-19 08:33:44 -0400
commitdbcd00eba99945acfc433508a58eadc5dcd18cad (patch)
treec9bd31141dc48b45819d153ead3d311a0db51e40 /net/sunrpc/xprtrdma
parent0e7f011a19696cc25d68a8d6631fc6c5aa60a54c (diff)
svcrdma: Fix race with dto_tasklet in svc_rdma_send
The svc_rdma_send function will attempt to reap SQ WR to make room for a new request if it finds the SQ full. This function races with the dto_tasklet that also reaps SQ WR. To avoid polling and arming the CQ unnecessarily move the test_and_clear_bit of the RDMAXPRT_SQ_PENDING flag and arming of the CQ to the sq_cq_reap function. Refactor the rq_cq_reap function to match sq_cq_reap so that the code is easier to follow. Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c40
1 files changed, 22 insertions, 18 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 1e0af2f205e9..73734173f994 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -228,23 +228,8 @@ static void dto_tasklet_func(unsigned long data)
228 list_del_init(&xprt->sc_dto_q); 228 list_del_init(&xprt->sc_dto_q);
229 spin_unlock_irqrestore(&dto_lock, flags); 229 spin_unlock_irqrestore(&dto_lock, flags);
230 230
231 if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) { 231 rq_cq_reap(xprt);
232 ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); 232 sq_cq_reap(xprt);
233 rq_cq_reap(xprt);
234 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
235 /*
236 * If data arrived before established event,
237 * don't enqueue. This defers RPC I/O until the
238 * RDMA connection is complete.
239 */
240 if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
241 svc_xprt_enqueue(&xprt->sc_xprt);
242 }
243
244 if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
245 ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
246 sq_cq_reap(xprt);
247 }
248 233
249 svc_xprt_put(&xprt->sc_xprt); 234 svc_xprt_put(&xprt->sc_xprt);
250 spin_lock_irqsave(&dto_lock, flags); 235 spin_lock_irqsave(&dto_lock, flags);
@@ -297,6 +282,10 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
297 struct ib_wc wc; 282 struct ib_wc wc;
298 struct svc_rdma_op_ctxt *ctxt = NULL; 283 struct svc_rdma_op_ctxt *ctxt = NULL;
299 284
285 if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
286 return;
287
288 ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
300 atomic_inc(&rdma_stat_rq_poll); 289 atomic_inc(&rdma_stat_rq_poll);
301 290
302 spin_lock_bh(&xprt->sc_rq_dto_lock); 291 spin_lock_bh(&xprt->sc_rq_dto_lock);
@@ -316,6 +305,15 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
316 305
317 if (ctxt) 306 if (ctxt)
318 atomic_inc(&rdma_stat_rq_prod); 307 atomic_inc(&rdma_stat_rq_prod);
308
309 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
310 /*
311 * If data arrived before established event,
312 * don't enqueue. This defers RPC I/O until the
313 * RDMA connection is complete.
314 */
315 if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
316 svc_xprt_enqueue(&xprt->sc_xprt);
319} 317}
320 318
321/* 319/*
@@ -328,6 +326,11 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
328 struct ib_cq *cq = xprt->sc_sq_cq; 326 struct ib_cq *cq = xprt->sc_sq_cq;
329 int ret; 327 int ret;
330 328
329
330 if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
331 return;
332
333 ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
331 atomic_inc(&rdma_stat_sq_poll); 334 atomic_inc(&rdma_stat_sq_poll);
332 while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) { 335 while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
333 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 336 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
@@ -1010,7 +1013,8 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1010 if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) { 1013 if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
1011 spin_unlock_bh(&xprt->sc_lock); 1014 spin_unlock_bh(&xprt->sc_lock);
1012 atomic_inc(&rdma_stat_sq_starve); 1015 atomic_inc(&rdma_stat_sq_starve);
1013 /* See if we can reap some SQ WR */ 1016
1017 /* See if we can opportunistically reap SQ WR to make room */
1014 sq_cq_reap(xprt); 1018 sq_cq_reap(xprt);
1015 1019
1016 /* Wait until SQ WR available if SQ still full */ 1020 /* Wait until SQ WR available if SQ still full */