aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSowmini Varadhan <sowmini.varadhan@oracle.com>2015-04-08 12:33:47 -0400
committerDavid S. Miller <davem@davemloft.net>2015-04-08 15:17:32 -0400
commit443be0e5affe3acb6dd81e7402951677e0a0eb35 (patch)
tree74cfc543b82a49fd7b920aade205cdca1ecb586b
parent1789b2c077f6d6c82b04cfe49a0fec020dc42488 (diff)
RDS: make sure not to loop forever inside rds_send_xmit
If a determined set of concurrent senders keep the send queue full, we can loop forever inside rds_send_xmit. This fix has two parts. First we are dropping out of the while(1) loop after we've processed a large batch of messages. Second we add a generation number that gets bumped each time the xmit bit lock is acquired. If someone else has jumped in and made progress in the queue, we skip our goto restart. Original patch by Chris Mason. Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/rds/connection.c1
-rw-r--r--net/rds/rds.h1
-rw-r--r--net/rds/send.c33
3 files changed, 33 insertions, 2 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 7952a5b1b4c4..14f041398ca1 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -193,6 +193,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
193 } 193 }
194 194
195 atomic_set(&conn->c_state, RDS_CONN_DOWN); 195 atomic_set(&conn->c_state, RDS_CONN_DOWN);
196 conn->c_send_gen = 0;
196 conn->c_reconnect_jiffies = 0; 197 conn->c_reconnect_jiffies = 0;
197 INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker); 198 INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker);
198 INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker); 199 INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker);
diff --git a/net/rds/rds.h b/net/rds/rds.h
index c2a5eef41343..02d8fd5b40c0 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -110,6 +110,7 @@ struct rds_connection {
110 void *c_transport_data; 110 void *c_transport_data;
111 111
112 atomic_t c_state; 112 atomic_t c_state;
113 unsigned long c_send_gen;
113 unsigned long c_flags; 114 unsigned long c_flags;
114 unsigned long c_reconnect_jiffies; 115 unsigned long c_reconnect_jiffies;
115 struct delayed_work c_send_w; 116 struct delayed_work c_send_w;
diff --git a/net/rds/send.c b/net/rds/send.c
index 42f65d4305c8..49f77efd82b9 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -140,8 +140,11 @@ int rds_send_xmit(struct rds_connection *conn)
140 struct scatterlist *sg; 140 struct scatterlist *sg;
141 int ret = 0; 141 int ret = 0;
142 LIST_HEAD(to_be_dropped); 142 LIST_HEAD(to_be_dropped);
143 int batch_count;
144 unsigned long send_gen = 0;
143 145
144restart: 146restart:
147 batch_count = 0;
145 148
146 /* 149 /*
147 * sendmsg calls here after having queued its message on the send 150 * sendmsg calls here after having queued its message on the send
@@ -157,6 +160,17 @@ restart:
157 } 160 }
158 161
159 /* 162 /*
163 * we record the send generation after doing the xmit acquire.
164 * if someone else manages to jump in and do some work, we'll use
165 * this to avoid a goto restart farther down.
166 *
167 * The acquire_in_xmit() check above ensures that only one
168 * caller can increment c_send_gen at any time.
169 */
170 conn->c_send_gen++;
171 send_gen = conn->c_send_gen;
172
173 /*
160 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, 174 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
161 * we do the opposite to avoid races. 175 * we do the opposite to avoid races.
162 */ 176 */
@@ -202,6 +216,16 @@ restart:
202 if (!rm) { 216 if (!rm) {
203 unsigned int len; 217 unsigned int len;
204 218
219 batch_count++;
220
221 /* we want to process as big a batch as we can, but
222 * we also want to avoid softlockups. If we've been
223 * through a lot of messages, lets back off and see
224 * if anyone else jumps in
225 */
226 if (batch_count >= 1024)
227 goto over_batch;
228
205 spin_lock_irqsave(&conn->c_lock, flags); 229 spin_lock_irqsave(&conn->c_lock, flags);
206 230
207 if (!list_empty(&conn->c_send_queue)) { 231 if (!list_empty(&conn->c_send_queue)) {
@@ -357,9 +381,9 @@ restart:
357 } 381 }
358 } 382 }
359 383
384over_batch:
360 if (conn->c_trans->xmit_complete) 385 if (conn->c_trans->xmit_complete)
361 conn->c_trans->xmit_complete(conn); 386 conn->c_trans->xmit_complete(conn);
362
363 release_in_xmit(conn); 387 release_in_xmit(conn);
364 388
365 /* Nuke any messages we decided not to retransmit. */ 389 /* Nuke any messages we decided not to retransmit. */
@@ -380,10 +404,15 @@ restart:
380 * If the transport cannot continue (i.e ret != 0), then it must 404 * If the transport cannot continue (i.e ret != 0), then it must
381 * call us when more room is available, such as from the tx 405 * call us when more room is available, such as from the tx
382 * completion handler. 406 * completion handler.
407 *
408 * We have an extra generation check here so that if someone manages
409 * to jump in after our release_in_xmit, we'll see that they have done
410 * some work and we will skip our goto
383 */ 411 */
384 if (ret == 0) { 412 if (ret == 0) {
385 smp_mb(); 413 smp_mb();
386 if (!list_empty(&conn->c_send_queue)) { 414 if (!list_empty(&conn->c_send_queue) &&
415 send_gen == conn->c_send_gen) {
387 rds_stats_inc(s_send_lock_queue_raced); 416 rds_stats_inc(s_send_lock_queue_raced);
388 goto restart; 417 goto restart;
389 } 418 }