aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2012-11-16 00:51:31 -0500
committerPaul Gortmaker <paul.gortmaker@windriver.com>2012-11-22 14:33:21 -0500
commitc64f7a6a1fb13565687ae5415736095f82557880 (patch)
treebc83cfacea0766a5152db4ca0700658192b16cf6 /net/tipc
parent389dd9bcf65e10929cedfeb79c49bd02069b8899 (diff)
tipc: introduce message to synchronize broadcast link
Upon establishing a first link between two nodes, there is currently a risk that the two endpoints will disagree on exactly which sequence number reception and acknowleding of broadcast packets should start. The following scenarios may happen: 1: Node A sends an ACTIVATE message to B, telling it to start acking packets from sequence number N. 2: Node A sends out broadcast N, but does not expect an acknowledge from B, since B is not yet in its broadcast receiver's list. 3: Node A receives ACK for N from all nodes except B, and releases packet N. 4: Node B receives the ACTIVATE, activates its link endpoint, and stores the value N as sequence number of first expected packet. 5: Node B sends a NAME_DISTR message to A. 6: Node A receives the NAME_DISTR message, and activates its endpoint. At this moment B is added to A's broadcast receiver's set. Node A also sets sequence number 0 as the first broadcast packet to be received from B. 7: Node A sends broadcast N+1. 8: B receives N+1, determines there is a gap in the sequence, since it is expecting N, and sends a NACK for N back to A. 9: Node A has already released N, so no retransmission is possible. The broadcast link in direction A->B is stale. In addition to, or instead of, 7-9 above, the following may happen: 10: Node B sends broadcast M > 0 to A. 11: Node A receives M, falsely decides there must be a gap, since it is expecting packet 0, and asks for retransmission of packets [0,M-1]. 12: Node B has already released these packets, so the broadcast link is stale in direction B->A. We solve this problem by introducing a new unicast message type, BCAST_PROTOCOL/STATE, to convey the sequence number of the next sent broadcast packet to the other endpoint, at exactly the moment that endpoint is added to the own node's broadcast receivers list, and before any other unicast messages are permitted to be sent. Furthermore, we don't allow any node to start receiving and processing broadcast packets until this new synchronization message has been received. To maintain backwards compatibility, we still open up for broadcast reception if we receive a NAME_DISTR message without any preceding broadcast sync message. In this case, we must assume that the other end has an older code version, and will never send out the new synchronization message. Hence, for mixed old and new nodes, the issue arising in 7-12 of the above may happen with the same probability as before. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/link.c61
-rw-r--r--net/tipc/node.c5
2 files changed, 60 insertions, 6 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 20f128fc2be1..87bf5aad704b 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/link.c: TIPC link code 2 * net/tipc/link.c: TIPC link code
3 * 3 *
4 * Copyright (c) 1996-2007, Ericsson AB 4 * Copyright (c) 1996-2007, 2012, Ericsson AB
5 * Copyright (c) 2004-2007, 2010-2011, Wind River Systems 5 * Copyright (c) 2004-2007, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -103,6 +103,8 @@ static void link_reset_statistics(struct tipc_link *l_ptr);
103static void link_print(struct tipc_link *l_ptr, const char *str); 103static void link_print(struct tipc_link *l_ptr, const char *str);
104static void link_start(struct tipc_link *l_ptr); 104static void link_start(struct tipc_link *l_ptr);
105static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf); 105static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
106static void tipc_link_send_sync(struct tipc_link *l);
107static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
106 108
107/* 109/*
108 * Simple link routines 110 * Simple link routines
@@ -712,6 +714,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
712 link_activate(l_ptr); 714 link_activate(l_ptr);
713 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 715 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
714 l_ptr->fsm_msg_cnt++; 716 l_ptr->fsm_msg_cnt++;
717 if (l_ptr->owner->working_links == 1)
718 tipc_link_send_sync(l_ptr);
715 link_set_timer(l_ptr, cont_intv); 719 link_set_timer(l_ptr, cont_intv);
716 break; 720 break;
717 case RESET_MSG: 721 case RESET_MSG:
@@ -745,6 +749,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
745 link_activate(l_ptr); 749 link_activate(l_ptr);
746 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 750 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
747 l_ptr->fsm_msg_cnt++; 751 l_ptr->fsm_msg_cnt++;
752 if (l_ptr->owner->working_links == 1)
753 tipc_link_send_sync(l_ptr);
748 link_set_timer(l_ptr, cont_intv); 754 link_set_timer(l_ptr, cont_intv);
749 break; 755 break;
750 case RESET_MSG: 756 case RESET_MSG:
@@ -941,7 +947,48 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
941 return res; 947 return res;
942} 948}
943 949
944/** 950/*
951 * tipc_link_send_sync - synchronize broadcast link endpoints.
952 *
953 * Give a newly added peer node the sequence number where it should
954 * start receiving and acking broadcast packets.
955 *
956 * Called with node locked
957 */
958static void tipc_link_send_sync(struct tipc_link *l)
959{
960 struct sk_buff *buf;
961 struct tipc_msg *msg;
962
963 buf = tipc_buf_acquire(INT_H_SIZE);
964 if (!buf)
965 return;
966
967 msg = buf_msg(buf);
968 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
969 msg_set_last_bcast(msg, l->owner->bclink.acked);
970 link_add_chain_to_outqueue(l, buf, 0);
971 tipc_link_push_queue(l);
972}
973
974/*
975 * tipc_link_recv_sync - synchronize broadcast link endpoints.
976 * Receive the sequence number where we should start receiving and
977 * acking broadcast packets from a newly added peer node, and open
978 * up for reception of such packets.
979 *
980 * Called with node locked
981 */
982static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
983{
984 struct tipc_msg *msg = buf_msg(buf);
985
986 n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
987 n->bclink.recv_permitted = true;
988 kfree_skb(buf);
989}
990
991/*
945 * tipc_link_send_names - send name table entries to new neighbor 992 * tipc_link_send_names - send name table entries to new neighbor
946 * 993 *
947 * Send routine for bulk delivery of name table messages when contact 994 * Send routine for bulk delivery of name table messages when contact
@@ -1691,9 +1738,14 @@ deliver:
1691 tipc_link_recv_bundle(buf); 1738 tipc_link_recv_bundle(buf);
1692 continue; 1739 continue;
1693 case NAME_DISTRIBUTOR: 1740 case NAME_DISTRIBUTOR:
1741 n_ptr->bclink.recv_permitted = true;
1694 tipc_node_unlock(n_ptr); 1742 tipc_node_unlock(n_ptr);
1695 tipc_named_recv(buf); 1743 tipc_named_recv(buf);
1696 continue; 1744 continue;
1745 case BCAST_PROTOCOL:
1746 tipc_link_recv_sync(n_ptr, buf);
1747 tipc_node_unlock(n_ptr);
1748 continue;
1697 case CONN_MANAGER: 1749 case CONN_MANAGER:
1698 tipc_node_unlock(n_ptr); 1750 tipc_node_unlock(n_ptr);
1699 tipc_port_recv_proto_msg(buf); 1751 tipc_port_recv_proto_msg(buf);
@@ -1736,16 +1788,19 @@ deliver:
1736 continue; 1788 continue;
1737 } 1789 }
1738 1790
1791 /* Link is not in state WORKING_WORKING */
1739 if (msg_user(msg) == LINK_PROTOCOL) { 1792 if (msg_user(msg) == LINK_PROTOCOL) {
1740 link_recv_proto_msg(l_ptr, buf); 1793 link_recv_proto_msg(l_ptr, buf);
1741 head = link_insert_deferred_queue(l_ptr, head); 1794 head = link_insert_deferred_queue(l_ptr, head);
1742 tipc_node_unlock(n_ptr); 1795 tipc_node_unlock(n_ptr);
1743 continue; 1796 continue;
1744 } 1797 }
1798
1799 /* Traffic message. Conditionally activate link */
1745 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 1800 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1746 1801
1747 if (link_working_working(l_ptr)) { 1802 if (link_working_working(l_ptr)) {
1748 /* Re-insert in front of queue */ 1803 /* Re-insert buffer in front of queue */
1749 buf->next = head; 1804 buf->next = head;
1750 head = buf; 1805 head = buf;
1751 tipc_node_unlock(n_ptr); 1806 tipc_node_unlock(n_ptr);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 283a59f0f1c8..48f39dd3eae8 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/node.c: TIPC node management routines 2 * net/tipc/node.c: TIPC node management routines
3 * 3 *
4 * Copyright (c) 2000-2006, Ericsson AB 4 * Copyright (c) 2000-2006, 2012 Ericsson AB
5 * Copyright (c) 2005-2006, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2006, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -263,10 +263,9 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
263static void node_established_contact(struct tipc_node *n_ptr) 263static void node_established_contact(struct tipc_node *n_ptr)
264{ 264{
265 tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr); 265 tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr);
266 266 n_ptr->bclink.oos_state = 0;
267 n_ptr->bclink.acked = tipc_bclink_get_last_sent(); 267 n_ptr->bclink.acked = tipc_bclink_get_last_sent();
268 tipc_bclink_add_node(n_ptr->addr); 268 tipc_bclink_add_node(n_ptr->addr);
269 n_ptr->bclink.recv_permitted = true;
270} 269}
271 270
272static void node_name_purge_complete(unsigned long node_addr) 271static void node_name_purge_complete(unsigned long node_addr)