aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds/send.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/send.c')
-rw-r--r--net/rds/send.c82
1 files changed, 43 insertions, 39 deletions
diff --git a/net/rds/send.c b/net/rds/send.c
index b9e41afef323..81471b25373b 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -53,14 +53,14 @@ module_param(send_batch_count, int, 0444);
53MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 53MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
54 54
55/* 55/*
56 * Reset the send state. Caller must hold c_send_lock when calling here. 56 * Reset the send state. Callers must ensure that this doesn't race with
57 * rds_send_xmit().
57 */ 58 */
58void rds_send_reset(struct rds_connection *conn) 59void rds_send_reset(struct rds_connection *conn)
59{ 60{
60 struct rds_message *rm, *tmp; 61 struct rds_message *rm, *tmp;
61 unsigned long flags; 62 unsigned long flags;
62 63
63 spin_lock_irqsave(&conn->c_send_lock, flags);
64 if (conn->c_xmit_rm) { 64 if (conn->c_xmit_rm) {
65 rm = conn->c_xmit_rm; 65 rm = conn->c_xmit_rm;
66 conn->c_xmit_rm = NULL; 66 conn->c_xmit_rm = NULL;
@@ -69,11 +69,7 @@ void rds_send_reset(struct rds_connection *conn)
69 * independently) but as the connection is down, there's 69 * independently) but as the connection is down, there's
70 * no ongoing RDMA to/from that memory */ 70 * no ongoing RDMA to/from that memory */
71 rds_message_unmapped(rm); 71 rds_message_unmapped(rm);
72 spin_unlock_irqrestore(&conn->c_send_lock, flags);
73
74 rds_message_put(rm); 72 rds_message_put(rm);
75 } else {
76 spin_unlock_irqrestore(&conn->c_send_lock, flags);
77 } 73 }
78 74
79 conn->c_xmit_sg = 0; 75 conn->c_xmit_sg = 0;
@@ -98,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn)
98 spin_unlock_irqrestore(&conn->c_lock, flags); 94 spin_unlock_irqrestore(&conn->c_lock, flags);
99} 95}
100 96
97static int acquire_in_xmit(struct rds_connection *conn)
98{
99 return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
100}
101
102static void release_in_xmit(struct rds_connection *conn)
103{
104 clear_bit(RDS_IN_XMIT, &conn->c_flags);
105 smp_mb__after_clear_bit();
106 /*
107 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
108 * hot path and finding waiters is very rare. We don't want to walk
109 * the system-wide hashed waitqueue buckets in the fast path only to
110 * almost never find waiters.
111 */
112 if (waitqueue_active(&conn->c_waitq))
113 wake_up_all(&conn->c_waitq);
114}
115
101/* 116/*
102 * We're making the concious trade-off here to only send one message 117 * We're making the concious trade-off here to only send one message
103 * down the connection at a time. 118 * down the connection at a time.
@@ -119,12 +134,9 @@ int rds_send_xmit(struct rds_connection *conn)
119 unsigned int tmp; 134 unsigned int tmp;
120 struct scatterlist *sg; 135 struct scatterlist *sg;
121 int ret = 0; 136 int ret = 0;
122 int gen = 0;
123 LIST_HEAD(to_be_dropped); 137 LIST_HEAD(to_be_dropped);
124 138
125restart: 139restart:
126 if (!rds_conn_up(conn))
127 goto out;
128 140
129 /* 141 /*
130 * sendmsg calls here after having queued its message on the send 142 * sendmsg calls here after having queued its message on the send
@@ -133,18 +145,25 @@ restart:
133 * avoids blocking the caller and trading per-connection data between 145 * avoids blocking the caller and trading per-connection data between
134 * caches per message. 146 * caches per message.
135 */ 147 */
136 if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) { 148 if (!acquire_in_xmit(conn)) {
137 rds_stats_inc(s_send_lock_contention); 149 rds_stats_inc(s_send_lock_contention);
138 ret = -ENOMEM; 150 ret = -ENOMEM;
139 goto out; 151 goto out;
140 } 152 }
141 atomic_inc(&conn->c_senders); 153
154 /*
155 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
156 * we do the opposite to avoid races.
157 */
158 if (!rds_conn_up(conn)) {
159 release_in_xmit(conn);
160 ret = 0;
161 goto out;
162 }
142 163
143 if (conn->c_trans->xmit_prepare) 164 if (conn->c_trans->xmit_prepare)
144 conn->c_trans->xmit_prepare(conn); 165 conn->c_trans->xmit_prepare(conn);
145 166
146 gen = atomic_inc_return(&conn->c_send_generation);
147
148 /* 167 /*
149 * spin trying to push headers and data down the connection until 168 * spin trying to push headers and data down the connection until
150 * the connection doesn't make forward progress. 169 * the connection doesn't make forward progress.
@@ -178,7 +197,7 @@ restart:
178 if (!rm) { 197 if (!rm) {
179 unsigned int len; 198 unsigned int len;
180 199
181 spin_lock(&conn->c_lock); 200 spin_lock_irqsave(&conn->c_lock, flags);
182 201
183 if (!list_empty(&conn->c_send_queue)) { 202 if (!list_empty(&conn->c_send_queue)) {
184 rm = list_entry(conn->c_send_queue.next, 203 rm = list_entry(conn->c_send_queue.next,
@@ -193,7 +212,7 @@ restart:
193 list_move_tail(&rm->m_conn_item, &conn->c_retrans); 212 list_move_tail(&rm->m_conn_item, &conn->c_retrans);
194 } 213 }
195 214
196 spin_unlock(&conn->c_lock); 215 spin_unlock_irqrestore(&conn->c_lock, flags);
197 216
198 if (!rm) 217 if (!rm)
199 break; 218 break;
@@ -207,10 +226,10 @@ restart:
207 */ 226 */
208 if (rm->rdma.op_active && 227 if (rm->rdma.op_active &&
209 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { 228 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
210 spin_lock(&conn->c_lock); 229 spin_lock_irqsave(&conn->c_lock, flags);
211 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 230 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
212 list_move(&rm->m_conn_item, &to_be_dropped); 231 list_move(&rm->m_conn_item, &to_be_dropped);
213 spin_unlock(&conn->c_lock); 232 spin_unlock_irqrestore(&conn->c_lock, flags);
214 continue; 233 continue;
215 } 234 }
216 235
@@ -336,19 +355,7 @@ restart:
336 if (conn->c_trans->xmit_complete) 355 if (conn->c_trans->xmit_complete)
337 conn->c_trans->xmit_complete(conn); 356 conn->c_trans->xmit_complete(conn);
338 357
339 /* 358 release_in_xmit(conn);
340 * We might be racing with another sender who queued a message but
341 * backed off on noticing that we held the c_send_lock. If we check
342 * for queued messages after dropping the sem then either we'll
343 * see the queued message or the queuer will get the sem. If we
344 * notice the queued message then we trigger an immediate retry.
345 *
346 * We need to be careful only to do this when we stopped processing
347 * the send queue because it was empty. It's the only way we
348 * stop processing the loop when the transport hasn't taken
349 * responsibility for forward progress.
350 */
351 spin_unlock_irqrestore(&conn->c_send_lock, flags);
352 359
353 /* Nuke any messages we decided not to retransmit. */ 360 /* Nuke any messages we decided not to retransmit. */
354 if (!list_empty(&to_be_dropped)) { 361 if (!list_empty(&to_be_dropped)) {
@@ -358,13 +365,12 @@ restart:
358 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 365 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
359 } 366 }
360 367
361 atomic_dec(&conn->c_senders);
362
363 /* 368 /*
364 * Other senders will see we have c_send_lock and exit. We 369 * Other senders can queue a message after we last test the send queue
365 * need to recheck the send queue and race again for c_send_lock 370 * but before we clear RDS_IN_XMIT. In that case they'd back off and
366 * to make sure messages don't just sit on the send queue, if 371 * not try and send their newly queued message. We need to check the
367 * somebody hasn't already beat us into the loop. 372 * send queue after having cleared RDS_IN_XMIT so that their message
373 * doesn't get stuck on the send queue.
368 * 374 *
369 * If the transport cannot continue (i.e ret != 0), then it must 375 * If the transport cannot continue (i.e ret != 0), then it must
370 * call us when more room is available, such as from the tx 376 * call us when more room is available, such as from the tx
@@ -374,9 +380,7 @@ restart:
374 smp_mb(); 380 smp_mb();
375 if (!list_empty(&conn->c_send_queue)) { 381 if (!list_empty(&conn->c_send_queue)) {
376 rds_stats_inc(s_send_lock_queue_raced); 382 rds_stats_inc(s_send_lock_queue_raced);
377 if (gen == atomic_read(&conn->c_send_generation)) { 383 goto restart;
378 goto restart;
379 }
380 } 384 }
381 } 385 }
382out: 386out: