aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c229
1 files changed, 87 insertions, 142 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 13b987745820..da6018beb6eb 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/link.c: TIPC link code 2 * net/tipc/link.c: TIPC link code
3 * 3 *
4 * Copyright (c) 1996-2007, 2012, Ericsson AB 4 * Copyright (c) 1996-2007, 2012-2014, Ericsson AB
5 * Copyright (c) 2004-2007, 2010-2013, Wind River Systems 5 * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -78,8 +78,8 @@ static const char *link_unk_evt = "Unknown link event ";
78static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, 78static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
79 struct sk_buff *buf); 79 struct sk_buff *buf);
80static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf); 80static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf);
81static int link_recv_changeover_msg(struct tipc_link **l_ptr, 81static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr,
82 struct sk_buff **buf); 82 struct sk_buff **buf);
83static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); 83static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
84static int link_send_sections_long(struct tipc_port *sender, 84static int link_send_sections_long(struct tipc_port *sender,
85 struct iovec const *msg_sect, 85 struct iovec const *msg_sect,
@@ -87,7 +87,6 @@ static int link_send_sections_long(struct tipc_port *sender,
87static void link_state_event(struct tipc_link *l_ptr, u32 event); 87static void link_state_event(struct tipc_link *l_ptr, u32 event);
88static void link_reset_statistics(struct tipc_link *l_ptr); 88static void link_reset_statistics(struct tipc_link *l_ptr);
89static void link_print(struct tipc_link *l_ptr, const char *str); 89static void link_print(struct tipc_link *l_ptr, const char *str);
90static void link_start(struct tipc_link *l_ptr);
91static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf); 90static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
92static void tipc_link_send_sync(struct tipc_link *l); 91static void tipc_link_send_sync(struct tipc_link *l);
93static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf); 92static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
@@ -278,9 +277,11 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
278 277
279 tipc_node_attach_link(n_ptr, l_ptr); 278 tipc_node_attach_link(n_ptr, l_ptr);
280 279
281 k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr); 280 k_init_timer(&l_ptr->timer, (Handler)link_timeout,
281 (unsigned long)l_ptr);
282 list_add_tail(&l_ptr->link_list, &b_ptr->links); 282 list_add_tail(&l_ptr->link_list, &b_ptr->links);
283 tipc_k_signal((Handler)link_start, (unsigned long)l_ptr); 283
284 link_state_event(l_ptr, STARTING_EVT);
284 285
285 return l_ptr; 286 return l_ptr;
286} 287}
@@ -305,19 +306,13 @@ void tipc_link_delete(struct tipc_link *l_ptr)
305 tipc_node_lock(l_ptr->owner); 306 tipc_node_lock(l_ptr->owner);
306 tipc_link_reset(l_ptr); 307 tipc_link_reset(l_ptr);
307 tipc_node_detach_link(l_ptr->owner, l_ptr); 308 tipc_node_detach_link(l_ptr->owner, l_ptr);
308 tipc_link_stop(l_ptr); 309 tipc_link_purge_queues(l_ptr);
309 list_del_init(&l_ptr->link_list); 310 list_del_init(&l_ptr->link_list);
310 tipc_node_unlock(l_ptr->owner); 311 tipc_node_unlock(l_ptr->owner);
311 k_term_timer(&l_ptr->timer); 312 k_term_timer(&l_ptr->timer);
312 kfree(l_ptr); 313 kfree(l_ptr);
313} 314}
314 315
315static void link_start(struct tipc_link *l_ptr)
316{
317 tipc_node_lock(l_ptr->owner);
318 link_state_event(l_ptr, STARTING_EVT);
319 tipc_node_unlock(l_ptr->owner);
320}
321 316
322/** 317/**
323 * link_schedule_port - schedule port for deferred sending 318 * link_schedule_port - schedule port for deferred sending
@@ -386,14 +381,7 @@ exit:
386 */ 381 */
387static void link_release_outqueue(struct tipc_link *l_ptr) 382static void link_release_outqueue(struct tipc_link *l_ptr)
388{ 383{
389 struct sk_buff *buf = l_ptr->first_out; 384 kfree_skb_list(l_ptr->first_out);
390 struct sk_buff *next;
391
392 while (buf) {
393 next = buf->next;
394 kfree_skb(buf);
395 buf = next;
396 }
397 l_ptr->first_out = NULL; 385 l_ptr->first_out = NULL;
398 l_ptr->out_queue_size = 0; 386 l_ptr->out_queue_size = 0;
399} 387}
@@ -410,37 +398,20 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
410} 398}
411 399
412/** 400/**
413 * tipc_link_stop - purge all inbound and outbound messages associated with link 401 * tipc_link_purge_queues - purge all pkt queues associated with link
414 * @l_ptr: pointer to link 402 * @l_ptr: pointer to link
415 */ 403 */
416void tipc_link_stop(struct tipc_link *l_ptr) 404void tipc_link_purge_queues(struct tipc_link *l_ptr)
417{ 405{
418 struct sk_buff *buf; 406 kfree_skb_list(l_ptr->oldest_deferred_in);
419 struct sk_buff *next; 407 kfree_skb_list(l_ptr->first_out);
420
421 buf = l_ptr->oldest_deferred_in;
422 while (buf) {
423 next = buf->next;
424 kfree_skb(buf);
425 buf = next;
426 }
427
428 buf = l_ptr->first_out;
429 while (buf) {
430 next = buf->next;
431 kfree_skb(buf);
432 buf = next;
433 }
434
435 tipc_link_reset_fragments(l_ptr); 408 tipc_link_reset_fragments(l_ptr);
436
437 kfree_skb(l_ptr->proto_msg_queue); 409 kfree_skb(l_ptr->proto_msg_queue);
438 l_ptr->proto_msg_queue = NULL; 410 l_ptr->proto_msg_queue = NULL;
439} 411}
440 412
441void tipc_link_reset(struct tipc_link *l_ptr) 413void tipc_link_reset(struct tipc_link *l_ptr)
442{ 414{
443 struct sk_buff *buf;
444 u32 prev_state = l_ptr->state; 415 u32 prev_state = l_ptr->state;
445 u32 checkpoint = l_ptr->next_in_no; 416 u32 checkpoint = l_ptr->next_in_no;
446 int was_active_link = tipc_link_is_active(l_ptr); 417 int was_active_link = tipc_link_is_active(l_ptr);
@@ -461,8 +432,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
461 tipc_node_link_down(l_ptr->owner, l_ptr); 432 tipc_node_link_down(l_ptr->owner, l_ptr);
462 tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr); 433 tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
463 434
464 if (was_active_link && tipc_node_active_links(l_ptr->owner) && 435 if (was_active_link && tipc_node_active_links(l_ptr->owner)) {
465 l_ptr->owner->permit_changeover) {
466 l_ptr->reset_checkpoint = checkpoint; 436 l_ptr->reset_checkpoint = checkpoint;
467 l_ptr->exp_msg_count = START_CHANGEOVER; 437 l_ptr->exp_msg_count = START_CHANGEOVER;
468 } 438 }
@@ -471,12 +441,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
471 link_release_outqueue(l_ptr); 441 link_release_outqueue(l_ptr);
472 kfree_skb(l_ptr->proto_msg_queue); 442 kfree_skb(l_ptr->proto_msg_queue);
473 l_ptr->proto_msg_queue = NULL; 443 l_ptr->proto_msg_queue = NULL;
474 buf = l_ptr->oldest_deferred_in; 444 kfree_skb_list(l_ptr->oldest_deferred_in);
475 while (buf) {
476 struct sk_buff *next = buf->next;
477 kfree_skb(buf);
478 buf = next;
479 }
480 if (!list_empty(&l_ptr->waiting_ports)) 445 if (!list_empty(&l_ptr->waiting_ports))
481 tipc_link_wakeup_ports(l_ptr, 1); 446 tipc_link_wakeup_ports(l_ptr, 1);
482 447
@@ -517,10 +482,11 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
517 if (!l_ptr->started && (event != STARTING_EVT)) 482 if (!l_ptr->started && (event != STARTING_EVT))
518 return; /* Not yet. */ 483 return; /* Not yet. */
519 484
520 if (link_blocked(l_ptr)) { 485 /* Check whether changeover is going on */
486 if (l_ptr->exp_msg_count) {
521 if (event == TIMEOUT_EVT) 487 if (event == TIMEOUT_EVT)
522 link_set_timer(l_ptr, cont_intv); 488 link_set_timer(l_ptr, cont_intv);
523 return; /* Changeover going on */ 489 return;
524 } 490 }
525 491
526 switch (l_ptr->state) { 492 switch (l_ptr->state) {
@@ -790,8 +756,7 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
790 return link_send_long_buf(l_ptr, buf); 756 return link_send_long_buf(l_ptr, buf);
791 757
792 /* Packet can be queued or sent. */ 758 /* Packet can be queued or sent. */
793 if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) && 759 if (likely(!link_congested(l_ptr))) {
794 !link_congested(l_ptr))) {
795 link_add_to_outqueue(l_ptr, buf, msg); 760 link_add_to_outqueue(l_ptr, buf, msg);
796 761
797 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); 762 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
@@ -957,14 +922,13 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
957 922
958 if (likely(!link_congested(l_ptr))) { 923 if (likely(!link_congested(l_ptr))) {
959 if (likely(msg_size(msg) <= l_ptr->max_pkt)) { 924 if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
960 if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) { 925 link_add_to_outqueue(l_ptr, buf, msg);
961 link_add_to_outqueue(l_ptr, buf, msg); 926 tipc_bearer_send(l_ptr->b_ptr, buf,
962 tipc_bearer_send(l_ptr->b_ptr, buf, 927 &l_ptr->media_addr);
963 &l_ptr->media_addr); 928 l_ptr->unacked_window = 0;
964 l_ptr->unacked_window = 0; 929 return res;
965 return res; 930 }
966 } 931 else
967 } else
968 *used_max_pkt = l_ptr->max_pkt; 932 *used_max_pkt = l_ptr->max_pkt;
969 } 933 }
970 return tipc_link_send_buf(l_ptr, buf); /* All other cases */ 934 return tipc_link_send_buf(l_ptr, buf); /* All other cases */
@@ -1013,8 +977,7 @@ exit:
1013 } 977 }
1014 978
1015 /* Exit if link (or bearer) is congested */ 979 /* Exit if link (or bearer) is congested */
1016 if (link_congested(l_ptr) || 980 if (link_congested(l_ptr)) {
1017 tipc_bearer_blocked(l_ptr->b_ptr)) {
1018 res = link_schedule_port(l_ptr, 981 res = link_schedule_port(l_ptr,
1019 sender->ref, res); 982 sender->ref, res);
1020 goto exit; 983 goto exit;
@@ -1127,10 +1090,7 @@ again:
1127 if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { 1090 if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1128 res = -EFAULT; 1091 res = -EFAULT;
1129error: 1092error:
1130 for (; buf_chain; buf_chain = buf) { 1093 kfree_skb_list(buf_chain);
1131 buf = buf_chain->next;
1132 kfree_skb(buf_chain);
1133 }
1134 return res; 1094 return res;
1135 } 1095 }
1136 sect_crs += sz; 1096 sect_crs += sz;
@@ -1180,18 +1140,12 @@ error:
1180 if (l_ptr->max_pkt < max_pkt) { 1140 if (l_ptr->max_pkt < max_pkt) {
1181 sender->max_pkt = l_ptr->max_pkt; 1141 sender->max_pkt = l_ptr->max_pkt;
1182 tipc_node_unlock(node); 1142 tipc_node_unlock(node);
1183 for (; buf_chain; buf_chain = buf) { 1143 kfree_skb_list(buf_chain);
1184 buf = buf_chain->next;
1185 kfree_skb(buf_chain);
1186 }
1187 goto again; 1144 goto again;
1188 } 1145 }
1189 } else { 1146 } else {
1190reject: 1147reject:
1191 for (; buf_chain; buf_chain = buf) { 1148 kfree_skb_list(buf_chain);
1192 buf = buf_chain->next;
1193 kfree_skb(buf_chain);
1194 }
1195 return tipc_port_reject_sections(sender, hdr, msg_sect, 1149 return tipc_port_reject_sections(sender, hdr, msg_sect,
1196 len, TIPC_ERR_NO_NODE); 1150 len, TIPC_ERR_NO_NODE);
1197 } 1151 }
@@ -1209,7 +1163,7 @@ reject:
1209/* 1163/*
1210 * tipc_link_push_packet: Push one unsent packet to the media 1164 * tipc_link_push_packet: Push one unsent packet to the media
1211 */ 1165 */
1212u32 tipc_link_push_packet(struct tipc_link *l_ptr) 1166static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1213{ 1167{
1214 struct sk_buff *buf = l_ptr->first_out; 1168 struct sk_buff *buf = l_ptr->first_out;
1215 u32 r_q_size = l_ptr->retransm_queue_size; 1169 u32 r_q_size = l_ptr->retransm_queue_size;
@@ -1281,9 +1235,6 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)
1281{ 1235{
1282 u32 res; 1236 u32 res;
1283 1237
1284 if (tipc_bearer_blocked(l_ptr->b_ptr))
1285 return;
1286
1287 do { 1238 do {
1288 res = tipc_link_push_packet(l_ptr); 1239 res = tipc_link_push_packet(l_ptr);
1289 } while (!res); 1240 } while (!res);
@@ -1370,26 +1321,15 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1370 1321
1371 msg = buf_msg(buf); 1322 msg = buf_msg(buf);
1372 1323
1373 if (tipc_bearer_blocked(l_ptr->b_ptr)) { 1324 /* Detect repeated retransmit failures */
1374 if (l_ptr->retransm_queue_size == 0) { 1325 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1375 l_ptr->retransm_queue_head = msg_seqno(msg); 1326 if (++l_ptr->stale_count > 100) {
1376 l_ptr->retransm_queue_size = retransmits; 1327 link_retransmit_failure(l_ptr, buf);
1377 } else { 1328 return;
1378 pr_err("Unexpected retransmit on link %s (qsize=%d)\n",
1379 l_ptr->name, l_ptr->retransm_queue_size);
1380 } 1329 }
1381 return;
1382 } else { 1330 } else {
1383 /* Detect repeated retransmit failures on unblocked bearer */ 1331 l_ptr->last_retransmitted = msg_seqno(msg);
1384 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 1332 l_ptr->stale_count = 1;
1385 if (++l_ptr->stale_count > 100) {
1386 link_retransmit_failure(l_ptr, buf);
1387 return;
1388 }
1389 } else {
1390 l_ptr->last_retransmitted = msg_seqno(msg);
1391 l_ptr->stale_count = 1;
1392 }
1393 } 1333 }
1394 1334
1395 while (retransmits && (buf != l_ptr->next_out) && buf) { 1335 while (retransmits && (buf != l_ptr->next_out) && buf) {
@@ -1451,6 +1391,12 @@ static int link_recv_buf_validate(struct sk_buff *buf)
1451 u32 hdr_size; 1391 u32 hdr_size;
1452 u32 min_hdr_size; 1392 u32 min_hdr_size;
1453 1393
1394 /* If this packet comes from the defer queue, the skb has already
1395 * been validated
1396 */
1397 if (unlikely(TIPC_SKB_CB(buf)->deferred))
1398 return 1;
1399
1454 if (unlikely(buf->len < MIN_H_SIZE)) 1400 if (unlikely(buf->len < MIN_H_SIZE))
1455 return 0; 1401 return 0;
1456 1402
@@ -1476,14 +1422,14 @@ static int link_recv_buf_validate(struct sk_buff *buf)
1476} 1422}
1477 1423
1478/** 1424/**
1479 * tipc_recv_msg - process TIPC messages arriving from off-node 1425 * tipc_rcv - process TIPC packets/messages arriving from off-node
1480 * @head: pointer to message buffer chain 1426 * @head: pointer to message buffer chain
1481 * @tb_ptr: pointer to bearer message arrived on 1427 * @tb_ptr: pointer to bearer message arrived on
1482 * 1428 *
1483 * Invoked with no locks held. Bearer pointer must point to a valid bearer 1429 * Invoked with no locks held. Bearer pointer must point to a valid bearer
1484 * structure (i.e. cannot be NULL), but bearer can be inactive. 1430 * structure (i.e. cannot be NULL), but bearer can be inactive.
1485 */ 1431 */
1486void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) 1432void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1487{ 1433{
1488 read_lock_bh(&tipc_net_lock); 1434 read_lock_bh(&tipc_net_lock);
1489 while (head) { 1435 while (head) {
@@ -1658,7 +1604,7 @@ deliver:
1658 continue; 1604 continue;
1659 case CHANGEOVER_PROTOCOL: 1605 case CHANGEOVER_PROTOCOL:
1660 type = msg_type(msg); 1606 type = msg_type(msg);
1661 if (link_recv_changeover_msg(&l_ptr, &buf)) { 1607 if (tipc_link_tunnel_rcv(&l_ptr, &buf)) {
1662 msg = buf_msg(buf); 1608 msg = buf_msg(buf);
1663 seq_no = msg_seqno(msg); 1609 seq_no = msg_seqno(msg);
1664 if (type == ORIGINAL_MSG) 1610 if (type == ORIGINAL_MSG)
@@ -1763,6 +1709,7 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1763 &l_ptr->newest_deferred_in, buf)) { 1709 &l_ptr->newest_deferred_in, buf)) {
1764 l_ptr->deferred_inqueue_sz++; 1710 l_ptr->deferred_inqueue_sz++;
1765 l_ptr->stats.deferred_recv++; 1711 l_ptr->stats.deferred_recv++;
1712 TIPC_SKB_CB(buf)->deferred = true;
1766 if ((l_ptr->deferred_inqueue_sz % 16) == 1) 1713 if ((l_ptr->deferred_inqueue_sz % 16) == 1)
1767 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1714 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1768 } else 1715 } else
@@ -1787,7 +1734,8 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
1787 l_ptr->proto_msg_queue = NULL; 1734 l_ptr->proto_msg_queue = NULL;
1788 } 1735 }
1789 1736
1790 if (link_blocked(l_ptr)) 1737 /* Don't send protocol message during link changeover */
1738 if (l_ptr->exp_msg_count)
1791 return; 1739 return;
1792 1740
1793 /* Abort non-RESET send if communication with node is prohibited */ 1741 /* Abort non-RESET send if communication with node is prohibited */
@@ -1862,12 +1810,6 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
1862 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 1810 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1863 buf->priority = TC_PRIO_CONTROL; 1811 buf->priority = TC_PRIO_CONTROL;
1864 1812
1865 /* Defer message if bearer is already blocked */
1866 if (tipc_bearer_blocked(l_ptr->b_ptr)) {
1867 l_ptr->proto_msg_queue = buf;
1868 return;
1869 }
1870
1871 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); 1813 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1872 l_ptr->unacked_window = 0; 1814 l_ptr->unacked_window = 0;
1873 kfree_skb(buf); 1815 kfree_skb(buf);
@@ -1886,7 +1828,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
1886 u32 msg_tol; 1828 u32 msg_tol;
1887 struct tipc_msg *msg = buf_msg(buf); 1829 struct tipc_msg *msg = buf_msg(buf);
1888 1830
1889 if (link_blocked(l_ptr)) 1831 /* Discard protocol message during link changeover */
1832 if (l_ptr->exp_msg_count)
1890 goto exit; 1833 goto exit;
1891 1834
1892 /* record unnumbered packet arrival (force mismatch on next timeout) */ 1835 /* record unnumbered packet arrival (force mismatch on next timeout) */
@@ -1896,8 +1839,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
1896 if (tipc_own_addr > msg_prevnode(msg)) 1839 if (tipc_own_addr > msg_prevnode(msg))
1897 l_ptr->b_ptr->net_plane = msg_net_plane(msg); 1840 l_ptr->b_ptr->net_plane = msg_net_plane(msg);
1898 1841
1899 l_ptr->owner->permit_changeover = msg_redundant_link(msg);
1900
1901 switch (msg_type(msg)) { 1842 switch (msg_type(msg)) {
1902 1843
1903 case RESET_MSG: 1844 case RESET_MSG:
@@ -2013,13 +1954,13 @@ exit:
2013} 1954}
2014 1955
2015 1956
2016/* 1957/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
2017 * tipc_link_tunnel(): Send one message via a link belonging to 1958 * a different bearer. Owner node is locked.
2018 * another bearer. Owner node is locked.
2019 */ 1959 */
2020static void tipc_link_tunnel(struct tipc_link *l_ptr, 1960static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
2021 struct tipc_msg *tunnel_hdr, struct tipc_msg *msg, 1961 struct tipc_msg *tunnel_hdr,
2022 u32 selector) 1962 struct tipc_msg *msg,
1963 u32 selector)
2023{ 1964{
2024 struct tipc_link *tunnel; 1965 struct tipc_link *tunnel;
2025 struct sk_buff *buf; 1966 struct sk_buff *buf;
@@ -2042,12 +1983,13 @@ static void tipc_link_tunnel(struct tipc_link *l_ptr,
2042} 1983}
2043 1984
2044 1985
2045 1986/* tipc_link_failover_send_queue(): A link has gone down, but a second
2046/* 1987 * link is still active. We can do failover. Tunnel the failing link's
2047 * changeover(): Send whole message queue via the remaining link 1988 * whole send queue via the remaining link. This way, we don't lose
2048 * Owner node is locked. 1989 * any packets, and sequence order is preserved for subsequent traffic
1990 * sent over the remaining link. Owner node is locked.
2049 */ 1991 */
2050void tipc_link_changeover(struct tipc_link *l_ptr) 1992void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
2051{ 1993{
2052 u32 msgcount = l_ptr->out_queue_size; 1994 u32 msgcount = l_ptr->out_queue_size;
2053 struct sk_buff *crs = l_ptr->first_out; 1995 struct sk_buff *crs = l_ptr->first_out;
@@ -2058,11 +2000,6 @@ void tipc_link_changeover(struct tipc_link *l_ptr)
2058 if (!tunnel) 2000 if (!tunnel)
2059 return; 2001 return;
2060 2002
2061 if (!l_ptr->owner->permit_changeover) {
2062 pr_warn("%speer did not permit changeover\n", link_co_err);
2063 return;
2064 }
2065
2066 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 2003 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2067 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); 2004 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2068 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 2005 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
@@ -2096,20 +2033,30 @@ void tipc_link_changeover(struct tipc_link *l_ptr)
2096 msgcount = msg_msgcnt(msg); 2033 msgcount = msg_msgcnt(msg);
2097 while (msgcount--) { 2034 while (msgcount--) {
2098 msg_set_seqno(m, msg_seqno(msg)); 2035 msg_set_seqno(m, msg_seqno(msg));
2099 tipc_link_tunnel(l_ptr, &tunnel_hdr, m, 2036 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m,
2100 msg_link_selector(m)); 2037 msg_link_selector(m));
2101 pos += align(msg_size(m)); 2038 pos += align(msg_size(m));
2102 m = (struct tipc_msg *)pos; 2039 m = (struct tipc_msg *)pos;
2103 } 2040 }
2104 } else { 2041 } else {
2105 tipc_link_tunnel(l_ptr, &tunnel_hdr, msg, 2042 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
2106 msg_link_selector(msg)); 2043 msg_link_selector(msg));
2107 } 2044 }
2108 crs = crs->next; 2045 crs = crs->next;
2109 } 2046 }
2110} 2047}
2111 2048
2112void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel) 2049/* tipc_link_dup_send_queue(): A second link has become active. Tunnel a
2050 * duplicate of the first link's send queue via the new link. This way, we
2051 * are guaranteed that currently queued packets from a socket are delivered
2052 * before future traffic from the same socket, even if this is using the
2053 * new link. The last arriving copy of each duplicate packet is dropped at
2054 * the receiving end by the regular protocol check, so packet cardinality
2055 * and sequence order is preserved per sender/receiver socket pair.
2056 * Owner node is locked.
2057 */
2058void tipc_link_dup_send_queue(struct tipc_link *l_ptr,
2059 struct tipc_link *tunnel)
2113{ 2060{
2114 struct sk_buff *iter; 2061 struct sk_buff *iter;
2115 struct tipc_msg tunnel_hdr; 2062 struct tipc_msg tunnel_hdr;
@@ -2165,12 +2112,14 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2165 return eb; 2112 return eb;
2166} 2113}
2167 2114
2168/* 2115/* tipc_link_tunnel_rcv(): Receive a tunneled packet, sent
2169 * link_recv_changeover_msg(): Receive tunneled packet sent 2116 * via other link as result of a failover (ORIGINAL_MSG) or
2170 * via other link. Node is locked. Return extracted buffer. 2117 * a new active link (DUPLICATE_MSG). Failover packets are
2118 * returned to the active link for delivery upwards.
2119 * Owner node is locked.
2171 */ 2120 */
2172static int link_recv_changeover_msg(struct tipc_link **l_ptr, 2121static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr,
2173 struct sk_buff **buf) 2122 struct sk_buff **buf)
2174{ 2123{
2175 struct sk_buff *tunnel_buf = *buf; 2124 struct sk_buff *tunnel_buf = *buf;
2176 struct tipc_link *dest_link; 2125 struct tipc_link *dest_link;
@@ -2307,11 +2256,7 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
2307 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE); 2256 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2308 if (fragm == NULL) { 2257 if (fragm == NULL) {
2309 kfree_skb(buf); 2258 kfree_skb(buf);
2310 while (buf_chain) { 2259 kfree_skb_list(buf_chain);
2311 buf = buf_chain;
2312 buf_chain = buf_chain->next;
2313 kfree_skb(buf);
2314 }
2315 return -ENOMEM; 2260 return -ENOMEM;
2316 } 2261 }
2317 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); 2262 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);