aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2014-07-16 20:41:04 -0400
committerDavid S. Miller <davem@davemloft.net>2014-07-17 00:38:19 -0400
commit6f92ee54b316c116e125a6bb268abe308e4c14e6 (patch)
tree9eadd99bdd4bf1a5d2197e3bc2340decc0c683f3 /net/tipc
parent9fbfb8b120bd4fe89cd70d6c8841e6e1cfab2609 (diff)
tipc: ensure sequential message delivery across dual bearers
When we run broadcast packets over dual bearers/interfaces, the current transmission code is flipping bearers between each sent packet, with the purpose of leveraging the double bandwidth available. The receiving bclink is resequencing the packets if needed, so all messages are delivered upwards from the broadcast link in the correct order, even if they may arrive in concurrent interrupts. However, at the moment of delivery upwards to the socket, we release all spinlocks (bclink_lock, node_lock), so it is still possible that arriving messages bypass each other before they reach the socket queue. We fix this by applying the same technique we are using for unicast traffic. We use a link selector (i.e., the last bit of sending port number) to ensure that messages from the same sender socket always are sent over the same bearer. This guarantees sequential delivery between socket pairs, which is sufficient to satisfy the protocol spec, as well as all known user requirements. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c17
1 files changed, 5 insertions, 12 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index d890d480ae3b..dd13bfa09333 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -637,6 +637,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
637 struct tipc_media_addr *unused2) 637 struct tipc_media_addr *unused2)
638{ 638{
639 int bp_index; 639 int bp_index;
640 struct tipc_msg *msg = buf_msg(buf);
640 641
641 /* Prepare broadcast link message for reliable transmission, 642 /* Prepare broadcast link message for reliable transmission,
642 * if first time trying to send it; 643 * if first time trying to send it;
@@ -644,10 +645,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
644 * since they are sent in an unreliable manner and don't need it 645 * since they are sent in an unreliable manner and don't need it
645 */ 646 */
646 if (likely(!msg_non_seq(buf_msg(buf)))) { 647 if (likely(!msg_non_seq(buf_msg(buf)))) {
647 struct tipc_msg *msg;
648
649 bcbuf_set_acks(buf, bclink->bcast_nodes.count); 648 bcbuf_set_acks(buf, bclink->bcast_nodes.count);
650 msg = buf_msg(buf);
651 msg_set_non_seq(msg, 1); 649 msg_set_non_seq(msg, 1);
652 msg_set_mc_netid(msg, tipc_net_id); 650 msg_set_mc_netid(msg, tipc_net_id);
653 bcl->stats.sent_info++; 651 bcl->stats.sent_info++;
@@ -664,12 +662,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
664 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { 662 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
665 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; 663 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
666 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; 664 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
667 struct tipc_bearer *b = p; 665 struct tipc_bearer *bp[2] = {p, s};
666 struct tipc_bearer *b = bp[msg_link_selector(msg)];
668 struct sk_buff *tbuf; 667 struct sk_buff *tbuf;
669 668
670 if (!p) 669 if (!p)
671 break; /* No more bearers to try */ 670 break; /* No more bearers to try */
672 671 if (!b)
672 b = p;
673 tipc_nmap_diff(&bcbearer->remains, &b->nodes, 673 tipc_nmap_diff(&bcbearer->remains, &b->nodes,
674 &bcbearer->remains_new); 674 &bcbearer->remains_new);
675 if (bcbearer->remains_new.count == bcbearer->remains.count) 675 if (bcbearer->remains_new.count == bcbearer->remains.count)
@@ -686,13 +686,6 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
686 tipc_bearer_send(b->identity, tbuf, &b->bcast_addr); 686 tipc_bearer_send(b->identity, tbuf, &b->bcast_addr);
687 kfree_skb(tbuf); /* Bearer keeps a clone */ 687 kfree_skb(tbuf); /* Bearer keeps a clone */
688 } 688 }
689
690 /* Swap bearers for next packet */
691 if (s) {
692 bcbearer->bpairs[bp_index].primary = s;
693 bcbearer->bpairs[bp_index].secondary = p;
694 }
695
696 if (bcbearer->remains_new.count == 0) 689 if (bcbearer->remains_new.count == 0)
697 break; /* All targets reached */ 690 break; /* All targets reached */
698 691