aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAllan Stephens <allan.stephens@windriver.com>2011-10-27 16:43:09 -0400
committerPaul Gortmaker <paul.gortmaker@windriver.com>2012-02-06 16:59:19 -0500
commit63e7f1ac2855ba56f15d8189694ca9bd16ae4107 (patch)
tree8e8764a8cdf8cabb15d4975187d1d91c587fbba2
parentb76b27cad5ade1d483d4b94df6b35976bccf1055 (diff)
tipc: Prevent loss of fragmented messages over broadcast link
Modifies broadcast link so that an incoming fragmented message is not lost if reassembly cannot begin because there currently is no buffer big enough to hold the entire reassembled message. The broadcast link now ignores the first fragment completely, which causes the sending node to retransmit the first fragment so that reassembly can be re-attempted. Previously, the sender would have had no reason to retransmit the 1st fragment, so we would never have a chance to re-try the allocation. To do this cleanly without duplicaton, a new bclink_accept_pkt() function is introduced. Signed-off-by: Allan Stephens <allan.stephens@windriver.com> Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
-rw-r--r--net/tipc/bcast.c64
1 files changed, 42 insertions, 22 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index a9b7132d34f2..41ecf313073c 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -389,7 +389,33 @@ exit:
389 return res; 389 return res;
390} 390}
391 391
392/** 392/*
393 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
394 *
395 * Called with both sending node's lock and bc_lock taken.
396 */
397
398static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
399{
400 bclink_update_last_sent(node, seqno);
401 node->bclink.last_in = seqno;
402 node->bclink.oos_state = 0;
403 bcl->stats.recv_info++;
404
405 /*
406 * Unicast an ACK periodically, ensuring that
407 * all nodes in the cluster don't ACK at the same time
408 */
409
410 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
411 tipc_link_send_proto_msg(
412 node->active_links[node->addr & 1],
413 STATE_MSG, 0, 0, 0, 0, 0);
414 bcl->stats.sent_acks++;
415 }
416}
417
418/*
393 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards 419 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards
394 * 420 *
395 * tipc_net_lock is read_locked, no other locks set 421 * tipc_net_lock is read_locked, no other locks set
@@ -443,29 +469,12 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
443 next_in = mod(node->bclink.last_in + 1); 469 next_in = mod(node->bclink.last_in + 1);
444 470
445 if (likely(seqno == next_in)) { 471 if (likely(seqno == next_in)) {
446 bclink_update_last_sent(node, seqno);
447receive: 472receive:
448 node->bclink.last_in = seqno;
449 node->bclink.oos_state = 0;
450
451 spin_lock_bh(&bc_lock);
452 bcl->stats.recv_info++;
453
454 /*
455 * Unicast an ACK periodically, ensuring that
456 * all nodes in the cluster don't ACK at the same time
457 */
458
459 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
460 tipc_link_send_proto_msg(
461 node->active_links[node->addr & 1],
462 STATE_MSG, 0, 0, 0, 0, 0);
463 bcl->stats.sent_acks++;
464 }
465
466 /* Deliver message to destination */ 473 /* Deliver message to destination */
467 474
468 if (likely(msg_isdata(msg))) { 475 if (likely(msg_isdata(msg))) {
476 spin_lock_bh(&bc_lock);
477 bclink_accept_pkt(node, seqno);
469 spin_unlock_bh(&bc_lock); 478 spin_unlock_bh(&bc_lock);
470 tipc_node_unlock(node); 479 tipc_node_unlock(node);
471 if (likely(msg_mcast(msg))) 480 if (likely(msg_mcast(msg)))
@@ -473,24 +482,35 @@ receive:
473 else 482 else
474 buf_discard(buf); 483 buf_discard(buf);
475 } else if (msg_user(msg) == MSG_BUNDLER) { 484 } else if (msg_user(msg) == MSG_BUNDLER) {
485 spin_lock_bh(&bc_lock);
486 bclink_accept_pkt(node, seqno);
476 bcl->stats.recv_bundles++; 487 bcl->stats.recv_bundles++;
477 bcl->stats.recv_bundled += msg_msgcnt(msg); 488 bcl->stats.recv_bundled += msg_msgcnt(msg);
478 spin_unlock_bh(&bc_lock); 489 spin_unlock_bh(&bc_lock);
479 tipc_node_unlock(node); 490 tipc_node_unlock(node);
480 tipc_link_recv_bundle(buf); 491 tipc_link_recv_bundle(buf);
481 } else if (msg_user(msg) == MSG_FRAGMENTER) { 492 } else if (msg_user(msg) == MSG_FRAGMENTER) {
493 int ret = tipc_link_recv_fragment(&node->bclink.defragm,
494 &buf, &msg);
495 if (ret < 0)
496 goto unlock;
497 spin_lock_bh(&bc_lock);
498 bclink_accept_pkt(node, seqno);
482 bcl->stats.recv_fragments++; 499 bcl->stats.recv_fragments++;
483 if (tipc_link_recv_fragment(&node->bclink.defragm, 500 if (ret > 0)
484 &buf, &msg))
485 bcl->stats.recv_fragmented++; 501 bcl->stats.recv_fragmented++;
486 spin_unlock_bh(&bc_lock); 502 spin_unlock_bh(&bc_lock);
487 tipc_node_unlock(node); 503 tipc_node_unlock(node);
488 tipc_net_route_msg(buf); 504 tipc_net_route_msg(buf);
489 } else if (msg_user(msg) == NAME_DISTRIBUTOR) { 505 } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
506 spin_lock_bh(&bc_lock);
507 bclink_accept_pkt(node, seqno);
490 spin_unlock_bh(&bc_lock); 508 spin_unlock_bh(&bc_lock);
491 tipc_node_unlock(node); 509 tipc_node_unlock(node);
492 tipc_named_recv(buf); 510 tipc_named_recv(buf);
493 } else { 511 } else {
512 spin_lock_bh(&bc_lock);
513 bclink_accept_pkt(node, seqno);
494 spin_unlock_bh(&bc_lock); 514 spin_unlock_bh(&bc_lock);
495 tipc_node_unlock(node); 515 tipc_node_unlock(node);
496 buf_discard(buf); 516 buf_discard(buf);