aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds/ib_send.c
diff options
context:
space:
mode:
authorZach Brown <zach.brown@oracle.com>2010-07-14 16:55:35 -0400
committerAndy Grover <andy.grover@oracle.com>2010-09-08 21:16:40 -0400
commitf046011cd73c372267befd10242988eb744649fe (patch)
treed184275400dee81f2a5027728bda849bec338d99 /net/rds/ib_send.c
parentef87b7ea39a91906218a262686bcb8bad8b6b46e (diff)
RDS/IB: track signaled sends
We're seeing bugs today where IB connection shutdown clears the send ring while the tasklet is processing completed sends. Implementation details cause this to dereference a null pointer. Shutdown needs to wait for send completion to stop before tearing down the connection. We can't simply wait for the ring to empty because it may contain unsignaled sends that will never be processed. This patch tracks the number of signaled sends that we've posted and waits for them to complete. It also makes sure that the tasklet has finished executing. Signed-off-by: Zach Brown <zach.brown@oracle.com>
Diffstat (limited to 'net/rds/ib_send.c')
-rw-r--r--net/rds/ib_send.c47
1 files changed, 42 insertions, 5 deletions
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index e88cb4af009b..15f75692574c 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -220,6 +220,18 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
220} 220}
221 221
222/* 222/*
223 * The only fast path caller always has a non-zero nr, so we don't
224 * bother testing nr before performing the atomic sub.
225 */
226static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
227{
228 if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
229 waitqueue_active(&rds_ib_ring_empty_wait))
230 wake_up(&rds_ib_ring_empty_wait);
231 BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
232}
233
234/*
223 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc 235 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
224 * operations performed in the send path. As the sender allocs and potentially 236 * operations performed in the send path. As the sender allocs and potentially
225 * unallocs the next free entry in the ring it doesn't alter which is 237 * unallocs the next free entry in the ring it doesn't alter which is
@@ -236,6 +248,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
236 u32 oldest; 248 u32 oldest;
237 u32 i = 0; 249 u32 i = 0;
238 int ret; 250 int ret;
251 int nr_sig = 0;
239 252
240 rdsdebug("cq %p conn %p\n", cq, conn); 253 rdsdebug("cq %p conn %p\n", cq, conn);
241 rds_ib_stats_inc(s_ib_tx_cq_call); 254 rds_ib_stats_inc(s_ib_tx_cq_call);
@@ -262,6 +275,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
262 275
263 for (i = 0; i < completed; i++) { 276 for (i = 0; i < completed; i++) {
264 send = &ic->i_sends[oldest]; 277 send = &ic->i_sends[oldest];
278 if (send->s_wr.send_flags & IB_SEND_SIGNALED)
279 nr_sig++;
265 280
266 rm = rds_ib_send_unmap_op(ic, send, wc.status); 281 rm = rds_ib_send_unmap_op(ic, send, wc.status);
267 282
@@ -282,6 +297,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
282 } 297 }
283 298
284 rds_ib_ring_free(&ic->i_send_ring, completed); 299 rds_ib_ring_free(&ic->i_send_ring, completed);
300 rds_ib_sub_signaled(ic, nr_sig);
301 nr_sig = 0;
285 302
286 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) || 303 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
287 test_bit(0, &conn->c_map_queued)) 304 test_bit(0, &conn->c_map_queued))
@@ -440,9 +457,9 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
440 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 457 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
441} 458}
442 459
443static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic, 460static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
444 struct rds_ib_send_work *send, 461 struct rds_ib_send_work *send,
445 bool notify) 462 bool notify)
446{ 463{
447 /* 464 /*
448 * We want to delay signaling completions just enough to get 465 * We want to delay signaling completions just enough to get
@@ -452,7 +469,9 @@ static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
452 if (ic->i_unsignaled_wrs-- == 0 || notify) { 469 if (ic->i_unsignaled_wrs-- == 0 || notify) {
453 ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; 470 ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
454 send->s_wr.send_flags |= IB_SEND_SIGNALED; 471 send->s_wr.send_flags |= IB_SEND_SIGNALED;
472 return 1;
455 } 473 }
474 return 0;
456} 475}
457 476
458/* 477/*
@@ -488,6 +507,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
488 int bytes_sent = 0; 507 int bytes_sent = 0;
489 int ret; 508 int ret;
490 int flow_controlled = 0; 509 int flow_controlled = 0;
510 int nr_sig = 0;
491 511
492 BUG_ON(off % RDS_FRAG_SIZE); 512 BUG_ON(off % RDS_FRAG_SIZE);
493 BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header)); 513 BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
@@ -645,6 +665,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
645 if (ic->i_flowctl && flow_controlled && i == (work_alloc-1)) 665 if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
646 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; 666 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
647 667
668 if (send->s_wr.send_flags & IB_SEND_SIGNALED)
669 nr_sig++;
670
648 rdsdebug("send %p wr %p num_sge %u next %p\n", send, 671 rdsdebug("send %p wr %p num_sge %u next %p\n", send,
649 &send->s_wr, send->s_wr.num_sge, send->s_wr.next); 672 &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
650 673
@@ -689,6 +712,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
689 if (ic->i_flowctl && i < credit_alloc) 712 if (ic->i_flowctl && i < credit_alloc)
690 rds_ib_send_add_credits(conn, credit_alloc - i); 713 rds_ib_send_add_credits(conn, credit_alloc - i);
691 714
715 if (nr_sig)
716 atomic_add(nr_sig, &ic->i_signaled_sends);
717
692 /* XXX need to worry about failed_wr and partial sends. */ 718 /* XXX need to worry about failed_wr and partial sends. */
693 failed_wr = &first->s_wr; 719 failed_wr = &first->s_wr;
694 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); 720 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
@@ -699,6 +725,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
699 printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 " 725 printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
700 "returned %d\n", &conn->c_faddr, ret); 726 "returned %d\n", &conn->c_faddr, ret);
701 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 727 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
728 rds_ib_sub_signaled(ic, nr_sig);
702 if (prev->s_op) { 729 if (prev->s_op) {
703 ic->i_data_op = prev->s_op; 730 ic->i_data_op = prev->s_op;
704 prev->s_op = NULL; 731 prev->s_op = NULL;
@@ -728,6 +755,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
728 u32 pos; 755 u32 pos;
729 u32 work_alloc; 756 u32 work_alloc;
730 int ret; 757 int ret;
758 int nr_sig = 0;
731 759
732 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); 760 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
733 761
@@ -752,7 +780,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
752 send->s_wr.wr.atomic.compare_add = op->op_swap_add; 780 send->s_wr.wr.atomic.compare_add = op->op_swap_add;
753 send->s_wr.wr.atomic.swap = 0; 781 send->s_wr.wr.atomic.swap = 0;
754 } 782 }
755 rds_ib_set_wr_signal_state(ic, send, op->op_notify); 783 nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
756 send->s_wr.num_sge = 1; 784 send->s_wr.num_sge = 1;
757 send->s_wr.next = NULL; 785 send->s_wr.next = NULL;
758 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; 786 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
@@ -778,6 +806,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
778 rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr, 806 rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
779 send->s_sge[0].addr, send->s_sge[0].length); 807 send->s_sge[0].addr, send->s_sge[0].length);
780 808
809 if (nr_sig)
810 atomic_add(nr_sig, &ic->i_signaled_sends);
811
781 failed_wr = &send->s_wr; 812 failed_wr = &send->s_wr;
782 ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr); 813 ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
783 rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic, 814 rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
@@ -787,6 +818,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
787 printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 " 818 printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
788 "returned %d\n", &conn->c_faddr, ret); 819 "returned %d\n", &conn->c_faddr, ret);
789 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 820 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
821 rds_ib_sub_signaled(ic, nr_sig);
790 goto out; 822 goto out;
791 } 823 }
792 824
@@ -817,6 +849,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
817 int sent; 849 int sent;
818 int ret; 850 int ret;
819 int num_sge; 851 int num_sge;
852 int nr_sig = 0;
820 853
821 /* map the op the first time we see it */ 854 /* map the op the first time we see it */
822 if (!op->op_mapped) { 855 if (!op->op_mapped) {
@@ -859,7 +892,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
859 send->s_queued = jiffies; 892 send->s_queued = jiffies;
860 send->s_op = NULL; 893 send->s_op = NULL;
861 894
862 rds_ib_set_wr_signal_state(ic, send, op->op_notify); 895 nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
863 896
864 send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ; 897 send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
865 send->s_wr.wr.rdma.remote_addr = remote_addr; 898 send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -910,6 +943,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
910 work_alloc = i; 943 work_alloc = i;
911 } 944 }
912 945
946 if (nr_sig)
947 atomic_add(nr_sig, &ic->i_signaled_sends);
948
913 failed_wr = &first->s_wr; 949 failed_wr = &first->s_wr;
914 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); 950 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
915 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, 951 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
@@ -919,6 +955,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
919 printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 " 955 printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
920 "returned %d\n", &conn->c_faddr, ret); 956 "returned %d\n", &conn->c_faddr, ret);
921 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 957 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
958 rds_ib_sub_signaled(ic, nr_sig);
922 goto out; 959 goto out;
923 } 960 }
924 961