aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/node.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-07-30 18:24:23 -0400
committerDavid S. Miller <davem@davemloft.net>2015-07-30 20:25:14 -0400
commit598411d70f85dcf5b5c6c2369cc48637c251b656 (patch)
tree488847de32b5fa2edb9ed7dea046e5b10d7ecea3 /net/tipc/node.c
parentcf148816acb6def45474001302368eb472995e62 (diff)
tipc: make resetting of links non-atomic
In order to facilitate future improvements to the locking structure, we want to make resetting and establishing of links non-atomic. I.e., the functions tipc_node_link_up() and tipc_node_link_down() should be called from outside the node lock context, and grab/release the node lock themselves. This requires that we can freeze the link state from the moment it is set to RESETTING or PEER_RESET in one lock context until it is set to RESET or ESTABLISHING in a later context. The recently introduced link FSM makes this possible, so we are now ready to introduce the above change. This commit implements this. Tested-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/node.c')
-rw-r--r--net/tipc/node.c166
1 files changed, 97 insertions, 69 deletions
diff --git a/net/tipc/node.c b/net/tipc/node.c
index d03e88f2273b..cdca57be85bf 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -66,8 +66,12 @@ enum {
66 NODE_SYNCH_END_EVT = 0xcee 66 NODE_SYNCH_END_EVT = 0xcee
67}; 67};
68 68
69static void tipc_node_link_down(struct tipc_node *n, int bearer_id); 69static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
70static void node_lost_contact(struct tipc_node *n_ptr); 70 struct sk_buff_head *xmitq,
71 struct tipc_media_addr **maddr);
72static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
73 bool delete);
74static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
71static void node_established_contact(struct tipc_node *n_ptr); 75static void node_established_contact(struct tipc_node *n_ptr);
72static void tipc_node_delete(struct tipc_node *node); 76static void tipc_node_delete(struct tipc_node *node);
73static void tipc_node_timeout(unsigned long data); 77static void tipc_node_timeout(unsigned long data);
@@ -275,9 +279,8 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
275static void tipc_node_timeout(unsigned long data) 279static void tipc_node_timeout(unsigned long data)
276{ 280{
277 struct tipc_node *n = (struct tipc_node *)data; 281 struct tipc_node *n = (struct tipc_node *)data;
282 struct tipc_link_entry *le;
278 struct sk_buff_head xmitq; 283 struct sk_buff_head xmitq;
279 struct tipc_link *l;
280 struct tipc_media_addr *maddr;
281 int bearer_id; 284 int bearer_id;
282 int rc = 0; 285 int rc = 0;
283 286
@@ -285,17 +288,16 @@ static void tipc_node_timeout(unsigned long data)
285 288
286 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { 289 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
287 tipc_node_lock(n); 290 tipc_node_lock(n);
288 l = n->links[bearer_id].link; 291 le = &n->links[bearer_id];
289 if (l) { 292 if (le->link) {
290 /* Link tolerance may change asynchronously: */ 293 /* Link tolerance may change asynchronously: */
291 tipc_node_calculate_timer(n, l); 294 tipc_node_calculate_timer(n, le->link);
292 rc = tipc_link_timeout(l, &xmitq); 295 rc = tipc_link_timeout(le->link, &xmitq);
293 if (rc & TIPC_LINK_DOWN_EVT)
294 tipc_node_link_down(n, bearer_id);
295 } 296 }
296 tipc_node_unlock(n); 297 tipc_node_unlock(n);
297 maddr = &n->links[bearer_id].maddr; 298 tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
298 tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); 299 if (rc & TIPC_LINK_DOWN_EVT)
300 tipc_node_link_down(n, bearer_id, false);
299 } 301 }
300 if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) 302 if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
301 tipc_node_get(n); 303 tipc_node_get(n);
@@ -303,18 +305,21 @@ static void tipc_node_timeout(unsigned long data)
303} 305}
304 306
305/** 307/**
306 * tipc_node_link_up - handle addition of link 308 * __tipc_node_link_up - handle addition of link
307 * 309 * Node lock must be held by caller
308 * Link becomes active (alone or shared) or standby, depending on its priority. 310 * Link becomes active (alone or shared) or standby, depending on its priority.
309 */ 311 */
310static void tipc_node_link_up(struct tipc_node *n, int bearer_id, 312static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
311 struct sk_buff_head *xmitq) 313 struct sk_buff_head *xmitq)
312{ 314{
313 int *slot0 = &n->active_links[0]; 315 int *slot0 = &n->active_links[0];
314 int *slot1 = &n->active_links[1]; 316 int *slot1 = &n->active_links[1];
315 struct tipc_link *ol = node_active_link(n, 0); 317 struct tipc_link *ol = node_active_link(n, 0);
316 struct tipc_link *nl = n->links[bearer_id].link; 318 struct tipc_link *nl = n->links[bearer_id].link;
317 319
320 if (!nl || !tipc_link_is_up(nl))
321 return;
322
318 if (n->working_links > 1) { 323 if (n->working_links > 1) {
319 pr_warn("Attempt to establish 3rd link to %x\n", n->addr); 324 pr_warn("Attempt to establish 3rd link to %x\n", n->addr);
320 return; 325 return;
@@ -356,28 +361,40 @@ static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
356} 361}
357 362
358/** 363/**
359 * tipc_node_link_down - handle loss of link 364 * tipc_node_link_up - handle addition of link
365 *
366 * Link becomes active (alone or shared) or standby, depending on its priority.
360 */ 367 */
361static void tipc_node_link_down(struct tipc_node *n, int bearer_id) 368static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
369 struct sk_buff_head *xmitq)
362{ 370{
371 tipc_node_lock(n);
372 __tipc_node_link_up(n, bearer_id, xmitq);
373 tipc_node_unlock(n);
374}
375
376/**
377 * __tipc_node_link_down - handle loss of link
378 */
379static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
380 struct sk_buff_head *xmitq,
381 struct tipc_media_addr **maddr)
382{
383 struct tipc_link_entry *le = &n->links[*bearer_id];
363 int *slot0 = &n->active_links[0]; 384 int *slot0 = &n->active_links[0];
364 int *slot1 = &n->active_links[1]; 385 int *slot1 = &n->active_links[1];
365 struct tipc_media_addr *maddr = &n->links[bearer_id].maddr;
366 int i, highest = 0; 386 int i, highest = 0;
367 struct tipc_link *l, *_l, *tnl; 387 struct tipc_link *l, *_l, *tnl;
368 struct sk_buff_head xmitq;
369 388
370 l = n->links[bearer_id].link; 389 l = n->links[*bearer_id].link;
371 if (!l || tipc_link_is_reset(l)) 390 if (!l || tipc_link_is_reset(l))
372 return; 391 return;
373 392
374 __skb_queue_head_init(&xmitq);
375
376 n->working_links--; 393 n->working_links--;
377 n->action_flags |= TIPC_NOTIFY_LINK_DOWN; 394 n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
378 n->link_id = l->peer_bearer_id << 16 | bearer_id; 395 n->link_id = l->peer_bearer_id << 16 | *bearer_id;
379 396
380 tipc_bearer_remove_dest(n->net, l->bearer_id, n->addr); 397 tipc_bearer_remove_dest(n->net, *bearer_id, n->addr);
381 398
382 pr_debug("Lost link <%s> on network plane %c\n", 399 pr_debug("Lost link <%s> on network plane %c\n",
383 l->name, l->net_plane); 400 l->name, l->net_plane);
@@ -404,18 +421,40 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id)
404 421
405 if (!tipc_node_is_up(n)) { 422 if (!tipc_node_is_up(n)) {
406 tipc_link_reset(l); 423 tipc_link_reset(l);
407 node_lost_contact(n); 424 node_lost_contact(n, &le->inputq);
408 return; 425 return;
409 } 426 }
410 427
411 /* There is still a working link => initiate failover */ 428 /* There is still a working link => initiate failover */
412 tnl = node_active_link(n, 0); 429 tnl = node_active_link(n, 0);
413 tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
414 n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1); 430 n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1);
415 tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, &xmitq); 431 tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq);
416 tipc_link_reset(l); 432 tipc_link_reset(l);
417 tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); 433 tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
418 tipc_bearer_xmit(n->net, tnl->bearer_id, &xmitq, maddr); 434 tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
435 *maddr = &n->links[tnl->bearer_id].maddr;
436 *bearer_id = tnl->bearer_id;
437}
438
439static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
440{
441 struct tipc_link_entry *le = &n->links[bearer_id];
442 struct tipc_media_addr *maddr;
443 struct sk_buff_head xmitq;
444
445 __skb_queue_head_init(&xmitq);
446
447 tipc_node_lock(n);
448 __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
449 if (delete && le->link) {
450 kfree(le->link);
451 le->link = NULL;
452 n->link_cnt--;
453 }
454 tipc_node_unlock(n);
455
456 tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
457 tipc_sk_rcv(n->net, &le->inputq);
419} 458}
420 459
421bool tipc_node_is_up(struct tipc_node *n) 460bool tipc_node_is_up(struct tipc_node *n)
@@ -437,7 +476,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
437 bool sign_match = false; 476 bool sign_match = false;
438 bool link_up = false; 477 bool link_up = false;
439 bool accept_addr = false; 478 bool accept_addr = false;
440 479 bool reset = true;
441 *dupl_addr = false; 480 *dupl_addr = false;
442 *respond = false; 481 *respond = false;
443 482
@@ -460,6 +499,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
460 499
461 if (sign_match && addr_match && link_up) { 500 if (sign_match && addr_match && link_up) {
462 /* All is fine. Do nothing. */ 501 /* All is fine. Do nothing. */
502 reset = false;
463 } else if (sign_match && addr_match && !link_up) { 503 } else if (sign_match && addr_match && !link_up) {
464 /* Respond. The link will come up in due time */ 504 /* Respond. The link will come up in due time */
465 *respond = true; 505 *respond = true;
@@ -531,29 +571,21 @@ void tipc_node_check_dest(struct net *net, u32 onode,
531 } 571 }
532 memcpy(&l->media_addr, maddr, sizeof(*maddr)); 572 memcpy(&l->media_addr, maddr, sizeof(*maddr));
533 memcpy(curr_maddr, maddr, sizeof(*maddr)); 573 memcpy(curr_maddr, maddr, sizeof(*maddr));
534 tipc_node_link_down(n, b->identity);
535exit: 574exit:
536 tipc_node_unlock(n); 575 tipc_node_unlock(n);
576 if (reset)
577 tipc_node_link_down(n, b->identity, false);
537 tipc_node_put(n); 578 tipc_node_put(n);
538} 579}
539 580
540void tipc_node_delete_links(struct net *net, int bearer_id) 581void tipc_node_delete_links(struct net *net, int bearer_id)
541{ 582{
542 struct tipc_net *tn = net_generic(net, tipc_net_id); 583 struct tipc_net *tn = net_generic(net, tipc_net_id);
543 struct tipc_link *l;
544 struct tipc_node *n; 584 struct tipc_node *n;
545 585
546 rcu_read_lock(); 586 rcu_read_lock();
547 list_for_each_entry_rcu(n, &tn->node_list, list) { 587 list_for_each_entry_rcu(n, &tn->node_list, list) {
548 tipc_node_lock(n); 588 tipc_node_link_down(n, bearer_id, true);
549 l = n->links[bearer_id].link;
550 if (l) {
551 tipc_node_link_down(n, bearer_id);
552 n->links[bearer_id].link = NULL;
553 n->link_cnt--;
554 }
555 tipc_node_unlock(n);
556 kfree(l);
557 } 589 }
558 rcu_read_unlock(); 590 rcu_read_unlock();
559} 591}
@@ -561,19 +593,14 @@ void tipc_node_delete_links(struct net *net, int bearer_id)
561static void tipc_node_reset_links(struct tipc_node *n) 593static void tipc_node_reset_links(struct tipc_node *n)
562{ 594{
563 char addr_string[16]; 595 char addr_string[16];
564 u32 i; 596 int i;
565
566 tipc_node_lock(n);
567 597
568 pr_warn("Resetting all links to %s\n", 598 pr_warn("Resetting all links to %s\n",
569 tipc_addr_string_fill(addr_string, n->addr)); 599 tipc_addr_string_fill(addr_string, n->addr));
570 600
571 for (i = 0; i < MAX_BEARERS; i++) { 601 for (i = 0; i < MAX_BEARERS; i++) {
572 if (!n->links[i].link) 602 tipc_node_link_down(n, i, false);
573 continue;
574 tipc_node_link_down(n, i);
575 } 603 }
576 tipc_node_unlock(n);
577} 604}
578 605
579void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) 606void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
@@ -798,10 +825,12 @@ static void node_established_contact(struct tipc_node *n_ptr)
798 tipc_bclink_add_node(n_ptr->net, n_ptr->addr); 825 tipc_bclink_add_node(n_ptr->net, n_ptr->addr);
799} 826}
800 827
801static void node_lost_contact(struct tipc_node *n_ptr) 828static void node_lost_contact(struct tipc_node *n_ptr,
829 struct sk_buff_head *inputq)
802{ 830{
803 char addr_string[16]; 831 char addr_string[16];
804 struct tipc_sock_conn *conn, *safe; 832 struct tipc_sock_conn *conn, *safe;
833 struct tipc_link *l;
805 struct list_head *conns = &n_ptr->conn_sks; 834 struct list_head *conns = &n_ptr->conn_sks;
806 struct sk_buff *skb; 835 struct sk_buff *skb;
807 struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); 836 struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
@@ -827,14 +856,11 @@ static void node_lost_contact(struct tipc_node *n_ptr)
827 856
828 /* Abort any ongoing link failover */ 857 /* Abort any ongoing link failover */
829 for (i = 0; i < MAX_BEARERS; i++) { 858 for (i = 0; i < MAX_BEARERS; i++) {
830 struct tipc_link *l_ptr = n_ptr->links[i].link; 859 l = n_ptr->links[i].link;
831 if (!l_ptr) 860 if (l)
832 continue; 861 tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
833 tipc_link_fsm_evt(l_ptr, LINK_FAILOVER_END_EVT);
834 kfree_skb(l_ptr->failover_reasm_skb);
835 l_ptr->failover_reasm_skb = NULL;
836 tipc_link_reset_fragments(l_ptr);
837 } 862 }
863
838 /* Prevent re-contact with node until cleanup is done */ 864 /* Prevent re-contact with node until cleanup is done */
839 tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT); 865 tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT);
840 866
@@ -848,7 +874,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
848 conn->peer_node, conn->port, 874 conn->peer_node, conn->port,
849 conn->peer_port, TIPC_ERR_NO_NODE); 875 conn->peer_port, TIPC_ERR_NO_NODE);
850 if (likely(skb)) { 876 if (likely(skb)) {
851 skb_queue_tail(n_ptr->inputq, skb); 877 skb_queue_tail(inputq, skb);
852 n_ptr->action_flags |= TIPC_MSG_EVT; 878 n_ptr->action_flags |= TIPC_MSG_EVT;
853 } 879 }
854 list_del(&conn->list); 880 list_del(&conn->list);
@@ -1025,9 +1051,9 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1025 l = tipc_node_select_link(n, selector, &bearer_id, &maddr); 1051 l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
1026 if (likely(l)) 1052 if (likely(l))
1027 rc = tipc_link_xmit(l, list, &xmitq); 1053 rc = tipc_link_xmit(l, list, &xmitq);
1028 if (unlikely(rc == -ENOBUFS))
1029 tipc_node_link_down(n, bearer_id);
1030 tipc_node_unlock(n); 1054 tipc_node_unlock(n);
1055 if (unlikely(rc == -ENOBUFS))
1056 tipc_node_link_down(n, bearer_id, false);
1031 tipc_node_put(n); 1057 tipc_node_put(n);
1032 } 1058 }
1033 if (likely(!rc)) { 1059 if (likely(!rc)) {
@@ -1081,8 +1107,8 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
1081 u16 rcv_nxt, syncpt, dlv_nxt; 1107 u16 rcv_nxt, syncpt, dlv_nxt;
1082 int state = n->state; 1108 int state = n->state;
1083 struct tipc_link *l, *pl = NULL; 1109 struct tipc_link *l, *pl = NULL;
1084 struct sk_buff_head; 1110 struct tipc_media_addr *maddr;
1085 int i; 1111 int i, pb_id;
1086 1112
1087 l = n->links[bearer_id].link; 1113 l = n->links[bearer_id].link;
1088 if (!l) 1114 if (!l)
@@ -1123,9 +1149,11 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
1123 /* Initiate or update failover mode if applicable */ 1149 /* Initiate or update failover mode if applicable */
1124 if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { 1150 if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) {
1125 syncpt = oseqno + exp_pkts - 1; 1151 syncpt = oseqno + exp_pkts - 1;
1126 if (pl && tipc_link_is_up(pl)) 1152 if (pl && tipc_link_is_up(pl)) {
1127 tipc_node_link_down(n, pl->bearer_id); 1153 pb_id = pl->bearer_id;
1128 1154 __tipc_node_link_down(n, &pb_id, xmitq, &maddr);
1155 tipc_skb_queue_splice_tail_init(pl->inputq, l->inputq);
1156 }
1129 /* If pkts arrive out of order, use lowest calculated syncpt */ 1157 /* If pkts arrive out of order, use lowest calculated syncpt */
1130 if (less(syncpt, n->sync_point)) 1158 if (less(syncpt, n->sync_point))
1131 n->sync_point = syncpt; 1159 n->sync_point = syncpt;
@@ -1146,7 +1174,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
1146 syncpt = iseqno + exp_pkts - 1; 1174 syncpt = iseqno + exp_pkts - 1;
1147 if (!tipc_link_is_up(l)) { 1175 if (!tipc_link_is_up(l)) {
1148 tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); 1176 tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT);
1149 tipc_node_link_up(n, bearer_id, xmitq); 1177 __tipc_node_link_up(n, bearer_id, xmitq);
1150 } 1178 }
1151 if (n->state == SELF_UP_PEER_UP) { 1179 if (n->state == SELF_UP_PEER_UP) {
1152 n->sync_point = syncpt; 1180 n->sync_point = syncpt;
@@ -1224,7 +1252,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1224 if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) 1252 if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
1225 tipc_bclink_sync_state(n, hdr); 1253 tipc_bclink_sync_state(n, hdr);
1226 1254
1227 /* Release acked broadcast messages */ 1255 /* Release acked broadcast packets */
1228 if (unlikely(n->bclink.acked != msg_bcast_ack(hdr))) 1256 if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
1229 tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); 1257 tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
1230 1258
@@ -1233,14 +1261,14 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1233 rc = tipc_link_rcv(le->link, skb, &xmitq); 1261 rc = tipc_link_rcv(le->link, skb, &xmitq);
1234 skb = NULL; 1262 skb = NULL;
1235 } 1263 }
1264unlock:
1265 tipc_node_unlock(n);
1236 1266
1237 if (unlikely(rc & TIPC_LINK_UP_EVT)) 1267 if (unlikely(rc & TIPC_LINK_UP_EVT))
1238 tipc_node_link_up(n, bearer_id, &xmitq); 1268 tipc_node_link_up(n, bearer_id, &xmitq);
1239 1269
1240 if (unlikely(rc & TIPC_LINK_DOWN_EVT)) 1270 if (unlikely(rc & TIPC_LINK_DOWN_EVT))
1241 tipc_node_link_down(n, bearer_id); 1271 tipc_node_link_down(n, bearer_id, false);
1242unlock:
1243 tipc_node_unlock(n);
1244 1272
1245 if (!skb_queue_empty(&le->inputq)) 1273 if (!skb_queue_empty(&le->inputq))
1246 tipc_sk_rcv(net, &le->inputq); 1274 tipc_sk_rcv(net, &le->inputq);