aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c64
1 files changed, 42 insertions, 22 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index a9b7132d34f2..41ecf313073c 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -389,7 +389,33 @@ exit:
389 return res; 389 return res;
390} 390}
391 391
392/** 392/*
393 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
394 *
395 * Called with both sending node's lock and bc_lock taken.
396 */
397
398static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
399{
400 bclink_update_last_sent(node, seqno);
401 node->bclink.last_in = seqno;
402 node->bclink.oos_state = 0;
403 bcl->stats.recv_info++;
404
405 /*
406 * Unicast an ACK periodically, ensuring that
407 * all nodes in the cluster don't ACK at the same time
408 */
409
410 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
411 tipc_link_send_proto_msg(
412 node->active_links[node->addr & 1],
413 STATE_MSG, 0, 0, 0, 0, 0);
414 bcl->stats.sent_acks++;
415 }
416}
417
418/*
393 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards 419 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards
394 * 420 *
395 * tipc_net_lock is read_locked, no other locks set 421 * tipc_net_lock is read_locked, no other locks set
@@ -443,29 +469,12 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
443 next_in = mod(node->bclink.last_in + 1); 469 next_in = mod(node->bclink.last_in + 1);
444 470
445 if (likely(seqno == next_in)) { 471 if (likely(seqno == next_in)) {
446 bclink_update_last_sent(node, seqno);
447receive: 472receive:
448 node->bclink.last_in = seqno;
449 node->bclink.oos_state = 0;
450
451 spin_lock_bh(&bc_lock);
452 bcl->stats.recv_info++;
453
454 /*
455 * Unicast an ACK periodically, ensuring that
456 * all nodes in the cluster don't ACK at the same time
457 */
458
459 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
460 tipc_link_send_proto_msg(
461 node->active_links[node->addr & 1],
462 STATE_MSG, 0, 0, 0, 0, 0);
463 bcl->stats.sent_acks++;
464 }
465
466 /* Deliver message to destination */ 473 /* Deliver message to destination */
467 474
468 if (likely(msg_isdata(msg))) { 475 if (likely(msg_isdata(msg))) {
476 spin_lock_bh(&bc_lock);
477 bclink_accept_pkt(node, seqno);
469 spin_unlock_bh(&bc_lock); 478 spin_unlock_bh(&bc_lock);
470 tipc_node_unlock(node); 479 tipc_node_unlock(node);
471 if (likely(msg_mcast(msg))) 480 if (likely(msg_mcast(msg)))
@@ -473,24 +482,35 @@ receive:
473 else 482 else
474 buf_discard(buf); 483 buf_discard(buf);
475 } else if (msg_user(msg) == MSG_BUNDLER) { 484 } else if (msg_user(msg) == MSG_BUNDLER) {
485 spin_lock_bh(&bc_lock);
486 bclink_accept_pkt(node, seqno);
476 bcl->stats.recv_bundles++; 487 bcl->stats.recv_bundles++;
477 bcl->stats.recv_bundled += msg_msgcnt(msg); 488 bcl->stats.recv_bundled += msg_msgcnt(msg);
478 spin_unlock_bh(&bc_lock); 489 spin_unlock_bh(&bc_lock);
479 tipc_node_unlock(node); 490 tipc_node_unlock(node);
480 tipc_link_recv_bundle(buf); 491 tipc_link_recv_bundle(buf);
481 } else if (msg_user(msg) == MSG_FRAGMENTER) { 492 } else if (msg_user(msg) == MSG_FRAGMENTER) {
493 int ret = tipc_link_recv_fragment(&node->bclink.defragm,
494 &buf, &msg);
495 if (ret < 0)
496 goto unlock;
497 spin_lock_bh(&bc_lock);
498 bclink_accept_pkt(node, seqno);
482 bcl->stats.recv_fragments++; 499 bcl->stats.recv_fragments++;
483 if (tipc_link_recv_fragment(&node->bclink.defragm, 500 if (ret > 0)
484 &buf, &msg))
485 bcl->stats.recv_fragmented++; 501 bcl->stats.recv_fragmented++;
486 spin_unlock_bh(&bc_lock); 502 spin_unlock_bh(&bc_lock);
487 tipc_node_unlock(node); 503 tipc_node_unlock(node);
488 tipc_net_route_msg(buf); 504 tipc_net_route_msg(buf);
489 } else if (msg_user(msg) == NAME_DISTRIBUTOR) { 505 } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
506 spin_lock_bh(&bc_lock);
507 bclink_accept_pkt(node, seqno);
490 spin_unlock_bh(&bc_lock); 508 spin_unlock_bh(&bc_lock);
491 tipc_node_unlock(node); 509 tipc_node_unlock(node);
492 tipc_named_recv(buf); 510 tipc_named_recv(buf);
493 } else { 511 } else {
512 spin_lock_bh(&bc_lock);
513 bclink_accept_pkt(node, seqno);
494 spin_unlock_bh(&bc_lock); 514 spin_unlock_bh(&bc_lock);
495 tipc_node_unlock(node); 515 tipc_node_unlock(node);
496 buf_discard(buf); 516 buf_discard(buf);