aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-03-13 16:08:10 -0400
committerDavid S. Miller <davem@davemloft.net>2015-03-14 14:38:32 -0400
commit05dcc5aa4dcced4f59f925625cea669e82b75519 (patch)
tree0a516e1012ee7e9b7eee037d8e31278a425e7d68 /net/tipc/link.c
parent2cdf3918e47e98c8f34f7a64455ea9fd433756e7 (diff)
tipc: split link outqueue
struct tipc_link contains one single queue for outgoing packets, where both transmitted and waiting packets are queued. This infrastructure is hard to maintain, because we need to keep a number of fields to keep track of which packets are sent or unsent, and the number of packets in each category. A lot of code becomes simpler if we split this queue into a transmission queue, where sent/unacknowledged packets are kept, and a backlog queue, where we keep the not yet sent packets. In this commit we do this separation. Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c208
1 files changed, 100 insertions, 108 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 2652c3286e2f..7e0036f5a364 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -194,10 +194,10 @@ static void link_timeout(unsigned long data)
194 tipc_node_lock(l_ptr->owner); 194 tipc_node_lock(l_ptr->owner);
195 195
196 /* update counters used in statistical profiling of send traffic */ 196 /* update counters used in statistical profiling of send traffic */
197 l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue); 197 l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);
198 l_ptr->stats.queue_sz_counts++; 198 l_ptr->stats.queue_sz_counts++;
199 199
200 skb = skb_peek(&l_ptr->outqueue); 200 skb = skb_peek(&l_ptr->transmq);
201 if (skb) { 201 if (skb) {
202 struct tipc_msg *msg = buf_msg(skb); 202 struct tipc_msg *msg = buf_msg(skb);
203 u32 length = msg_size(msg); 203 u32 length = msg_size(msg);
@@ -229,7 +229,7 @@ static void link_timeout(unsigned long data)
229 /* do all other link processing performed on a periodic basis */ 229 /* do all other link processing performed on a periodic basis */
230 link_state_event(l_ptr, TIMEOUT_EVT); 230 link_state_event(l_ptr, TIMEOUT_EVT);
231 231
232 if (l_ptr->next_out) 232 if (skb_queue_len(&l_ptr->backlogq))
233 tipc_link_push_packets(l_ptr); 233 tipc_link_push_packets(l_ptr);
234 234
235 tipc_node_unlock(l_ptr->owner); 235 tipc_node_unlock(l_ptr->owner);
@@ -313,8 +313,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
313 link_init_max_pkt(l_ptr); 313 link_init_max_pkt(l_ptr);
314 314
315 l_ptr->next_out_no = 1; 315 l_ptr->next_out_no = 1;
316 __skb_queue_head_init(&l_ptr->outqueue); 316 __skb_queue_head_init(&l_ptr->transmq);
317 __skb_queue_head_init(&l_ptr->deferred_queue); 317 __skb_queue_head_init(&l_ptr->backlogq);
318 __skb_queue_head_init(&l_ptr->deferdq);
318 skb_queue_head_init(&l_ptr->wakeupq); 319 skb_queue_head_init(&l_ptr->wakeupq);
319 skb_queue_head_init(&l_ptr->inputq); 320 skb_queue_head_init(&l_ptr->inputq);
320 skb_queue_head_init(&l_ptr->namedq); 321 skb_queue_head_init(&l_ptr->namedq);
@@ -400,7 +401,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
400 */ 401 */
401void link_prepare_wakeup(struct tipc_link *link) 402void link_prepare_wakeup(struct tipc_link *link)
402{ 403{
403 uint pend_qsz = skb_queue_len(&link->outqueue); 404 uint pend_qsz = skb_queue_len(&link->backlogq);
404 struct sk_buff *skb, *tmp; 405 struct sk_buff *skb, *tmp;
405 406
406 skb_queue_walk_safe(&link->wakeupq, skb, tmp) { 407 skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
@@ -430,8 +431,9 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
430 */ 431 */
431void tipc_link_purge_queues(struct tipc_link *l_ptr) 432void tipc_link_purge_queues(struct tipc_link *l_ptr)
432{ 433{
433 __skb_queue_purge(&l_ptr->deferred_queue); 434 __skb_queue_purge(&l_ptr->deferdq);
434 __skb_queue_purge(&l_ptr->outqueue); 435 __skb_queue_purge(&l_ptr->transmq);
436 __skb_queue_purge(&l_ptr->backlogq);
435 tipc_link_reset_fragments(l_ptr); 437 tipc_link_reset_fragments(l_ptr);
436} 438}
437 439
@@ -464,15 +466,15 @@ void tipc_link_reset(struct tipc_link *l_ptr)
464 } 466 }
465 467
466 /* Clean up all queues, except inputq: */ 468 /* Clean up all queues, except inputq: */
467 __skb_queue_purge(&l_ptr->outqueue); 469 __skb_queue_purge(&l_ptr->transmq);
468 __skb_queue_purge(&l_ptr->deferred_queue); 470 __skb_queue_purge(&l_ptr->backlogq);
471 __skb_queue_purge(&l_ptr->deferdq);
469 if (!owner->inputq) 472 if (!owner->inputq)
470 owner->inputq = &l_ptr->inputq; 473 owner->inputq = &l_ptr->inputq;
471 skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq); 474 skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
472 if (!skb_queue_empty(owner->inputq)) 475 if (!skb_queue_empty(owner->inputq))
473 owner->action_flags |= TIPC_MSG_EVT; 476 owner->action_flags |= TIPC_MSG_EVT;
474 l_ptr->next_out = NULL; 477 l_ptr->rcv_unacked = 0;
475 l_ptr->unacked_window = 0;
476 l_ptr->checkpoint = 1; 478 l_ptr->checkpoint = 1;
477 l_ptr->next_out_no = 1; 479 l_ptr->next_out_no = 1;
478 l_ptr->fsm_msg_cnt = 0; 480 l_ptr->fsm_msg_cnt = 0;
@@ -742,54 +744,51 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
742 struct sk_buff_head *list) 744 struct sk_buff_head *list)
743{ 745{
744 struct tipc_msg *msg = buf_msg(skb_peek(list)); 746 struct tipc_msg *msg = buf_msg(skb_peek(list));
745 uint psz = msg_size(msg); 747 unsigned int maxwin = link->window;
746 uint sndlim = link->queue_limit[0];
747 uint imp = tipc_msg_tot_importance(msg); 748 uint imp = tipc_msg_tot_importance(msg);
748 uint mtu = link->max_pkt; 749 uint mtu = link->max_pkt;
749 uint ack = mod(link->next_in_no - 1); 750 uint ack = mod(link->next_in_no - 1);
750 uint seqno = link->next_out_no; 751 uint seqno = link->next_out_no;
751 uint bc_last_in = link->owner->bclink.last_in; 752 uint bc_last_in = link->owner->bclink.last_in;
752 struct tipc_media_addr *addr = &link->media_addr; 753 struct tipc_media_addr *addr = &link->media_addr;
753 struct sk_buff_head *outqueue = &link->outqueue; 754 struct sk_buff_head *transmq = &link->transmq;
755 struct sk_buff_head *backlogq = &link->backlogq;
754 struct sk_buff *skb, *tmp; 756 struct sk_buff *skb, *tmp;
755 757
756 /* Match queue limits against msg importance: */ 758 /* Match queue limits against msg importance: */
757 if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp])) 759 if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp]))
758 return tipc_link_cong(link, list); 760 return tipc_link_cong(link, list);
759 761
760 /* Has valid packet limit been used ? */ 762 /* Has valid packet limit been used ? */
761 if (unlikely(psz > mtu)) { 763 if (unlikely(msg_size(msg) > mtu)) {
762 __skb_queue_purge(list); 764 __skb_queue_purge(list);
763 return -EMSGSIZE; 765 return -EMSGSIZE;
764 } 766 }
765 767
766 /* Prepare each packet for sending, and add to outqueue: */ 768 /* Prepare each packet for sending, and add to relevant queue: */
767 skb_queue_walk_safe(list, skb, tmp) { 769 skb_queue_walk_safe(list, skb, tmp) {
768 __skb_unlink(skb, list); 770 __skb_unlink(skb, list);
769 msg = buf_msg(skb); 771 msg = buf_msg(skb);
770 msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); 772 msg_set_seqno(msg, seqno);
773 msg_set_ack(msg, ack);
771 msg_set_bcast_ack(msg, bc_last_in); 774 msg_set_bcast_ack(msg, bc_last_in);
772 775
773 if (skb_queue_len(outqueue) < sndlim) { 776 if (likely(skb_queue_len(transmq) < maxwin)) {
774 __skb_queue_tail(outqueue, skb); 777 __skb_queue_tail(transmq, skb);
775 tipc_bearer_send(net, link->bearer_id, 778 tipc_bearer_send(net, link->bearer_id, skb, addr);
776 skb, addr); 779 link->rcv_unacked = 0;
777 link->next_out = NULL; 780 seqno++;
778 link->unacked_window = 0; 781 continue;
779 } else if (tipc_msg_bundle(outqueue, skb, mtu)) { 782 }
783 if (tipc_msg_bundle(skb_peek_tail(backlogq), skb, mtu)) {
780 link->stats.sent_bundled++; 784 link->stats.sent_bundled++;
781 continue; 785 continue;
782 } else if (tipc_msg_make_bundle(outqueue, skb, mtu, 786 }
783 link->addr)) { 787 if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {
784 link->stats.sent_bundled++; 788 link->stats.sent_bundled++;
785 link->stats.sent_bundles++; 789 link->stats.sent_bundles++;
786 if (!link->next_out)
787 link->next_out = skb_peek_tail(outqueue);
788 } else {
789 __skb_queue_tail(outqueue, skb);
790 if (!link->next_out)
791 link->next_out = skb;
792 } 790 }
791 __skb_queue_tail(backlogq, skb);
793 seqno++; 792 seqno++;
794 } 793 }
795 link->next_out_no = seqno; 794 link->next_out_no = seqno;
@@ -895,14 +894,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
895 kfree_skb(buf); 894 kfree_skb(buf);
896} 895}
897 896
898struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
899 const struct sk_buff *skb)
900{
901 if (skb_queue_is_last(list, skb))
902 return NULL;
903 return skb->next;
904}
905
906/* 897/*
907 * tipc_link_push_packets - push unsent packets to bearer 898 * tipc_link_push_packets - push unsent packets to bearer
908 * 899 *
@@ -911,30 +902,23 @@ struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
911 * 902 *
912 * Called with node locked 903 * Called with node locked
913 */ 904 */
914void tipc_link_push_packets(struct tipc_link *l_ptr) 905void tipc_link_push_packets(struct tipc_link *link)
915{ 906{
916 struct sk_buff_head *outqueue = &l_ptr->outqueue; 907 struct sk_buff *skb;
917 struct sk_buff *skb = l_ptr->next_out;
918 struct tipc_msg *msg; 908 struct tipc_msg *msg;
919 u32 next, first; 909 unsigned int ack = mod(link->next_in_no - 1);
920 910
921 skb_queue_walk_from(outqueue, skb) { 911 while (skb_queue_len(&link->transmq) < link->window) {
922 msg = buf_msg(skb); 912 skb = __skb_dequeue(&link->backlogq);
923 next = msg_seqno(msg); 913 if (!skb)
924 first = buf_seqno(skb_peek(outqueue));
925
926 if (mod(next - first) < l_ptr->queue_limit[0]) {
927 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
928 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
929 if (msg_user(msg) == MSG_BUNDLER)
930 TIPC_SKB_CB(skb)->bundling = false;
931 tipc_bearer_send(l_ptr->owner->net,
932 l_ptr->bearer_id, skb,
933 &l_ptr->media_addr);
934 l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
935 } else {
936 break; 914 break;
937 } 915 msg = buf_msg(skb);
916 msg_set_ack(msg, ack);
917 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
918 link->rcv_unacked = 0;
919 __skb_queue_tail(&link->transmq, skb);
920 tipc_bearer_send(link->owner->net, link->bearer_id,
921 skb, &link->media_addr);
938 } 922 }
939} 923}
940 924
@@ -1021,8 +1005,8 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
1021 l_ptr->stale_count = 1; 1005 l_ptr->stale_count = 1;
1022 } 1006 }
1023 1007
1024 skb_queue_walk_from(&l_ptr->outqueue, skb) { 1008 skb_queue_walk_from(&l_ptr->transmq, skb) {
1025 if (!retransmits || skb == l_ptr->next_out) 1009 if (!retransmits)
1026 break; 1010 break;
1027 msg = buf_msg(skb); 1011 msg = buf_msg(skb);
1028 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1012 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
@@ -1039,12 +1023,12 @@ static void link_retrieve_defq(struct tipc_link *link,
1039{ 1023{
1040 u32 seq_no; 1024 u32 seq_no;
1041 1025
1042 if (skb_queue_empty(&link->deferred_queue)) 1026 if (skb_queue_empty(&link->deferdq))
1043 return; 1027 return;
1044 1028
1045 seq_no = buf_seqno(skb_peek(&link->deferred_queue)); 1029 seq_no = buf_seqno(skb_peek(&link->deferdq));
1046 if (seq_no == mod(link->next_in_no)) 1030 if (seq_no == mod(link->next_in_no))
1047 skb_queue_splice_tail_init(&link->deferred_queue, list); 1031 skb_queue_splice_tail_init(&link->deferdq, list);
1048} 1032}
1049 1033
1050/** 1034/**
@@ -1121,17 +1105,16 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1121 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1105 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1122 1106
1123 released = 0; 1107 released = 0;
1124 skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) { 1108 skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
1125 if (skb1 == l_ptr->next_out || 1109 if (more(buf_seqno(skb1), ackd))
1126 more(buf_seqno(skb1), ackd))
1127 break; 1110 break;
1128 __skb_unlink(skb1, &l_ptr->outqueue); 1111 __skb_unlink(skb1, &l_ptr->transmq);
1129 kfree_skb(skb1); 1112 kfree_skb(skb1);
1130 released = 1; 1113 released = 1;
1131 } 1114 }
1132 1115
1133 /* Try sending any messages link endpoint has pending */ 1116 /* Try sending any messages link endpoint has pending */
1134 if (unlikely(l_ptr->next_out)) 1117 if (unlikely(skb_queue_len(&l_ptr->backlogq)))
1135 tipc_link_push_packets(l_ptr); 1118 tipc_link_push_packets(l_ptr);
1136 1119
1137 if (released && !skb_queue_empty(&l_ptr->wakeupq)) 1120 if (released && !skb_queue_empty(&l_ptr->wakeupq))
@@ -1166,10 +1149,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1166 goto unlock; 1149 goto unlock;
1167 } 1150 }
1168 l_ptr->next_in_no++; 1151 l_ptr->next_in_no++;
1169 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) 1152 if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
1170 link_retrieve_defq(l_ptr, &head); 1153 link_retrieve_defq(l_ptr, &head);
1171 1154 if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
1172 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1173 l_ptr->stats.sent_acks++; 1155 l_ptr->stats.sent_acks++;
1174 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1156 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1175 } 1157 }
@@ -1336,9 +1318,9 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1336 return; 1318 return;
1337 } 1319 }
1338 1320
1339 if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) { 1321 if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
1340 l_ptr->stats.deferred_recv++; 1322 l_ptr->stats.deferred_recv++;
1341 if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1) 1323 if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
1342 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1324 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1343 } else { 1325 } else {
1344 l_ptr->stats.duplicates++; 1326 l_ptr->stats.duplicates++;
@@ -1375,11 +1357,11 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1375 1357
1376 if (!tipc_link_is_up(l_ptr)) 1358 if (!tipc_link_is_up(l_ptr))
1377 return; 1359 return;
1378 if (l_ptr->next_out) 1360 if (skb_queue_len(&l_ptr->backlogq))
1379 next_sent = buf_seqno(l_ptr->next_out); 1361 next_sent = buf_seqno(skb_peek(&l_ptr->backlogq));
1380 msg_set_next_sent(msg, next_sent); 1362 msg_set_next_sent(msg, next_sent);
1381 if (!skb_queue_empty(&l_ptr->deferred_queue)) { 1363 if (!skb_queue_empty(&l_ptr->deferdq)) {
1382 u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue)); 1364 u32 rec = buf_seqno(skb_peek(&l_ptr->deferdq));
1383 gap = mod(rec - mod(l_ptr->next_in_no)); 1365 gap = mod(rec - mod(l_ptr->next_in_no));
1384 } 1366 }
1385 msg_set_seq_gap(msg, gap); 1367 msg_set_seq_gap(msg, gap);
@@ -1431,10 +1413,9 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1431 1413
1432 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 1414 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1433 buf->priority = TC_PRIO_CONTROL; 1415 buf->priority = TC_PRIO_CONTROL;
1434
1435 tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf, 1416 tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
1436 &l_ptr->media_addr); 1417 &l_ptr->media_addr);
1437 l_ptr->unacked_window = 0; 1418 l_ptr->rcv_unacked = 0;
1438 kfree_skb(buf); 1419 kfree_skb(buf);
1439} 1420}
1440 1421
@@ -1569,7 +1550,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
1569 } 1550 }
1570 if (msg_seq_gap(msg)) { 1551 if (msg_seq_gap(msg)) {
1571 l_ptr->stats.recv_nacks++; 1552 l_ptr->stats.recv_nacks++;
1572 tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue), 1553 tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
1573 msg_seq_gap(msg)); 1554 msg_seq_gap(msg));
1574 } 1555 }
1575 break; 1556 break;
@@ -1616,7 +1597,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1616 */ 1597 */
1617void tipc_link_failover_send_queue(struct tipc_link *l_ptr) 1598void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1618{ 1599{
1619 u32 msgcount = skb_queue_len(&l_ptr->outqueue); 1600 int msgcount;
1620 struct tipc_link *tunnel = l_ptr->owner->active_links[0]; 1601 struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1621 struct tipc_msg tunnel_hdr; 1602 struct tipc_msg tunnel_hdr;
1622 struct sk_buff *skb; 1603 struct sk_buff *skb;
@@ -1627,10 +1608,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1627 1608
1628 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, 1609 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
1629 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); 1610 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
1611 skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
1612 msgcount = skb_queue_len(&l_ptr->transmq);
1630 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1613 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1631 msg_set_msgcnt(&tunnel_hdr, msgcount); 1614 msg_set_msgcnt(&tunnel_hdr, msgcount);
1632 1615
1633 if (skb_queue_empty(&l_ptr->outqueue)) { 1616 if (skb_queue_empty(&l_ptr->transmq)) {
1634 skb = tipc_buf_acquire(INT_H_SIZE); 1617 skb = tipc_buf_acquire(INT_H_SIZE);
1635 if (skb) { 1618 if (skb) {
1636 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); 1619 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
@@ -1646,7 +1629,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1646 split_bundles = (l_ptr->owner->active_links[0] != 1629 split_bundles = (l_ptr->owner->active_links[0] !=
1647 l_ptr->owner->active_links[1]); 1630 l_ptr->owner->active_links[1]);
1648 1631
1649 skb_queue_walk(&l_ptr->outqueue, skb) { 1632 skb_queue_walk(&l_ptr->transmq, skb) {
1650 struct tipc_msg *msg = buf_msg(skb); 1633 struct tipc_msg *msg = buf_msg(skb);
1651 1634
1652 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 1635 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
@@ -1677,39 +1660,46 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1677 * and sequence order is preserved per sender/receiver socket pair. 1660 * and sequence order is preserved per sender/receiver socket pair.
1678 * Owner node is locked. 1661 * Owner node is locked.
1679 */ 1662 */
1680void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, 1663void tipc_link_dup_queue_xmit(struct tipc_link *link,
1681 struct tipc_link *tunnel) 1664 struct tipc_link *tnl)
1682{ 1665{
1683 struct sk_buff *skb; 1666 struct sk_buff *skb;
1684 struct tipc_msg tunnel_hdr; 1667 struct tipc_msg tnl_hdr;
1685 1668 struct sk_buff_head *queue = &link->transmq;
1686 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, 1669 int mcnt;
1687 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); 1670
1688 msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue)); 1671 tipc_msg_init(link_own_addr(link), &tnl_hdr, CHANGEOVER_PROTOCOL,
1689 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1672 DUPLICATE_MSG, INT_H_SIZE, link->addr);
1690 skb_queue_walk(&l_ptr->outqueue, skb) { 1673 mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
1674 msg_set_msgcnt(&tnl_hdr, mcnt);
1675 msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);
1676
1677tunnel_queue:
1678 skb_queue_walk(queue, skb) {
1691 struct sk_buff *outskb; 1679 struct sk_buff *outskb;
1692 struct tipc_msg *msg = buf_msg(skb); 1680 struct tipc_msg *msg = buf_msg(skb);
1693 u32 length = msg_size(msg); 1681 u32 len = msg_size(msg);
1694 1682
1695 if (msg_user(msg) == MSG_BUNDLER) 1683 msg_set_ack(msg, mod(link->next_in_no - 1));
1696 msg_set_type(msg, CLOSED_MSG); 1684 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
1697 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 1685 msg_set_size(&tnl_hdr, len + INT_H_SIZE);
1698 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1686 outskb = tipc_buf_acquire(len + INT_H_SIZE);
1699 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
1700 outskb = tipc_buf_acquire(length + INT_H_SIZE);
1701 if (outskb == NULL) { 1687 if (outskb == NULL) {
1702 pr_warn("%sunable to send duplicate msg\n", 1688 pr_warn("%sunable to send duplicate msg\n",
1703 link_co_err); 1689 link_co_err);
1704 return; 1690 return;
1705 } 1691 }
1706 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE); 1692 skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE);
1707 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data, 1693 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE,
1708 length); 1694 skb->data, len);
1709 __tipc_link_xmit_skb(tunnel, outskb); 1695 __tipc_link_xmit_skb(tnl, outskb);
1710 if (!tipc_link_is_up(l_ptr)) 1696 if (!tipc_link_is_up(link))
1711 return; 1697 return;
1712 } 1698 }
1699 if (queue == &link->backlogq)
1700 return;
1701 queue = &link->backlogq;
1702 goto tunnel_queue;
1713} 1703}
1714 1704
1715/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. 1705/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
@@ -1823,6 +1813,8 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
1823 1813
1824void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window) 1814void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
1825{ 1815{
1816 l_ptr->window = window;
1817
1826 /* Data messages from this node, inclusive FIRST_FRAGM */ 1818 /* Data messages from this node, inclusive FIRST_FRAGM */
1827 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window; 1819 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
1828 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4; 1820 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;