aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/rds/connection.c19
-rw-r--r--net/rds/ib_send.c2
-rw-r--r--net/rds/rds.h5
-rw-r--r--net/rds/send.c82
-rw-r--r--net/rds/threads.c2
5 files changed, 52 insertions, 58 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 5bb0eec5ada3..89871db77f8f 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -148,9 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
148 spin_lock_init(&conn->c_lock); 148 spin_lock_init(&conn->c_lock);
149 conn->c_next_tx_seq = 1; 149 conn->c_next_tx_seq = 1;
150 150
151 spin_lock_init(&conn->c_send_lock); 151 init_waitqueue_head(&conn->c_waitq);
152 atomic_set(&conn->c_send_generation, 1);
153 atomic_set(&conn->c_senders, 0);
154 INIT_LIST_HEAD(&conn->c_send_queue); 152 INIT_LIST_HEAD(&conn->c_send_queue);
155 INIT_LIST_HEAD(&conn->c_retrans); 153 INIT_LIST_HEAD(&conn->c_retrans);
156 154
@@ -275,15 +273,8 @@ void rds_conn_shutdown(struct rds_connection *conn)
275 } 273 }
276 mutex_unlock(&conn->c_cm_lock); 274 mutex_unlock(&conn->c_cm_lock);
277 275
278 /* verify everybody's out of rds_send_xmit() */ 276 wait_event(conn->c_waitq,
279 spin_lock_irq(&conn->c_send_lock); 277 !test_bit(RDS_IN_XMIT, &conn->c_flags));
280 spin_unlock_irq(&conn->c_send_lock);
281
282 while(atomic_read(&conn->c_senders)) {
283 schedule_timeout(1);
284 spin_lock_irq(&conn->c_send_lock);
285 spin_unlock_irq(&conn->c_send_lock);
286 }
287 278
288 conn->c_trans->conn_shutdown(conn); 279 conn->c_trans->conn_shutdown(conn);
289 rds_conn_reset(conn); 280 rds_conn_reset(conn);
@@ -477,8 +468,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
477 sizeof(cinfo->transport)); 468 sizeof(cinfo->transport));
478 cinfo->flags = 0; 469 cinfo->flags = 0;
479 470
480 rds_conn_info_set(cinfo->flags, 471 rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
481 spin_is_locked(&conn->c_send_lock), SENDING); 472 SENDING);
482 /* XXX Future: return the state rather than these funky bits */ 473 /* XXX Future: return the state rather than these funky bits */
483 rds_conn_info_set(cinfo->flags, 474 rds_conn_info_set(cinfo->flags,
484 atomic_read(&conn->c_state) == RDS_CONN_CONNECTING, 475 atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 3f91e794eae9..e88cb4af009b 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -321,7 +321,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
321 * credits (see rds_ib_send_add_credits below). 321 * credits (see rds_ib_send_add_credits below).
322 * 322 *
323 * The RDS send code is essentially single-threaded; rds_send_xmit 323 * The RDS send code is essentially single-threaded; rds_send_xmit
324 * grabs c_send_lock to ensure exclusive access to the send ring. 324 * sets RDS_IN_XMIT to ensure exclusive access to the send ring.
325 * However, the ACK sending code is independent and can race with 325 * However, the ACK sending code is independent and can race with
326 * message SENDs. 326 * message SENDs.
327 * 327 *
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 270ded76fd53..4510344ce8ca 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -80,6 +80,7 @@ enum {
80/* Bits for c_flags */ 80/* Bits for c_flags */
81#define RDS_LL_SEND_FULL 0 81#define RDS_LL_SEND_FULL 0
82#define RDS_RECONNECT_PENDING 1 82#define RDS_RECONNECT_PENDING 1
83#define RDS_IN_XMIT 2
83 84
84struct rds_connection { 85struct rds_connection {
85 struct hlist_node c_hash_node; 86 struct hlist_node c_hash_node;
@@ -91,9 +92,6 @@ struct rds_connection {
91 struct rds_cong_map *c_lcong; 92 struct rds_cong_map *c_lcong;
92 struct rds_cong_map *c_fcong; 93 struct rds_cong_map *c_fcong;
93 94
94 spinlock_t c_send_lock; /* protect send ring */
95 atomic_t c_send_generation;
96 atomic_t c_senders;
97 struct rds_message *c_xmit_rm; 95 struct rds_message *c_xmit_rm;
98 unsigned long c_xmit_sg; 96 unsigned long c_xmit_sg;
99 unsigned int c_xmit_hdr_off; 97 unsigned int c_xmit_hdr_off;
@@ -120,6 +118,7 @@ struct rds_connection {
120 struct delayed_work c_conn_w; 118 struct delayed_work c_conn_w;
121 struct work_struct c_down_w; 119 struct work_struct c_down_w;
122 struct mutex c_cm_lock; /* protect conn state & cm */ 120 struct mutex c_cm_lock; /* protect conn state & cm */
121 wait_queue_head_t c_waitq;
123 122
124 struct list_head c_map_item; 123 struct list_head c_map_item;
125 unsigned long c_map_queued; 124 unsigned long c_map_queued;
diff --git a/net/rds/send.c b/net/rds/send.c
index b9e41afef323..81471b25373b 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -53,14 +53,14 @@ module_param(send_batch_count, int, 0444);
53MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 53MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
54 54
55/* 55/*
56 * Reset the send state. Caller must hold c_send_lock when calling here. 56 * Reset the send state. Callers must ensure that this doesn't race with
57 * rds_send_xmit().
57 */ 58 */
58void rds_send_reset(struct rds_connection *conn) 59void rds_send_reset(struct rds_connection *conn)
59{ 60{
60 struct rds_message *rm, *tmp; 61 struct rds_message *rm, *tmp;
61 unsigned long flags; 62 unsigned long flags;
62 63
63 spin_lock_irqsave(&conn->c_send_lock, flags);
64 if (conn->c_xmit_rm) { 64 if (conn->c_xmit_rm) {
65 rm = conn->c_xmit_rm; 65 rm = conn->c_xmit_rm;
66 conn->c_xmit_rm = NULL; 66 conn->c_xmit_rm = NULL;
@@ -69,11 +69,7 @@ void rds_send_reset(struct rds_connection *conn)
69 * independently) but as the connection is down, there's 69 * independently) but as the connection is down, there's
70 * no ongoing RDMA to/from that memory */ 70 * no ongoing RDMA to/from that memory */
71 rds_message_unmapped(rm); 71 rds_message_unmapped(rm);
72 spin_unlock_irqrestore(&conn->c_send_lock, flags);
73
74 rds_message_put(rm); 72 rds_message_put(rm);
75 } else {
76 spin_unlock_irqrestore(&conn->c_send_lock, flags);
77 } 73 }
78 74
79 conn->c_xmit_sg = 0; 75 conn->c_xmit_sg = 0;
@@ -98,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn)
98 spin_unlock_irqrestore(&conn->c_lock, flags); 94 spin_unlock_irqrestore(&conn->c_lock, flags);
99} 95}
100 96
97static int acquire_in_xmit(struct rds_connection *conn)
98{
99 return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
100}
101
102static void release_in_xmit(struct rds_connection *conn)
103{
104 clear_bit(RDS_IN_XMIT, &conn->c_flags);
105 smp_mb__after_clear_bit();
106 /*
107 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
108 * hot path and finding waiters is very rare. We don't want to walk
109 * the system-wide hashed waitqueue buckets in the fast path only to
110 * almost never find waiters.
111 */
112 if (waitqueue_active(&conn->c_waitq))
113 wake_up_all(&conn->c_waitq);
114}
115
101/* 116/*
102 * We're making the concious trade-off here to only send one message 117 * We're making the concious trade-off here to only send one message
103 * down the connection at a time. 118 * down the connection at a time.
@@ -119,12 +134,9 @@ int rds_send_xmit(struct rds_connection *conn)
119 unsigned int tmp; 134 unsigned int tmp;
120 struct scatterlist *sg; 135 struct scatterlist *sg;
121 int ret = 0; 136 int ret = 0;
122 int gen = 0;
123 LIST_HEAD(to_be_dropped); 137 LIST_HEAD(to_be_dropped);
124 138
125restart: 139restart:
126 if (!rds_conn_up(conn))
127 goto out;
128 140
129 /* 141 /*
130 * sendmsg calls here after having queued its message on the send 142 * sendmsg calls here after having queued its message on the send
@@ -133,18 +145,25 @@ restart:
133 * avoids blocking the caller and trading per-connection data between 145 * avoids blocking the caller and trading per-connection data between
134 * caches per message. 146 * caches per message.
135 */ 147 */
136 if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) { 148 if (!acquire_in_xmit(conn)) {
137 rds_stats_inc(s_send_lock_contention); 149 rds_stats_inc(s_send_lock_contention);
138 ret = -ENOMEM; 150 ret = -ENOMEM;
139 goto out; 151 goto out;
140 } 152 }
141 atomic_inc(&conn->c_senders); 153
154 /*
155 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
156 * we do the opposite to avoid races.
157 */
158 if (!rds_conn_up(conn)) {
159 release_in_xmit(conn);
160 ret = 0;
161 goto out;
162 }
142 163
143 if (conn->c_trans->xmit_prepare) 164 if (conn->c_trans->xmit_prepare)
144 conn->c_trans->xmit_prepare(conn); 165 conn->c_trans->xmit_prepare(conn);
145 166
146 gen = atomic_inc_return(&conn->c_send_generation);
147
148 /* 167 /*
149 * spin trying to push headers and data down the connection until 168 * spin trying to push headers and data down the connection until
150 * the connection doesn't make forward progress. 169 * the connection doesn't make forward progress.
@@ -178,7 +197,7 @@ restart:
178 if (!rm) { 197 if (!rm) {
179 unsigned int len; 198 unsigned int len;
180 199
181 spin_lock(&conn->c_lock); 200 spin_lock_irqsave(&conn->c_lock, flags);
182 201
183 if (!list_empty(&conn->c_send_queue)) { 202 if (!list_empty(&conn->c_send_queue)) {
184 rm = list_entry(conn->c_send_queue.next, 203 rm = list_entry(conn->c_send_queue.next,
@@ -193,7 +212,7 @@ restart:
193 list_move_tail(&rm->m_conn_item, &conn->c_retrans); 212 list_move_tail(&rm->m_conn_item, &conn->c_retrans);
194 } 213 }
195 214
196 spin_unlock(&conn->c_lock); 215 spin_unlock_irqrestore(&conn->c_lock, flags);
197 216
198 if (!rm) 217 if (!rm)
199 break; 218 break;
@@ -207,10 +226,10 @@ restart:
207 */ 226 */
208 if (rm->rdma.op_active && 227 if (rm->rdma.op_active &&
209 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { 228 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
210 spin_lock(&conn->c_lock); 229 spin_lock_irqsave(&conn->c_lock, flags);
211 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 230 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
212 list_move(&rm->m_conn_item, &to_be_dropped); 231 list_move(&rm->m_conn_item, &to_be_dropped);
213 spin_unlock(&conn->c_lock); 232 spin_unlock_irqrestore(&conn->c_lock, flags);
214 continue; 233 continue;
215 } 234 }
216 235
@@ -336,19 +355,7 @@ restart:
336 if (conn->c_trans->xmit_complete) 355 if (conn->c_trans->xmit_complete)
337 conn->c_trans->xmit_complete(conn); 356 conn->c_trans->xmit_complete(conn);
338 357
339 /* 358 release_in_xmit(conn);
340 * We might be racing with another sender who queued a message but
341 * backed off on noticing that we held the c_send_lock. If we check
342 * for queued messages after dropping the sem then either we'll
343 * see the queued message or the queuer will get the sem. If we
344 * notice the queued message then we trigger an immediate retry.
345 *
346 * We need to be careful only to do this when we stopped processing
347 * the send queue because it was empty. It's the only way we
348 * stop processing the loop when the transport hasn't taken
349 * responsibility for forward progress.
350 */
351 spin_unlock_irqrestore(&conn->c_send_lock, flags);
352 359
353 /* Nuke any messages we decided not to retransmit. */ 360 /* Nuke any messages we decided not to retransmit. */
354 if (!list_empty(&to_be_dropped)) { 361 if (!list_empty(&to_be_dropped)) {
@@ -358,13 +365,12 @@ restart:
358 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 365 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
359 } 366 }
360 367
361 atomic_dec(&conn->c_senders);
362
363 /* 368 /*
364 * Other senders will see we have c_send_lock and exit. We 369 * Other senders can queue a message after we last test the send queue
365 * need to recheck the send queue and race again for c_send_lock 370 * but before we clear RDS_IN_XMIT. In that case they'd back off and
366 * to make sure messages don't just sit on the send queue, if 371 * not try and send their newly queued message. We need to check the
367 * somebody hasn't already beat us into the loop. 372 * send queue after having cleared RDS_IN_XMIT so that their message
373 * doesn't get stuck on the send queue.
368 * 374 *
369 * If the transport cannot continue (i.e ret != 0), then it must 375 * If the transport cannot continue (i.e ret != 0), then it must
370 * call us when more room is available, such as from the tx 376 * call us when more room is available, such as from the tx
@@ -374,9 +380,7 @@ restart:
374 smp_mb(); 380 smp_mb();
375 if (!list_empty(&conn->c_send_queue)) { 381 if (!list_empty(&conn->c_send_queue)) {
376 rds_stats_inc(s_send_lock_queue_raced); 382 rds_stats_inc(s_send_lock_queue_raced);
377 if (gen == atomic_read(&conn->c_send_generation)) { 383 goto restart;
378 goto restart;
379 }
380 } 384 }
381 } 385 }
382out: 386out:
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 7a8ca7a1d983..2bab9bf07b91 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -61,7 +61,7 @@
61 * 61 *
62 * Transition to state DISCONNECTING/DOWN: 62 * Transition to state DISCONNECTING/DOWN:
63 * - Inside the shutdown worker; synchronizes with xmit path 63 * - Inside the shutdown worker; synchronizes with xmit path
64 * through c_send_lock, and with connection management callbacks 64 * through RDS_IN_XMIT, and with connection management callbacks
65 * via c_cm_lock. 65 * via c_cm_lock.
66 * 66 *
67 * For receive callbacks, we rely on the underlying transport 67 * For receive callbacks, we rely on the underlying transport