aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2018-01-05 13:39:19 -0500
committerDavid S. Miller <davem@davemloft.net>2018-01-05 13:39:19 -0500
commitad521763e6060b3cb11df572cab3339526cd82c0 (patch)
tree67f61aa4784da3fba56aa43e3626d3f3b6562c78
parenteb9aa1bfbad8c9fc280adf43cb480911295cfa3f (diff)
parent3db6e0d172c94bd9953a1347c55ffb64b1d2e74f (diff)
Merge branch 'rds-use-RCU-between-work-enqueue-and-connection-teardown'
Sowmini Varadhan says: ==================== rds: use RCU between work-enqueue and connection teardown This patchset follows up on the root-cause mentioned in https://www.spinics.net/lists/netdev/msg472849.html Patch1 implements some code refactoring that was suggeseted as an enhancement in http://patchwork.ozlabs.org/patch/843157/ It replaces the c_destroy_in_prog bit in rds_connection with an atomically managed flag in rds_conn_path. Patch2 builds on Patch1 and uses RCU to make sure that work is only enqueued if the connection destroy is not already in progress: the test-flag-and-enqueue is done under rcu_read_lock, while destroy first sets the flag, uses synchronize_rcu to wait for existing reader threads to complete, and then starts all the work-cancellation. Since I have not been able to reproduce the original stack traces reported by syszbot, and these are fixes for a race condition that are based on code-inspection I am not marking these as reported-by at this time. ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/rds/cong.c10
-rw-r--r--net/rds/connection.c24
-rw-r--r--net/rds/rds.h4
-rw-r--r--net/rds/send.c37
-rw-r--r--net/rds/tcp_connect.c2
-rw-r--r--net/rds/tcp_recv.c8
-rw-r--r--net/rds/tcp_send.c5
-rw-r--r--net/rds/threads.c20
8 files changed, 86 insertions, 24 deletions
diff --git a/net/rds/cong.c b/net/rds/cong.c
index 8398fee7c866..8d19fd25dce3 100644
--- a/net/rds/cong.c
+++ b/net/rds/cong.c
@@ -219,7 +219,11 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
219 spin_lock_irqsave(&rds_cong_lock, flags); 219 spin_lock_irqsave(&rds_cong_lock, flags);
220 220
221 list_for_each_entry(conn, &map->m_conn_list, c_map_item) { 221 list_for_each_entry(conn, &map->m_conn_list, c_map_item) {
222 if (!test_and_set_bit(0, &conn->c_map_queued)) { 222 struct rds_conn_path *cp = &conn->c_path[0];
223
224 rcu_read_lock();
225 if (!test_and_set_bit(0, &conn->c_map_queued) &&
226 !test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
223 rds_stats_inc(s_cong_update_queued); 227 rds_stats_inc(s_cong_update_queued);
224 /* We cannot inline the call to rds_send_xmit() here 228 /* We cannot inline the call to rds_send_xmit() here
225 * for two reasons (both pertaining to a TCP transport): 229 * for two reasons (both pertaining to a TCP transport):
@@ -235,9 +239,9 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
235 * therefore trigger warnings. 239 * therefore trigger warnings.
236 * Defer the xmit to rds_send_worker() instead. 240 * Defer the xmit to rds_send_worker() instead.
237 */ 241 */
238 queue_delayed_work(rds_wq, 242 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
239 &conn->c_path[0].cp_send_w, 0);
240 } 243 }
244 rcu_read_unlock();
241 } 245 }
242 246
243 spin_unlock_irqrestore(&rds_cong_lock, flags); 247 spin_unlock_irqrestore(&rds_cong_lock, flags);
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 6492c0b608a4..b10c0ef36d8d 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -366,8 +366,6 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
366 * to the conn hash, so we never trigger a reconnect on this 366 * to the conn hash, so we never trigger a reconnect on this
367 * conn - the reconnect is always triggered by the active peer. */ 367 * conn - the reconnect is always triggered by the active peer. */
368 cancel_delayed_work_sync(&cp->cp_conn_w); 368 cancel_delayed_work_sync(&cp->cp_conn_w);
369 if (conn->c_destroy_in_prog)
370 return;
371 rcu_read_lock(); 369 rcu_read_lock();
372 if (!hlist_unhashed(&conn->c_hash_node)) { 370 if (!hlist_unhashed(&conn->c_hash_node)) {
373 rcu_read_unlock(); 371 rcu_read_unlock();
@@ -384,10 +382,13 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
384{ 382{
385 struct rds_message *rm, *rtmp; 383 struct rds_message *rm, *rtmp;
386 384
385 set_bit(RDS_DESTROY_PENDING, &cp->cp_flags);
386
387 if (!cp->cp_transport_data) 387 if (!cp->cp_transport_data)
388 return; 388 return;
389 389
390 /* make sure lingering queued work won't try to ref the conn */ 390 /* make sure lingering queued work won't try to ref the conn */
391 synchronize_rcu();
391 cancel_delayed_work_sync(&cp->cp_send_w); 392 cancel_delayed_work_sync(&cp->cp_send_w);
392 cancel_delayed_work_sync(&cp->cp_recv_w); 393 cancel_delayed_work_sync(&cp->cp_recv_w);
393 394
@@ -405,6 +406,11 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
405 if (cp->cp_xmit_rm) 406 if (cp->cp_xmit_rm)
406 rds_message_put(cp->cp_xmit_rm); 407 rds_message_put(cp->cp_xmit_rm);
407 408
409 WARN_ON(delayed_work_pending(&cp->cp_send_w));
410 WARN_ON(delayed_work_pending(&cp->cp_recv_w));
411 WARN_ON(delayed_work_pending(&cp->cp_conn_w));
412 WARN_ON(work_pending(&cp->cp_down_w));
413
408 cp->cp_conn->c_trans->conn_free(cp->cp_transport_data); 414 cp->cp_conn->c_trans->conn_free(cp->cp_transport_data);
409} 415}
410 416
@@ -426,7 +432,6 @@ void rds_conn_destroy(struct rds_connection *conn)
426 "%pI4\n", conn, &conn->c_laddr, 432 "%pI4\n", conn, &conn->c_laddr,
427 &conn->c_faddr); 433 &conn->c_faddr);
428 434
429 conn->c_destroy_in_prog = 1;
430 /* Ensure conn will not be scheduled for reconnect */ 435 /* Ensure conn will not be scheduled for reconnect */
431 spin_lock_irq(&rds_conn_lock); 436 spin_lock_irq(&rds_conn_lock);
432 hlist_del_init_rcu(&conn->c_hash_node); 437 hlist_del_init_rcu(&conn->c_hash_node);
@@ -685,10 +690,13 @@ void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy)
685{ 690{
686 atomic_set(&cp->cp_state, RDS_CONN_ERROR); 691 atomic_set(&cp->cp_state, RDS_CONN_ERROR);
687 692
688 if (!destroy && cp->cp_conn->c_destroy_in_prog) 693 rcu_read_lock();
694 if (!destroy && test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
695 rcu_read_unlock();
689 return; 696 return;
690 697 }
691 queue_work(rds_wq, &cp->cp_down_w); 698 queue_work(rds_wq, &cp->cp_down_w);
699 rcu_read_unlock();
692} 700}
693EXPORT_SYMBOL_GPL(rds_conn_path_drop); 701EXPORT_SYMBOL_GPL(rds_conn_path_drop);
694 702
@@ -705,9 +713,15 @@ EXPORT_SYMBOL_GPL(rds_conn_drop);
705 */ 713 */
706void rds_conn_path_connect_if_down(struct rds_conn_path *cp) 714void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
707{ 715{
716 rcu_read_lock();
717 if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
718 rcu_read_unlock();
719 return;
720 }
708 if (rds_conn_path_state(cp) == RDS_CONN_DOWN && 721 if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
709 !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags)) 722 !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
710 queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); 723 queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
724 rcu_read_unlock();
711} 725}
712EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down); 726EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);
713 727
diff --git a/net/rds/rds.h b/net/rds/rds.h
index d09f6c1facb4..374ae83b60d4 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -88,6 +88,7 @@ enum {
88#define RDS_RECONNECT_PENDING 1 88#define RDS_RECONNECT_PENDING 1
89#define RDS_IN_XMIT 2 89#define RDS_IN_XMIT 2
90#define RDS_RECV_REFILL 3 90#define RDS_RECV_REFILL 3
91#define RDS_DESTROY_PENDING 4
91 92
92/* Max number of multipaths per RDS connection. Must be a power of 2 */ 93/* Max number of multipaths per RDS connection. Must be a power of 2 */
93#define RDS_MPATH_WORKERS 8 94#define RDS_MPATH_WORKERS 8
@@ -139,8 +140,7 @@ struct rds_connection {
139 __be32 c_faddr; 140 __be32 c_faddr;
140 unsigned int c_loopback:1, 141 unsigned int c_loopback:1,
141 c_ping_triggered:1, 142 c_ping_triggered:1,
142 c_destroy_in_prog:1, 143 c_pad_to_32:30;
143 c_pad_to_32:29;
144 int c_npaths; 144 int c_npaths;
145 struct rds_connection *c_passive; 145 struct rds_connection *c_passive;
146 struct rds_transport *c_trans; 146 struct rds_transport *c_trans;
diff --git a/net/rds/send.c b/net/rds/send.c
index f72466c63f0c..d3e32d1f3c7d 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -162,6 +162,12 @@ restart:
162 goto out; 162 goto out;
163 } 163 }
164 164
165 if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
166 release_in_xmit(cp);
167 ret = -ENETUNREACH; /* dont requeue send work */
168 goto out;
169 }
170
165 /* 171 /*
166 * we record the send generation after doing the xmit acquire. 172 * we record the send generation after doing the xmit acquire.
167 * if someone else manages to jump in and do some work, we'll use 173 * if someone else manages to jump in and do some work, we'll use
@@ -437,7 +443,12 @@ over_batch:
437 !list_empty(&cp->cp_send_queue)) && !raced) { 443 !list_empty(&cp->cp_send_queue)) && !raced) {
438 if (batch_count < send_batch_count) 444 if (batch_count < send_batch_count)
439 goto restart; 445 goto restart;
440 queue_delayed_work(rds_wq, &cp->cp_send_w, 1); 446 rcu_read_lock();
447 if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
448 ret = -ENETUNREACH;
449 else
450 queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
451 rcu_read_unlock();
441 } else if (raced) { 452 } else if (raced) {
442 rds_stats_inc(s_send_lock_queue_raced); 453 rds_stats_inc(s_send_lock_queue_raced);
443 } 454 }
@@ -1151,6 +1162,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1151 else 1162 else
1152 cpath = &conn->c_path[0]; 1163 cpath = &conn->c_path[0];
1153 1164
1165 if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags)) {
1166 ret = -EAGAIN;
1167 goto out;
1168 }
1169
1154 rds_conn_path_connect_if_down(cpath); 1170 rds_conn_path_connect_if_down(cpath);
1155 1171
1156 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); 1172 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
@@ -1190,9 +1206,17 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1190 rds_stats_inc(s_send_queued); 1206 rds_stats_inc(s_send_queued);
1191 1207
1192 ret = rds_send_xmit(cpath); 1208 ret = rds_send_xmit(cpath);
1193 if (ret == -ENOMEM || ret == -EAGAIN) 1209 if (ret == -ENOMEM || ret == -EAGAIN) {
1194 queue_delayed_work(rds_wq, &cpath->cp_send_w, 1); 1210 ret = 0;
1195 1211 rcu_read_lock();
1212 if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags))
1213 ret = -ENETUNREACH;
1214 else
1215 queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
1216 rcu_read_unlock();
1217 }
1218 if (ret)
1219 goto out;
1196 rds_message_put(rm); 1220 rds_message_put(rm);
1197 return payload_len; 1221 return payload_len;
1198 1222
@@ -1270,7 +1294,10 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
1270 rds_stats_inc(s_send_pong); 1294 rds_stats_inc(s_send_pong);
1271 1295
1272 /* schedule the send work on rds_wq */ 1296 /* schedule the send work on rds_wq */
1273 queue_delayed_work(rds_wq, &cp->cp_send_w, 1); 1297 rcu_read_lock();
1298 if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
1299 queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
1300 rcu_read_unlock();
1274 1301
1275 rds_message_put(rm); 1302 rds_message_put(rm);
1276 return 0; 1303 return 0;
diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c
index 46f74dad0e16..534c67aeb20f 100644
--- a/net/rds/tcp_connect.c
+++ b/net/rds/tcp_connect.c
@@ -170,7 +170,7 @@ void rds_tcp_conn_path_shutdown(struct rds_conn_path *cp)
170 cp->cp_conn, tc, sock); 170 cp->cp_conn, tc, sock);
171 171
172 if (sock) { 172 if (sock) {
173 if (cp->cp_conn->c_destroy_in_prog) 173 if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
174 rds_tcp_set_linger(sock); 174 rds_tcp_set_linger(sock);
175 sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN); 175 sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN);
176 lock_sock(sock->sk); 176 lock_sock(sock->sk);
diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c
index e006ef8e6d40..dd707b9e73e5 100644
--- a/net/rds/tcp_recv.c
+++ b/net/rds/tcp_recv.c
@@ -321,8 +321,12 @@ void rds_tcp_data_ready(struct sock *sk)
321 ready = tc->t_orig_data_ready; 321 ready = tc->t_orig_data_ready;
322 rds_tcp_stats_inc(s_tcp_data_ready_calls); 322 rds_tcp_stats_inc(s_tcp_data_ready_calls);
323 323
324 if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) 324 if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) {
325 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); 325 rcu_read_lock();
326 if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
327 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
328 rcu_read_unlock();
329 }
326out: 330out:
327 read_unlock_bh(&sk->sk_callback_lock); 331 read_unlock_bh(&sk->sk_callback_lock);
328 ready(sk); 332 ready(sk);
diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c
index dc860d1bb608..73c74763ca72 100644
--- a/net/rds/tcp_send.c
+++ b/net/rds/tcp_send.c
@@ -202,8 +202,11 @@ void rds_tcp_write_space(struct sock *sk)
202 tc->t_last_seen_una = rds_tcp_snd_una(tc); 202 tc->t_last_seen_una = rds_tcp_snd_una(tc);
203 rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked); 203 rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked);
204 204
205 if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf) 205 rcu_read_lock();
206 if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf &&
207 !test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
206 queue_delayed_work(rds_wq, &cp->cp_send_w, 0); 208 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
209 rcu_read_unlock();
207 210
208out: 211out:
209 read_unlock_bh(&sk->sk_callback_lock); 212 read_unlock_bh(&sk->sk_callback_lock);
diff --git a/net/rds/threads.c b/net/rds/threads.c
index f121daa402c8..eb76db1360b0 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -87,8 +87,12 @@ void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
87 87
88 cp->cp_reconnect_jiffies = 0; 88 cp->cp_reconnect_jiffies = 0;
89 set_bit(0, &cp->cp_conn->c_map_queued); 89 set_bit(0, &cp->cp_conn->c_map_queued);
90 queue_delayed_work(rds_wq, &cp->cp_send_w, 0); 90 rcu_read_lock();
91 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); 91 if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
92 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
93 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
94 }
95 rcu_read_unlock();
92} 96}
93EXPORT_SYMBOL_GPL(rds_connect_path_complete); 97EXPORT_SYMBOL_GPL(rds_connect_path_complete);
94 98
@@ -133,7 +137,10 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
133 set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags); 137 set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
134 if (cp->cp_reconnect_jiffies == 0) { 138 if (cp->cp_reconnect_jiffies == 0) {
135 cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies; 139 cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
136 queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); 140 rcu_read_lock();
141 if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
142 queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
143 rcu_read_unlock();
137 return; 144 return;
138 } 145 }
139 146
@@ -141,8 +148,11 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
141 rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n", 148 rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n",
142 rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies, 149 rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
143 conn, &conn->c_laddr, &conn->c_faddr); 150 conn, &conn->c_laddr, &conn->c_faddr);
144 queue_delayed_work(rds_wq, &cp->cp_conn_w, 151 rcu_read_lock();
145 rand % cp->cp_reconnect_jiffies); 152 if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
153 queue_delayed_work(rds_wq, &cp->cp_conn_w,
154 rand % cp->cp_reconnect_jiffies);
155 rcu_read_unlock();
146 156
147 cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2, 157 cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
148 rds_sysctl_reconnect_max_jiffies); 158 rds_sysctl_reconnect_max_jiffies);