aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2014-11-25 22:41:52 -0500
committerDavid S. Miller <davem@davemloft.net>2014-11-26 12:30:17 -0500
commit58dc55f25631178ee74cd27185956a8f7dcb3e32 (patch)
treea38c003514637757191edf01d906fd58b300e6b1 /net/tipc/link.c
parent58d78b328a70f4b5ed1c00010499aaedb715ea5b (diff)
tipc: use generic SKB list APIs to manage link transmission queue
Use standard SKB list APIs associated with struct sk_buff_head to manage link transmission queue, having relevant code more clean. Signed-off-by: Ying Xue <ying.xue@windriver.com> Reviewed-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c190
1 files changed, 86 insertions, 104 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ddee498e74bc..9e94bf935e48 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -171,14 +171,17 @@ int tipc_link_is_active(struct tipc_link *l_ptr)
171 */ 171 */
172static void link_timeout(struct tipc_link *l_ptr) 172static void link_timeout(struct tipc_link *l_ptr)
173{ 173{
174 struct sk_buff *skb;
175
174 tipc_node_lock(l_ptr->owner); 176 tipc_node_lock(l_ptr->owner);
175 177
176 /* update counters used in statistical profiling of send traffic */ 178 /* update counters used in statistical profiling of send traffic */
177 l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; 179 l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue);
178 l_ptr->stats.queue_sz_counts++; 180 l_ptr->stats.queue_sz_counts++;
179 181
180 if (l_ptr->first_out) { 182 skb = skb_peek(&l_ptr->outqueue);
181 struct tipc_msg *msg = buf_msg(l_ptr->first_out); 183 if (skb) {
184 struct tipc_msg *msg = buf_msg(skb);
182 u32 length = msg_size(msg); 185 u32 length = msg_size(msg);
183 186
184 if ((msg_user(msg) == MSG_FRAGMENTER) && 187 if ((msg_user(msg) == MSG_FRAGMENTER) &&
@@ -206,7 +209,6 @@ static void link_timeout(struct tipc_link *l_ptr)
206 } 209 }
207 210
208 /* do all other link processing performed on a periodic basis */ 211 /* do all other link processing performed on a periodic basis */
209
210 link_state_event(l_ptr, TIMEOUT_EVT); 212 link_state_event(l_ptr, TIMEOUT_EVT);
211 213
212 if (l_ptr->next_out) 214 if (l_ptr->next_out)
@@ -289,6 +291,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
289 link_init_max_pkt(l_ptr); 291 link_init_max_pkt(l_ptr);
290 292
291 l_ptr->next_out_no = 1; 293 l_ptr->next_out_no = 1;
294 __skb_queue_head_init(&l_ptr->outqueue);
292 __skb_queue_head_init(&l_ptr->waiting_sks); 295 __skb_queue_head_init(&l_ptr->waiting_sks);
293 296
294 link_reset_statistics(l_ptr); 297 link_reset_statistics(l_ptr);
@@ -367,7 +370,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
367 */ 370 */
368static void link_prepare_wakeup(struct tipc_link *link) 371static void link_prepare_wakeup(struct tipc_link *link)
369{ 372{
370 uint pend_qsz = link->out_queue_size; 373 uint pend_qsz = skb_queue_len(&link->outqueue);
371 struct sk_buff *skb, *tmp; 374 struct sk_buff *skb, *tmp;
372 375
373 skb_queue_walk_safe(&link->waiting_sks, skb, tmp) { 376 skb_queue_walk_safe(&link->waiting_sks, skb, tmp) {
@@ -380,17 +383,6 @@ static void link_prepare_wakeup(struct tipc_link *link)
380} 383}
381 384
382/** 385/**
383 * link_release_outqueue - purge link's outbound message queue
384 * @l_ptr: pointer to link
385 */
386static void link_release_outqueue(struct tipc_link *l_ptr)
387{
388 kfree_skb_list(l_ptr->first_out);
389 l_ptr->first_out = NULL;
390 l_ptr->out_queue_size = 0;
391}
392
393/**
394 * tipc_link_reset_fragments - purge link's inbound message fragments queue 386 * tipc_link_reset_fragments - purge link's inbound message fragments queue
395 * @l_ptr: pointer to link 387 * @l_ptr: pointer to link
396 */ 388 */
@@ -407,7 +399,7 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
407void tipc_link_purge_queues(struct tipc_link *l_ptr) 399void tipc_link_purge_queues(struct tipc_link *l_ptr)
408{ 400{
409 kfree_skb_list(l_ptr->oldest_deferred_in); 401 kfree_skb_list(l_ptr->oldest_deferred_in);
410 kfree_skb_list(l_ptr->first_out); 402 __skb_queue_purge(&l_ptr->outqueue);
411 tipc_link_reset_fragments(l_ptr); 403 tipc_link_reset_fragments(l_ptr);
412} 404}
413 405
@@ -440,14 +432,12 @@ void tipc_link_reset(struct tipc_link *l_ptr)
440 } 432 }
441 433
442 /* Clean up all queues: */ 434 /* Clean up all queues: */
443 link_release_outqueue(l_ptr); 435 __skb_queue_purge(&l_ptr->outqueue);
444 kfree_skb_list(l_ptr->oldest_deferred_in); 436 kfree_skb_list(l_ptr->oldest_deferred_in);
445 if (!skb_queue_empty(&l_ptr->waiting_sks)) { 437 if (!skb_queue_empty(&l_ptr->waiting_sks)) {
446 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); 438 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
447 owner->action_flags |= TIPC_WAKEUP_USERS; 439 owner->action_flags |= TIPC_WAKEUP_USERS;
448 } 440 }
449 l_ptr->last_out = NULL;
450 l_ptr->first_out = NULL;
451 l_ptr->next_out = NULL; 441 l_ptr->next_out = NULL;
452 l_ptr->unacked_window = 0; 442 l_ptr->unacked_window = 0;
453 l_ptr->checkpoint = 1; 443 l_ptr->checkpoint = 1;
@@ -703,18 +693,17 @@ drop:
703/** 693/**
704 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked 694 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
705 * @link: link to use 695 * @link: link to use
706 * @buf: chain of buffers containing message 696 * @skb: chain of buffers containing message
707 * Consumes the buffer chain, except when returning -ELINKCONG 697 * Consumes the buffer chain, except when returning -ELINKCONG
708 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket 698 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
709 * user data messages) or -EHOSTUNREACH (all other messages/senders) 699 * user data messages) or -EHOSTUNREACH (all other messages/senders)
710 * Only the socket functions tipc_send_stream() and tipc_send_packet() need 700 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
711 * to act on the return value, since they may need to do more send attempts. 701 * to act on the return value, since they may need to do more send attempts.
712 */ 702 */
713int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) 703int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb)
714{ 704{
715 struct tipc_msg *msg = buf_msg(buf); 705 struct tipc_msg *msg = buf_msg(skb);
716 uint psz = msg_size(msg); 706 uint psz = msg_size(msg);
717 uint qsz = link->out_queue_size;
718 uint sndlim = link->queue_limit[0]; 707 uint sndlim = link->queue_limit[0];
719 uint imp = tipc_msg_tot_importance(msg); 708 uint imp = tipc_msg_tot_importance(msg);
720 uint mtu = link->max_pkt; 709 uint mtu = link->max_pkt;
@@ -722,58 +711,50 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
722 uint seqno = link->next_out_no; 711 uint seqno = link->next_out_no;
723 uint bc_last_in = link->owner->bclink.last_in; 712 uint bc_last_in = link->owner->bclink.last_in;
724 struct tipc_media_addr *addr = &link->media_addr; 713 struct tipc_media_addr *addr = &link->media_addr;
725 struct sk_buff *next = buf->next; 714 struct sk_buff_head *outqueue = &link->outqueue;
715 struct sk_buff *next;
726 716
727 /* Match queue limits against msg importance: */ 717 /* Match queue limits against msg importance: */
728 if (unlikely(qsz >= link->queue_limit[imp])) 718 if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
729 return tipc_link_cong(link, buf); 719 return tipc_link_cong(link, skb);
730 720
731 /* Has valid packet limit been used ? */ 721 /* Has valid packet limit been used ? */
732 if (unlikely(psz > mtu)) { 722 if (unlikely(psz > mtu)) {
733 kfree_skb_list(buf); 723 kfree_skb_list(skb);
734 return -EMSGSIZE; 724 return -EMSGSIZE;
735 } 725 }
736 726
737 /* Prepare each packet for sending, and add to outqueue: */ 727 /* Prepare each packet for sending, and add to outqueue: */
738 while (buf) { 728 while (skb) {
739 next = buf->next; 729 next = skb->next;
740 msg = buf_msg(buf); 730 msg = buf_msg(skb);
741 msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); 731 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
742 msg_set_bcast_ack(msg, bc_last_in); 732 msg_set_bcast_ack(msg, bc_last_in);
743 733
744 if (!link->first_out) { 734 if (skb_queue_len(outqueue) < sndlim) {
745 link->first_out = buf; 735 __skb_queue_tail(outqueue, skb);
746 } else if (qsz < sndlim) { 736 tipc_bearer_send(link->bearer_id, skb, addr);
747 link->last_out->next = buf; 737 link->next_out = NULL;
748 } else if (tipc_msg_bundle(link->last_out, buf, mtu)) { 738 link->unacked_window = 0;
739 } else if (tipc_msg_bundle(outqueue, skb, mtu)) {
749 link->stats.sent_bundled++; 740 link->stats.sent_bundled++;
750 buf = next; 741 skb = next;
751 next = buf->next;
752 continue; 742 continue;
753 } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) { 743 } else if (tipc_msg_make_bundle(outqueue, skb, mtu,
744 link->addr)) {
754 link->stats.sent_bundled++; 745 link->stats.sent_bundled++;
755 link->stats.sent_bundles++; 746 link->stats.sent_bundles++;
756 link->last_out->next = buf;
757 if (!link->next_out) 747 if (!link->next_out)
758 link->next_out = buf; 748 link->next_out = skb_peek_tail(outqueue);
759 } else { 749 } else {
760 link->last_out->next = buf; 750 __skb_queue_tail(outqueue, skb);
761 if (!link->next_out) 751 if (!link->next_out)
762 link->next_out = buf; 752 link->next_out = skb;
763 }
764
765 /* Send packet if possible: */
766 if (likely(++qsz <= sndlim)) {
767 tipc_bearer_send(link->bearer_id, buf, addr);
768 link->next_out = next;
769 link->unacked_window = 0;
770 } 753 }
771 seqno++; 754 seqno++;
772 link->last_out = buf; 755 skb = next;
773 buf = next;
774 } 756 }
775 link->next_out_no = seqno; 757 link->next_out_no = seqno;
776 link->out_queue_size = qsz;
777 return 0; 758 return 0;
778} 759}
779 760
@@ -851,6 +832,14 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
851 kfree_skb(buf); 832 kfree_skb(buf);
852} 833}
853 834
835struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
836 const struct sk_buff *skb)
837{
838 if (skb_queue_is_last(list, skb))
839 return NULL;
840 return skb->next;
841}
842
854/* 843/*
855 * tipc_link_push_packets - push unsent packets to bearer 844 * tipc_link_push_packets - push unsent packets to bearer
856 * 845 *
@@ -861,15 +850,15 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
861 */ 850 */
862void tipc_link_push_packets(struct tipc_link *l_ptr) 851void tipc_link_push_packets(struct tipc_link *l_ptr)
863{ 852{
864 struct sk_buff *skb; 853 struct sk_buff_head *outqueue = &l_ptr->outqueue;
854 struct sk_buff *skb = l_ptr->next_out;
865 struct tipc_msg *msg; 855 struct tipc_msg *msg;
866 u32 next, first; 856 u32 next, first;
867 857
868 while (l_ptr->next_out) { 858 skb_queue_walk_from(outqueue, skb) {
869 skb = l_ptr->next_out;
870 msg = buf_msg(skb); 859 msg = buf_msg(skb);
871 next = msg_seqno(msg); 860 next = msg_seqno(msg);
872 first = buf_seqno(l_ptr->first_out); 861 first = buf_seqno(skb_peek(outqueue));
873 862
874 if (mod(next - first) < l_ptr->queue_limit[0]) { 863 if (mod(next - first) < l_ptr->queue_limit[0]) {
875 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 864 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
@@ -878,7 +867,7 @@ void tipc_link_push_packets(struct tipc_link *l_ptr)
878 TIPC_SKB_CB(skb)->bundling = false; 867 TIPC_SKB_CB(skb)->bundling = false;
879 tipc_bearer_send(l_ptr->bearer_id, skb, 868 tipc_bearer_send(l_ptr->bearer_id, skb,
880 &l_ptr->media_addr); 869 &l_ptr->media_addr);
881 l_ptr->next_out = skb->next; 870 l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
882 } else { 871 } else {
883 break; 872 break;
884 } 873 }
@@ -946,20 +935,20 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
946 } 935 }
947} 936}
948 937
949void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, 938void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
950 u32 retransmits) 939 u32 retransmits)
951{ 940{
952 struct tipc_msg *msg; 941 struct tipc_msg *msg;
953 942
954 if (!buf) 943 if (!skb)
955 return; 944 return;
956 945
957 msg = buf_msg(buf); 946 msg = buf_msg(skb);
958 947
959 /* Detect repeated retransmit failures */ 948 /* Detect repeated retransmit failures */
960 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 949 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
961 if (++l_ptr->stale_count > 100) { 950 if (++l_ptr->stale_count > 100) {
962 link_retransmit_failure(l_ptr, buf); 951 link_retransmit_failure(l_ptr, skb);
963 return; 952 return;
964 } 953 }
965 } else { 954 } else {
@@ -967,12 +956,13 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
967 l_ptr->stale_count = 1; 956 l_ptr->stale_count = 1;
968 } 957 }
969 958
970 while (retransmits && (buf != l_ptr->next_out) && buf) { 959 skb_queue_walk_from(&l_ptr->outqueue, skb) {
971 msg = buf_msg(buf); 960 if (!retransmits || skb == l_ptr->next_out)
961 break;
962 msg = buf_msg(skb);
972 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 963 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
973 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 964 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
974 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); 965 tipc_bearer_send(l_ptr->bearer_id, skb, &l_ptr->media_addr);
975 buf = buf->next;
976 retransmits--; 966 retransmits--;
977 l_ptr->stats.retransmitted++; 967 l_ptr->stats.retransmitted++;
978 } 968 }
@@ -1067,12 +1057,12 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1067 while (head) { 1057 while (head) {
1068 struct tipc_node *n_ptr; 1058 struct tipc_node *n_ptr;
1069 struct tipc_link *l_ptr; 1059 struct tipc_link *l_ptr;
1070 struct sk_buff *crs;
1071 struct sk_buff *buf = head; 1060 struct sk_buff *buf = head;
1061 struct sk_buff *skb1, *tmp;
1072 struct tipc_msg *msg; 1062 struct tipc_msg *msg;
1073 u32 seq_no; 1063 u32 seq_no;
1074 u32 ackd; 1064 u32 ackd;
1075 u32 released = 0; 1065 u32 released;
1076 1066
1077 head = head->next; 1067 head = head->next;
1078 buf->next = NULL; 1068 buf->next = NULL;
@@ -1131,17 +1121,14 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1131 if (n_ptr->bclink.recv_permitted) 1121 if (n_ptr->bclink.recv_permitted)
1132 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1122 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1133 1123
1134 crs = l_ptr->first_out; 1124 released = 0;
1135 while ((crs != l_ptr->next_out) && 1125 skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) {
1136 less_eq(buf_seqno(crs), ackd)) { 1126 if (skb1 == l_ptr->next_out ||
1137 struct sk_buff *next = crs->next; 1127 more(buf_seqno(skb1), ackd))
1138 kfree_skb(crs); 1128 break;
1139 crs = next; 1129 __skb_unlink(skb1, &l_ptr->outqueue);
1140 released++; 1130 kfree_skb(skb1);
1141 } 1131 released = 1;
1142 if (released) {
1143 l_ptr->first_out = crs;
1144 l_ptr->out_queue_size -= released;
1145 } 1132 }
1146 1133
1147 /* Try sending any messages link endpoint has pending */ 1134 /* Try sending any messages link endpoint has pending */
@@ -1590,7 +1577,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
1590 } 1577 }
1591 if (msg_seq_gap(msg)) { 1578 if (msg_seq_gap(msg)) {
1592 l_ptr->stats.recv_nacks++; 1579 l_ptr->stats.recv_nacks++;
1593 tipc_link_retransmit(l_ptr, l_ptr->first_out, 1580 tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue),
1594 msg_seq_gap(msg)); 1581 msg_seq_gap(msg));
1595 } 1582 }
1596 break; 1583 break;
@@ -1637,10 +1624,10 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1637 */ 1624 */
1638void tipc_link_failover_send_queue(struct tipc_link *l_ptr) 1625void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1639{ 1626{
1640 u32 msgcount = l_ptr->out_queue_size; 1627 u32 msgcount = skb_queue_len(&l_ptr->outqueue);
1641 struct sk_buff *crs = l_ptr->first_out;
1642 struct tipc_link *tunnel = l_ptr->owner->active_links[0]; 1628 struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1643 struct tipc_msg tunnel_hdr; 1629 struct tipc_msg tunnel_hdr;
1630 struct sk_buff *skb;
1644 int split_bundles; 1631 int split_bundles;
1645 1632
1646 if (!tunnel) 1633 if (!tunnel)
@@ -1651,14 +1638,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1651 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1638 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1652 msg_set_msgcnt(&tunnel_hdr, msgcount); 1639 msg_set_msgcnt(&tunnel_hdr, msgcount);
1653 1640
1654 if (!l_ptr->first_out) { 1641 if (skb_queue_empty(&l_ptr->outqueue)) {
1655 struct sk_buff *buf; 1642 skb = tipc_buf_acquire(INT_H_SIZE);
1656 1643 if (skb) {
1657 buf = tipc_buf_acquire(INT_H_SIZE); 1644 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
1658 if (buf) {
1659 skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
1660 msg_set_size(&tunnel_hdr, INT_H_SIZE); 1645 msg_set_size(&tunnel_hdr, INT_H_SIZE);
1661 __tipc_link_xmit(tunnel, buf); 1646 __tipc_link_xmit(tunnel, skb);
1662 } else { 1647 } else {
1663 pr_warn("%sunable to send changeover msg\n", 1648 pr_warn("%sunable to send changeover msg\n",
1664 link_co_err); 1649 link_co_err);
@@ -1669,8 +1654,8 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1669 split_bundles = (l_ptr->owner->active_links[0] != 1654 split_bundles = (l_ptr->owner->active_links[0] !=
1670 l_ptr->owner->active_links[1]); 1655 l_ptr->owner->active_links[1]);
1671 1656
1672 while (crs) { 1657 skb_queue_walk(&l_ptr->outqueue, skb) {
1673 struct tipc_msg *msg = buf_msg(crs); 1658 struct tipc_msg *msg = buf_msg(skb);
1674 1659
1675 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 1660 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
1676 struct tipc_msg *m = msg_get_wrapped(msg); 1661 struct tipc_msg *m = msg_get_wrapped(msg);
@@ -1688,7 +1673,6 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1688 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, 1673 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
1689 msg_link_selector(msg)); 1674 msg_link_selector(msg));
1690 } 1675 }
1691 crs = crs->next;
1692 } 1676 }
1693} 1677}
1694 1678
@@ -1704,17 +1688,16 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1704void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, 1688void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1705 struct tipc_link *tunnel) 1689 struct tipc_link *tunnel)
1706{ 1690{
1707 struct sk_buff *iter; 1691 struct sk_buff *skb;
1708 struct tipc_msg tunnel_hdr; 1692 struct tipc_msg tunnel_hdr;
1709 1693
1710 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 1694 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
1711 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); 1695 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
1712 msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); 1696 msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
1713 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1697 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1714 iter = l_ptr->first_out; 1698 skb_queue_walk(&l_ptr->outqueue, skb) {
1715 while (iter) { 1699 struct sk_buff *outskb;
1716 struct sk_buff *outbuf; 1700 struct tipc_msg *msg = buf_msg(skb);
1717 struct tipc_msg *msg = buf_msg(iter);
1718 u32 length = msg_size(msg); 1701 u32 length = msg_size(msg);
1719 1702
1720 if (msg_user(msg) == MSG_BUNDLER) 1703 if (msg_user(msg) == MSG_BUNDLER)
@@ -1722,19 +1705,18 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1722 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 1705 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */
1723 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1706 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1724 msg_set_size(&tunnel_hdr, length + INT_H_SIZE); 1707 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
1725 outbuf = tipc_buf_acquire(length + INT_H_SIZE); 1708 outskb = tipc_buf_acquire(length + INT_H_SIZE);
1726 if (outbuf == NULL) { 1709 if (outskb == NULL) {
1727 pr_warn("%sunable to send duplicate msg\n", 1710 pr_warn("%sunable to send duplicate msg\n",
1728 link_co_err); 1711 link_co_err);
1729 return; 1712 return;
1730 } 1713 }
1731 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); 1714 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
1732 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, 1715 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
1733 length); 1716 length);
1734 __tipc_link_xmit(tunnel, outbuf); 1717 __tipc_link_xmit(tunnel, outskb);
1735 if (!tipc_link_is_up(l_ptr)) 1718 if (!tipc_link_is_up(l_ptr))
1736 return; 1719 return;
1737 iter = iter->next;
1738 } 1720 }
1739} 1721}
1740 1722