aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-03-13 16:08:10 -0400
committerDavid S. Miller <davem@davemloft.net>2015-03-14 14:38:32 -0400
commit05dcc5aa4dcced4f59f925625cea669e82b75519 (patch)
tree0a516e1012ee7e9b7eee037d8e31278a425e7d68
parent2cdf3918e47e98c8f34f7a64455ea9fd433756e7 (diff)
tipc: split link outqueue
struct tipc_link contains one single queue for outgoing packets, where both transmitted and waiting packets are queued. This infrastructure is hard to maintain, because we need to keep a number of fields to keep track of which packets are sent or unsent, and the number of packets in each category. A lot of code becomes simpler if we split this queue into a transmission queue, where sent/unacknowledged packets are kept, and a backlog queue, where we keep the not yet sent packets. In this commit we do this separation. Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/bcast.c48
-rw-r--r--net/tipc/link.c208
-rw-r--r--net/tipc/link.h17
-rw-r--r--net/tipc/msg.c32
-rw-r--r--net/tipc/msg.h6
-rw-r--r--net/tipc/node.c4
-rw-r--r--net/tipc/node.h2
7 files changed, 150 insertions, 167 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 5ee5076a8b27..17cb0ff5f344 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -135,9 +135,10 @@ static void bclink_set_last_sent(struct net *net)
135{ 135{
136 struct tipc_net *tn = net_generic(net, tipc_net_id); 136 struct tipc_net *tn = net_generic(net, tipc_net_id);
137 struct tipc_link *bcl = tn->bcl; 137 struct tipc_link *bcl = tn->bcl;
138 struct sk_buff *skb = skb_peek(&bcl->backlogq);
138 139
139 if (bcl->next_out) 140 if (skb)
140 bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1); 141 bcl->fsm_msg_cnt = mod(buf_seqno(skb) - 1);
141 else 142 else
142 bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1); 143 bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1);
143} 144}
@@ -180,7 +181,7 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
180 struct sk_buff *skb; 181 struct sk_buff *skb;
181 struct tipc_link *bcl = tn->bcl; 182 struct tipc_link *bcl = tn->bcl;
182 183
183 skb_queue_walk(&bcl->outqueue, skb) { 184 skb_queue_walk(&bcl->transmq, skb) {
184 if (more(buf_seqno(skb), after)) { 185 if (more(buf_seqno(skb), after)) {
185 tipc_link_retransmit(bcl, skb, mod(to - after)); 186 tipc_link_retransmit(bcl, skb, mod(to - after));
186 break; 187 break;
@@ -210,7 +211,6 @@ void tipc_bclink_wakeup_users(struct net *net)
210void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 211void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
211{ 212{
212 struct sk_buff *skb, *tmp; 213 struct sk_buff *skb, *tmp;
213 struct sk_buff *next;
214 unsigned int released = 0; 214 unsigned int released = 0;
215 struct net *net = n_ptr->net; 215 struct net *net = n_ptr->net;
216 struct tipc_net *tn = net_generic(net, tipc_net_id); 216 struct tipc_net *tn = net_generic(net, tipc_net_id);
@@ -221,7 +221,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
221 tipc_bclink_lock(net); 221 tipc_bclink_lock(net);
222 222
223 /* Bail out if tx queue is empty (no clean up is required) */ 223 /* Bail out if tx queue is empty (no clean up is required) */
224 skb = skb_peek(&tn->bcl->outqueue); 224 skb = skb_peek(&tn->bcl->transmq);
225 if (!skb) 225 if (!skb)
226 goto exit; 226 goto exit;
227 227
@@ -248,27 +248,19 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
248 } 248 }
249 249
250 /* Skip over packets that node has previously acknowledged */ 250 /* Skip over packets that node has previously acknowledged */
251 skb_queue_walk(&tn->bcl->outqueue, skb) { 251 skb_queue_walk(&tn->bcl->transmq, skb) {
252 if (more(buf_seqno(skb), n_ptr->bclink.acked)) 252 if (more(buf_seqno(skb), n_ptr->bclink.acked))
253 break; 253 break;
254 } 254 }
255 255
256 /* Update packets that node is now acknowledging */ 256 /* Update packets that node is now acknowledging */
257 skb_queue_walk_from_safe(&tn->bcl->outqueue, skb, tmp) { 257 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
258 if (more(buf_seqno(skb), acked)) 258 if (more(buf_seqno(skb), acked))
259 break; 259 break;
260 260 bcbuf_decr_acks(skb);
261 next = tipc_skb_queue_next(&tn->bcl->outqueue, skb); 261 bclink_set_last_sent(net);
262 if (skb != tn->bcl->next_out) {
263 bcbuf_decr_acks(skb);
264 } else {
265 bcbuf_set_acks(skb, 0);
266 tn->bcl->next_out = next;
267 bclink_set_last_sent(net);
268 }
269
270 if (bcbuf_acks(skb) == 0) { 262 if (bcbuf_acks(skb) == 0) {
271 __skb_unlink(skb, &tn->bcl->outqueue); 263 __skb_unlink(skb, &tn->bcl->transmq);
272 kfree_skb(skb); 264 kfree_skb(skb);
273 released = 1; 265 released = 1;
274 } 266 }
@@ -276,7 +268,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
276 n_ptr->bclink.acked = acked; 268 n_ptr->bclink.acked = acked;
277 269
278 /* Try resolving broadcast link congestion, if necessary */ 270 /* Try resolving broadcast link congestion, if necessary */
279 if (unlikely(tn->bcl->next_out)) { 271 if (unlikely(skb_peek(&tn->bcl->backlogq))) {
280 tipc_link_push_packets(tn->bcl); 272 tipc_link_push_packets(tn->bcl);
281 bclink_set_last_sent(net); 273 bclink_set_last_sent(net);
282 } 274 }
@@ -323,7 +315,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
323 buf = tipc_buf_acquire(INT_H_SIZE); 315 buf = tipc_buf_acquire(INT_H_SIZE);
324 if (buf) { 316 if (buf) {
325 struct tipc_msg *msg = buf_msg(buf); 317 struct tipc_msg *msg = buf_msg(buf);
326 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue); 318 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
327 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; 319 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
328 320
329 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG, 321 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
@@ -398,7 +390,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
398 if (likely(bclink->bcast_nodes.count)) { 390 if (likely(bclink->bcast_nodes.count)) {
399 rc = __tipc_link_xmit(net, bcl, list); 391 rc = __tipc_link_xmit(net, bcl, list);
400 if (likely(!rc)) { 392 if (likely(!rc)) {
401 u32 len = skb_queue_len(&bcl->outqueue); 393 u32 len = skb_queue_len(&bcl->transmq);
402 394
403 bclink_set_last_sent(net); 395 bclink_set_last_sent(net);
404 bcl->stats.queue_sz_counts++; 396 bcl->stats.queue_sz_counts++;
@@ -563,25 +555,25 @@ receive:
563 if (node->bclink.last_in == node->bclink.last_sent) 555 if (node->bclink.last_in == node->bclink.last_sent)
564 goto unlock; 556 goto unlock;
565 557
566 if (skb_queue_empty(&node->bclink.deferred_queue)) { 558 if (skb_queue_empty(&node->bclink.deferdq)) {
567 node->bclink.oos_state = 1; 559 node->bclink.oos_state = 1;
568 goto unlock; 560 goto unlock;
569 } 561 }
570 562
571 msg = buf_msg(skb_peek(&node->bclink.deferred_queue)); 563 msg = buf_msg(skb_peek(&node->bclink.deferdq));
572 seqno = msg_seqno(msg); 564 seqno = msg_seqno(msg);
573 next_in = mod(next_in + 1); 565 next_in = mod(next_in + 1);
574 if (seqno != next_in) 566 if (seqno != next_in)
575 goto unlock; 567 goto unlock;
576 568
577 /* Take in-sequence message from deferred queue & deliver it */ 569 /* Take in-sequence message from deferred queue & deliver it */
578 buf = __skb_dequeue(&node->bclink.deferred_queue); 570 buf = __skb_dequeue(&node->bclink.deferdq);
579 goto receive; 571 goto receive;
580 } 572 }
581 573
582 /* Handle out-of-sequence broadcast message */ 574 /* Handle out-of-sequence broadcast message */
583 if (less(next_in, seqno)) { 575 if (less(next_in, seqno)) {
584 deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue, 576 deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
585 buf); 577 buf);
586 bclink_update_last_sent(node, seqno); 578 bclink_update_last_sent(node, seqno);
587 buf = NULL; 579 buf = NULL;
@@ -638,7 +630,6 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
638 msg_set_non_seq(msg, 1); 630 msg_set_non_seq(msg, 1);
639 msg_set_mc_netid(msg, tn->net_id); 631 msg_set_mc_netid(msg, tn->net_id);
640 tn->bcl->stats.sent_info++; 632 tn->bcl->stats.sent_info++;
641
642 if (WARN_ON(!bclink->bcast_nodes.count)) { 633 if (WARN_ON(!bclink->bcast_nodes.count)) {
643 dump_stack(); 634 dump_stack();
644 return 0; 635 return 0;
@@ -917,8 +908,9 @@ int tipc_bclink_init(struct net *net)
917 sprintf(bcbearer->media.name, "tipc-broadcast"); 908 sprintf(bcbearer->media.name, "tipc-broadcast");
918 909
919 spin_lock_init(&bclink->lock); 910 spin_lock_init(&bclink->lock);
920 __skb_queue_head_init(&bcl->outqueue); 911 __skb_queue_head_init(&bcl->transmq);
921 __skb_queue_head_init(&bcl->deferred_queue); 912 __skb_queue_head_init(&bcl->backlogq);
913 __skb_queue_head_init(&bcl->deferdq);
922 skb_queue_head_init(&bcl->wakeupq); 914 skb_queue_head_init(&bcl->wakeupq);
923 bcl->next_out_no = 1; 915 bcl->next_out_no = 1;
924 spin_lock_init(&bclink->node.lock); 916 spin_lock_init(&bclink->node.lock);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 2652c3286e2f..7e0036f5a364 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -194,10 +194,10 @@ static void link_timeout(unsigned long data)
194 tipc_node_lock(l_ptr->owner); 194 tipc_node_lock(l_ptr->owner);
195 195
196 /* update counters used in statistical profiling of send traffic */ 196 /* update counters used in statistical profiling of send traffic */
197 l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue); 197 l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);
198 l_ptr->stats.queue_sz_counts++; 198 l_ptr->stats.queue_sz_counts++;
199 199
200 skb = skb_peek(&l_ptr->outqueue); 200 skb = skb_peek(&l_ptr->transmq);
201 if (skb) { 201 if (skb) {
202 struct tipc_msg *msg = buf_msg(skb); 202 struct tipc_msg *msg = buf_msg(skb);
203 u32 length = msg_size(msg); 203 u32 length = msg_size(msg);
@@ -229,7 +229,7 @@ static void link_timeout(unsigned long data)
229 /* do all other link processing performed on a periodic basis */ 229 /* do all other link processing performed on a periodic basis */
230 link_state_event(l_ptr, TIMEOUT_EVT); 230 link_state_event(l_ptr, TIMEOUT_EVT);
231 231
232 if (l_ptr->next_out) 232 if (skb_queue_len(&l_ptr->backlogq))
233 tipc_link_push_packets(l_ptr); 233 tipc_link_push_packets(l_ptr);
234 234
235 tipc_node_unlock(l_ptr->owner); 235 tipc_node_unlock(l_ptr->owner);
@@ -313,8 +313,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
313 link_init_max_pkt(l_ptr); 313 link_init_max_pkt(l_ptr);
314 314
315 l_ptr->next_out_no = 1; 315 l_ptr->next_out_no = 1;
316 __skb_queue_head_init(&l_ptr->outqueue); 316 __skb_queue_head_init(&l_ptr->transmq);
317 __skb_queue_head_init(&l_ptr->deferred_queue); 317 __skb_queue_head_init(&l_ptr->backlogq);
318 __skb_queue_head_init(&l_ptr->deferdq);
318 skb_queue_head_init(&l_ptr->wakeupq); 319 skb_queue_head_init(&l_ptr->wakeupq);
319 skb_queue_head_init(&l_ptr->inputq); 320 skb_queue_head_init(&l_ptr->inputq);
320 skb_queue_head_init(&l_ptr->namedq); 321 skb_queue_head_init(&l_ptr->namedq);
@@ -400,7 +401,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
400 */ 401 */
401void link_prepare_wakeup(struct tipc_link *link) 402void link_prepare_wakeup(struct tipc_link *link)
402{ 403{
403 uint pend_qsz = skb_queue_len(&link->outqueue); 404 uint pend_qsz = skb_queue_len(&link->backlogq);
404 struct sk_buff *skb, *tmp; 405 struct sk_buff *skb, *tmp;
405 406
406 skb_queue_walk_safe(&link->wakeupq, skb, tmp) { 407 skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
@@ -430,8 +431,9 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
430 */ 431 */
431void tipc_link_purge_queues(struct tipc_link *l_ptr) 432void tipc_link_purge_queues(struct tipc_link *l_ptr)
432{ 433{
433 __skb_queue_purge(&l_ptr->deferred_queue); 434 __skb_queue_purge(&l_ptr->deferdq);
434 __skb_queue_purge(&l_ptr->outqueue); 435 __skb_queue_purge(&l_ptr->transmq);
436 __skb_queue_purge(&l_ptr->backlogq);
435 tipc_link_reset_fragments(l_ptr); 437 tipc_link_reset_fragments(l_ptr);
436} 438}
437 439
@@ -464,15 +466,15 @@ void tipc_link_reset(struct tipc_link *l_ptr)
464 } 466 }
465 467
466 /* Clean up all queues, except inputq: */ 468 /* Clean up all queues, except inputq: */
467 __skb_queue_purge(&l_ptr->outqueue); 469 __skb_queue_purge(&l_ptr->transmq);
468 __skb_queue_purge(&l_ptr->deferred_queue); 470 __skb_queue_purge(&l_ptr->backlogq);
471 __skb_queue_purge(&l_ptr->deferdq);
469 if (!owner->inputq) 472 if (!owner->inputq)
470 owner->inputq = &l_ptr->inputq; 473 owner->inputq = &l_ptr->inputq;
471 skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq); 474 skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
472 if (!skb_queue_empty(owner->inputq)) 475 if (!skb_queue_empty(owner->inputq))
473 owner->action_flags |= TIPC_MSG_EVT; 476 owner->action_flags |= TIPC_MSG_EVT;
474 l_ptr->next_out = NULL; 477 l_ptr->rcv_unacked = 0;
475 l_ptr->unacked_window = 0;
476 l_ptr->checkpoint = 1; 478 l_ptr->checkpoint = 1;
477 l_ptr->next_out_no = 1; 479 l_ptr->next_out_no = 1;
478 l_ptr->fsm_msg_cnt = 0; 480 l_ptr->fsm_msg_cnt = 0;
@@ -742,54 +744,51 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
742 struct sk_buff_head *list) 744 struct sk_buff_head *list)
743{ 745{
744 struct tipc_msg *msg = buf_msg(skb_peek(list)); 746 struct tipc_msg *msg = buf_msg(skb_peek(list));
745 uint psz = msg_size(msg); 747 unsigned int maxwin = link->window;
746 uint sndlim = link->queue_limit[0];
747 uint imp = tipc_msg_tot_importance(msg); 748 uint imp = tipc_msg_tot_importance(msg);
748 uint mtu = link->max_pkt; 749 uint mtu = link->max_pkt;
749 uint ack = mod(link->next_in_no - 1); 750 uint ack = mod(link->next_in_no - 1);
750 uint seqno = link->next_out_no; 751 uint seqno = link->next_out_no;
751 uint bc_last_in = link->owner->bclink.last_in; 752 uint bc_last_in = link->owner->bclink.last_in;
752 struct tipc_media_addr *addr = &link->media_addr; 753 struct tipc_media_addr *addr = &link->media_addr;
753 struct sk_buff_head *outqueue = &link->outqueue; 754 struct sk_buff_head *transmq = &link->transmq;
755 struct sk_buff_head *backlogq = &link->backlogq;
754 struct sk_buff *skb, *tmp; 756 struct sk_buff *skb, *tmp;
755 757
756 /* Match queue limits against msg importance: */ 758 /* Match queue limits against msg importance: */
757 if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp])) 759 if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp]))
758 return tipc_link_cong(link, list); 760 return tipc_link_cong(link, list);
759 761
760 /* Has valid packet limit been used ? */ 762 /* Has valid packet limit been used ? */
761 if (unlikely(psz > mtu)) { 763 if (unlikely(msg_size(msg) > mtu)) {
762 __skb_queue_purge(list); 764 __skb_queue_purge(list);
763 return -EMSGSIZE; 765 return -EMSGSIZE;
764 } 766 }
765 767
766 /* Prepare each packet for sending, and add to outqueue: */ 768 /* Prepare each packet for sending, and add to relevant queue: */
767 skb_queue_walk_safe(list, skb, tmp) { 769 skb_queue_walk_safe(list, skb, tmp) {
768 __skb_unlink(skb, list); 770 __skb_unlink(skb, list);
769 msg = buf_msg(skb); 771 msg = buf_msg(skb);
770 msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); 772 msg_set_seqno(msg, seqno);
773 msg_set_ack(msg, ack);
771 msg_set_bcast_ack(msg, bc_last_in); 774 msg_set_bcast_ack(msg, bc_last_in);
772 775
773 if (skb_queue_len(outqueue) < sndlim) { 776 if (likely(skb_queue_len(transmq) < maxwin)) {
774 __skb_queue_tail(outqueue, skb); 777 __skb_queue_tail(transmq, skb);
775 tipc_bearer_send(net, link->bearer_id, 778 tipc_bearer_send(net, link->bearer_id, skb, addr);
776 skb, addr); 779 link->rcv_unacked = 0;
777 link->next_out = NULL; 780 seqno++;
778 link->unacked_window = 0; 781 continue;
779 } else if (tipc_msg_bundle(outqueue, skb, mtu)) { 782 }
783 if (tipc_msg_bundle(skb_peek_tail(backlogq), skb, mtu)) {
780 link->stats.sent_bundled++; 784 link->stats.sent_bundled++;
781 continue; 785 continue;
782 } else if (tipc_msg_make_bundle(outqueue, skb, mtu, 786 }
783 link->addr)) { 787 if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {
784 link->stats.sent_bundled++; 788 link->stats.sent_bundled++;
785 link->stats.sent_bundles++; 789 link->stats.sent_bundles++;
786 if (!link->next_out)
787 link->next_out = skb_peek_tail(outqueue);
788 } else {
789 __skb_queue_tail(outqueue, skb);
790 if (!link->next_out)
791 link->next_out = skb;
792 } 790 }
791 __skb_queue_tail(backlogq, skb);
793 seqno++; 792 seqno++;
794 } 793 }
795 link->next_out_no = seqno; 794 link->next_out_no = seqno;
@@ -895,14 +894,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
895 kfree_skb(buf); 894 kfree_skb(buf);
896} 895}
897 896
898struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
899 const struct sk_buff *skb)
900{
901 if (skb_queue_is_last(list, skb))
902 return NULL;
903 return skb->next;
904}
905
906/* 897/*
907 * tipc_link_push_packets - push unsent packets to bearer 898 * tipc_link_push_packets - push unsent packets to bearer
908 * 899 *
@@ -911,30 +902,23 @@ struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
911 * 902 *
912 * Called with node locked 903 * Called with node locked
913 */ 904 */
914void tipc_link_push_packets(struct tipc_link *l_ptr) 905void tipc_link_push_packets(struct tipc_link *link)
915{ 906{
916 struct sk_buff_head *outqueue = &l_ptr->outqueue; 907 struct sk_buff *skb;
917 struct sk_buff *skb = l_ptr->next_out;
918 struct tipc_msg *msg; 908 struct tipc_msg *msg;
919 u32 next, first; 909 unsigned int ack = mod(link->next_in_no - 1);
920 910
921 skb_queue_walk_from(outqueue, skb) { 911 while (skb_queue_len(&link->transmq) < link->window) {
922 msg = buf_msg(skb); 912 skb = __skb_dequeue(&link->backlogq);
923 next = msg_seqno(msg); 913 if (!skb)
924 first = buf_seqno(skb_peek(outqueue));
925
926 if (mod(next - first) < l_ptr->queue_limit[0]) {
927 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
928 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
929 if (msg_user(msg) == MSG_BUNDLER)
930 TIPC_SKB_CB(skb)->bundling = false;
931 tipc_bearer_send(l_ptr->owner->net,
932 l_ptr->bearer_id, skb,
933 &l_ptr->media_addr);
934 l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
935 } else {
936 break; 914 break;
937 } 915 msg = buf_msg(skb);
916 msg_set_ack(msg, ack);
917 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
918 link->rcv_unacked = 0;
919 __skb_queue_tail(&link->transmq, skb);
920 tipc_bearer_send(link->owner->net, link->bearer_id,
921 skb, &link->media_addr);
938 } 922 }
939} 923}
940 924
@@ -1021,8 +1005,8 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
1021 l_ptr->stale_count = 1; 1005 l_ptr->stale_count = 1;
1022 } 1006 }
1023 1007
1024 skb_queue_walk_from(&l_ptr->outqueue, skb) { 1008 skb_queue_walk_from(&l_ptr->transmq, skb) {
1025 if (!retransmits || skb == l_ptr->next_out) 1009 if (!retransmits)
1026 break; 1010 break;
1027 msg = buf_msg(skb); 1011 msg = buf_msg(skb);
1028 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1012 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
@@ -1039,12 +1023,12 @@ static void link_retrieve_defq(struct tipc_link *link,
1039{ 1023{
1040 u32 seq_no; 1024 u32 seq_no;
1041 1025
1042 if (skb_queue_empty(&link->deferred_queue)) 1026 if (skb_queue_empty(&link->deferdq))
1043 return; 1027 return;
1044 1028
1045 seq_no = buf_seqno(skb_peek(&link->deferred_queue)); 1029 seq_no = buf_seqno(skb_peek(&link->deferdq));
1046 if (seq_no == mod(link->next_in_no)) 1030 if (seq_no == mod(link->next_in_no))
1047 skb_queue_splice_tail_init(&link->deferred_queue, list); 1031 skb_queue_splice_tail_init(&link->deferdq, list);
1048} 1032}
1049 1033
1050/** 1034/**
@@ -1121,17 +1105,16 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1121 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1105 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1122 1106
1123 released = 0; 1107 released = 0;
1124 skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) { 1108 skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
1125 if (skb1 == l_ptr->next_out || 1109 if (more(buf_seqno(skb1), ackd))
1126 more(buf_seqno(skb1), ackd))
1127 break; 1110 break;
1128 __skb_unlink(skb1, &l_ptr->outqueue); 1111 __skb_unlink(skb1, &l_ptr->transmq);
1129 kfree_skb(skb1); 1112 kfree_skb(skb1);
1130 released = 1; 1113 released = 1;
1131 } 1114 }
1132 1115
1133 /* Try sending any messages link endpoint has pending */ 1116 /* Try sending any messages link endpoint has pending */
1134 if (unlikely(l_ptr->next_out)) 1117 if (unlikely(skb_queue_len(&l_ptr->backlogq)))
1135 tipc_link_push_packets(l_ptr); 1118 tipc_link_push_packets(l_ptr);
1136 1119
1137 if (released && !skb_queue_empty(&l_ptr->wakeupq)) 1120 if (released && !skb_queue_empty(&l_ptr->wakeupq))
@@ -1166,10 +1149,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1166 goto unlock; 1149 goto unlock;
1167 } 1150 }
1168 l_ptr->next_in_no++; 1151 l_ptr->next_in_no++;
1169 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) 1152 if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
1170 link_retrieve_defq(l_ptr, &head); 1153 link_retrieve_defq(l_ptr, &head);
1171 1154 if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
1172 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1173 l_ptr->stats.sent_acks++; 1155 l_ptr->stats.sent_acks++;
1174 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1156 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1175 } 1157 }
@@ -1336,9 +1318,9 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1336 return; 1318 return;
1337 } 1319 }
1338 1320
1339 if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) { 1321 if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
1340 l_ptr->stats.deferred_recv++; 1322 l_ptr->stats.deferred_recv++;
1341 if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1) 1323 if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
1342 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1324 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1343 } else { 1325 } else {
1344 l_ptr->stats.duplicates++; 1326 l_ptr->stats.duplicates++;
@@ -1375,11 +1357,11 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1375 1357
1376 if (!tipc_link_is_up(l_ptr)) 1358 if (!tipc_link_is_up(l_ptr))
1377 return; 1359 return;
1378 if (l_ptr->next_out) 1360 if (skb_queue_len(&l_ptr->backlogq))
1379 next_sent = buf_seqno(l_ptr->next_out); 1361 next_sent = buf_seqno(skb_peek(&l_ptr->backlogq));
1380 msg_set_next_sent(msg, next_sent); 1362 msg_set_next_sent(msg, next_sent);
1381 if (!skb_queue_empty(&l_ptr->deferred_queue)) { 1363 if (!skb_queue_empty(&l_ptr->deferdq)) {
1382 u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue)); 1364 u32 rec = buf_seqno(skb_peek(&l_ptr->deferdq));
1383 gap = mod(rec - mod(l_ptr->next_in_no)); 1365 gap = mod(rec - mod(l_ptr->next_in_no));
1384 } 1366 }
1385 msg_set_seq_gap(msg, gap); 1367 msg_set_seq_gap(msg, gap);
@@ -1431,10 +1413,9 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1431 1413
1432 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 1414 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1433 buf->priority = TC_PRIO_CONTROL; 1415 buf->priority = TC_PRIO_CONTROL;
1434
1435 tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf, 1416 tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
1436 &l_ptr->media_addr); 1417 &l_ptr->media_addr);
1437 l_ptr->unacked_window = 0; 1418 l_ptr->rcv_unacked = 0;
1438 kfree_skb(buf); 1419 kfree_skb(buf);
1439} 1420}
1440 1421
@@ -1569,7 +1550,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
1569 } 1550 }
1570 if (msg_seq_gap(msg)) { 1551 if (msg_seq_gap(msg)) {
1571 l_ptr->stats.recv_nacks++; 1552 l_ptr->stats.recv_nacks++;
1572 tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue), 1553 tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
1573 msg_seq_gap(msg)); 1554 msg_seq_gap(msg));
1574 } 1555 }
1575 break; 1556 break;
@@ -1616,7 +1597,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1616 */ 1597 */
1617void tipc_link_failover_send_queue(struct tipc_link *l_ptr) 1598void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1618{ 1599{
1619 u32 msgcount = skb_queue_len(&l_ptr->outqueue); 1600 int msgcount;
1620 struct tipc_link *tunnel = l_ptr->owner->active_links[0]; 1601 struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1621 struct tipc_msg tunnel_hdr; 1602 struct tipc_msg tunnel_hdr;
1622 struct sk_buff *skb; 1603 struct sk_buff *skb;
@@ -1627,10 +1608,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1627 1608
1628 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, 1609 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
1629 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); 1610 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
1611 skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
1612 msgcount = skb_queue_len(&l_ptr->transmq);
1630 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1613 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1631 msg_set_msgcnt(&tunnel_hdr, msgcount); 1614 msg_set_msgcnt(&tunnel_hdr, msgcount);
1632 1615
1633 if (skb_queue_empty(&l_ptr->outqueue)) { 1616 if (skb_queue_empty(&l_ptr->transmq)) {
1634 skb = tipc_buf_acquire(INT_H_SIZE); 1617 skb = tipc_buf_acquire(INT_H_SIZE);
1635 if (skb) { 1618 if (skb) {
1636 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); 1619 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
@@ -1646,7 +1629,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1646 split_bundles = (l_ptr->owner->active_links[0] != 1629 split_bundles = (l_ptr->owner->active_links[0] !=
1647 l_ptr->owner->active_links[1]); 1630 l_ptr->owner->active_links[1]);
1648 1631
1649 skb_queue_walk(&l_ptr->outqueue, skb) { 1632 skb_queue_walk(&l_ptr->transmq, skb) {
1650 struct tipc_msg *msg = buf_msg(skb); 1633 struct tipc_msg *msg = buf_msg(skb);
1651 1634
1652 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 1635 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
@@ -1677,39 +1660,46 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1677 * and sequence order is preserved per sender/receiver socket pair. 1660 * and sequence order is preserved per sender/receiver socket pair.
1678 * Owner node is locked. 1661 * Owner node is locked.
1679 */ 1662 */
1680void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, 1663void tipc_link_dup_queue_xmit(struct tipc_link *link,
1681 struct tipc_link *tunnel) 1664 struct tipc_link *tnl)
1682{ 1665{
1683 struct sk_buff *skb; 1666 struct sk_buff *skb;
1684 struct tipc_msg tunnel_hdr; 1667 struct tipc_msg tnl_hdr;
1685 1668 struct sk_buff_head *queue = &link->transmq;
1686 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, 1669 int mcnt;
1687 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); 1670
1688 msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue)); 1671 tipc_msg_init(link_own_addr(link), &tnl_hdr, CHANGEOVER_PROTOCOL,
1689 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1672 DUPLICATE_MSG, INT_H_SIZE, link->addr);
1690 skb_queue_walk(&l_ptr->outqueue, skb) { 1673 mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
1674 msg_set_msgcnt(&tnl_hdr, mcnt);
1675 msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);
1676
1677tunnel_queue:
1678 skb_queue_walk(queue, skb) {
1691 struct sk_buff *outskb; 1679 struct sk_buff *outskb;
1692 struct tipc_msg *msg = buf_msg(skb); 1680 struct tipc_msg *msg = buf_msg(skb);
1693 u32 length = msg_size(msg); 1681 u32 len = msg_size(msg);
1694 1682
1695 if (msg_user(msg) == MSG_BUNDLER) 1683 msg_set_ack(msg, mod(link->next_in_no - 1));
1696 msg_set_type(msg, CLOSED_MSG); 1684 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
1697 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 1685 msg_set_size(&tnl_hdr, len + INT_H_SIZE);
1698 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1686 outskb = tipc_buf_acquire(len + INT_H_SIZE);
1699 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
1700 outskb = tipc_buf_acquire(length + INT_H_SIZE);
1701 if (outskb == NULL) { 1687 if (outskb == NULL) {
1702 pr_warn("%sunable to send duplicate msg\n", 1688 pr_warn("%sunable to send duplicate msg\n",
1703 link_co_err); 1689 link_co_err);
1704 return; 1690 return;
1705 } 1691 }
1706 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE); 1692 skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE);
1707 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data, 1693 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE,
1708 length); 1694 skb->data, len);
1709 __tipc_link_xmit_skb(tunnel, outskb); 1695 __tipc_link_xmit_skb(tnl, outskb);
1710 if (!tipc_link_is_up(l_ptr)) 1696 if (!tipc_link_is_up(link))
1711 return; 1697 return;
1712 } 1698 }
1699 if (queue == &link->backlogq)
1700 return;
1701 queue = &link->backlogq;
1702 goto tunnel_queue;
1713} 1703}
1714 1704
1715/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. 1705/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
@@ -1823,6 +1813,8 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
1823 1813
1824void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window) 1814void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
1825{ 1815{
1816 l_ptr->window = window;
1817
1826 /* Data messages from this node, inclusive FIRST_FRAGM */ 1818 /* Data messages from this node, inclusive FIRST_FRAGM */
1827 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window; 1819 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
1828 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4; 1820 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 7aeb52092bf3..eec3ecf2d450 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -124,7 +124,8 @@ struct tipc_stats {
124 * @max_pkt: current maximum packet size for this link 124 * @max_pkt: current maximum packet size for this link
125 * @max_pkt_target: desired maximum packet size for this link 125 * @max_pkt_target: desired maximum packet size for this link
126 * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target) 126 * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target)
127 * @outqueue: outbound message queue 127 * @transmitq: queue for sent, non-acked messages
128 * @backlogq: queue for messages waiting to be sent
128 * @next_out_no: next sequence number to use for outbound messages 129 * @next_out_no: next sequence number to use for outbound messages
129 * @last_retransmitted: sequence number of most recently retransmitted message 130 * @last_retransmitted: sequence number of most recently retransmitted message
130 * @stale_count: # of identical retransmit requests made by peer 131 * @stale_count: # of identical retransmit requests made by peer
@@ -177,20 +178,21 @@ struct tipc_link {
177 u32 max_pkt_probes; 178 u32 max_pkt_probes;
178 179
179 /* Sending */ 180 /* Sending */
180 struct sk_buff_head outqueue; 181 struct sk_buff_head transmq;
182 struct sk_buff_head backlogq;
181 u32 next_out_no; 183 u32 next_out_no;
184 u32 window;
182 u32 last_retransmitted; 185 u32 last_retransmitted;
183 u32 stale_count; 186 u32 stale_count;
184 187
185 /* Reception */ 188 /* Reception */
186 u32 next_in_no; 189 u32 next_in_no;
187 struct sk_buff_head deferred_queue; 190 u32 rcv_unacked;
188 u32 unacked_window; 191 struct sk_buff_head deferdq;
189 struct sk_buff_head inputq; 192 struct sk_buff_head inputq;
190 struct sk_buff_head namedq; 193 struct sk_buff_head namedq;
191 194
192 /* Congestion handling */ 195 /* Congestion handling */
193 struct sk_buff *next_out;
194 struct sk_buff_head wakeupq; 196 struct sk_buff_head wakeupq;
195 197
196 /* Fragmentation/reassembly */ 198 /* Fragmentation/reassembly */
@@ -302,9 +304,4 @@ static inline int link_reset_reset(struct tipc_link *l_ptr)
302 return l_ptr->state == RESET_RESET; 304 return l_ptr->state == RESET_RESET;
303} 305}
304 306
305static inline int link_congested(struct tipc_link *l_ptr)
306{
307 return skb_queue_len(&l_ptr->outqueue) >= l_ptr->queue_limit[0];
308}
309
310#endif 307#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 333d2ae1cf76..47c8fd8e2fb2 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -330,33 +330,36 @@ error:
330 330
331/** 331/**
332 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one 332 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
333 * @list: the buffer chain of the existing buffer ("bundle") 333 * @bskb: the buffer to append to ("bundle")
334 * @skb: buffer to be appended 334 * @skb: buffer to be appended
335 * @mtu: max allowable size for the bundle buffer 335 * @mtu: max allowable size for the bundle buffer
336 * Consumes buffer if successful 336 * Consumes buffer if successful
337 * Returns true if bundling could be performed, otherwise false 337 * Returns true if bundling could be performed, otherwise false
338 */ 338 */
339bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu) 339bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu)
340{ 340{
341 struct sk_buff *bskb = skb_peek_tail(list); 341 struct tipc_msg *bmsg;
342 struct tipc_msg *bmsg = buf_msg(bskb);
343 struct tipc_msg *msg = buf_msg(skb); 342 struct tipc_msg *msg = buf_msg(skb);
344 unsigned int bsz = msg_size(bmsg); 343 unsigned int bsz;
345 unsigned int msz = msg_size(msg); 344 unsigned int msz = msg_size(msg);
346 u32 start = align(bsz); 345 u32 start, pad;
347 u32 max = mtu - INT_H_SIZE; 346 u32 max = mtu - INT_H_SIZE;
348 u32 pad = start - bsz;
349 347
350 if (likely(msg_user(msg) == MSG_FRAGMENTER)) 348 if (likely(msg_user(msg) == MSG_FRAGMENTER))
351 return false; 349 return false;
350 if (!bskb)
351 return false;
352 bmsg = buf_msg(bskb);
353 bsz = msg_size(bmsg);
354 start = align(bsz);
355 pad = start - bsz;
356
352 if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) 357 if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL))
353 return false; 358 return false;
354 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) 359 if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
355 return false; 360 return false;
356 if (likely(msg_user(bmsg) != MSG_BUNDLER)) 361 if (likely(msg_user(bmsg) != MSG_BUNDLER))
357 return false; 362 return false;
358 if (likely(!TIPC_SKB_CB(bskb)->bundling))
359 return false;
360 if (unlikely(skb_tailroom(bskb) < (pad + msz))) 363 if (unlikely(skb_tailroom(bskb) < (pad + msz)))
361 return false; 364 return false;
362 if (unlikely(max < (start + msz))) 365 if (unlikely(max < (start + msz)))
@@ -419,12 +422,11 @@ none:
419 * Replaces buffer if successful 422 * Replaces buffer if successful
420 * Returns true if success, otherwise false 423 * Returns true if success, otherwise false
421 */ 424 */
422bool tipc_msg_make_bundle(struct sk_buff_head *list, 425bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode)
423 struct sk_buff *skb, u32 mtu, u32 dnode)
424{ 426{
425 struct sk_buff *bskb; 427 struct sk_buff *bskb;
426 struct tipc_msg *bmsg; 428 struct tipc_msg *bmsg;
427 struct tipc_msg *msg = buf_msg(skb); 429 struct tipc_msg *msg = buf_msg(*skb);
428 u32 msz = msg_size(msg); 430 u32 msz = msg_size(msg);
429 u32 max = mtu - INT_H_SIZE; 431 u32 max = mtu - INT_H_SIZE;
430 432
@@ -448,9 +450,9 @@ bool tipc_msg_make_bundle(struct sk_buff_head *list,
448 msg_set_seqno(bmsg, msg_seqno(msg)); 450 msg_set_seqno(bmsg, msg_seqno(msg));
449 msg_set_ack(bmsg, msg_ack(msg)); 451 msg_set_ack(bmsg, msg_ack(msg));
450 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); 452 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
451 TIPC_SKB_CB(bskb)->bundling = true; 453 tipc_msg_bundle(bskb, *skb, mtu);
452 __skb_queue_tail(list, bskb); 454 *skb = bskb;
453 return tipc_msg_bundle(list, skb, mtu); 455 return true;
454} 456}
455 457
456/** 458/**
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 62306b8d2410..e5fc5fdb2ea7 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -767,9 +767,9 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
767 uint data_sz, u32 dnode, u32 onode, 767 uint data_sz, u32 dnode, u32 onode,
768 u32 dport, u32 oport, int errcode); 768 u32 dport, u32 oport, int errcode);
769int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); 769int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
770bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu); 770bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu);
771bool tipc_msg_make_bundle(struct sk_buff_head *list, 771
772 struct sk_buff *skb, u32 mtu, u32 dnode); 772bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode);
773bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); 773bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
774int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 774int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
775 int offset, int dsz, int mtu, struct sk_buff_head *list); 775 int offset, int dsz, int mtu, struct sk_buff_head *list);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 86152de8248d..26d1de1bf34d 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -111,7 +111,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
111 INIT_LIST_HEAD(&n_ptr->list); 111 INIT_LIST_HEAD(&n_ptr->list);
112 INIT_LIST_HEAD(&n_ptr->publ_list); 112 INIT_LIST_HEAD(&n_ptr->publ_list);
113 INIT_LIST_HEAD(&n_ptr->conn_sks); 113 INIT_LIST_HEAD(&n_ptr->conn_sks);
114 __skb_queue_head_init(&n_ptr->bclink.deferred_queue); 114 __skb_queue_head_init(&n_ptr->bclink.deferdq);
115 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); 115 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
116 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 116 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
117 if (n_ptr->addr < temp_node->addr) 117 if (n_ptr->addr < temp_node->addr)
@@ -354,7 +354,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
354 354
355 /* Flush broadcast link info associated with lost node */ 355 /* Flush broadcast link info associated with lost node */
356 if (n_ptr->bclink.recv_permitted) { 356 if (n_ptr->bclink.recv_permitted) {
357 __skb_queue_purge(&n_ptr->bclink.deferred_queue); 357 __skb_queue_purge(&n_ptr->bclink.deferdq);
358 358
359 if (n_ptr->bclink.reasm_buf) { 359 if (n_ptr->bclink.reasm_buf) {
360 kfree_skb(n_ptr->bclink.reasm_buf); 360 kfree_skb(n_ptr->bclink.reasm_buf);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index f78be64e105b..e89ac04ec2c3 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -84,7 +84,7 @@ struct tipc_node_bclink {
84 u32 last_sent; 84 u32 last_sent;
85 u32 oos_state; 85 u32 oos_state;
86 u32 deferred_size; 86 u32 deferred_size;
87 struct sk_buff_head deferred_queue; 87 struct sk_buff_head deferdq;
88 struct sk_buff *reasm_buf; 88 struct sk_buff *reasm_buf;
89 int inputq_map; 89 int inputq_map;
90 bool recv_permitted; 90 bool recv_permitted;