aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma/svc_rdma_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/svc_rdma_transport.c')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c290
1 files changed, 174 insertions, 116 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index af408fc12634..e132509d1db0 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -103,8 +103,8 @@ static int rdma_bump_context_cache(struct svcxprt_rdma *xprt)
103 spin_lock_bh(&xprt->sc_ctxt_lock); 103 spin_lock_bh(&xprt->sc_ctxt_lock);
104 if (ctxt) { 104 if (ctxt) {
105 at_least_one = 1; 105 at_least_one = 1;
106 ctxt->next = xprt->sc_ctxt_head; 106 INIT_LIST_HEAD(&ctxt->free_list);
107 xprt->sc_ctxt_head = ctxt; 107 list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
108 } else { 108 } else {
109 /* kmalloc failed...give up for now */ 109 /* kmalloc failed...give up for now */
110 xprt->sc_ctxt_cnt--; 110 xprt->sc_ctxt_cnt--;
@@ -123,7 +123,7 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
123 123
124 while (1) { 124 while (1) {
125 spin_lock_bh(&xprt->sc_ctxt_lock); 125 spin_lock_bh(&xprt->sc_ctxt_lock);
126 if (unlikely(xprt->sc_ctxt_head == NULL)) { 126 if (unlikely(list_empty(&xprt->sc_ctxt_free))) {
127 /* Try to bump my cache. */ 127 /* Try to bump my cache. */
128 spin_unlock_bh(&xprt->sc_ctxt_lock); 128 spin_unlock_bh(&xprt->sc_ctxt_lock);
129 129
@@ -136,12 +136,15 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
136 schedule_timeout_uninterruptible(msecs_to_jiffies(500)); 136 schedule_timeout_uninterruptible(msecs_to_jiffies(500));
137 continue; 137 continue;
138 } 138 }
139 ctxt = xprt->sc_ctxt_head; 139 ctxt = list_entry(xprt->sc_ctxt_free.next,
140 xprt->sc_ctxt_head = ctxt->next; 140 struct svc_rdma_op_ctxt,
141 free_list);
142 list_del_init(&ctxt->free_list);
141 spin_unlock_bh(&xprt->sc_ctxt_lock); 143 spin_unlock_bh(&xprt->sc_ctxt_lock);
142 ctxt->xprt = xprt; 144 ctxt->xprt = xprt;
143 INIT_LIST_HEAD(&ctxt->dto_q); 145 INIT_LIST_HEAD(&ctxt->dto_q);
144 ctxt->count = 0; 146 ctxt->count = 0;
147 atomic_inc(&xprt->sc_ctxt_used);
145 break; 148 break;
146 } 149 }
147 return ctxt; 150 return ctxt;
@@ -159,14 +162,15 @@ void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
159 put_page(ctxt->pages[i]); 162 put_page(ctxt->pages[i]);
160 163
161 for (i = 0; i < ctxt->count; i++) 164 for (i = 0; i < ctxt->count; i++)
162 dma_unmap_single(xprt->sc_cm_id->device->dma_device, 165 ib_dma_unmap_single(xprt->sc_cm_id->device,
163 ctxt->sge[i].addr, 166 ctxt->sge[i].addr,
164 ctxt->sge[i].length, 167 ctxt->sge[i].length,
165 ctxt->direction); 168 ctxt->direction);
169
166 spin_lock_bh(&xprt->sc_ctxt_lock); 170 spin_lock_bh(&xprt->sc_ctxt_lock);
167 ctxt->next = xprt->sc_ctxt_head; 171 list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
168 xprt->sc_ctxt_head = ctxt;
169 spin_unlock_bh(&xprt->sc_ctxt_lock); 172 spin_unlock_bh(&xprt->sc_ctxt_lock);
173 atomic_dec(&xprt->sc_ctxt_used);
170} 174}
171 175
172/* ib_cq event handler */ 176/* ib_cq event handler */
@@ -228,23 +232,8 @@ static void dto_tasklet_func(unsigned long data)
228 list_del_init(&xprt->sc_dto_q); 232 list_del_init(&xprt->sc_dto_q);
229 spin_unlock_irqrestore(&dto_lock, flags); 233 spin_unlock_irqrestore(&dto_lock, flags);
230 234
231 if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) { 235 rq_cq_reap(xprt);
232 ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); 236 sq_cq_reap(xprt);
233 rq_cq_reap(xprt);
234 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
235 /*
236 * If data arrived before established event,
237 * don't enqueue. This defers RPC I/O until the
238 * RDMA connection is complete.
239 */
240 if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
241 svc_xprt_enqueue(&xprt->sc_xprt);
242 }
243
244 if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
245 ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
246 sq_cq_reap(xprt);
247 }
248 237
249 svc_xprt_put(&xprt->sc_xprt); 238 svc_xprt_put(&xprt->sc_xprt);
250 spin_lock_irqsave(&dto_lock, flags); 239 spin_lock_irqsave(&dto_lock, flags);
@@ -263,11 +252,15 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
263 struct svcxprt_rdma *xprt = cq_context; 252 struct svcxprt_rdma *xprt = cq_context;
264 unsigned long flags; 253 unsigned long flags;
265 254
255 /* Guard against unconditional flush call for destroyed QP */
256 if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
257 return;
258
266 /* 259 /*
267 * Set the bit regardless of whether or not it's on the list 260 * Set the bit regardless of whether or not it's on the list
268 * because it may be on the list already due to an SQ 261 * because it may be on the list already due to an SQ
269 * completion. 262 * completion.
270 */ 263 */
271 set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags); 264 set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
272 265
273 /* 266 /*
@@ -290,6 +283,8 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
290 * 283 *
291 * Take all completing WC off the CQE and enqueue the associated DTO 284 * Take all completing WC off the CQE and enqueue the associated DTO
292 * context on the dto_q for the transport. 285 * context on the dto_q for the transport.
286 *
287 * Note that caller must hold a transport reference.
293 */ 288 */
294static void rq_cq_reap(struct svcxprt_rdma *xprt) 289static void rq_cq_reap(struct svcxprt_rdma *xprt)
295{ 290{
@@ -297,29 +292,47 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
297 struct ib_wc wc; 292 struct ib_wc wc;
298 struct svc_rdma_op_ctxt *ctxt = NULL; 293 struct svc_rdma_op_ctxt *ctxt = NULL;
299 294
295 if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
296 return;
297
298 ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
300 atomic_inc(&rdma_stat_rq_poll); 299 atomic_inc(&rdma_stat_rq_poll);
301 300
302 spin_lock_bh(&xprt->sc_rq_dto_lock);
303 while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) { 301 while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
304 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 302 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
305 ctxt->wc_status = wc.status; 303 ctxt->wc_status = wc.status;
306 ctxt->byte_len = wc.byte_len; 304 ctxt->byte_len = wc.byte_len;
307 if (wc.status != IB_WC_SUCCESS) { 305 if (wc.status != IB_WC_SUCCESS) {
308 /* Close the transport */ 306 /* Close the transport */
307 dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
309 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 308 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
310 svc_rdma_put_context(ctxt, 1); 309 svc_rdma_put_context(ctxt, 1);
310 svc_xprt_put(&xprt->sc_xprt);
311 continue; 311 continue;
312 } 312 }
313 spin_lock_bh(&xprt->sc_rq_dto_lock);
313 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q); 314 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
315 spin_unlock_bh(&xprt->sc_rq_dto_lock);
316 svc_xprt_put(&xprt->sc_xprt);
314 } 317 }
315 spin_unlock_bh(&xprt->sc_rq_dto_lock);
316 318
317 if (ctxt) 319 if (ctxt)
318 atomic_inc(&rdma_stat_rq_prod); 320 atomic_inc(&rdma_stat_rq_prod);
321
322 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
323 /*
324 * If data arrived before established event,
325 * don't enqueue. This defers RPC I/O until the
326 * RDMA connection is complete.
327 */
328 if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
329 svc_xprt_enqueue(&xprt->sc_xprt);
319} 330}
320 331
321/* 332/*
322 * Send Queue Completion Handler - potentially called on interrupt context. 333 * Send Queue Completion Handler - potentially called on interrupt context.
334 *
335 * Note that caller must hold a transport reference.
323 */ 336 */
324static void sq_cq_reap(struct svcxprt_rdma *xprt) 337static void sq_cq_reap(struct svcxprt_rdma *xprt)
325{ 338{
@@ -328,6 +341,11 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
328 struct ib_cq *cq = xprt->sc_sq_cq; 341 struct ib_cq *cq = xprt->sc_sq_cq;
329 int ret; 342 int ret;
330 343
344
345 if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
346 return;
347
348 ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
331 atomic_inc(&rdma_stat_sq_poll); 349 atomic_inc(&rdma_stat_sq_poll);
332 while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) { 350 while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
333 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 351 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
@@ -349,14 +367,16 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
349 367
350 case IB_WR_RDMA_READ: 368 case IB_WR_RDMA_READ:
351 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) { 369 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
370 struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
371 BUG_ON(!read_hdr);
352 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 372 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
353 set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
354 spin_lock_bh(&xprt->sc_read_complete_lock); 373 spin_lock_bh(&xprt->sc_read_complete_lock);
355 list_add_tail(&ctxt->dto_q, 374 list_add_tail(&read_hdr->dto_q,
356 &xprt->sc_read_complete_q); 375 &xprt->sc_read_complete_q);
357 spin_unlock_bh(&xprt->sc_read_complete_lock); 376 spin_unlock_bh(&xprt->sc_read_complete_lock);
358 svc_xprt_enqueue(&xprt->sc_xprt); 377 svc_xprt_enqueue(&xprt->sc_xprt);
359 } 378 }
379 svc_rdma_put_context(ctxt, 0);
360 break; 380 break;
361 381
362 default: 382 default:
@@ -365,6 +385,7 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
365 wc.opcode, wc.status); 385 wc.opcode, wc.status);
366 break; 386 break;
367 } 387 }
388 svc_xprt_put(&xprt->sc_xprt);
368 } 389 }
369 390
370 if (ctxt) 391 if (ctxt)
@@ -376,11 +397,15 @@ static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
376 struct svcxprt_rdma *xprt = cq_context; 397 struct svcxprt_rdma *xprt = cq_context;
377 unsigned long flags; 398 unsigned long flags;
378 399
400 /* Guard against unconditional flush call for destroyed QP */
401 if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
402 return;
403
379 /* 404 /*
380 * Set the bit regardless of whether or not it's on the list 405 * Set the bit regardless of whether or not it's on the list
381 * because it may be on the list already due to an RQ 406 * because it may be on the list already due to an RQ
382 * completion. 407 * completion.
383 */ 408 */
384 set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags); 409 set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
385 410
386 /* 411 /*
@@ -407,28 +432,29 @@ static void create_context_cache(struct svcxprt_rdma *xprt,
407 xprt->sc_ctxt_max = ctxt_max; 432 xprt->sc_ctxt_max = ctxt_max;
408 xprt->sc_ctxt_bump = ctxt_bump; 433 xprt->sc_ctxt_bump = ctxt_bump;
409 xprt->sc_ctxt_cnt = 0; 434 xprt->sc_ctxt_cnt = 0;
410 xprt->sc_ctxt_head = NULL; 435 atomic_set(&xprt->sc_ctxt_used, 0);
436
437 INIT_LIST_HEAD(&xprt->sc_ctxt_free);
411 for (i = 0; i < ctxt_count; i++) { 438 for (i = 0; i < ctxt_count; i++) {
412 ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); 439 ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
413 if (ctxt) { 440 if (ctxt) {
414 ctxt->next = xprt->sc_ctxt_head; 441 INIT_LIST_HEAD(&ctxt->free_list);
415 xprt->sc_ctxt_head = ctxt; 442 list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
416 xprt->sc_ctxt_cnt++; 443 xprt->sc_ctxt_cnt++;
417 } 444 }
418 } 445 }
419} 446}
420 447
421static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt) 448static void destroy_context_cache(struct svcxprt_rdma *xprt)
422{ 449{
423 struct svc_rdma_op_ctxt *next; 450 while (!list_empty(&xprt->sc_ctxt_free)) {
424 if (!ctxt) 451 struct svc_rdma_op_ctxt *ctxt;
425 return; 452 ctxt = list_entry(xprt->sc_ctxt_free.next,
426 453 struct svc_rdma_op_ctxt,
427 do { 454 free_list);
428 next = ctxt->next; 455 list_del_init(&ctxt->free_list);
429 kfree(ctxt); 456 kfree(ctxt);
430 ctxt = next; 457 }
431 } while (next);
432} 458}
433 459
434static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 460static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
@@ -465,7 +491,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
465 reqs + 491 reqs +
466 cma_xprt->sc_sq_depth + 492 cma_xprt->sc_sq_depth +
467 RPCRDMA_MAX_THREADS + 1); /* max */ 493 RPCRDMA_MAX_THREADS + 1); /* max */
468 if (!cma_xprt->sc_ctxt_head) { 494 if (list_empty(&cma_xprt->sc_ctxt_free)) {
469 kfree(cma_xprt); 495 kfree(cma_xprt);
470 return NULL; 496 return NULL;
471 } 497 }
@@ -520,7 +546,12 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
520 recv_wr.num_sge = ctxt->count; 546 recv_wr.num_sge = ctxt->count;
521 recv_wr.wr_id = (u64)(unsigned long)ctxt; 547 recv_wr.wr_id = (u64)(unsigned long)ctxt;
522 548
549 svc_xprt_get(&xprt->sc_xprt);
523 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 550 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
551 if (ret) {
552 svc_xprt_put(&xprt->sc_xprt);
553 svc_rdma_put_context(ctxt, 1);
554 }
524 return ret; 555 return ret;
525} 556}
526 557
@@ -539,6 +570,7 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id)
539{ 570{
540 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 571 struct svcxprt_rdma *listen_xprt = new_cma_id->context;
541 struct svcxprt_rdma *newxprt; 572 struct svcxprt_rdma *newxprt;
573 struct sockaddr *sa;
542 574
543 /* Create a new transport */ 575 /* Create a new transport */
544 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); 576 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
@@ -551,6 +583,12 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id)
551 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 583 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
552 newxprt, newxprt->sc_cm_id, listen_xprt); 584 newxprt, newxprt->sc_cm_id, listen_xprt);
553 585
586 /* Set the local and remote addresses in the transport */
587 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
588 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
589 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
590 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
591
554 /* 592 /*
555 * Enqueue the new transport on the accept queue of the listening 593 * Enqueue the new transport on the accept queue of the listening
556 * transport 594 * transport
@@ -627,6 +665,7 @@ static int rdma_cma_handler(struct rdma_cm_id *cma_id,
627 if (xprt) { 665 if (xprt) {
628 set_bit(XPT_CLOSE, &xprt->xpt_flags); 666 set_bit(XPT_CLOSE, &xprt->xpt_flags);
629 svc_xprt_enqueue(xprt); 667 svc_xprt_enqueue(xprt);
668 svc_xprt_put(xprt);
630 } 669 }
631 break; 670 break;
632 case RDMA_CM_EVENT_DEVICE_REMOVAL: 671 case RDMA_CM_EVENT_DEVICE_REMOVAL:
@@ -661,31 +700,27 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
661 700
662 cma_xprt = rdma_create_xprt(serv, 1); 701 cma_xprt = rdma_create_xprt(serv, 1);
663 if (!cma_xprt) 702 if (!cma_xprt)
664 return ERR_PTR(ENOMEM); 703 return ERR_PTR(-ENOMEM);
665 xprt = &cma_xprt->sc_xprt; 704 xprt = &cma_xprt->sc_xprt;
666 705
667 listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP); 706 listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
668 if (IS_ERR(listen_id)) { 707 if (IS_ERR(listen_id)) {
669 svc_xprt_put(&cma_xprt->sc_xprt); 708 ret = PTR_ERR(listen_id);
670 dprintk("svcrdma: rdma_create_id failed = %ld\n", 709 dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
671 PTR_ERR(listen_id)); 710 goto err0;
672 return (void *)listen_id;
673 } 711 }
712
674 ret = rdma_bind_addr(listen_id, sa); 713 ret = rdma_bind_addr(listen_id, sa);
675 if (ret) { 714 if (ret) {
676 rdma_destroy_id(listen_id);
677 svc_xprt_put(&cma_xprt->sc_xprt);
678 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 715 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
679 return ERR_PTR(ret); 716 goto err1;
680 } 717 }
681 cma_xprt->sc_cm_id = listen_id; 718 cma_xprt->sc_cm_id = listen_id;
682 719
683 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 720 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
684 if (ret) { 721 if (ret) {
685 rdma_destroy_id(listen_id);
686 svc_xprt_put(&cma_xprt->sc_xprt);
687 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 722 dprintk("svcrdma: rdma_listen failed = %d\n", ret);
688 return ERR_PTR(ret); 723 goto err1;
689 } 724 }
690 725
691 /* 726 /*
@@ -696,6 +731,12 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
696 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 731 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
697 732
698 return &cma_xprt->sc_xprt; 733 return &cma_xprt->sc_xprt;
734
735 err1:
736 rdma_destroy_id(listen_id);
737 err0:
738 kfree(cma_xprt);
739 return ERR_PTR(ret);
699} 740}
700 741
701/* 742/*
@@ -716,7 +757,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
716 struct rdma_conn_param conn_param; 757 struct rdma_conn_param conn_param;
717 struct ib_qp_init_attr qp_attr; 758 struct ib_qp_init_attr qp_attr;
718 struct ib_device_attr devattr; 759 struct ib_device_attr devattr;
719 struct sockaddr *sa;
720 int ret; 760 int ret;
721 int i; 761 int i;
722 762
@@ -826,7 +866,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
826 newxprt->sc_sq_depth = qp_attr.cap.max_send_wr; 866 newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
827 newxprt->sc_max_requests = qp_attr.cap.max_recv_wr; 867 newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
828 } 868 }
829 svc_xprt_get(&newxprt->sc_xprt);
830 newxprt->sc_qp = newxprt->sc_cm_id->qp; 869 newxprt->sc_qp = newxprt->sc_cm_id->qp;
831 870
832 /* Register all of physical memory */ 871 /* Register all of physical memory */
@@ -850,6 +889,13 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
850 /* Swap out the handler */ 889 /* Swap out the handler */
851 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 890 newxprt->sc_cm_id->event_handler = rdma_cma_handler;
852 891
892 /*
893 * Arm the CQs for the SQ and RQ before accepting so we can't
894 * miss the first message
895 */
896 ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
897 ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
898
853 /* Accept Connection */ 899 /* Accept Connection */
854 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 900 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
855 memset(&conn_param, 0, sizeof conn_param); 901 memset(&conn_param, 0, sizeof conn_param);
@@ -886,58 +932,26 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
886 newxprt->sc_max_requests, 932 newxprt->sc_max_requests,
887 newxprt->sc_ord); 933 newxprt->sc_ord);
888 934
889 /* Set the local and remote addresses in the transport */
890 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
891 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
892 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
893 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
894
895 ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
896 ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
897 return &newxprt->sc_xprt; 935 return &newxprt->sc_xprt;
898 936
899 errout: 937 errout:
900 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 938 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
901 /* Take a reference in case the DTO handler runs */ 939 /* Take a reference in case the DTO handler runs */
902 svc_xprt_get(&newxprt->sc_xprt); 940 svc_xprt_get(&newxprt->sc_xprt);
903 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) { 941 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
904 ib_destroy_qp(newxprt->sc_qp); 942 ib_destroy_qp(newxprt->sc_qp);
905 svc_xprt_put(&newxprt->sc_xprt);
906 }
907 rdma_destroy_id(newxprt->sc_cm_id); 943 rdma_destroy_id(newxprt->sc_cm_id);
908 /* This call to put will destroy the transport */ 944 /* This call to put will destroy the transport */
909 svc_xprt_put(&newxprt->sc_xprt); 945 svc_xprt_put(&newxprt->sc_xprt);
910 return NULL; 946 return NULL;
911} 947}
912 948
913/*
914 * Post an RQ WQE to the RQ when the rqst is being released. This
915 * effectively returns an RQ credit to the client. The rq_xprt_ctxt
916 * will be null if the request is deferred due to an RDMA_READ or the
917 * transport had no data ready (EAGAIN). Note that an RPC deferred in
918 * svc_process will still return the credit, this is because the data
919 * is copied and no longer consume a WQE/WC.
920 */
921static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 949static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
922{ 950{
923 int err;
924 struct svcxprt_rdma *rdma =
925 container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
926 if (rqstp->rq_xprt_ctxt) {
927 BUG_ON(rqstp->rq_xprt_ctxt != rdma);
928 err = svc_rdma_post_recv(rdma);
929 if (err)
930 dprintk("svcrdma: failed to post an RQ WQE error=%d\n",
931 err);
932 }
933 rqstp->rq_xprt_ctxt = NULL;
934} 951}
935 952
936/* 953/*
937 * When connected, an svc_xprt has at least three references: 954 * When connected, an svc_xprt has at least two references:
938 *
939 * - A reference held by the QP. We still hold that here because this
940 * code deletes the QP and puts the reference.
941 * 955 *
942 * - A reference held by the cm_id between the ESTABLISHED and 956 * - A reference held by the cm_id between the ESTABLISHED and
943 * DISCONNECTED events. If the remote peer disconnected first, this 957 * DISCONNECTED events. If the remote peer disconnected first, this
@@ -946,7 +960,7 @@ static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
946 * - A reference held by the svc_recv code that called this function 960 * - A reference held by the svc_recv code that called this function
947 * as part of close processing. 961 * as part of close processing.
948 * 962 *
949 * At a minimum two references should still be held. 963 * At a minimum one references should still be held.
950 */ 964 */
951static void svc_rdma_detach(struct svc_xprt *xprt) 965static void svc_rdma_detach(struct svc_xprt *xprt)
952{ 966{
@@ -956,23 +970,53 @@ static void svc_rdma_detach(struct svc_xprt *xprt)
956 970
957 /* Disconnect and flush posted WQE */ 971 /* Disconnect and flush posted WQE */
958 rdma_disconnect(rdma->sc_cm_id); 972 rdma_disconnect(rdma->sc_cm_id);
959
960 /* Destroy the QP if present (not a listener) */
961 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) {
962 ib_destroy_qp(rdma->sc_qp);
963 svc_xprt_put(xprt);
964 }
965
966 /* Destroy the CM ID */
967 rdma_destroy_id(rdma->sc_cm_id);
968} 973}
969 974
970static void svc_rdma_free(struct svc_xprt *xprt) 975static void __svc_rdma_free(struct work_struct *work)
971{ 976{
972 struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt; 977 struct svcxprt_rdma *rdma =
978 container_of(work, struct svcxprt_rdma, sc_work);
973 dprintk("svcrdma: svc_rdma_free(%p)\n", rdma); 979 dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
980
974 /* We should only be called from kref_put */ 981 /* We should only be called from kref_put */
975 BUG_ON(atomic_read(&xprt->xpt_ref.refcount) != 0); 982 BUG_ON(atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0);
983
984 /*
985 * Destroy queued, but not processed read completions. Note
986 * that this cleanup has to be done before destroying the
987 * cm_id because the device ptr is needed to unmap the dma in
988 * svc_rdma_put_context.
989 */
990 spin_lock_bh(&rdma->sc_read_complete_lock);
991 while (!list_empty(&rdma->sc_read_complete_q)) {
992 struct svc_rdma_op_ctxt *ctxt;
993 ctxt = list_entry(rdma->sc_read_complete_q.next,
994 struct svc_rdma_op_ctxt,
995 dto_q);
996 list_del_init(&ctxt->dto_q);
997 svc_rdma_put_context(ctxt, 1);
998 }
999 spin_unlock_bh(&rdma->sc_read_complete_lock);
1000
1001 /* Destroy queued, but not processed recv completions */
1002 spin_lock_bh(&rdma->sc_rq_dto_lock);
1003 while (!list_empty(&rdma->sc_rq_dto_q)) {
1004 struct svc_rdma_op_ctxt *ctxt;
1005 ctxt = list_entry(rdma->sc_rq_dto_q.next,
1006 struct svc_rdma_op_ctxt,
1007 dto_q);
1008 list_del_init(&ctxt->dto_q);
1009 svc_rdma_put_context(ctxt, 1);
1010 }
1011 spin_unlock_bh(&rdma->sc_rq_dto_lock);
1012
1013 /* Warn if we leaked a resource or under-referenced */
1014 WARN_ON(atomic_read(&rdma->sc_ctxt_used) != 0);
1015
1016 /* Destroy the QP if present (not a listener) */
1017 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
1018 ib_destroy_qp(rdma->sc_qp);
1019
976 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 1020 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
977 ib_destroy_cq(rdma->sc_sq_cq); 1021 ib_destroy_cq(rdma->sc_sq_cq);
978 1022
@@ -985,10 +1029,21 @@ static void svc_rdma_free(struct svc_xprt *xprt)
985 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 1029 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
986 ib_dealloc_pd(rdma->sc_pd); 1030 ib_dealloc_pd(rdma->sc_pd);
987 1031
988 destroy_context_cache(rdma->sc_ctxt_head); 1032 /* Destroy the CM ID */
1033 rdma_destroy_id(rdma->sc_cm_id);
1034
1035 destroy_context_cache(rdma);
989 kfree(rdma); 1036 kfree(rdma);
990} 1037}
991 1038
1039static void svc_rdma_free(struct svc_xprt *xprt)
1040{
1041 struct svcxprt_rdma *rdma =
1042 container_of(xprt, struct svcxprt_rdma, sc_xprt);
1043 INIT_WORK(&rdma->sc_work, __svc_rdma_free);
1044 schedule_work(&rdma->sc_work);
1045}
1046
992static int svc_rdma_has_wspace(struct svc_xprt *xprt) 1047static int svc_rdma_has_wspace(struct svc_xprt *xprt)
993{ 1048{
994 struct svcxprt_rdma *rdma = 1049 struct svcxprt_rdma *rdma =
@@ -1018,7 +1073,7 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1018 int ret; 1073 int ret;
1019 1074
1020 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1075 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1021 return 0; 1076 return -ENOTCONN;
1022 1077
1023 BUG_ON(wr->send_flags != IB_SEND_SIGNALED); 1078 BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
1024 BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op != 1079 BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op !=
@@ -1029,7 +1084,8 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1029 if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) { 1084 if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
1030 spin_unlock_bh(&xprt->sc_lock); 1085 spin_unlock_bh(&xprt->sc_lock);
1031 atomic_inc(&rdma_stat_sq_starve); 1086 atomic_inc(&rdma_stat_sq_starve);
1032 /* See if we can reap some SQ WR */ 1087
1088 /* See if we can opportunistically reap SQ WR to make room */
1033 sq_cq_reap(xprt); 1089 sq_cq_reap(xprt);
1034 1090
1035 /* Wait until SQ WR available if SQ still full */ 1091 /* Wait until SQ WR available if SQ still full */
@@ -1041,22 +1097,25 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1041 continue; 1097 continue;
1042 } 1098 }
1043 /* Bumped used SQ WR count and post */ 1099 /* Bumped used SQ WR count and post */
1100 svc_xprt_get(&xprt->sc_xprt);
1044 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1101 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
1045 if (!ret) 1102 if (!ret)
1046 atomic_inc(&xprt->sc_sq_count); 1103 atomic_inc(&xprt->sc_sq_count);
1047 else 1104 else {
1105 svc_xprt_put(&xprt->sc_xprt);
1048 dprintk("svcrdma: failed to post SQ WR rc=%d, " 1106 dprintk("svcrdma: failed to post SQ WR rc=%d, "
1049 "sc_sq_count=%d, sc_sq_depth=%d\n", 1107 "sc_sq_count=%d, sc_sq_depth=%d\n",
1050 ret, atomic_read(&xprt->sc_sq_count), 1108 ret, atomic_read(&xprt->sc_sq_count),
1051 xprt->sc_sq_depth); 1109 xprt->sc_sq_depth);
1110 }
1052 spin_unlock_bh(&xprt->sc_lock); 1111 spin_unlock_bh(&xprt->sc_lock);
1053 break; 1112 break;
1054 } 1113 }
1055 return ret; 1114 return ret;
1056} 1115}
1057 1116
1058int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, 1117void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
1059 enum rpcrdma_errcode err) 1118 enum rpcrdma_errcode err)
1060{ 1119{
1061 struct ib_send_wr err_wr; 1120 struct ib_send_wr err_wr;
1062 struct ib_sge sge; 1121 struct ib_sge sge;
@@ -1094,9 +1153,8 @@ int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
1094 /* Post It */ 1153 /* Post It */
1095 ret = svc_rdma_send(xprt, &err_wr); 1154 ret = svc_rdma_send(xprt, &err_wr);
1096 if (ret) { 1155 if (ret) {
1097 dprintk("svcrdma: Error posting send = %d\n", ret); 1156 dprintk("svcrdma: Error %d posting send for protocol error\n",
1157 ret);
1098 svc_rdma_put_context(ctxt, 1); 1158 svc_rdma_put_context(ctxt, 1);
1099 } 1159 }
1100
1101 return ret;
1102} 1160}