aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-03-25 12:07:26 -0400
committerDavid S. Miller <davem@davemloft.net>2015-03-25 14:05:56 -0400
commit8b4ed8634f8b3f9aacfc42b4a872d30c36b9e255 (patch)
treea53fedc6a39f617600e4aebf4957d67195d7a37a /net/tipc
parent3127a0200d4a46cf279bb388cc0f71827cd60699 (diff)
tipc: eliminate race condition at dual link establishment
Despite recent improvements, the establishment of dual parallel links still has a small glitch where messages can bypass each other. When the second link in a dual-link configuration is established, part of the first link's traffic will be steered over to the new link. Although we do have a mechanism to ensure that packets sent before and after the establishment of the new link arrive in sequence to the destination node, this is not enough. The arriving messages will still be delivered upwards in different threads, something entailing a risk of message disordering during the transition phase. To fix this, we introduce a synchronization mechanism between the two parallel links, so that traffic arriving on the new link cannot be added to its input queue until we are guaranteed that all pre-establishment messages have been delivered on the old, parallel link. This problem seems to always have been around, but its occurrence is so rare that it has not been noticed until recent intensive testing. Reviewed-by: Ying Xue <ying.xue@windriver.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/link.c45
-rw-r--r--net/tipc/link.h2
-rw-r--r--net/tipc/msg.h8
3 files changed, 55 insertions, 0 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 58e2460682da..1287161e9424 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -139,6 +139,13 @@ static void tipc_link_put(struct tipc_link *l_ptr)
139 kref_put(&l_ptr->ref, tipc_link_release); 139 kref_put(&l_ptr->ref, tipc_link_release);
140} 140}
141 141
142static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
143{
144 if (l->owner->active_links[0] != l)
145 return l->owner->active_links[0];
146 return l->owner->active_links[1];
147}
148
142static void link_init_max_pkt(struct tipc_link *l_ptr) 149static void link_init_max_pkt(struct tipc_link *l_ptr)
143{ 150{
144 struct tipc_node *node = l_ptr->owner; 151 struct tipc_node *node = l_ptr->owner;
@@ -1026,6 +1033,32 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
1026 } 1033 }
1027} 1034}
1028 1035
1036/* link_synch(): check if all packets arrived before the synch
1037 * point have been consumed
1038 * Returns true if the parallel links are synched, otherwise false
1039 */
1040static bool link_synch(struct tipc_link *l)
1041{
1042 unsigned int post_synch;
1043 struct tipc_link *pl;
1044
1045 pl = tipc_parallel_link(l);
1046 if (pl == l)
1047 goto synched;
1048
1049 /* Was last pre-synch packet added to input queue ? */
1050 if (less_eq(pl->next_in_no, l->synch_point))
1051 return false;
1052
1053 /* Is it still in the input queue ? */
1054 post_synch = mod(pl->next_in_no - l->synch_point) - 1;
1055 if (skb_queue_len(&pl->inputq) > post_synch)
1056 return false;
1057synched:
1058 l->flags &= ~LINK_SYNCHING;
1059 return true;
1060}
1061
1029static void link_retrieve_defq(struct tipc_link *link, 1062static void link_retrieve_defq(struct tipc_link *link,
1030 struct sk_buff_head *list) 1063 struct sk_buff_head *list)
1031{ 1064{
@@ -1156,6 +1189,14 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1156 skb = NULL; 1189 skb = NULL;
1157 goto unlock; 1190 goto unlock;
1158 } 1191 }
1192 /* Synchronize with parallel link if applicable */
1193 if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) {
1194 link_handle_out_of_seq_msg(l_ptr, skb);
1195 if (link_synch(l_ptr))
1196 link_retrieve_defq(l_ptr, &head);
1197 skb = NULL;
1198 goto unlock;
1199 }
1159 l_ptr->next_in_no++; 1200 l_ptr->next_in_no++;
1160 if (unlikely(!skb_queue_empty(&l_ptr->deferdq))) 1201 if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
1161 link_retrieve_defq(l_ptr, &head); 1202 link_retrieve_defq(l_ptr, &head);
@@ -1231,6 +1272,10 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1231 1272
1232 switch (msg_user(msg)) { 1273 switch (msg_user(msg)) {
1233 case CHANGEOVER_PROTOCOL: 1274 case CHANGEOVER_PROTOCOL:
1275 if (msg_dup(msg)) {
1276 link->flags |= LINK_SYNCHING;
1277 link->synch_point = msg_seqno(msg_get_wrapped(msg));
1278 }
1234 if (!tipc_link_tunnel_rcv(node, &skb)) 1279 if (!tipc_link_tunnel_rcv(node, &skb))
1235 break; 1280 break;
1236 if (msg_user(buf_msg(skb)) != MSG_BUNDLER) { 1281 if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 99543a46095a..d2b5663643da 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -60,6 +60,7 @@
60 */ 60 */
61#define LINK_STARTED 0x0001 61#define LINK_STARTED 0x0001
62#define LINK_STOPPED 0x0002 62#define LINK_STOPPED 0x0002
63#define LINK_SYNCHING 0x0004
63 64
64/* Starting value for maximum packet size negotiation on unicast links 65/* Starting value for maximum packet size negotiation on unicast links
65 * (unless bearer MTU is less) 66 * (unless bearer MTU is less)
@@ -170,6 +171,7 @@ struct tipc_link {
170 /* Changeover */ 171 /* Changeover */
171 u32 exp_msg_count; 172 u32 exp_msg_count;
172 u32 reset_checkpoint; 173 u32 reset_checkpoint;
174 u32 synch_point;
173 175
174 /* Max packet negotiation */ 176 /* Max packet negotiation */
175 u32 max_pkt; 177 u32 max_pkt;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 6445db09c0c4..d273207ede28 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -554,6 +554,14 @@ static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n)
554 msg_set_bits(m, 1, 15, 0x1fff, n); 554 msg_set_bits(m, 1, 15, 0x1fff, n);
555} 555}
556 556
557static inline bool msg_dup(struct tipc_msg *m)
558{
559 if (likely(msg_user(m) != CHANGEOVER_PROTOCOL))
560 return false;
561 if (msg_type(m) != DUPLICATE_MSG)
562 return false;
563 return true;
564}
557 565
558/* 566/*
559 * Word 2 567 * Word 2