aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds
diff options
context:
space:
mode:
authorZach Brown <zach.brown@oracle.com>2010-06-04 17:41:41 -0400
committerAndy Grover <andy.grover@oracle.com>2010-09-08 21:15:27 -0400
commit0f4b1c7e89e699f588807a914ec6e6396c851a72 (patch)
tree8b882f85f03089283f6d222bf8c7d5616a102ecd /net/rds
parent501dcccdb7a2335cde07d4acb56e636182d62944 (diff)
rds: fix rds_send_xmit() serialization
rds_send_xmit() was changed to hold an interrupt masking spinlock instead of a mutex so that it could be called from the IB receive tasklet path. This broke the TCP transport because its xmit method can block and masks and unmasks interrupts. This patch serializes callers to rds_send_xmit() with a simple bit instead of the current spinlock or previous mutex. This enables rds_send_xmit() to be called from any context and to call functions which block. Getting rid of the c_send_lock exposes the bare c_lock acquisitions which are changed to block interrupts. A waitqueue is added so that rds_conn_shutdown() can wait for callers to leave rds_send_xmit() before tearing down partial send state. This lets us get rid of c_senders. rds_send_xmit() is changed to check the conn state after acquiring the RDS_IN_XMIT bit to resolve races with the shutdown path. Previously both worked with the conn state and then the lock in the same order, allowing them to race and execute the paths concurrently. rds_send_reset() isn't racing with rds_send_xmit() now that rds_conn_shutdown() properly ensures that rds_send_xmit() can't start once the conn state has been changed. We can remove its previous use of the spinlock. Finally, c_send_generation is redundant. Callers can race to test the c_flags bit by simply retrying instead of racing to test the c_send_generation atomic. Signed-off-by: Zach Brown <zach.brown@oracle.com>
Diffstat (limited to 'net/rds')
-rw-r--r--net/rds/connection.c19
-rw-r--r--net/rds/ib_send.c2
-rw-r--r--net/rds/rds.h5
-rw-r--r--net/rds/send.c82
-rw-r--r--net/rds/threads.c2
5 files changed, 52 insertions, 58 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 5bb0eec5ada3..89871db77f8f 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -148,9 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
148 spin_lock_init(&conn->c_lock); 148 spin_lock_init(&conn->c_lock);
149 conn->c_next_tx_seq = 1; 149 conn->c_next_tx_seq = 1;
150 150
151 spin_lock_init(&conn->c_send_lock); 151 init_waitqueue_head(&conn->c_waitq);
152 atomic_set(&conn->c_send_generation, 1);
153 atomic_set(&conn->c_senders, 0);
154 INIT_LIST_HEAD(&conn->c_send_queue); 152 INIT_LIST_HEAD(&conn->c_send_queue);
155 INIT_LIST_HEAD(&conn->c_retrans); 153 INIT_LIST_HEAD(&conn->c_retrans);
156 154
@@ -275,15 +273,8 @@ void rds_conn_shutdown(struct rds_connection *conn)
275 } 273 }
276 mutex_unlock(&conn->c_cm_lock); 274 mutex_unlock(&conn->c_cm_lock);
277 275
278 /* verify everybody's out of rds_send_xmit() */ 276 wait_event(conn->c_waitq,
279 spin_lock_irq(&conn->c_send_lock); 277 !test_bit(RDS_IN_XMIT, &conn->c_flags));
280 spin_unlock_irq(&conn->c_send_lock);
281
282 while(atomic_read(&conn->c_senders)) {
283 schedule_timeout(1);
284 spin_lock_irq(&conn->c_send_lock);
285 spin_unlock_irq(&conn->c_send_lock);
286 }
287 278
288 conn->c_trans->conn_shutdown(conn); 279 conn->c_trans->conn_shutdown(conn);
289 rds_conn_reset(conn); 280 rds_conn_reset(conn);
@@ -477,8 +468,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
477 sizeof(cinfo->transport)); 468 sizeof(cinfo->transport));
478 cinfo->flags = 0; 469 cinfo->flags = 0;
479 470
480 rds_conn_info_set(cinfo->flags, 471 rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
481 spin_is_locked(&conn->c_send_lock), SENDING); 472 SENDING);
482 /* XXX Future: return the state rather than these funky bits */ 473 /* XXX Future: return the state rather than these funky bits */
483 rds_conn_info_set(cinfo->flags, 474 rds_conn_info_set(cinfo->flags,
484 atomic_read(&conn->c_state) == RDS_CONN_CONNECTING, 475 atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 3f91e794eae9..e88cb4af009b 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -321,7 +321,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
321 * credits (see rds_ib_send_add_credits below). 321 * credits (see rds_ib_send_add_credits below).
322 * 322 *
323 * The RDS send code is essentially single-threaded; rds_send_xmit 323 * The RDS send code is essentially single-threaded; rds_send_xmit
324 * grabs c_send_lock to ensure exclusive access to the send ring. 324 * sets RDS_IN_XMIT to ensure exclusive access to the send ring.
325 * However, the ACK sending code is independent and can race with 325 * However, the ACK sending code is independent and can race with
326 * message SENDs. 326 * message SENDs.
327 * 327 *
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 270ded76fd53..4510344ce8ca 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -80,6 +80,7 @@ enum {
80/* Bits for c_flags */ 80/* Bits for c_flags */
81#define RDS_LL_SEND_FULL 0 81#define RDS_LL_SEND_FULL 0
82#define RDS_RECONNECT_PENDING 1 82#define RDS_RECONNECT_PENDING 1
83#define RDS_IN_XMIT 2
83 84
84struct rds_connection { 85struct rds_connection {
85 struct hlist_node c_hash_node; 86 struct hlist_node c_hash_node;
@@ -91,9 +92,6 @@ struct rds_connection {
91 struct rds_cong_map *c_lcong; 92 struct rds_cong_map *c_lcong;
92 struct rds_cong_map *c_fcong; 93 struct rds_cong_map *c_fcong;
93 94
94 spinlock_t c_send_lock; /* protect send ring */
95 atomic_t c_send_generation;
96 atomic_t c_senders;
97 struct rds_message *c_xmit_rm; 95 struct rds_message *c_xmit_rm;
98 unsigned long c_xmit_sg; 96 unsigned long c_xmit_sg;
99 unsigned int c_xmit_hdr_off; 97 unsigned int c_xmit_hdr_off;
@@ -120,6 +118,7 @@ struct rds_connection {
120 struct delayed_work c_conn_w; 118 struct delayed_work c_conn_w;
121 struct work_struct c_down_w; 119 struct work_struct c_down_w;
122 struct mutex c_cm_lock; /* protect conn state & cm */ 120 struct mutex c_cm_lock; /* protect conn state & cm */
121 wait_queue_head_t c_waitq;
123 122
124 struct list_head c_map_item; 123 struct list_head c_map_item;
125 unsigned long c_map_queued; 124 unsigned long c_map_queued;
diff --git a/net/rds/send.c b/net/rds/send.c
index b9e41afef323..81471b25373b 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -53,14 +53,14 @@ module_param(send_batch_count, int, 0444);
53MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 53MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
54 54
55/* 55/*
56 * Reset the send state. Caller must hold c_send_lock when calling here. 56 * Reset the send state. Callers must ensure that this doesn't race with
57 * rds_send_xmit().
57 */ 58 */
58void rds_send_reset(struct rds_connection *conn) 59void rds_send_reset(struct rds_connection *conn)
59{ 60{
60 struct rds_message *rm, *tmp; 61 struct rds_message *rm, *tmp;
61 unsigned long flags; 62 unsigned long flags;
62 63
63 spin_lock_irqsave(&conn->c_send_lock, flags);
64 if (conn->c_xmit_rm) { 64 if (conn->c_xmit_rm) {
65 rm = conn->c_xmit_rm; 65 rm = conn->c_xmit_rm;
66 conn->c_xmit_rm = NULL; 66 conn->c_xmit_rm = NULL;
@@ -69,11 +69,7 @@ void rds_send_reset(struct rds_connection *conn)
69 * independently) but as the connection is down, there's 69 * independently) but as the connection is down, there's
70 * no ongoing RDMA to/from that memory */ 70 * no ongoing RDMA to/from that memory */
71 rds_message_unmapped(rm); 71 rds_message_unmapped(rm);
72 spin_unlock_irqrestore(&conn->c_send_lock, flags);
73
74 rds_message_put(rm); 72 rds_message_put(rm);
75 } else {
76 spin_unlock_irqrestore(&conn->c_send_lock, flags);
77 } 73 }
78 74
79 conn->c_xmit_sg = 0; 75 conn->c_xmit_sg = 0;
@@ -98,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn)
98 spin_unlock_irqrestore(&conn->c_lock, flags); 94 spin_unlock_irqrestore(&conn->c_lock, flags);
99} 95}
100 96
97static int acquire_in_xmit(struct rds_connection *conn)
98{
99 return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
100}
101
102static void release_in_xmit(struct rds_connection *conn)
103{
104 clear_bit(RDS_IN_XMIT, &conn->c_flags);
105 smp_mb__after_clear_bit();
106 /*
107 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
108 * hot path and finding waiters is very rare. We don't want to walk
109 * the system-wide hashed waitqueue buckets in the fast path only to
110 * almost never find waiters.
111 */
112 if (waitqueue_active(&conn->c_waitq))
113 wake_up_all(&conn->c_waitq);
114}
115
101/* 116/*
102 * We're making the concious trade-off here to only send one message 117 * We're making the concious trade-off here to only send one message
103 * down the connection at a time. 118 * down the connection at a time.
@@ -119,12 +134,9 @@ int rds_send_xmit(struct rds_connection *conn)
119 unsigned int tmp; 134 unsigned int tmp;
120 struct scatterlist *sg; 135 struct scatterlist *sg;
121 int ret = 0; 136 int ret = 0;
122 int gen = 0;
123 LIST_HEAD(to_be_dropped); 137 LIST_HEAD(to_be_dropped);
124 138
125restart: 139restart:
126 if (!rds_conn_up(conn))
127 goto out;
128 140
129 /* 141 /*
130 * sendmsg calls here after having queued its message on the send 142 * sendmsg calls here after having queued its message on the send
@@ -133,18 +145,25 @@ restart:
133 * avoids blocking the caller and trading per-connection data between 145 * avoids blocking the caller and trading per-connection data between
134 * caches per message. 146 * caches per message.
135 */ 147 */
136 if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) { 148 if (!acquire_in_xmit(conn)) {
137 rds_stats_inc(s_send_lock_contention); 149 rds_stats_inc(s_send_lock_contention);
138 ret = -ENOMEM; 150 ret = -ENOMEM;
139 goto out; 151 goto out;
140 } 152 }
141 atomic_inc(&conn->c_senders); 153
154 /*
155 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
156 * we do the opposite to avoid races.
157 */
158 if (!rds_conn_up(conn)) {
159 release_in_xmit(conn);
160 ret = 0;
161 goto out;
162 }
142 163
143 if (conn->c_trans->xmit_prepare) 164 if (conn->c_trans->xmit_prepare)
144 conn->c_trans->xmit_prepare(conn); 165 conn->c_trans->xmit_prepare(conn);
145 166
146 gen = atomic_inc_return(&conn->c_send_generation);
147
148 /* 167 /*
149 * spin trying to push headers and data down the connection until 168 * spin trying to push headers and data down the connection until
150 * the connection doesn't make forward progress. 169 * the connection doesn't make forward progress.
@@ -178,7 +197,7 @@ restart:
178 if (!rm) { 197 if (!rm) {
179 unsigned int len; 198 unsigned int len;
180 199
181 spin_lock(&conn->c_lock); 200 spin_lock_irqsave(&conn->c_lock, flags);
182 201
183 if (!list_empty(&conn->c_send_queue)) { 202 if (!list_empty(&conn->c_send_queue)) {
184 rm = list_entry(conn->c_send_queue.next, 203 rm = list_entry(conn->c_send_queue.next,
@@ -193,7 +212,7 @@ restart:
193 list_move_tail(&rm->m_conn_item, &conn->c_retrans); 212 list_move_tail(&rm->m_conn_item, &conn->c_retrans);
194 } 213 }
195 214
196 spin_unlock(&conn->c_lock); 215 spin_unlock_irqrestore(&conn->c_lock, flags);
197 216
198 if (!rm) 217 if (!rm)
199 break; 218 break;
@@ -207,10 +226,10 @@ restart:
207 */ 226 */
208 if (rm->rdma.op_active && 227 if (rm->rdma.op_active &&
209 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { 228 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
210 spin_lock(&conn->c_lock); 229 spin_lock_irqsave(&conn->c_lock, flags);
211 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 230 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
212 list_move(&rm->m_conn_item, &to_be_dropped); 231 list_move(&rm->m_conn_item, &to_be_dropped);
213 spin_unlock(&conn->c_lock); 232 spin_unlock_irqrestore(&conn->c_lock, flags);
214 continue; 233 continue;
215 } 234 }
216 235
@@ -336,19 +355,7 @@ restart:
336 if (conn->c_trans->xmit_complete) 355 if (conn->c_trans->xmit_complete)
337 conn->c_trans->xmit_complete(conn); 356 conn->c_trans->xmit_complete(conn);
338 357
339 /* 358 release_in_xmit(conn);
340 * We might be racing with another sender who queued a message but
341 * backed off on noticing that we held the c_send_lock. If we check
342 * for queued messages after dropping the sem then either we'll
343 * see the queued message or the queuer will get the sem. If we
344 * notice the queued message then we trigger an immediate retry.
345 *
346 * We need to be careful only to do this when we stopped processing
347 * the send queue because it was empty. It's the only way we
348 * stop processing the loop when the transport hasn't taken
349 * responsibility for forward progress.
350 */
351 spin_unlock_irqrestore(&conn->c_send_lock, flags);
352 359
353 /* Nuke any messages we decided not to retransmit. */ 360 /* Nuke any messages we decided not to retransmit. */
354 if (!list_empty(&to_be_dropped)) { 361 if (!list_empty(&to_be_dropped)) {
@@ -358,13 +365,12 @@ restart:
358 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 365 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
359 } 366 }
360 367
361 atomic_dec(&conn->c_senders);
362
363 /* 368 /*
364 * Other senders will see we have c_send_lock and exit. We 369 * Other senders can queue a message after we last test the send queue
365 * need to recheck the send queue and race again for c_send_lock 370 * but before we clear RDS_IN_XMIT. In that case they'd back off and
366 * to make sure messages don't just sit on the send queue, if 371 * not try and send their newly queued message. We need to check the
367 * somebody hasn't already beat us into the loop. 372 * send queue after having cleared RDS_IN_XMIT so that their message
373 * doesn't get stuck on the send queue.
368 * 374 *
369 * If the transport cannot continue (i.e ret != 0), then it must 375 * If the transport cannot continue (i.e ret != 0), then it must
370 * call us when more room is available, such as from the tx 376 * call us when more room is available, such as from the tx
@@ -374,9 +380,7 @@ restart:
374 smp_mb(); 380 smp_mb();
375 if (!list_empty(&conn->c_send_queue)) { 381 if (!list_empty(&conn->c_send_queue)) {
376 rds_stats_inc(s_send_lock_queue_raced); 382 rds_stats_inc(s_send_lock_queue_raced);
377 if (gen == atomic_read(&conn->c_send_generation)) { 383 goto restart;
378 goto restart;
379 }
380 } 384 }
381 } 385 }
382out: 386out:
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 7a8ca7a1d983..2bab9bf07b91 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -61,7 +61,7 @@
61 * 61 *
62 * Transition to state DISCONNECTING/DOWN: 62 * Transition to state DISCONNECTING/DOWN:
63 * - Inside the shutdown worker; synchronizes with xmit path 63 * - Inside the shutdown worker; synchronizes with xmit path
64 * through c_send_lock, and with connection management callbacks 64 * through RDS_IN_XMIT, and with connection management callbacks
65 * via c_cm_lock. 65 * via c_cm_lock.
66 * 66 *
67 * For receive callbacks, we rely on the underlying transport 67 * For receive callbacks, we rely on the underlying transport