diff options
-rw-r--r-- | net/rds/connection.c | 19 | ||||
-rw-r--r-- | net/rds/ib_send.c | 2 | ||||
-rw-r--r-- | net/rds/rds.h | 5 | ||||
-rw-r--r-- | net/rds/send.c | 82 | ||||
-rw-r--r-- | net/rds/threads.c | 2 |
5 files changed, 52 insertions, 58 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c index 5bb0eec5ada3..89871db77f8f 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c | |||
@@ -148,9 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, | |||
148 | spin_lock_init(&conn->c_lock); | 148 | spin_lock_init(&conn->c_lock); |
149 | conn->c_next_tx_seq = 1; | 149 | conn->c_next_tx_seq = 1; |
150 | 150 | ||
151 | spin_lock_init(&conn->c_send_lock); | 151 | init_waitqueue_head(&conn->c_waitq); |
152 | atomic_set(&conn->c_send_generation, 1); | ||
153 | atomic_set(&conn->c_senders, 0); | ||
154 | INIT_LIST_HEAD(&conn->c_send_queue); | 152 | INIT_LIST_HEAD(&conn->c_send_queue); |
155 | INIT_LIST_HEAD(&conn->c_retrans); | 153 | INIT_LIST_HEAD(&conn->c_retrans); |
156 | 154 | ||
@@ -275,15 +273,8 @@ void rds_conn_shutdown(struct rds_connection *conn) | |||
275 | } | 273 | } |
276 | mutex_unlock(&conn->c_cm_lock); | 274 | mutex_unlock(&conn->c_cm_lock); |
277 | 275 | ||
278 | /* verify everybody's out of rds_send_xmit() */ | 276 | wait_event(conn->c_waitq, |
279 | spin_lock_irq(&conn->c_send_lock); | 277 | !test_bit(RDS_IN_XMIT, &conn->c_flags)); |
280 | spin_unlock_irq(&conn->c_send_lock); | ||
281 | |||
282 | while(atomic_read(&conn->c_senders)) { | ||
283 | schedule_timeout(1); | ||
284 | spin_lock_irq(&conn->c_send_lock); | ||
285 | spin_unlock_irq(&conn->c_send_lock); | ||
286 | } | ||
287 | 278 | ||
288 | conn->c_trans->conn_shutdown(conn); | 279 | conn->c_trans->conn_shutdown(conn); |
289 | rds_conn_reset(conn); | 280 | rds_conn_reset(conn); |
@@ -477,8 +468,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn, | |||
477 | sizeof(cinfo->transport)); | 468 | sizeof(cinfo->transport)); |
478 | cinfo->flags = 0; | 469 | cinfo->flags = 0; |
479 | 470 | ||
480 | rds_conn_info_set(cinfo->flags, | 471 | rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags), |
481 | spin_is_locked(&conn->c_send_lock), SENDING); | 472 | SENDING); |
482 | /* XXX Future: return the state rather than these funky bits */ | 473 | /* XXX Future: return the state rather than these funky bits */ |
483 | rds_conn_info_set(cinfo->flags, | 474 | rds_conn_info_set(cinfo->flags, |
484 | atomic_read(&conn->c_state) == RDS_CONN_CONNECTING, | 475 | atomic_read(&conn->c_state) == RDS_CONN_CONNECTING, |
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index 3f91e794eae9..e88cb4af009b 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c | |||
@@ -321,7 +321,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) | |||
321 | * credits (see rds_ib_send_add_credits below). | 321 | * credits (see rds_ib_send_add_credits below). |
322 | * | 322 | * |
323 | * The RDS send code is essentially single-threaded; rds_send_xmit | 323 | * The RDS send code is essentially single-threaded; rds_send_xmit |
324 | * grabs c_send_lock to ensure exclusive access to the send ring. | 324 | * sets RDS_IN_XMIT to ensure exclusive access to the send ring. |
325 | * However, the ACK sending code is independent and can race with | 325 | * However, the ACK sending code is independent and can race with |
326 | * message SENDs. | 326 | * message SENDs. |
327 | * | 327 | * |
diff --git a/net/rds/rds.h b/net/rds/rds.h index 270ded76fd53..4510344ce8ca 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h | |||
@@ -80,6 +80,7 @@ enum { | |||
80 | /* Bits for c_flags */ | 80 | /* Bits for c_flags */ |
81 | #define RDS_LL_SEND_FULL 0 | 81 | #define RDS_LL_SEND_FULL 0 |
82 | #define RDS_RECONNECT_PENDING 1 | 82 | #define RDS_RECONNECT_PENDING 1 |
83 | #define RDS_IN_XMIT 2 | ||
83 | 84 | ||
84 | struct rds_connection { | 85 | struct rds_connection { |
85 | struct hlist_node c_hash_node; | 86 | struct hlist_node c_hash_node; |
@@ -91,9 +92,6 @@ struct rds_connection { | |||
91 | struct rds_cong_map *c_lcong; | 92 | struct rds_cong_map *c_lcong; |
92 | struct rds_cong_map *c_fcong; | 93 | struct rds_cong_map *c_fcong; |
93 | 94 | ||
94 | spinlock_t c_send_lock; /* protect send ring */ | ||
95 | atomic_t c_send_generation; | ||
96 | atomic_t c_senders; | ||
97 | struct rds_message *c_xmit_rm; | 95 | struct rds_message *c_xmit_rm; |
98 | unsigned long c_xmit_sg; | 96 | unsigned long c_xmit_sg; |
99 | unsigned int c_xmit_hdr_off; | 97 | unsigned int c_xmit_hdr_off; |
@@ -120,6 +118,7 @@ struct rds_connection { | |||
120 | struct delayed_work c_conn_w; | 118 | struct delayed_work c_conn_w; |
121 | struct work_struct c_down_w; | 119 | struct work_struct c_down_w; |
122 | struct mutex c_cm_lock; /* protect conn state & cm */ | 120 | struct mutex c_cm_lock; /* protect conn state & cm */ |
121 | wait_queue_head_t c_waitq; | ||
123 | 122 | ||
124 | struct list_head c_map_item; | 123 | struct list_head c_map_item; |
125 | unsigned long c_map_queued; | 124 | unsigned long c_map_queued; |
diff --git a/net/rds/send.c b/net/rds/send.c index b9e41afef323..81471b25373b 100644 --- a/net/rds/send.c +++ b/net/rds/send.c | |||
@@ -53,14 +53,14 @@ module_param(send_batch_count, int, 0444); | |||
53 | MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); | 53 | MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); |
54 | 54 | ||
55 | /* | 55 | /* |
56 | * Reset the send state. Caller must hold c_send_lock when calling here. | 56 | * Reset the send state. Callers must ensure that this doesn't race with |
57 | * rds_send_xmit(). | ||
57 | */ | 58 | */ |
58 | void rds_send_reset(struct rds_connection *conn) | 59 | void rds_send_reset(struct rds_connection *conn) |
59 | { | 60 | { |
60 | struct rds_message *rm, *tmp; | 61 | struct rds_message *rm, *tmp; |
61 | unsigned long flags; | 62 | unsigned long flags; |
62 | 63 | ||
63 | spin_lock_irqsave(&conn->c_send_lock, flags); | ||
64 | if (conn->c_xmit_rm) { | 64 | if (conn->c_xmit_rm) { |
65 | rm = conn->c_xmit_rm; | 65 | rm = conn->c_xmit_rm; |
66 | conn->c_xmit_rm = NULL; | 66 | conn->c_xmit_rm = NULL; |
@@ -69,11 +69,7 @@ void rds_send_reset(struct rds_connection *conn) | |||
69 | * independently) but as the connection is down, there's | 69 | * independently) but as the connection is down, there's |
70 | * no ongoing RDMA to/from that memory */ | 70 | * no ongoing RDMA to/from that memory */ |
71 | rds_message_unmapped(rm); | 71 | rds_message_unmapped(rm); |
72 | spin_unlock_irqrestore(&conn->c_send_lock, flags); | ||
73 | |||
74 | rds_message_put(rm); | 72 | rds_message_put(rm); |
75 | } else { | ||
76 | spin_unlock_irqrestore(&conn->c_send_lock, flags); | ||
77 | } | 73 | } |
78 | 74 | ||
79 | conn->c_xmit_sg = 0; | 75 | conn->c_xmit_sg = 0; |
@@ -98,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn) | |||
98 | spin_unlock_irqrestore(&conn->c_lock, flags); | 94 | spin_unlock_irqrestore(&conn->c_lock, flags); |
99 | } | 95 | } |
100 | 96 | ||
97 | static int acquire_in_xmit(struct rds_connection *conn) | ||
98 | { | ||
99 | return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0; | ||
100 | } | ||
101 | |||
102 | static void release_in_xmit(struct rds_connection *conn) | ||
103 | { | ||
104 | clear_bit(RDS_IN_XMIT, &conn->c_flags); | ||
105 | smp_mb__after_clear_bit(); | ||
106 | /* | ||
107 | * We don't use wait_on_bit()/wake_up_bit() because our waking is in a | ||
108 | * hot path and finding waiters is very rare. We don't want to walk | ||
109 | * the system-wide hashed waitqueue buckets in the fast path only to | ||
110 | * almost never find waiters. | ||
111 | */ | ||
112 | if (waitqueue_active(&conn->c_waitq)) | ||
113 | wake_up_all(&conn->c_waitq); | ||
114 | } | ||
115 | |||
101 | /* | 116 | /* |
102 | * We're making the concious trade-off here to only send one message | 117 | * We're making the concious trade-off here to only send one message |
103 | * down the connection at a time. | 118 | * down the connection at a time. |
@@ -119,12 +134,9 @@ int rds_send_xmit(struct rds_connection *conn) | |||
119 | unsigned int tmp; | 134 | unsigned int tmp; |
120 | struct scatterlist *sg; | 135 | struct scatterlist *sg; |
121 | int ret = 0; | 136 | int ret = 0; |
122 | int gen = 0; | ||
123 | LIST_HEAD(to_be_dropped); | 137 | LIST_HEAD(to_be_dropped); |
124 | 138 | ||
125 | restart: | 139 | restart: |
126 | if (!rds_conn_up(conn)) | ||
127 | goto out; | ||
128 | 140 | ||
129 | /* | 141 | /* |
130 | * sendmsg calls here after having queued its message on the send | 142 | * sendmsg calls here after having queued its message on the send |
@@ -133,18 +145,25 @@ restart: | |||
133 | * avoids blocking the caller and trading per-connection data between | 145 | * avoids blocking the caller and trading per-connection data between |
134 | * caches per message. | 146 | * caches per message. |
135 | */ | 147 | */ |
136 | if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) { | 148 | if (!acquire_in_xmit(conn)) { |
137 | rds_stats_inc(s_send_lock_contention); | 149 | rds_stats_inc(s_send_lock_contention); |
138 | ret = -ENOMEM; | 150 | ret = -ENOMEM; |
139 | goto out; | 151 | goto out; |
140 | } | 152 | } |
141 | atomic_inc(&conn->c_senders); | 153 | |
154 | /* | ||
155 | * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, | ||
156 | * we do the opposite to avoid races. | ||
157 | */ | ||
158 | if (!rds_conn_up(conn)) { | ||
159 | release_in_xmit(conn); | ||
160 | ret = 0; | ||
161 | goto out; | ||
162 | } | ||
142 | 163 | ||
143 | if (conn->c_trans->xmit_prepare) | 164 | if (conn->c_trans->xmit_prepare) |
144 | conn->c_trans->xmit_prepare(conn); | 165 | conn->c_trans->xmit_prepare(conn); |
145 | 166 | ||
146 | gen = atomic_inc_return(&conn->c_send_generation); | ||
147 | |||
148 | /* | 167 | /* |
149 | * spin trying to push headers and data down the connection until | 168 | * spin trying to push headers and data down the connection until |
150 | * the connection doesn't make forward progress. | 169 | * the connection doesn't make forward progress. |
@@ -178,7 +197,7 @@ restart: | |||
178 | if (!rm) { | 197 | if (!rm) { |
179 | unsigned int len; | 198 | unsigned int len; |
180 | 199 | ||
181 | spin_lock(&conn->c_lock); | 200 | spin_lock_irqsave(&conn->c_lock, flags); |
182 | 201 | ||
183 | if (!list_empty(&conn->c_send_queue)) { | 202 | if (!list_empty(&conn->c_send_queue)) { |
184 | rm = list_entry(conn->c_send_queue.next, | 203 | rm = list_entry(conn->c_send_queue.next, |
@@ -193,7 +212,7 @@ restart: | |||
193 | list_move_tail(&rm->m_conn_item, &conn->c_retrans); | 212 | list_move_tail(&rm->m_conn_item, &conn->c_retrans); |
194 | } | 213 | } |
195 | 214 | ||
196 | spin_unlock(&conn->c_lock); | 215 | spin_unlock_irqrestore(&conn->c_lock, flags); |
197 | 216 | ||
198 | if (!rm) | 217 | if (!rm) |
199 | break; | 218 | break; |
@@ -207,10 +226,10 @@ restart: | |||
207 | */ | 226 | */ |
208 | if (rm->rdma.op_active && | 227 | if (rm->rdma.op_active && |
209 | test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { | 228 | test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { |
210 | spin_lock(&conn->c_lock); | 229 | spin_lock_irqsave(&conn->c_lock, flags); |
211 | if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) | 230 | if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) |
212 | list_move(&rm->m_conn_item, &to_be_dropped); | 231 | list_move(&rm->m_conn_item, &to_be_dropped); |
213 | spin_unlock(&conn->c_lock); | 232 | spin_unlock_irqrestore(&conn->c_lock, flags); |
214 | continue; | 233 | continue; |
215 | } | 234 | } |
216 | 235 | ||
@@ -336,19 +355,7 @@ restart: | |||
336 | if (conn->c_trans->xmit_complete) | 355 | if (conn->c_trans->xmit_complete) |
337 | conn->c_trans->xmit_complete(conn); | 356 | conn->c_trans->xmit_complete(conn); |
338 | 357 | ||
339 | /* | 358 | release_in_xmit(conn); |
340 | * We might be racing with another sender who queued a message but | ||
341 | * backed off on noticing that we held the c_send_lock. If we check | ||
342 | * for queued messages after dropping the sem then either we'll | ||
343 | * see the queued message or the queuer will get the sem. If we | ||
344 | * notice the queued message then we trigger an immediate retry. | ||
345 | * | ||
346 | * We need to be careful only to do this when we stopped processing | ||
347 | * the send queue because it was empty. It's the only way we | ||
348 | * stop processing the loop when the transport hasn't taken | ||
349 | * responsibility for forward progress. | ||
350 | */ | ||
351 | spin_unlock_irqrestore(&conn->c_send_lock, flags); | ||
352 | 359 | ||
353 | /* Nuke any messages we decided not to retransmit. */ | 360 | /* Nuke any messages we decided not to retransmit. */ |
354 | if (!list_empty(&to_be_dropped)) { | 361 | if (!list_empty(&to_be_dropped)) { |
@@ -358,13 +365,12 @@ restart: | |||
358 | rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); | 365 | rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); |
359 | } | 366 | } |
360 | 367 | ||
361 | atomic_dec(&conn->c_senders); | ||
362 | |||
363 | /* | 368 | /* |
364 | * Other senders will see we have c_send_lock and exit. We | 369 | * Other senders can queue a message after we last test the send queue |
365 | * need to recheck the send queue and race again for c_send_lock | 370 | * but before we clear RDS_IN_XMIT. In that case they'd back off and |
366 | * to make sure messages don't just sit on the send queue, if | 371 | * not try and send their newly queued message. We need to check the |
367 | * somebody hasn't already beat us into the loop. | 372 | * send queue after having cleared RDS_IN_XMIT so that their message |
373 | * doesn't get stuck on the send queue. | ||
368 | * | 374 | * |
369 | * If the transport cannot continue (i.e ret != 0), then it must | 375 | * If the transport cannot continue (i.e ret != 0), then it must |
370 | * call us when more room is available, such as from the tx | 376 | * call us when more room is available, such as from the tx |
@@ -374,9 +380,7 @@ restart: | |||
374 | smp_mb(); | 380 | smp_mb(); |
375 | if (!list_empty(&conn->c_send_queue)) { | 381 | if (!list_empty(&conn->c_send_queue)) { |
376 | rds_stats_inc(s_send_lock_queue_raced); | 382 | rds_stats_inc(s_send_lock_queue_raced); |
377 | if (gen == atomic_read(&conn->c_send_generation)) { | 383 | goto restart; |
378 | goto restart; | ||
379 | } | ||
380 | } | 384 | } |
381 | } | 385 | } |
382 | out: | 386 | out: |
diff --git a/net/rds/threads.c b/net/rds/threads.c index 7a8ca7a1d983..2bab9bf07b91 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c | |||
@@ -61,7 +61,7 @@ | |||
61 | * | 61 | * |
62 | * Transition to state DISCONNECTING/DOWN: | 62 | * Transition to state DISCONNECTING/DOWN: |
63 | * - Inside the shutdown worker; synchronizes with xmit path | 63 | * - Inside the shutdown worker; synchronizes with xmit path |
64 | * through c_send_lock, and with connection management callbacks | 64 | * through RDS_IN_XMIT, and with connection management callbacks |
65 | * via c_cm_lock. | 65 | * via c_cm_lock. |
66 | * | 66 | * |
67 | * For receive callbacks, we rely on the underlying transport | 67 | * For receive callbacks, we rely on the underlying transport |