diff options
Diffstat (limited to 'net/rds/send.c')
-rw-r--r-- | net/rds/send.c | 82 |
1 files changed, 43 insertions, 39 deletions
diff --git a/net/rds/send.c b/net/rds/send.c index b9e41afef323..81471b25373b 100644 --- a/net/rds/send.c +++ b/net/rds/send.c | |||
@@ -53,14 +53,14 @@ module_param(send_batch_count, int, 0444); | |||
53 | MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); | 53 | MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); |
54 | 54 | ||
55 | /* | 55 | /* |
56 | * Reset the send state. Caller must hold c_send_lock when calling here. | 56 | * Reset the send state. Callers must ensure that this doesn't race with |
57 | * rds_send_xmit(). | ||
57 | */ | 58 | */ |
58 | void rds_send_reset(struct rds_connection *conn) | 59 | void rds_send_reset(struct rds_connection *conn) |
59 | { | 60 | { |
60 | struct rds_message *rm, *tmp; | 61 | struct rds_message *rm, *tmp; |
61 | unsigned long flags; | 62 | unsigned long flags; |
62 | 63 | ||
63 | spin_lock_irqsave(&conn->c_send_lock, flags); | ||
64 | if (conn->c_xmit_rm) { | 64 | if (conn->c_xmit_rm) { |
65 | rm = conn->c_xmit_rm; | 65 | rm = conn->c_xmit_rm; |
66 | conn->c_xmit_rm = NULL; | 66 | conn->c_xmit_rm = NULL; |
@@ -69,11 +69,7 @@ void rds_send_reset(struct rds_connection *conn) | |||
69 | * independently) but as the connection is down, there's | 69 | * independently) but as the connection is down, there's |
70 | * no ongoing RDMA to/from that memory */ | 70 | * no ongoing RDMA to/from that memory */ |
71 | rds_message_unmapped(rm); | 71 | rds_message_unmapped(rm); |
72 | spin_unlock_irqrestore(&conn->c_send_lock, flags); | ||
73 | |||
74 | rds_message_put(rm); | 72 | rds_message_put(rm); |
75 | } else { | ||
76 | spin_unlock_irqrestore(&conn->c_send_lock, flags); | ||
77 | } | 73 | } |
78 | 74 | ||
79 | conn->c_xmit_sg = 0; | 75 | conn->c_xmit_sg = 0; |
@@ -98,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn) | |||
98 | spin_unlock_irqrestore(&conn->c_lock, flags); | 94 | spin_unlock_irqrestore(&conn->c_lock, flags); |
99 | } | 95 | } |
100 | 96 | ||
97 | static int acquire_in_xmit(struct rds_connection *conn) | ||
98 | { | ||
99 | return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0; | ||
100 | } | ||
101 | |||
102 | static void release_in_xmit(struct rds_connection *conn) | ||
103 | { | ||
104 | clear_bit(RDS_IN_XMIT, &conn->c_flags); | ||
105 | smp_mb__after_clear_bit(); | ||
106 | /* | ||
107 | * We don't use wait_on_bit()/wake_up_bit() because our waking is in a | ||
108 | * hot path and finding waiters is very rare. We don't want to walk | ||
109 | * the system-wide hashed waitqueue buckets in the fast path only to | ||
110 | * almost never find waiters. | ||
111 | */ | ||
112 | if (waitqueue_active(&conn->c_waitq)) | ||
113 | wake_up_all(&conn->c_waitq); | ||
114 | } | ||
115 | |||
101 | /* | 116 | /* |
102 | * We're making the concious trade-off here to only send one message | 117 | * We're making the concious trade-off here to only send one message |
103 | * down the connection at a time. | 118 | * down the connection at a time. |
@@ -119,12 +134,9 @@ int rds_send_xmit(struct rds_connection *conn) | |||
119 | unsigned int tmp; | 134 | unsigned int tmp; |
120 | struct scatterlist *sg; | 135 | struct scatterlist *sg; |
121 | int ret = 0; | 136 | int ret = 0; |
122 | int gen = 0; | ||
123 | LIST_HEAD(to_be_dropped); | 137 | LIST_HEAD(to_be_dropped); |
124 | 138 | ||
125 | restart: | 139 | restart: |
126 | if (!rds_conn_up(conn)) | ||
127 | goto out; | ||
128 | 140 | ||
129 | /* | 141 | /* |
130 | * sendmsg calls here after having queued its message on the send | 142 | * sendmsg calls here after having queued its message on the send |
@@ -133,18 +145,25 @@ restart: | |||
133 | * avoids blocking the caller and trading per-connection data between | 145 | * avoids blocking the caller and trading per-connection data between |
134 | * caches per message. | 146 | * caches per message. |
135 | */ | 147 | */ |
136 | if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) { | 148 | if (!acquire_in_xmit(conn)) { |
137 | rds_stats_inc(s_send_lock_contention); | 149 | rds_stats_inc(s_send_lock_contention); |
138 | ret = -ENOMEM; | 150 | ret = -ENOMEM; |
139 | goto out; | 151 | goto out; |
140 | } | 152 | } |
141 | atomic_inc(&conn->c_senders); | 153 | |
154 | /* | ||
155 | * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, | ||
156 | * we do the opposite to avoid races. | ||
157 | */ | ||
158 | if (!rds_conn_up(conn)) { | ||
159 | release_in_xmit(conn); | ||
160 | ret = 0; | ||
161 | goto out; | ||
162 | } | ||
142 | 163 | ||
143 | if (conn->c_trans->xmit_prepare) | 164 | if (conn->c_trans->xmit_prepare) |
144 | conn->c_trans->xmit_prepare(conn); | 165 | conn->c_trans->xmit_prepare(conn); |
145 | 166 | ||
146 | gen = atomic_inc_return(&conn->c_send_generation); | ||
147 | |||
148 | /* | 167 | /* |
149 | * spin trying to push headers and data down the connection until | 168 | * spin trying to push headers and data down the connection until |
150 | * the connection doesn't make forward progress. | 169 | * the connection doesn't make forward progress. |
@@ -178,7 +197,7 @@ restart: | |||
178 | if (!rm) { | 197 | if (!rm) { |
179 | unsigned int len; | 198 | unsigned int len; |
180 | 199 | ||
181 | spin_lock(&conn->c_lock); | 200 | spin_lock_irqsave(&conn->c_lock, flags); |
182 | 201 | ||
183 | if (!list_empty(&conn->c_send_queue)) { | 202 | if (!list_empty(&conn->c_send_queue)) { |
184 | rm = list_entry(conn->c_send_queue.next, | 203 | rm = list_entry(conn->c_send_queue.next, |
@@ -193,7 +212,7 @@ restart: | |||
193 | list_move_tail(&rm->m_conn_item, &conn->c_retrans); | 212 | list_move_tail(&rm->m_conn_item, &conn->c_retrans); |
194 | } | 213 | } |
195 | 214 | ||
196 | spin_unlock(&conn->c_lock); | 215 | spin_unlock_irqrestore(&conn->c_lock, flags); |
197 | 216 | ||
198 | if (!rm) | 217 | if (!rm) |
199 | break; | 218 | break; |
@@ -207,10 +226,10 @@ restart: | |||
207 | */ | 226 | */ |
208 | if (rm->rdma.op_active && | 227 | if (rm->rdma.op_active && |
209 | test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { | 228 | test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { |
210 | spin_lock(&conn->c_lock); | 229 | spin_lock_irqsave(&conn->c_lock, flags); |
211 | if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) | 230 | if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) |
212 | list_move(&rm->m_conn_item, &to_be_dropped); | 231 | list_move(&rm->m_conn_item, &to_be_dropped); |
213 | spin_unlock(&conn->c_lock); | 232 | spin_unlock_irqrestore(&conn->c_lock, flags); |
214 | continue; | 233 | continue; |
215 | } | 234 | } |
216 | 235 | ||
@@ -336,19 +355,7 @@ restart: | |||
336 | if (conn->c_trans->xmit_complete) | 355 | if (conn->c_trans->xmit_complete) |
337 | conn->c_trans->xmit_complete(conn); | 356 | conn->c_trans->xmit_complete(conn); |
338 | 357 | ||
339 | /* | 358 | release_in_xmit(conn); |
340 | * We might be racing with another sender who queued a message but | ||
341 | * backed off on noticing that we held the c_send_lock. If we check | ||
342 | * for queued messages after dropping the sem then either we'll | ||
343 | * see the queued message or the queuer will get the sem. If we | ||
344 | * notice the queued message then we trigger an immediate retry. | ||
345 | * | ||
346 | * We need to be careful only to do this when we stopped processing | ||
347 | * the send queue because it was empty. It's the only way we | ||
348 | * stop processing the loop when the transport hasn't taken | ||
349 | * responsibility for forward progress. | ||
350 | */ | ||
351 | spin_unlock_irqrestore(&conn->c_send_lock, flags); | ||
352 | 359 | ||
353 | /* Nuke any messages we decided not to retransmit. */ | 360 | /* Nuke any messages we decided not to retransmit. */ |
354 | if (!list_empty(&to_be_dropped)) { | 361 | if (!list_empty(&to_be_dropped)) { |
@@ -358,13 +365,12 @@ restart: | |||
358 | rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); | 365 | rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); |
359 | } | 366 | } |
360 | 367 | ||
361 | atomic_dec(&conn->c_senders); | ||
362 | |||
363 | /* | 368 | /* |
364 | * Other senders will see we have c_send_lock and exit. We | 369 | * Other senders can queue a message after we last test the send queue |
365 | * need to recheck the send queue and race again for c_send_lock | 370 | * but before we clear RDS_IN_XMIT. In that case they'd back off and |
366 | * to make sure messages don't just sit on the send queue, if | 371 | * not try and send their newly queued message. We need to check the |
367 | * somebody hasn't already beat us into the loop. | 372 | * send queue after having cleared RDS_IN_XMIT so that their message |
373 | * doesn't get stuck on the send queue. | ||
368 | * | 374 | * |
369 | * If the transport cannot continue (i.e ret != 0), then it must | 375 | * If the transport cannot continue (i.e ret != 0), then it must |
370 | * call us when more room is available, such as from the tx | 376 | * call us when more room is available, such as from the tx |
@@ -374,9 +380,7 @@ restart: | |||
374 | smp_mb(); | 380 | smp_mb(); |
375 | if (!list_empty(&conn->c_send_queue)) { | 381 | if (!list_empty(&conn->c_send_queue)) { |
376 | rds_stats_inc(s_send_lock_queue_raced); | 382 | rds_stats_inc(s_send_lock_queue_raced); |
377 | if (gen == atomic_read(&conn->c_send_generation)) { | 383 | goto restart; |
378 | goto restart; | ||
379 | } | ||
380 | } | 384 | } |
381 | } | 385 | } |
382 | out: | 386 | out: |