aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/node.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-07-30 18:24:19 -0400
committerDavid S. Miller <davem@davemloft.net>2015-07-30 20:25:14 -0400
commit6e498158a827fd515b514842e9a06bdf0f75ab86 (patch)
tree0f9312078445c1bd7d2ed669c564ca6c7a330764 /net/tipc/node.c
parent66996b6c47ed7f6bbb01a768e23fae262c7db8e0 (diff)
tipc: move link synch and failover to link aggregation level
Link failover and synchronization have until now been handled by the links themselves, forcing them to have knowledge about and to access parallel links in order to make the two algorithms work correctly. In this commit, we move the control part of this functionality to the link aggregation level in node.c, which is the right location for this. As a result, the two algorithms become easier to follow, and the link implementation becomes simpler. Tested-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/node.c')
-rw-r--r--net/tipc/node.c291
1 files changed, 190 insertions, 101 deletions
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6b18d73830ca..b0372bb107f6 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -42,6 +42,31 @@
42#include "bcast.h" 42#include "bcast.h"
43#include "discover.h" 43#include "discover.h"
44 44
45/* Node FSM states and events:
46 */
47enum {
48 SELF_DOWN_PEER_DOWN = 0xdd,
49 SELF_UP_PEER_UP = 0xaa,
50 SELF_DOWN_PEER_LEAVING = 0xd1,
51 SELF_UP_PEER_COMING = 0xac,
52 SELF_COMING_PEER_UP = 0xca,
53 SELF_LEAVING_PEER_DOWN = 0x1d,
54 NODE_FAILINGOVER = 0xf0,
55 NODE_SYNCHING = 0xcc
56};
57
58enum {
59 SELF_ESTABL_CONTACT_EVT = 0xece,
60 SELF_LOST_CONTACT_EVT = 0x1ce,
61 PEER_ESTABL_CONTACT_EVT = 0x9ece,
62 PEER_LOST_CONTACT_EVT = 0x91ce,
63 NODE_FAILOVER_BEGIN_EVT = 0xfbe,
64 NODE_FAILOVER_END_EVT = 0xfee,
65 NODE_SYNCH_BEGIN_EVT = 0xcbe,
66 NODE_SYNCH_END_EVT = 0xcee
67};
68
69static void tipc_node_link_down(struct tipc_node *n, int bearer_id);
45static void node_lost_contact(struct tipc_node *n_ptr); 70static void node_lost_contact(struct tipc_node *n_ptr);
46static void node_established_contact(struct tipc_node *n_ptr); 71static void node_established_contact(struct tipc_node *n_ptr);
47static void tipc_node_delete(struct tipc_node *node); 72static void tipc_node_delete(struct tipc_node *node);
@@ -281,69 +306,75 @@ static void tipc_node_timeout(unsigned long data)
281 * 306 *
282 * Link becomes active (alone or shared) or standby, depending on its priority. 307 * Link becomes active (alone or shared) or standby, depending on its priority.
283 */ 308 */
284void tipc_node_link_up(struct tipc_node *n, int bearer_id) 309static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
310 struct sk_buff_head *xmitq)
285{ 311{
286 int *slot0 = &n->active_links[0]; 312 int *slot0 = &n->active_links[0];
287 int *slot1 = &n->active_links[1]; 313 int *slot1 = &n->active_links[1];
288 struct tipc_link_entry *links = n->links; 314 struct tipc_link *ol = node_active_link(n, 0);
289 struct tipc_link *l = n->links[bearer_id].link; 315 struct tipc_link *nl = n->links[bearer_id].link;
290
291 /* Leave room for tunnel header when returning 'mtu' to users: */
292 links[bearer_id].mtu = l->mtu - INT_H_SIZE;
293 316
317 if (n->working_links > 1) {
318 pr_warn("Attempt to establish 3rd link to %x\n", n->addr);
319 return;
320 }
294 n->working_links++; 321 n->working_links++;
295 n->action_flags |= TIPC_NOTIFY_LINK_UP; 322 n->action_flags |= TIPC_NOTIFY_LINK_UP;
296 n->link_id = l->peer_bearer_id << 16 | l->bearer_id; 323 n->link_id = nl->peer_bearer_id << 16 | bearer_id;
324
325 /* Leave room for tunnel header when returning 'mtu' to users: */
326 n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE;
297 327
298 tipc_bearer_add_dest(n->net, bearer_id, n->addr); 328 tipc_bearer_add_dest(n->net, bearer_id, n->addr);
299 329
300 pr_debug("Established link <%s> on network plane %c\n", 330 pr_debug("Established link <%s> on network plane %c\n",
301 l->name, l->net_plane); 331 nl->name, nl->net_plane);
302 332
303 /* No active links ? => take both active slots */ 333 /* First link? => give it both slots */
304 if (!tipc_node_is_up(n)) { 334 if (!ol) {
305 *slot0 = bearer_id; 335 *slot0 = bearer_id;
306 *slot1 = bearer_id; 336 *slot1 = bearer_id;
337 nl->exec_mode = TIPC_LINK_OPEN;
307 node_established_contact(n); 338 node_established_contact(n);
308 return; 339 return;
309 } 340 }
310 341
311 /* Lower prio than current active ? => no slot */ 342 /* Second link => redistribute slots */
312 if (l->priority < links[*slot0].link->priority) { 343 if (nl->priority > ol->priority) {
313 pr_debug("New link <%s> becomes standby\n", l->name); 344 pr_debug("Old link <%s> becomes standby\n", ol->name);
314 return;
315 }
316 tipc_link_dup_queue_xmit(links[*slot0].link, l);
317
318 /* Same prio as current active ? => take one slot */
319 if (l->priority == links[*slot0].link->priority) {
320 *slot0 = bearer_id; 345 *slot0 = bearer_id;
321 return; 346 *slot1 = bearer_id;
347 } else if (nl->priority == ol->priority) {
348 *slot0 = bearer_id;
349 } else {
350 pr_debug("New link <%s> is standby\n", nl->name);
322 } 351 }
323 352
324 /* Higher prio than current active => take both active slots */ 353 /* Prepare synchronization with first link */
325 pr_debug("Old link <%s> now standby\n", links[*slot0].link->name); 354 tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq);
326 *slot0 = bearer_id;
327 *slot1 = bearer_id;
328} 355}
329 356
330/** 357/**
331 * tipc_node_link_down - handle loss of link 358 * tipc_node_link_down - handle loss of link
332 */ 359 */
333void tipc_node_link_down(struct tipc_node *n, int bearer_id) 360static void tipc_node_link_down(struct tipc_node *n, int bearer_id)
334{ 361{
335 int *slot0 = &n->active_links[0]; 362 int *slot0 = &n->active_links[0];
336 int *slot1 = &n->active_links[1]; 363 int *slot1 = &n->active_links[1];
364 struct tipc_media_addr *maddr = &n->links[bearer_id].maddr;
337 int i, highest = 0; 365 int i, highest = 0;
338 struct tipc_link *l, *_l; 366 struct tipc_link *l, *_l, *tnl;
367 struct sk_buff_head xmitq;
339 368
340 l = n->links[bearer_id].link; 369 l = n->links[bearer_id].link;
341 if (!l || !tipc_link_is_up(l)) 370 if (!l || !tipc_link_is_up(l))
342 return; 371 return;
343 372
373 __skb_queue_head_init(&xmitq);
374
344 n->working_links--; 375 n->working_links--;
345 n->action_flags |= TIPC_NOTIFY_LINK_DOWN; 376 n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
346 n->link_id = l->peer_bearer_id << 16 | l->bearer_id; 377 n->link_id = l->peer_bearer_id << 16 | bearer_id;
347 378
348 tipc_bearer_remove_dest(n->net, l->bearer_id, n->addr); 379 tipc_bearer_remove_dest(n->net, l->bearer_id, n->addr);
349 380
@@ -370,13 +401,19 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id)
370 *slot1 = i; 401 *slot1 = i;
371 } 402 }
372 403
373 if (tipc_node_is_up(n)) 404 if (!tipc_node_is_up(n)) {
374 tipc_link_failover_send_queue(l); 405 tipc_link_reset(l);
406 node_lost_contact(n);
407 return;
408 }
375 409
410 /* There is still a working link => initiate failover */
411 tnl = node_active_link(n, 0);
412 tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
413 n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1);
414 tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, &xmitq);
376 tipc_link_reset(l); 415 tipc_link_reset(l);
377 416 tipc_bearer_xmit(n->net, tnl->bearer_id, &xmitq, maddr);
378 if (!tipc_node_is_up(n))
379 node_lost_contact(n);
380} 417}
381 418
382bool tipc_node_is_up(struct tipc_node *n) 419bool tipc_node_is_up(struct tipc_node *n)
@@ -652,37 +689,22 @@ illegal_evt:
652 pr_err("Illegal node fsm evt %x in state %x\n", evt, state); 689 pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
653} 690}
654 691
655bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_link *l, 692bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
656 struct tipc_msg *hdr)
657{ 693{
658 int state = n->state; 694 int state = n->state;
659 695
660 if (likely(state == SELF_UP_PEER_UP)) 696 if (likely(state == SELF_UP_PEER_UP))
661 return true; 697 return true;
662 698
663 if (state == SELF_DOWN_PEER_DOWN)
664 return true;
665
666 if (state == SELF_UP_PEER_COMING) {
667 /* If not traffic msg, peer may still be ESTABLISHING */
668 if (tipc_link_is_up(l) && msg_is_traffic(hdr))
669 tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
670 return true;
671 }
672
673 if (state == SELF_COMING_PEER_UP)
674 return true;
675
676 if (state == SELF_LEAVING_PEER_DOWN) 699 if (state == SELF_LEAVING_PEER_DOWN)
677 return false; 700 return false;
678 701
679 if (state == SELF_DOWN_PEER_LEAVING) { 702 if (state == SELF_DOWN_PEER_LEAVING) {
680 if (msg_peer_is_up(hdr)) 703 if (msg_peer_node_is_up(hdr))
681 return false; 704 return false;
682 tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
683 return true;
684 } 705 }
685 return false; 706
707 return true;
686} 708}
687 709
688static void node_established_contact(struct tipc_node *n_ptr) 710static void node_established_contact(struct tipc_node *n_ptr)
@@ -727,10 +749,8 @@ static void node_lost_contact(struct tipc_node *n_ptr)
727 if (!l_ptr) 749 if (!l_ptr)
728 continue; 750 continue;
729 l_ptr->exec_mode = TIPC_LINK_OPEN; 751 l_ptr->exec_mode = TIPC_LINK_OPEN;
730 l_ptr->failover_checkpt = 0; 752 kfree_skb(l_ptr->failover_reasm_skb);
731 l_ptr->failover_pkts = 0; 753 l_ptr->failover_reasm_skb = NULL;
732 kfree_skb(l_ptr->failover_skb);
733 l_ptr->failover_skb = NULL;
734 tipc_link_reset_fragments(l_ptr); 754 tipc_link_reset_fragments(l_ptr);
735 } 755 }
736 /* Prevent re-contact with node until cleanup is done */ 756 /* Prevent re-contact with node until cleanup is done */
@@ -961,38 +981,111 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
961 return 0; 981 return 0;
962} 982}
963 983
964/* tipc_node_tnl_init(): handle a received TUNNEL_PROTOCOL packet, 984/**
965 * in order to control parallel link failover or synchronization 985 * tipc_node_check_state - check and if necessary update node state
986 * @skb: TIPC packet
987 * @bearer_id: identity of bearer delivering the packet
988 * Returns true if state is ok, otherwise consumes buffer and returns false
966 */ 989 */
967static void tipc_node_tnl_init(struct tipc_node *n, int bearer_id, 990static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
968 struct sk_buff *skb) 991 int bearer_id)
969{ 992{
970 struct tipc_link *tnl, *pl;
971 struct tipc_msg *hdr = buf_msg(skb); 993 struct tipc_msg *hdr = buf_msg(skb);
994 int usr = msg_user(hdr);
995 int mtyp = msg_type(hdr);
972 u16 oseqno = msg_seqno(hdr); 996 u16 oseqno = msg_seqno(hdr);
973 int pb_id = msg_bearer_id(hdr); 997 u16 iseqno = msg_seqno(msg_get_wrapped(hdr));
998 u16 exp_pkts = msg_msgcnt(hdr);
999 u16 rcv_nxt, syncpt, dlv_nxt;
1000 int state = n->state;
1001 struct tipc_link *l, *pl = NULL;
1002 struct sk_buff_head;
1003 int i;
974 1004
975 if (pb_id >= MAX_BEARERS) 1005 l = n->links[bearer_id].link;
976 return; 1006 if (!l)
1007 return false;
1008 rcv_nxt = l->rcv_nxt;
977 1009
978 tnl = n->links[bearer_id].link;
979 if (!tnl)
980 return;
981 1010
982 /* Ignore if duplicate */ 1011 if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL)))
983 if (less(oseqno, tnl->rcv_nxt)) 1012 return true;
984 return;
985 1013
986 pl = n->links[pb_id].link; 1014 /* Find parallel link, if any */
987 if (!pl) 1015 for (i = 0; i < MAX_BEARERS; i++) {
988 return; 1016 if ((i != bearer_id) && n->links[i].link) {
1017 pl = n->links[i].link;
1018 break;
1019 }
1020 }
989 1021
990 if (msg_type(hdr) == FAILOVER_MSG) { 1022 /* Update node accesibility if applicable */
991 if (tipc_link_is_up(pl)) { 1023 if (state == SELF_UP_PEER_COMING) {
992 tipc_node_link_down(n, pb_id); 1024 if (!tipc_link_is_up(l))
1025 return true;
1026 if (!msg_peer_link_is_up(hdr))
1027 return true;
1028 tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
1029 }
1030
1031 if (state == SELF_DOWN_PEER_LEAVING) {
1032 if (msg_peer_node_is_up(hdr))
1033 return false;
1034 tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
1035 }
1036
1037 /* Ignore duplicate packets */
1038 if (less(oseqno, rcv_nxt))
1039 return true;
1040
1041 /* Initiate or update failover mode if applicable */
1042 if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) {
1043 syncpt = oseqno + exp_pkts - 1;
1044 if (pl && tipc_link_is_up(pl)) {
1045 tipc_node_link_down(n, pl->bearer_id);
993 pl->exec_mode = TIPC_LINK_BLOCKED; 1046 pl->exec_mode = TIPC_LINK_BLOCKED;
994 } 1047 }
1048 /* If pkts arrive out of order, use lowest calculated syncpt */
1049 if (less(syncpt, n->sync_point))
1050 n->sync_point = syncpt;
1051 }
1052
1053 /* Open parallel link when tunnel link reaches synch point */
1054 if ((n->state == NODE_FAILINGOVER) && (more(rcv_nxt, n->sync_point))) {
1055 tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT);
1056 if (pl)
1057 pl->exec_mode = TIPC_LINK_OPEN;
1058 return true;
1059 }
1060
1061 /* Initiate or update synch mode if applicable */
1062 if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) {
1063 syncpt = iseqno + exp_pkts - 1;
1064 if (n->state == SELF_UP_PEER_UP) {
1065 n->sync_point = syncpt;
1066 tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT);
1067 }
1068 l->exec_mode = TIPC_LINK_TUNNEL;
1069 if (less(syncpt, n->sync_point))
1070 n->sync_point = syncpt;
995 } 1071 }
1072
1073 /* Open tunnel link when parallel link reaches synch point */
1074 if ((n->state == NODE_SYNCHING) && (l->exec_mode == TIPC_LINK_TUNNEL)) {
1075 if (pl)
1076 dlv_nxt = mod(pl->rcv_nxt - skb_queue_len(pl->inputq));
1077 if (!pl || more(dlv_nxt, n->sync_point)) {
1078 tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
1079 l->exec_mode = TIPC_LINK_OPEN;
1080 return true;
1081 }
1082 if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG))
1083 return true;
1084 if (usr == LINK_PROTOCOL)
1085 return true;
1086 return false;
1087 }
1088 return true;
996} 1089}
997 1090
998/** 1091/**
@@ -1008,12 +1101,11 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1008{ 1101{
1009 struct sk_buff_head xmitq; 1102 struct sk_buff_head xmitq;
1010 struct tipc_node *n; 1103 struct tipc_node *n;
1011 struct tipc_link *l; 1104 struct tipc_msg *hdr = buf_msg(skb);
1012 struct tipc_msg *hdr; 1105 int usr = msg_user(hdr);
1013 struct tipc_media_addr *maddr;
1014 int bearer_id = b->identity; 1106 int bearer_id = b->identity;
1107 struct tipc_link_entry *le;
1015 int rc = 0; 1108 int rc = 0;
1016 int usr;
1017 1109
1018 __skb_queue_head_init(&xmitq); 1110 __skb_queue_head_init(&xmitq);
1019 1111
@@ -1022,8 +1114,6 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1022 goto discard; 1114 goto discard;
1023 1115
1024 /* Handle arrival of a non-unicast link packet */ 1116 /* Handle arrival of a non-unicast link packet */
1025 hdr = buf_msg(skb);
1026 usr = msg_user(hdr);
1027 if (unlikely(msg_non_seq(hdr))) { 1117 if (unlikely(msg_non_seq(hdr))) {
1028 if (usr == LINK_CONFIG) 1118 if (usr == LINK_CONFIG)
1029 tipc_disc_rcv(net, skb, b); 1119 tipc_disc_rcv(net, skb, b);
@@ -1036,42 +1126,41 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1036 n = tipc_node_find(net, msg_prevnode(hdr)); 1126 n = tipc_node_find(net, msg_prevnode(hdr));
1037 if (unlikely(!n)) 1127 if (unlikely(!n))
1038 goto discard; 1128 goto discard;
1039 tipc_node_lock(n); 1129 le = &n->links[bearer_id];
1040 1130
1041 /* Prepare links for tunneled reception if applicable */ 1131 tipc_node_lock(n);
1042 if (unlikely(usr == TUNNEL_PROTOCOL))
1043 tipc_node_tnl_init(n, bearer_id, skb);
1044 1132
1045 /* Locate link endpoint that should handle packet */ 1133 /* Is reception permitted at the moment ? */
1046 l = n->links[bearer_id].link; 1134 if (!tipc_node_filter_pkt(n, hdr))
1047 if (unlikely(!l))
1048 goto unlock; 1135 goto unlock;
1049 1136
1050 /* Is reception of this packet permitted at the moment ? */ 1137 if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
1051 if (unlikely(n->state != SELF_UP_PEER_UP))
1052 if (!tipc_node_filter_skb(n, l, hdr))
1053 goto unlock;
1054
1055 if (unlikely(usr == LINK_PROTOCOL))
1056 tipc_bclink_sync_state(n, hdr); 1138 tipc_bclink_sync_state(n, hdr);
1057 1139
1058 /* Release acked broadcast messages */ 1140 /* Release acked broadcast messages */
1059 if (unlikely(n->bclink.acked != msg_bcast_ack(hdr))) 1141 if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
1060 tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); 1142 tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
1061 1143
1062 /* Check protocol and update link state */ 1144 /* Check and if necessary update node state */
1063 rc = tipc_link_rcv(l, skb, &xmitq); 1145 if (likely(tipc_node_check_state(n, skb, bearer_id))) {
1146 rc = tipc_link_rcv(le->link, skb, &xmitq);
1147 skb = NULL;
1148 }
1064 1149
1065 if (unlikely(rc & TIPC_LINK_UP_EVT)) 1150 if (unlikely(rc & TIPC_LINK_UP_EVT))
1066 tipc_node_link_up(n, bearer_id); 1151 tipc_node_link_up(n, bearer_id, &xmitq);
1152
1067 if (unlikely(rc & TIPC_LINK_DOWN_EVT)) 1153 if (unlikely(rc & TIPC_LINK_DOWN_EVT))
1068 tipc_node_link_down(n, bearer_id); 1154 tipc_node_link_down(n, bearer_id);
1069 skb = NULL;
1070unlock: 1155unlock:
1071 tipc_node_unlock(n); 1156 tipc_node_unlock(n);
1072 tipc_sk_rcv(net, &n->links[bearer_id].inputq); 1157
1073 maddr = &n->links[bearer_id].maddr; 1158 if (!skb_queue_empty(&le->inputq))
1074 tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); 1159 tipc_sk_rcv(net, &le->inputq);
1160
1161 if (!skb_queue_empty(&xmitq))
1162 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1163
1075 tipc_node_put(n); 1164 tipc_node_put(n);
1076discard: 1165discard:
1077 kfree_skb(skb); 1166 kfree_skb(skb);