aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds')
-rw-r--r--net/rds/ib.h1
-rw-r--r--net/rds/ib_cm.c14
-rw-r--r--net/rds/ib_send.c47
3 files changed, 54 insertions, 8 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h
index acda2dbc6576..a13ced504145 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -108,6 +108,7 @@ struct rds_ib_connection {
108 struct rds_header *i_send_hdrs; 108 struct rds_header *i_send_hdrs;
109 u64 i_send_hdrs_dma; 109 u64 i_send_hdrs_dma;
110 struct rds_ib_send_work *i_sends; 110 struct rds_ib_send_work *i_sends;
111 atomic_t i_signaled_sends;
111 112
112 /* rx */ 113 /* rx */
113 struct tasklet_struct i_recv_tasklet; 114 struct tasklet_struct i_recv_tasklet;
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 10f6a8815cd0..123c7d33b54e 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -615,11 +615,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
615 } 615 }
616 616
617 /* 617 /*
618 * Don't wait for the send ring to be empty -- there may be completed 618 * We want to wait for tx and rx completion to finish
619 * non-signaled entries sitting on there. We unmap these below. 619 * before we tear down the connection, but we have to be
620 * careful not to get stuck waiting on a send ring that
621 * only has unsignaled sends in it. We've shutdown new
622 * sends before getting here so by waiting for signaled
623 * sends to complete we're ensured that there will be no
624 * more tx processing.
620 */ 625 */
621 wait_event(rds_ib_ring_empty_wait, 626 wait_event(rds_ib_ring_empty_wait,
622 rds_ib_ring_empty(&ic->i_recv_ring)); 627 rds_ib_ring_empty(&ic->i_recv_ring) &&
628 (atomic_read(&ic->i_signaled_sends) == 0));
629 tasklet_kill(&ic->i_recv_tasklet);
623 630
624 if (ic->i_send_hdrs) 631 if (ic->i_send_hdrs)
625 ib_dma_free_coherent(dev, 632 ib_dma_free_coherent(dev,
@@ -729,6 +736,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
729#ifndef KERNEL_HAS_ATOMIC64 736#ifndef KERNEL_HAS_ATOMIC64
730 spin_lock_init(&ic->i_ack_lock); 737 spin_lock_init(&ic->i_ack_lock);
731#endif 738#endif
739 atomic_set(&ic->i_signaled_sends, 0);
732 740
733 /* 741 /*
734 * rds_ib_conn_shutdown() waits for these to be emptied so they 742 * rds_ib_conn_shutdown() waits for these to be emptied so they
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index e88cb4af009b..15f75692574c 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -220,6 +220,18 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
220} 220}
221 221
222/* 222/*
223 * The only fast path caller always has a non-zero nr, so we don't
224 * bother testing nr before performing the atomic sub.
225 */
226static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
227{
228 if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
229 waitqueue_active(&rds_ib_ring_empty_wait))
230 wake_up(&rds_ib_ring_empty_wait);
231 BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
232}
233
234/*
223 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc 235 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
224 * operations performed in the send path. As the sender allocs and potentially 236 * operations performed in the send path. As the sender allocs and potentially
225 * unallocs the next free entry in the ring it doesn't alter which is 237 * unallocs the next free entry in the ring it doesn't alter which is
@@ -236,6 +248,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
236 u32 oldest; 248 u32 oldest;
237 u32 i = 0; 249 u32 i = 0;
238 int ret; 250 int ret;
251 int nr_sig = 0;
239 252
240 rdsdebug("cq %p conn %p\n", cq, conn); 253 rdsdebug("cq %p conn %p\n", cq, conn);
241 rds_ib_stats_inc(s_ib_tx_cq_call); 254 rds_ib_stats_inc(s_ib_tx_cq_call);
@@ -262,6 +275,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
262 275
263 for (i = 0; i < completed; i++) { 276 for (i = 0; i < completed; i++) {
264 send = &ic->i_sends[oldest]; 277 send = &ic->i_sends[oldest];
278 if (send->s_wr.send_flags & IB_SEND_SIGNALED)
279 nr_sig++;
265 280
266 rm = rds_ib_send_unmap_op(ic, send, wc.status); 281 rm = rds_ib_send_unmap_op(ic, send, wc.status);
267 282
@@ -282,6 +297,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
282 } 297 }
283 298
284 rds_ib_ring_free(&ic->i_send_ring, completed); 299 rds_ib_ring_free(&ic->i_send_ring, completed);
300 rds_ib_sub_signaled(ic, nr_sig);
301 nr_sig = 0;
285 302
286 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) || 303 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
287 test_bit(0, &conn->c_map_queued)) 304 test_bit(0, &conn->c_map_queued))
@@ -440,9 +457,9 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
440 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 457 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
441} 458}
442 459
443static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic, 460static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
444 struct rds_ib_send_work *send, 461 struct rds_ib_send_work *send,
445 bool notify) 462 bool notify)
446{ 463{
447 /* 464 /*
448 * We want to delay signaling completions just enough to get 465 * We want to delay signaling completions just enough to get
@@ -452,7 +469,9 @@ static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
452 if (ic->i_unsignaled_wrs-- == 0 || notify) { 469 if (ic->i_unsignaled_wrs-- == 0 || notify) {
453 ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; 470 ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
454 send->s_wr.send_flags |= IB_SEND_SIGNALED; 471 send->s_wr.send_flags |= IB_SEND_SIGNALED;
472 return 1;
455 } 473 }
474 return 0;
456} 475}
457 476
458/* 477/*
@@ -488,6 +507,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
488 int bytes_sent = 0; 507 int bytes_sent = 0;
489 int ret; 508 int ret;
490 int flow_controlled = 0; 509 int flow_controlled = 0;
510 int nr_sig = 0;
491 511
492 BUG_ON(off % RDS_FRAG_SIZE); 512 BUG_ON(off % RDS_FRAG_SIZE);
493 BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header)); 513 BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
@@ -645,6 +665,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
645 if (ic->i_flowctl && flow_controlled && i == (work_alloc-1)) 665 if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
646 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; 666 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
647 667
668 if (send->s_wr.send_flags & IB_SEND_SIGNALED)
669 nr_sig++;
670
648 rdsdebug("send %p wr %p num_sge %u next %p\n", send, 671 rdsdebug("send %p wr %p num_sge %u next %p\n", send,
649 &send->s_wr, send->s_wr.num_sge, send->s_wr.next); 672 &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
650 673
@@ -689,6 +712,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
689 if (ic->i_flowctl && i < credit_alloc) 712 if (ic->i_flowctl && i < credit_alloc)
690 rds_ib_send_add_credits(conn, credit_alloc - i); 713 rds_ib_send_add_credits(conn, credit_alloc - i);
691 714
715 if (nr_sig)
716 atomic_add(nr_sig, &ic->i_signaled_sends);
717
692 /* XXX need to worry about failed_wr and partial sends. */ 718 /* XXX need to worry about failed_wr and partial sends. */
693 failed_wr = &first->s_wr; 719 failed_wr = &first->s_wr;
694 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); 720 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
@@ -699,6 +725,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
699 printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 " 725 printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
700 "returned %d\n", &conn->c_faddr, ret); 726 "returned %d\n", &conn->c_faddr, ret);
701 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 727 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
728 rds_ib_sub_signaled(ic, nr_sig);
702 if (prev->s_op) { 729 if (prev->s_op) {
703 ic->i_data_op = prev->s_op; 730 ic->i_data_op = prev->s_op;
704 prev->s_op = NULL; 731 prev->s_op = NULL;
@@ -728,6 +755,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
728 u32 pos; 755 u32 pos;
729 u32 work_alloc; 756 u32 work_alloc;
730 int ret; 757 int ret;
758 int nr_sig = 0;
731 759
732 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); 760 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
733 761
@@ -752,7 +780,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
752 send->s_wr.wr.atomic.compare_add = op->op_swap_add; 780 send->s_wr.wr.atomic.compare_add = op->op_swap_add;
753 send->s_wr.wr.atomic.swap = 0; 781 send->s_wr.wr.atomic.swap = 0;
754 } 782 }
755 rds_ib_set_wr_signal_state(ic, send, op->op_notify); 783 nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
756 send->s_wr.num_sge = 1; 784 send->s_wr.num_sge = 1;
757 send->s_wr.next = NULL; 785 send->s_wr.next = NULL;
758 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; 786 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
@@ -778,6 +806,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
778 rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr, 806 rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
779 send->s_sge[0].addr, send->s_sge[0].length); 807 send->s_sge[0].addr, send->s_sge[0].length);
780 808
809 if (nr_sig)
810 atomic_add(nr_sig, &ic->i_signaled_sends);
811
781 failed_wr = &send->s_wr; 812 failed_wr = &send->s_wr;
782 ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr); 813 ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
783 rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic, 814 rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
@@ -787,6 +818,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
787 printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 " 818 printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
788 "returned %d\n", &conn->c_faddr, ret); 819 "returned %d\n", &conn->c_faddr, ret);
789 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 820 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
821 rds_ib_sub_signaled(ic, nr_sig);
790 goto out; 822 goto out;
791 } 823 }
792 824
@@ -817,6 +849,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
817 int sent; 849 int sent;
818 int ret; 850 int ret;
819 int num_sge; 851 int num_sge;
852 int nr_sig = 0;
820 853
821 /* map the op the first time we see it */ 854 /* map the op the first time we see it */
822 if (!op->op_mapped) { 855 if (!op->op_mapped) {
@@ -859,7 +892,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
859 send->s_queued = jiffies; 892 send->s_queued = jiffies;
860 send->s_op = NULL; 893 send->s_op = NULL;
861 894
862 rds_ib_set_wr_signal_state(ic, send, op->op_notify); 895 nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
863 896
864 send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ; 897 send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
865 send->s_wr.wr.rdma.remote_addr = remote_addr; 898 send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -910,6 +943,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
910 work_alloc = i; 943 work_alloc = i;
911 } 944 }
912 945
946 if (nr_sig)
947 atomic_add(nr_sig, &ic->i_signaled_sends);
948
913 failed_wr = &first->s_wr; 949 failed_wr = &first->s_wr;
914 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); 950 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
915 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, 951 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
@@ -919,6 +955,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
919 printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 " 955 printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
920 "returned %d\n", &conn->c_faddr, ret); 956 "returned %d\n", &conn->c_faddr, ret);
921 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 957 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
958 rds_ib_sub_signaled(ic, nr_sig);
922 goto out; 959 goto out;
923 } 960 }
924 961