aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2018-11-01 09:39:53 -0400
committerDavid S. Miller <davem@davemloft.net>2018-11-03 02:59:26 -0400
commitc7e86acfcee30794dc99a0759924bf7b9d43f1ca (patch)
treec31ab320a0a156e97a0442c30e8859071a11d178
parent284fb78ed7572117846f8e1d1d8e3dbfd16880c2 (diff)
rxrpc: Fix lockup due to no error backoff after ack transmit error
If the network becomes (partially) unavailable, say by disabling IPv6, the background ACK transmission routine can get itself into a tizzy by proposing immediate ACK retransmission. Since we're in the call event processor, that happens immediately without returning to the workqueue manager. The condition should clear after a while when either the network comes back or the call times out. Fix this by: (1) When re-proposing an ACK on failed Tx, don't schedule it immediately. This will allow a certain amount of time to elapse before we try again. (2) Enforce a return to the workqueue manager after a certain number of iterations of the call processing loop. (3) Add a backoff delay that increases the delay on deferred ACKs by a jiffy per failed transmission to a limit of HZ. The backoff delay is cleared on a successful return from kernel_sendmsg(). (4) Cancel calls immediately if the opening sendmsg fails. The layer above can arrange retransmission or rotate to another server. Fixes: 248f219cb8bc ("rxrpc: Rewrite the data and ack handling code") Signed-off-by: David Howells <dhowells@redhat.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/rxrpc/ar-internal.h1
-rw-r--r--net/rxrpc/call_event.c18
-rw-r--r--net/rxrpc/output.c35
3 files changed, 46 insertions, 8 deletions
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 382196e57a26..bc628acf4f4f 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -611,6 +611,7 @@ struct rxrpc_call {
611 * not hard-ACK'd packet follows this. 611 * not hard-ACK'd packet follows this.
612 */ 612 */
613 rxrpc_seq_t tx_top; /* Highest Tx slot allocated. */ 613 rxrpc_seq_t tx_top; /* Highest Tx slot allocated. */
614 u16 tx_backoff; /* Delay to insert due to Tx failure */
614 615
615 /* TCP-style slow-start congestion control [RFC5681]. Since the SMSS 616 /* TCP-style slow-start congestion control [RFC5681]. Since the SMSS
616 * is fixed, we keep these numbers in terms of segments (ie. DATA 617 * is fixed, we keep these numbers in terms of segments (ie. DATA
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index 8e7434e92097..468efc3660c0 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -123,6 +123,7 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
123 else 123 else
124 ack_at = expiry; 124 ack_at = expiry;
125 125
126 ack_at += READ_ONCE(call->tx_backoff);
126 ack_at += now; 127 ack_at += now;
127 if (time_before(ack_at, call->ack_at)) { 128 if (time_before(ack_at, call->ack_at)) {
128 WRITE_ONCE(call->ack_at, ack_at); 129 WRITE_ONCE(call->ack_at, ack_at);
@@ -311,6 +312,7 @@ void rxrpc_process_call(struct work_struct *work)
311 container_of(work, struct rxrpc_call, processor); 312 container_of(work, struct rxrpc_call, processor);
312 rxrpc_serial_t *send_ack; 313 rxrpc_serial_t *send_ack;
313 unsigned long now, next, t; 314 unsigned long now, next, t;
315 unsigned int iterations = 0;
314 316
315 rxrpc_see_call(call); 317 rxrpc_see_call(call);
316 318
@@ -319,6 +321,11 @@ void rxrpc_process_call(struct work_struct *work)
319 call->debug_id, rxrpc_call_states[call->state], call->events); 321 call->debug_id, rxrpc_call_states[call->state], call->events);
320 322
321recheck_state: 323recheck_state:
324 /* Limit the number of times we do this before returning to the manager */
325 iterations++;
326 if (iterations > 5)
327 goto requeue;
328
322 if (test_and_clear_bit(RXRPC_CALL_EV_ABORT, &call->events)) { 329 if (test_and_clear_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
323 rxrpc_send_abort_packet(call); 330 rxrpc_send_abort_packet(call);
324 goto recheck_state; 331 goto recheck_state;
@@ -447,13 +454,16 @@ recheck_state:
447 rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart); 454 rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart);
448 455
449 /* other events may have been raised since we started checking */ 456 /* other events may have been raised since we started checking */
450 if (call->events && call->state < RXRPC_CALL_COMPLETE) { 457 if (call->events && call->state < RXRPC_CALL_COMPLETE)
451 __rxrpc_queue_call(call); 458 goto requeue;
452 goto out;
453 }
454 459
455out_put: 460out_put:
456 rxrpc_put_call(call, rxrpc_call_put); 461 rxrpc_put_call(call, rxrpc_call_put);
457out: 462out:
458 _leave(""); 463 _leave("");
464 return;
465
466requeue:
467 __rxrpc_queue_call(call);
468 goto out;
459} 469}
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 189418888839..736aa9281100 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -35,6 +35,21 @@ struct rxrpc_abort_buffer {
35static const char rxrpc_keepalive_string[] = ""; 35static const char rxrpc_keepalive_string[] = "";
36 36
37/* 37/*
38 * Increase Tx backoff on transmission failure and clear it on success.
39 */
40static void rxrpc_tx_backoff(struct rxrpc_call *call, int ret)
41{
42 if (ret < 0) {
43 u16 tx_backoff = READ_ONCE(call->tx_backoff);
44
45 if (tx_backoff < HZ)
46 WRITE_ONCE(call->tx_backoff, tx_backoff + 1);
47 } else {
48 WRITE_ONCE(call->tx_backoff, 0);
49 }
50}
51
52/*
38 * Arrange for a keepalive ping a certain time after we last transmitted. This 53 * Arrange for a keepalive ping a certain time after we last transmitted. This
39 * lets the far side know we're still interested in this call and helps keep 54 * lets the far side know we're still interested in this call and helps keep
40 * the route through any intervening firewall open. 55 * the route through any intervening firewall open.
@@ -210,6 +225,7 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
210 else 225 else
211 trace_rxrpc_tx_packet(call->debug_id, &pkt->whdr, 226 trace_rxrpc_tx_packet(call->debug_id, &pkt->whdr,
212 rxrpc_tx_point_call_ack); 227 rxrpc_tx_point_call_ack);
228 rxrpc_tx_backoff(call, ret);
213 229
214 if (call->state < RXRPC_CALL_COMPLETE) { 230 if (call->state < RXRPC_CALL_COMPLETE) {
215 if (ret < 0) { 231 if (ret < 0) {
@@ -218,7 +234,7 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
218 rxrpc_propose_ACK(call, pkt->ack.reason, 234 rxrpc_propose_ACK(call, pkt->ack.reason,
219 ntohs(pkt->ack.maxSkew), 235 ntohs(pkt->ack.maxSkew),
220 ntohl(pkt->ack.serial), 236 ntohl(pkt->ack.serial),
221 true, true, 237 false, true,
222 rxrpc_propose_ack_retry_tx); 238 rxrpc_propose_ack_retry_tx);
223 } else { 239 } else {
224 spin_lock_bh(&call->lock); 240 spin_lock_bh(&call->lock);
@@ -300,7 +316,7 @@ int rxrpc_send_abort_packet(struct rxrpc_call *call)
300 else 316 else
301 trace_rxrpc_tx_packet(call->debug_id, &pkt.whdr, 317 trace_rxrpc_tx_packet(call->debug_id, &pkt.whdr,
302 rxrpc_tx_point_call_abort); 318 rxrpc_tx_point_call_abort);
303 319 rxrpc_tx_backoff(call, ret);
304 320
305 rxrpc_put_connection(conn); 321 rxrpc_put_connection(conn);
306 return ret; 322 return ret;
@@ -413,6 +429,7 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
413 else 429 else
414 trace_rxrpc_tx_packet(call->debug_id, &whdr, 430 trace_rxrpc_tx_packet(call->debug_id, &whdr,
415 rxrpc_tx_point_call_data_nofrag); 431 rxrpc_tx_point_call_data_nofrag);
432 rxrpc_tx_backoff(call, ret);
416 if (ret == -EMSGSIZE) 433 if (ret == -EMSGSIZE)
417 goto send_fragmentable; 434 goto send_fragmentable;
418 435
@@ -445,9 +462,18 @@ done:
445 rxrpc_reduce_call_timer(call, expect_rx_by, nowj, 462 rxrpc_reduce_call_timer(call, expect_rx_by, nowj,
446 rxrpc_timer_set_for_normal); 463 rxrpc_timer_set_for_normal);
447 } 464 }
448 }
449 465
450 rxrpc_set_keepalive(call); 466 rxrpc_set_keepalive(call);
467 } else {
468 /* Cancel the call if the initial transmission fails,
469 * particularly if that's due to network routing issues that
470 * aren't going away anytime soon. The layer above can arrange
471 * the retransmission.
472 */
473 if (!test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags))
474 rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
475 RX_USER_ABORT, ret);
476 }
451 477
452 _leave(" = %d [%u]", ret, call->peer->maxdata); 478 _leave(" = %d [%u]", ret, call->peer->maxdata);
453 return ret; 479 return ret;
@@ -506,6 +532,7 @@ send_fragmentable:
506 else 532 else
507 trace_rxrpc_tx_packet(call->debug_id, &whdr, 533 trace_rxrpc_tx_packet(call->debug_id, &whdr,
508 rxrpc_tx_point_call_data_frag); 534 rxrpc_tx_point_call_data_frag);
535 rxrpc_tx_backoff(call, ret);
509 536
510 up_write(&conn->params.local->defrag_sem); 537 up_write(&conn->params.local->defrag_sem);
511 goto done; 538 goto done;