aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds/send.c
diff options
context:
space:
mode:
authorZach Brown <zach.brown@oracle.com>2010-06-04 17:41:41 -0400
committerAndy Grover <andy.grover@oracle.com>2010-09-08 21:15:27 -0400
commit0f4b1c7e89e699f588807a914ec6e6396c851a72 (patch)
tree8b882f85f03089283f6d222bf8c7d5616a102ecd /net/rds/send.c
parent501dcccdb7a2335cde07d4acb56e636182d62944 (diff)
rds: fix rds_send_xmit() serialization
rds_send_xmit() was changed to hold an interrupt masking spinlock instead of a mutex so that it could be called from the IB receive tasklet path. This broke the TCP transport because its xmit method can block and masks and unmasks interrupts. This patch serializes callers to rds_send_xmit() with a simple bit instead of the current spinlock or previous mutex. This enables rds_send_xmit() to be called from any context and to call functions which block. Getting rid of the c_send_lock exposes the bare c_lock acquisitions which are changed to block interrupts. A waitqueue is added so that rds_conn_shutdown() can wait for callers to leave rds_send_xmit() before tearing down partial send state. This lets us get rid of c_senders. rds_send_xmit() is changed to check the conn state after acquiring the RDS_IN_XMIT bit to resolve races with the shutdown path. Previously both worked with the conn state and then the lock in the same order, allowing them to race and execute the paths concurrently. rds_send_reset() isn't racing with rds_send_xmit() now that rds_conn_shutdown() properly ensures that rds_send_xmit() can't start once the conn state has been changed. We can remove its previous use of the spinlock. Finally, c_send_generation is redundant. Callers can race to test the c_flags bit by simply retrying instead of racing to test the c_send_generation atomic. Signed-off-by: Zach Brown <zach.brown@oracle.com>
Diffstat (limited to 'net/rds/send.c')
-rw-r--r--net/rds/send.c82
1 files changed, 43 insertions, 39 deletions
diff --git a/net/rds/send.c b/net/rds/send.c
index b9e41afef323..81471b25373b 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -53,14 +53,14 @@ module_param(send_batch_count, int, 0444);
53MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 53MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
54 54
55/* 55/*
56 * Reset the send state. Caller must hold c_send_lock when calling here. 56 * Reset the send state. Callers must ensure that this doesn't race with
57 * rds_send_xmit().
57 */ 58 */
58void rds_send_reset(struct rds_connection *conn) 59void rds_send_reset(struct rds_connection *conn)
59{ 60{
60 struct rds_message *rm, *tmp; 61 struct rds_message *rm, *tmp;
61 unsigned long flags; 62 unsigned long flags;
62 63
63 spin_lock_irqsave(&conn->c_send_lock, flags);
64 if (conn->c_xmit_rm) { 64 if (conn->c_xmit_rm) {
65 rm = conn->c_xmit_rm; 65 rm = conn->c_xmit_rm;
66 conn->c_xmit_rm = NULL; 66 conn->c_xmit_rm = NULL;
@@ -69,11 +69,7 @@ void rds_send_reset(struct rds_connection *conn)
69 * independently) but as the connection is down, there's 69 * independently) but as the connection is down, there's
70 * no ongoing RDMA to/from that memory */ 70 * no ongoing RDMA to/from that memory */
71 rds_message_unmapped(rm); 71 rds_message_unmapped(rm);
72 spin_unlock_irqrestore(&conn->c_send_lock, flags);
73
74 rds_message_put(rm); 72 rds_message_put(rm);
75 } else {
76 spin_unlock_irqrestore(&conn->c_send_lock, flags);
77 } 73 }
78 74
79 conn->c_xmit_sg = 0; 75 conn->c_xmit_sg = 0;
@@ -98,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn)
98 spin_unlock_irqrestore(&conn->c_lock, flags); 94 spin_unlock_irqrestore(&conn->c_lock, flags);
99} 95}
100 96
97static int acquire_in_xmit(struct rds_connection *conn)
98{
99 return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
100}
101
102static void release_in_xmit(struct rds_connection *conn)
103{
104 clear_bit(RDS_IN_XMIT, &conn->c_flags);
105 smp_mb__after_clear_bit();
106 /*
107 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
108 * hot path and finding waiters is very rare. We don't want to walk
109 * the system-wide hashed waitqueue buckets in the fast path only to
110 * almost never find waiters.
111 */
112 if (waitqueue_active(&conn->c_waitq))
113 wake_up_all(&conn->c_waitq);
114}
115
101/* 116/*
102 * We're making the concious trade-off here to only send one message 117 * We're making the concious trade-off here to only send one message
103 * down the connection at a time. 118 * down the connection at a time.
@@ -119,12 +134,9 @@ int rds_send_xmit(struct rds_connection *conn)
119 unsigned int tmp; 134 unsigned int tmp;
120 struct scatterlist *sg; 135 struct scatterlist *sg;
121 int ret = 0; 136 int ret = 0;
122 int gen = 0;
123 LIST_HEAD(to_be_dropped); 137 LIST_HEAD(to_be_dropped);
124 138
125restart: 139restart:
126 if (!rds_conn_up(conn))
127 goto out;
128 140
129 /* 141 /*
130 * sendmsg calls here after having queued its message on the send 142 * sendmsg calls here after having queued its message on the send
@@ -133,18 +145,25 @@ restart:
133 * avoids blocking the caller and trading per-connection data between 145 * avoids blocking the caller and trading per-connection data between
134 * caches per message. 146 * caches per message.
135 */ 147 */
136 if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) { 148 if (!acquire_in_xmit(conn)) {
137 rds_stats_inc(s_send_lock_contention); 149 rds_stats_inc(s_send_lock_contention);
138 ret = -ENOMEM; 150 ret = -ENOMEM;
139 goto out; 151 goto out;
140 } 152 }
141 atomic_inc(&conn->c_senders); 153
154 /*
155 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
156 * we do the opposite to avoid races.
157 */
158 if (!rds_conn_up(conn)) {
159 release_in_xmit(conn);
160 ret = 0;
161 goto out;
162 }
142 163
143 if (conn->c_trans->xmit_prepare) 164 if (conn->c_trans->xmit_prepare)
144 conn->c_trans->xmit_prepare(conn); 165 conn->c_trans->xmit_prepare(conn);
145 166
146 gen = atomic_inc_return(&conn->c_send_generation);
147
148 /* 167 /*
149 * spin trying to push headers and data down the connection until 168 * spin trying to push headers and data down the connection until
150 * the connection doesn't make forward progress. 169 * the connection doesn't make forward progress.
@@ -178,7 +197,7 @@ restart:
178 if (!rm) { 197 if (!rm) {
179 unsigned int len; 198 unsigned int len;
180 199
181 spin_lock(&conn->c_lock); 200 spin_lock_irqsave(&conn->c_lock, flags);
182 201
183 if (!list_empty(&conn->c_send_queue)) { 202 if (!list_empty(&conn->c_send_queue)) {
184 rm = list_entry(conn->c_send_queue.next, 203 rm = list_entry(conn->c_send_queue.next,
@@ -193,7 +212,7 @@ restart:
193 list_move_tail(&rm->m_conn_item, &conn->c_retrans); 212 list_move_tail(&rm->m_conn_item, &conn->c_retrans);
194 } 213 }
195 214
196 spin_unlock(&conn->c_lock); 215 spin_unlock_irqrestore(&conn->c_lock, flags);
197 216
198 if (!rm) 217 if (!rm)
199 break; 218 break;
@@ -207,10 +226,10 @@ restart:
207 */ 226 */
208 if (rm->rdma.op_active && 227 if (rm->rdma.op_active &&
209 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { 228 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
210 spin_lock(&conn->c_lock); 229 spin_lock_irqsave(&conn->c_lock, flags);
211 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 230 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
212 list_move(&rm->m_conn_item, &to_be_dropped); 231 list_move(&rm->m_conn_item, &to_be_dropped);
213 spin_unlock(&conn->c_lock); 232 spin_unlock_irqrestore(&conn->c_lock, flags);
214 continue; 233 continue;
215 } 234 }
216 235
@@ -336,19 +355,7 @@ restart:
336 if (conn->c_trans->xmit_complete) 355 if (conn->c_trans->xmit_complete)
337 conn->c_trans->xmit_complete(conn); 356 conn->c_trans->xmit_complete(conn);
338 357
339 /* 358 release_in_xmit(conn);
340 * We might be racing with another sender who queued a message but
341 * backed off on noticing that we held the c_send_lock. If we check
342 * for queued messages after dropping the sem then either we'll
343 * see the queued message or the queuer will get the sem. If we
344 * notice the queued message then we trigger an immediate retry.
345 *
346 * We need to be careful only to do this when we stopped processing
347 * the send queue because it was empty. It's the only way we
348 * stop processing the loop when the transport hasn't taken
349 * responsibility for forward progress.
350 */
351 spin_unlock_irqrestore(&conn->c_send_lock, flags);
352 359
353 /* Nuke any messages we decided not to retransmit. */ 360 /* Nuke any messages we decided not to retransmit. */
354 if (!list_empty(&to_be_dropped)) { 361 if (!list_empty(&to_be_dropped)) {
@@ -358,13 +365,12 @@ restart:
358 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 365 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
359 } 366 }
360 367
361 atomic_dec(&conn->c_senders);
362
363 /* 368 /*
364 * Other senders will see we have c_send_lock and exit. We 369 * Other senders can queue a message after we last test the send queue
365 * need to recheck the send queue and race again for c_send_lock 370 * but before we clear RDS_IN_XMIT. In that case they'd back off and
366 * to make sure messages don't just sit on the send queue, if 371 * not try and send their newly queued message. We need to check the
367 * somebody hasn't already beat us into the loop. 372 * send queue after having cleared RDS_IN_XMIT so that their message
373 * doesn't get stuck on the send queue.
368 * 374 *
369 * If the transport cannot continue (i.e ret != 0), then it must 375 * If the transport cannot continue (i.e ret != 0), then it must
370 * call us when more room is available, such as from the tx 376 * call us when more room is available, such as from the tx
@@ -374,9 +380,7 @@ restart:
374 smp_mb(); 380 smp_mb();
375 if (!list_empty(&conn->c_send_queue)) { 381 if (!list_empty(&conn->c_send_queue)) {
376 rds_stats_inc(s_send_lock_queue_raced); 382 rds_stats_inc(s_send_lock_queue_raced);
377 if (gen == atomic_read(&conn->c_send_generation)) { 383 goto restart;
378 goto restart;
379 }
380 } 384 }
381 } 385 }
382out: 386out: