summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2015-10-24 17:26:45 -0400
committerAnna Schumaker <Anna.Schumaker@Netapp.com>2015-11-02 13:45:15 -0500
commit4220a07264c0517006a534aed201e29c8d297306 (patch)
tree08e527549cda9af62cabb81412623bed7f7c8a3f /net/sunrpc
parent7b3d770c67bc07db5035999e4f864c5f2ff7b10e (diff)
xprtrdma: Prevent loss of completion signals
Commit 8301a2c047cc ("xprtrdma: Limit work done by completion handler") was supposed to prevent xprtrdma's upcall handlers from starving other softIRQ work by letting them return to the provider before all CQEs have been polled. The logic assumes the provider will call the upcall handler again immediately if the CQ is re-armed while there are still queued CQEs. This assumption is invalid. The IBTA spec says that after a CQ is armed, the hardware must interrupt only when a new CQE is inserted. xprtrdma can't rely on the provider calling again, even though some providers do. Therefore, leaving CQEs on queue makes sense only when there is another mechanism that ensures all remaining CQEs are consumed in a timely fashion. xprtrdma does not have such a mechanism. If a CQE remains queued, the transport can wait forever to send the next RPC. Finally, move the wcs array back onto the stack to ensure that the poll array is always local to the CPU where the completion upcall is running. Fixes: 8301a2c047cc ("xprtrdma: Limit work done by completion ...") Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Reviewed-by: Sagi Grimberg <sagig@mellanox.com> Reviewed-by: Devesh Sharma <devesh.sharma@avagotech.com> Tested-By: Devesh Sharma <devesh.sharma@avagotech.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/xprtrdma/verbs.c74
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h5
2 files changed, 38 insertions, 41 deletions
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 61eea73557ff..6661b1b95758 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -158,25 +158,30 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
158 } 158 }
159} 159}
160 160
161static int 161/* The common case is a single send completion is waiting. By
162rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) 162 * passing two WC entries to ib_poll_cq, a return code of 1
163 * means there is exactly one WC waiting and no more. We don't
164 * have to invoke ib_poll_cq again to know that the CQ has been
165 * properly drained.
166 */
167static void
168rpcrdma_sendcq_poll(struct ib_cq *cq)
163{ 169{
164 struct ib_wc *wcs; 170 struct ib_wc *pos, wcs[2];
165 int budget, count, rc; 171 int count, rc;
166 172
167 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
168 do { 173 do {
169 wcs = ep->rep_send_wcs; 174 pos = wcs;
170 175
171 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); 176 rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
172 if (rc <= 0) 177 if (rc < 0)
173 return rc; 178 break;
174 179
175 count = rc; 180 count = rc;
176 while (count-- > 0) 181 while (count-- > 0)
177 rpcrdma_sendcq_process_wc(wcs++); 182 rpcrdma_sendcq_process_wc(pos++);
178 } while (rc == RPCRDMA_POLLSIZE && --budget); 183 } while (rc == ARRAY_SIZE(wcs));
179 return 0; 184 return;
180} 185}
181 186
182/* Handle provider send completion upcalls. 187/* Handle provider send completion upcalls.
@@ -184,10 +189,8 @@ rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
184static void 189static void
185rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) 190rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
186{ 191{
187 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
188
189 do { 192 do {
190 rpcrdma_sendcq_poll(cq, ep); 193 rpcrdma_sendcq_poll(cq);
191 } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | 194 } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
192 IB_CQ_REPORT_MISSED_EVENTS) > 0); 195 IB_CQ_REPORT_MISSED_EVENTS) > 0);
193} 196}
@@ -226,31 +229,32 @@ out_fail:
226 goto out_schedule; 229 goto out_schedule;
227} 230}
228 231
229static int 232/* The wc array is on stack: automatic memory is always CPU-local.
230rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) 233 *
234 * struct ib_wc is 64 bytes, making the poll array potentially
235 * large. But this is at the bottom of the call chain. Further
236 * substantial work is done in another thread.
237 */
238static void
239rpcrdma_recvcq_poll(struct ib_cq *cq)
231{ 240{
232 struct list_head sched_list; 241 struct ib_wc *pos, wcs[4];
233 struct ib_wc *wcs; 242 LIST_HEAD(sched_list);
234 int budget, count, rc; 243 int count, rc;
235 244
236 INIT_LIST_HEAD(&sched_list);
237 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
238 do { 245 do {
239 wcs = ep->rep_recv_wcs; 246 pos = wcs;
240 247
241 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); 248 rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
242 if (rc <= 0) 249 if (rc < 0)
243 goto out_schedule; 250 break;
244 251
245 count = rc; 252 count = rc;
246 while (count-- > 0) 253 while (count-- > 0)
247 rpcrdma_recvcq_process_wc(wcs++, &sched_list); 254 rpcrdma_recvcq_process_wc(pos++, &sched_list);
248 } while (rc == RPCRDMA_POLLSIZE && --budget); 255 } while (rc == ARRAY_SIZE(wcs));
249 rc = 0;
250 256
251out_schedule:
252 rpcrdma_schedule_tasklet(&sched_list); 257 rpcrdma_schedule_tasklet(&sched_list);
253 return rc;
254} 258}
255 259
256/* Handle provider receive completion upcalls. 260/* Handle provider receive completion upcalls.
@@ -258,10 +262,8 @@ out_schedule:
258static void 262static void
259rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) 263rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
260{ 264{
261 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
262
263 do { 265 do {
264 rpcrdma_recvcq_poll(cq, ep); 266 rpcrdma_recvcq_poll(cq);
265 } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | 267 } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
266 IB_CQ_REPORT_MISSED_EVENTS) > 0); 268 IB_CQ_REPORT_MISSED_EVENTS) > 0);
267} 269}
@@ -623,7 +625,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
623 625
624 cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1; 626 cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
625 sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall, 627 sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
626 rpcrdma_cq_async_error_upcall, ep, &cq_attr); 628 rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
627 if (IS_ERR(sendcq)) { 629 if (IS_ERR(sendcq)) {
628 rc = PTR_ERR(sendcq); 630 rc = PTR_ERR(sendcq);
629 dprintk("RPC: %s: failed to create send CQ: %i\n", 631 dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -640,7 +642,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
640 642
641 cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1; 643 cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
642 recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall, 644 recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
643 rpcrdma_cq_async_error_upcall, ep, &cq_attr); 645 rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
644 if (IS_ERR(recvcq)) { 646 if (IS_ERR(recvcq)) {
645 rc = PTR_ERR(recvcq); 647 rc = PTR_ERR(recvcq);
646 dprintk("RPC: %s: failed to create recv CQ: %i\n", 648 dprintk("RPC: %s: failed to create recv CQ: %i\n",
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c09414e6f91b..42c8d44a175b 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -77,9 +77,6 @@ struct rpcrdma_ia {
77 * RDMA Endpoint -- one per transport instance 77 * RDMA Endpoint -- one per transport instance
78 */ 78 */
79 79
80#define RPCRDMA_WC_BUDGET (128)
81#define RPCRDMA_POLLSIZE (16)
82
83struct rpcrdma_ep { 80struct rpcrdma_ep {
84 atomic_t rep_cqcount; 81 atomic_t rep_cqcount;
85 int rep_cqinit; 82 int rep_cqinit;
@@ -89,8 +86,6 @@ struct rpcrdma_ep {
89 struct rdma_conn_param rep_remote_cma; 86 struct rdma_conn_param rep_remote_cma;
90 struct sockaddr_storage rep_remote_addr; 87 struct sockaddr_storage rep_remote_addr;
91 struct delayed_work rep_connect_worker; 88 struct delayed_work rep_connect_worker;
92 struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
93 struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
94}; 89};
95 90
96/* 91/*