aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2014-11-25 22:41:52 -0500
committerDavid S. Miller <davem@davemloft.net>2014-11-26 12:30:17 -0500
commit58dc55f25631178ee74cd27185956a8f7dcb3e32 (patch)
treea38c003514637757191edf01d906fd58b300e6b1
parent58d78b328a70f4b5ed1c00010499aaedb715ea5b (diff)
tipc: use generic SKB list APIs to manage link transmission queue
Use standard SKB list APIs associated with struct sk_buff_head to manage link transmission queue, having relevant code more clean. Signed-off-by: Ying Xue <ying.xue@windriver.com> Reviewed-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/bcast.c67
-rw-r--r--net/tipc/link.c190
-rw-r--r--net/tipc/link.h17
-rw-r--r--net/tipc/msg.c50
-rw-r--r--net/tipc/msg.h5
5 files changed, 153 insertions, 176 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 27648841e7ff..4a1a3c8627d0 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -217,12 +217,13 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
217 */ 217 */
218static void bclink_retransmit_pkt(u32 after, u32 to) 218static void bclink_retransmit_pkt(u32 after, u32 to)
219{ 219{
220 struct sk_buff *buf; 220 struct sk_buff *skb;
221 221
222 buf = bcl->first_out; 222 skb_queue_walk(&bcl->outqueue, skb) {
223 while (buf && less_eq(buf_seqno(buf), after)) 223 if (more(buf_seqno(skb), after))
224 buf = buf->next; 224 break;
225 tipc_link_retransmit(bcl, buf, mod(to - after)); 225 }
226 tipc_link_retransmit(bcl, skb, mod(to - after));
226} 227}
227 228
228/** 229/**
@@ -245,14 +246,14 @@ void tipc_bclink_wakeup_users(void)
245 */ 246 */
246void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 247void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
247{ 248{
248 struct sk_buff *crs; 249 struct sk_buff *skb, *tmp;
249 struct sk_buff *next; 250 struct sk_buff *next;
250 unsigned int released = 0; 251 unsigned int released = 0;
251 252
252 tipc_bclink_lock(); 253 tipc_bclink_lock();
253 /* Bail out if tx queue is empty (no clean up is required) */ 254 /* Bail out if tx queue is empty (no clean up is required) */
254 crs = bcl->first_out; 255 skb = skb_peek(&bcl->outqueue);
255 if (!crs) 256 if (!skb)
256 goto exit; 257 goto exit;
257 258
258 /* Determine which messages need to be acknowledged */ 259 /* Determine which messages need to be acknowledged */
@@ -271,41 +272,41 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
271 * Bail out if specified sequence number does not correspond 272 * Bail out if specified sequence number does not correspond
272 * to a message that has been sent and not yet acknowledged 273 * to a message that has been sent and not yet acknowledged
273 */ 274 */
274 if (less(acked, buf_seqno(crs)) || 275 if (less(acked, buf_seqno(skb)) ||
275 less(bcl->fsm_msg_cnt, acked) || 276 less(bcl->fsm_msg_cnt, acked) ||
276 less_eq(acked, n_ptr->bclink.acked)) 277 less_eq(acked, n_ptr->bclink.acked))
277 goto exit; 278 goto exit;
278 } 279 }
279 280
280 /* Skip over packets that node has previously acknowledged */ 281 /* Skip over packets that node has previously acknowledged */
281 while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked)) 282 skb_queue_walk(&bcl->outqueue, skb) {
282 crs = crs->next; 283 if (more(buf_seqno(skb), n_ptr->bclink.acked))
284 break;
285 }
283 286
284 /* Update packets that node is now acknowledging */ 287 /* Update packets that node is now acknowledging */
288 skb_queue_walk_from_safe(&bcl->outqueue, skb, tmp) {
289 if (more(buf_seqno(skb), acked))
290 break;
285 291
286 while (crs && less_eq(buf_seqno(crs), acked)) { 292 next = tipc_skb_queue_next(&bcl->outqueue, skb);
287 next = crs->next; 293 if (skb != bcl->next_out) {
288 294 bcbuf_decr_acks(skb);
289 if (crs != bcl->next_out) 295 } else {
290 bcbuf_decr_acks(crs); 296 bcbuf_set_acks(skb, 0);
291 else {
292 bcbuf_set_acks(crs, 0);
293 bcl->next_out = next; 297 bcl->next_out = next;
294 bclink_set_last_sent(); 298 bclink_set_last_sent();
295 } 299 }
296 300
297 if (bcbuf_acks(crs) == 0) { 301 if (bcbuf_acks(skb) == 0) {
298 bcl->first_out = next; 302 __skb_unlink(skb, &bcl->outqueue);
299 bcl->out_queue_size--; 303 kfree_skb(skb);
300 kfree_skb(crs);
301 released = 1; 304 released = 1;
302 } 305 }
303 crs = next;
304 } 306 }
305 n_ptr->bclink.acked = acked; 307 n_ptr->bclink.acked = acked;
306 308
307 /* Try resolving broadcast link congestion, if necessary */ 309 /* Try resolving broadcast link congestion, if necessary */
308
309 if (unlikely(bcl->next_out)) { 310 if (unlikely(bcl->next_out)) {
310 tipc_link_push_packets(bcl); 311 tipc_link_push_packets(bcl);
311 bclink_set_last_sent(); 312 bclink_set_last_sent();
@@ -327,19 +328,16 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
327 struct sk_buff *buf; 328 struct sk_buff *buf;
328 329
329 /* Ignore "stale" link state info */ 330 /* Ignore "stale" link state info */
330
331 if (less_eq(last_sent, n_ptr->bclink.last_in)) 331 if (less_eq(last_sent, n_ptr->bclink.last_in))
332 return; 332 return;
333 333
334 /* Update link synchronization state; quit if in sync */ 334 /* Update link synchronization state; quit if in sync */
335
336 bclink_update_last_sent(n_ptr, last_sent); 335 bclink_update_last_sent(n_ptr, last_sent);
337 336
338 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 337 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
339 return; 338 return;
340 339
341 /* Update out-of-sync state; quit if loss is still unconfirmed */ 340 /* Update out-of-sync state; quit if loss is still unconfirmed */
342
343 if ((++n_ptr->bclink.oos_state) == 1) { 341 if ((++n_ptr->bclink.oos_state) == 1) {
344 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) 342 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
345 return; 343 return;
@@ -347,12 +345,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
347 } 345 }
348 346
349 /* Don't NACK if one has been recently sent (or seen) */ 347 /* Don't NACK if one has been recently sent (or seen) */
350
351 if (n_ptr->bclink.oos_state & 0x1) 348 if (n_ptr->bclink.oos_state & 0x1)
352 return; 349 return;
353 350
354 /* Send NACK */ 351 /* Send NACK */
355
356 buf = tipc_buf_acquire(INT_H_SIZE); 352 buf = tipc_buf_acquire(INT_H_SIZE);
357 if (buf) { 353 if (buf) {
358 struct tipc_msg *msg = buf_msg(buf); 354 struct tipc_msg *msg = buf_msg(buf);
@@ -425,9 +421,11 @@ int tipc_bclink_xmit(struct sk_buff *buf)
425 if (likely(bclink->bcast_nodes.count)) { 421 if (likely(bclink->bcast_nodes.count)) {
426 rc = __tipc_link_xmit(bcl, buf); 422 rc = __tipc_link_xmit(bcl, buf);
427 if (likely(!rc)) { 423 if (likely(!rc)) {
424 u32 len = skb_queue_len(&bcl->outqueue);
425
428 bclink_set_last_sent(); 426 bclink_set_last_sent();
429 bcl->stats.queue_sz_counts++; 427 bcl->stats.queue_sz_counts++;
430 bcl->stats.accu_queue_sz += bcl->out_queue_size; 428 bcl->stats.accu_queue_sz += len;
431 } 429 }
432 bc = 1; 430 bc = 1;
433 } 431 }
@@ -462,7 +460,6 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
462 * Unicast an ACK periodically, ensuring that 460 * Unicast an ACK periodically, ensuring that
463 * all nodes in the cluster don't ACK at the same time 461 * all nodes in the cluster don't ACK at the same time
464 */ 462 */
465
466 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { 463 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
467 tipc_link_proto_xmit(node->active_links[node->addr & 1], 464 tipc_link_proto_xmit(node->active_links[node->addr & 1],
468 STATE_MSG, 0, 0, 0, 0, 0); 465 STATE_MSG, 0, 0, 0, 0, 0);
@@ -484,7 +481,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
484 int deferred = 0; 481 int deferred = 0;
485 482
486 /* Screen out unwanted broadcast messages */ 483 /* Screen out unwanted broadcast messages */
487
488 if (msg_mc_netid(msg) != tipc_net_id) 484 if (msg_mc_netid(msg) != tipc_net_id)
489 goto exit; 485 goto exit;
490 486
@@ -497,7 +493,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
497 goto unlock; 493 goto unlock;
498 494
499 /* Handle broadcast protocol message */ 495 /* Handle broadcast protocol message */
500
501 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 496 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
502 if (msg_type(msg) != STATE_MSG) 497 if (msg_type(msg) != STATE_MSG)
503 goto unlock; 498 goto unlock;
@@ -518,14 +513,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)
518 } 513 }
519 514
520 /* Handle in-sequence broadcast message */ 515 /* Handle in-sequence broadcast message */
521
522 seqno = msg_seqno(msg); 516 seqno = msg_seqno(msg);
523 next_in = mod(node->bclink.last_in + 1); 517 next_in = mod(node->bclink.last_in + 1);
524 518
525 if (likely(seqno == next_in)) { 519 if (likely(seqno == next_in)) {
526receive: 520receive:
527 /* Deliver message to destination */ 521 /* Deliver message to destination */
528
529 if (likely(msg_isdata(msg))) { 522 if (likely(msg_isdata(msg))) {
530 tipc_bclink_lock(); 523 tipc_bclink_lock();
531 bclink_accept_pkt(node, seqno); 524 bclink_accept_pkt(node, seqno);
@@ -574,7 +567,6 @@ receive:
574 buf = NULL; 567 buf = NULL;
575 568
576 /* Determine new synchronization state */ 569 /* Determine new synchronization state */
577
578 tipc_node_lock(node); 570 tipc_node_lock(node);
579 if (unlikely(!tipc_node_is_up(node))) 571 if (unlikely(!tipc_node_is_up(node)))
580 goto unlock; 572 goto unlock;
@@ -594,7 +586,6 @@ receive:
594 goto unlock; 586 goto unlock;
595 587
596 /* Take in-sequence message from deferred queue & deliver it */ 588 /* Take in-sequence message from deferred queue & deliver it */
597
598 buf = node->bclink.deferred_head; 589 buf = node->bclink.deferred_head;
599 node->bclink.deferred_head = buf->next; 590 node->bclink.deferred_head = buf->next;
600 buf->next = NULL; 591 buf->next = NULL;
@@ -603,7 +594,6 @@ receive:
603 } 594 }
604 595
605 /* Handle out-of-sequence broadcast message */ 596 /* Handle out-of-sequence broadcast message */
606
607 if (less(next_in, seqno)) { 597 if (less(next_in, seqno)) {
608 deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, 598 deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
609 &node->bclink.deferred_tail, 599 &node->bclink.deferred_tail,
@@ -963,6 +953,7 @@ int tipc_bclink_init(void)
963 sprintf(bcbearer->media.name, "tipc-broadcast"); 953 sprintf(bcbearer->media.name, "tipc-broadcast");
964 954
965 spin_lock_init(&bclink->lock); 955 spin_lock_init(&bclink->lock);
956 __skb_queue_head_init(&bcl->outqueue);
966 __skb_queue_head_init(&bcl->waiting_sks); 957 __skb_queue_head_init(&bcl->waiting_sks);
967 bcl->next_out_no = 1; 958 bcl->next_out_no = 1;
968 spin_lock_init(&bclink->node.lock); 959 spin_lock_init(&bclink->node.lock);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ddee498e74bc..9e94bf935e48 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -171,14 +171,17 @@ int tipc_link_is_active(struct tipc_link *l_ptr)
171 */ 171 */
172static void link_timeout(struct tipc_link *l_ptr) 172static void link_timeout(struct tipc_link *l_ptr)
173{ 173{
174 struct sk_buff *skb;
175
174 tipc_node_lock(l_ptr->owner); 176 tipc_node_lock(l_ptr->owner);
175 177
176 /* update counters used in statistical profiling of send traffic */ 178 /* update counters used in statistical profiling of send traffic */
177 l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; 179 l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue);
178 l_ptr->stats.queue_sz_counts++; 180 l_ptr->stats.queue_sz_counts++;
179 181
180 if (l_ptr->first_out) { 182 skb = skb_peek(&l_ptr->outqueue);
181 struct tipc_msg *msg = buf_msg(l_ptr->first_out); 183 if (skb) {
184 struct tipc_msg *msg = buf_msg(skb);
182 u32 length = msg_size(msg); 185 u32 length = msg_size(msg);
183 186
184 if ((msg_user(msg) == MSG_FRAGMENTER) && 187 if ((msg_user(msg) == MSG_FRAGMENTER) &&
@@ -206,7 +209,6 @@ static void link_timeout(struct tipc_link *l_ptr)
206 } 209 }
207 210
208 /* do all other link processing performed on a periodic basis */ 211 /* do all other link processing performed on a periodic basis */
209
210 link_state_event(l_ptr, TIMEOUT_EVT); 212 link_state_event(l_ptr, TIMEOUT_EVT);
211 213
212 if (l_ptr->next_out) 214 if (l_ptr->next_out)
@@ -289,6 +291,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
289 link_init_max_pkt(l_ptr); 291 link_init_max_pkt(l_ptr);
290 292
291 l_ptr->next_out_no = 1; 293 l_ptr->next_out_no = 1;
294 __skb_queue_head_init(&l_ptr->outqueue);
292 __skb_queue_head_init(&l_ptr->waiting_sks); 295 __skb_queue_head_init(&l_ptr->waiting_sks);
293 296
294 link_reset_statistics(l_ptr); 297 link_reset_statistics(l_ptr);
@@ -367,7 +370,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
367 */ 370 */
368static void link_prepare_wakeup(struct tipc_link *link) 371static void link_prepare_wakeup(struct tipc_link *link)
369{ 372{
370 uint pend_qsz = link->out_queue_size; 373 uint pend_qsz = skb_queue_len(&link->outqueue);
371 struct sk_buff *skb, *tmp; 374 struct sk_buff *skb, *tmp;
372 375
373 skb_queue_walk_safe(&link->waiting_sks, skb, tmp) { 376 skb_queue_walk_safe(&link->waiting_sks, skb, tmp) {
@@ -380,17 +383,6 @@ static void link_prepare_wakeup(struct tipc_link *link)
380} 383}
381 384
382/** 385/**
383 * link_release_outqueue - purge link's outbound message queue
384 * @l_ptr: pointer to link
385 */
386static void link_release_outqueue(struct tipc_link *l_ptr)
387{
388 kfree_skb_list(l_ptr->first_out);
389 l_ptr->first_out = NULL;
390 l_ptr->out_queue_size = 0;
391}
392
393/**
394 * tipc_link_reset_fragments - purge link's inbound message fragments queue 386 * tipc_link_reset_fragments - purge link's inbound message fragments queue
395 * @l_ptr: pointer to link 387 * @l_ptr: pointer to link
396 */ 388 */
@@ -407,7 +399,7 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
407void tipc_link_purge_queues(struct tipc_link *l_ptr) 399void tipc_link_purge_queues(struct tipc_link *l_ptr)
408{ 400{
409 kfree_skb_list(l_ptr->oldest_deferred_in); 401 kfree_skb_list(l_ptr->oldest_deferred_in);
410 kfree_skb_list(l_ptr->first_out); 402 __skb_queue_purge(&l_ptr->outqueue);
411 tipc_link_reset_fragments(l_ptr); 403 tipc_link_reset_fragments(l_ptr);
412} 404}
413 405
@@ -440,14 +432,12 @@ void tipc_link_reset(struct tipc_link *l_ptr)
440 } 432 }
441 433
442 /* Clean up all queues: */ 434 /* Clean up all queues: */
443 link_release_outqueue(l_ptr); 435 __skb_queue_purge(&l_ptr->outqueue);
444 kfree_skb_list(l_ptr->oldest_deferred_in); 436 kfree_skb_list(l_ptr->oldest_deferred_in);
445 if (!skb_queue_empty(&l_ptr->waiting_sks)) { 437 if (!skb_queue_empty(&l_ptr->waiting_sks)) {
446 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); 438 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
447 owner->action_flags |= TIPC_WAKEUP_USERS; 439 owner->action_flags |= TIPC_WAKEUP_USERS;
448 } 440 }
449 l_ptr->last_out = NULL;
450 l_ptr->first_out = NULL;
451 l_ptr->next_out = NULL; 441 l_ptr->next_out = NULL;
452 l_ptr->unacked_window = 0; 442 l_ptr->unacked_window = 0;
453 l_ptr->checkpoint = 1; 443 l_ptr->checkpoint = 1;
@@ -703,18 +693,17 @@ drop:
703/** 693/**
704 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked 694 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
705 * @link: link to use 695 * @link: link to use
706 * @buf: chain of buffers containing message 696 * @skb: chain of buffers containing message
707 * Consumes the buffer chain, except when returning -ELINKCONG 697 * Consumes the buffer chain, except when returning -ELINKCONG
708 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket 698 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
709 * user data messages) or -EHOSTUNREACH (all other messages/senders) 699 * user data messages) or -EHOSTUNREACH (all other messages/senders)
710 * Only the socket functions tipc_send_stream() and tipc_send_packet() need 700 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
711 * to act on the return value, since they may need to do more send attempts. 701 * to act on the return value, since they may need to do more send attempts.
712 */ 702 */
713int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) 703int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb)
714{ 704{
715 struct tipc_msg *msg = buf_msg(buf); 705 struct tipc_msg *msg = buf_msg(skb);
716 uint psz = msg_size(msg); 706 uint psz = msg_size(msg);
717 uint qsz = link->out_queue_size;
718 uint sndlim = link->queue_limit[0]; 707 uint sndlim = link->queue_limit[0];
719 uint imp = tipc_msg_tot_importance(msg); 708 uint imp = tipc_msg_tot_importance(msg);
720 uint mtu = link->max_pkt; 709 uint mtu = link->max_pkt;
@@ -722,58 +711,50 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
722 uint seqno = link->next_out_no; 711 uint seqno = link->next_out_no;
723 uint bc_last_in = link->owner->bclink.last_in; 712 uint bc_last_in = link->owner->bclink.last_in;
724 struct tipc_media_addr *addr = &link->media_addr; 713 struct tipc_media_addr *addr = &link->media_addr;
725 struct sk_buff *next = buf->next; 714 struct sk_buff_head *outqueue = &link->outqueue;
715 struct sk_buff *next;
726 716
727 /* Match queue limits against msg importance: */ 717 /* Match queue limits against msg importance: */
728 if (unlikely(qsz >= link->queue_limit[imp])) 718 if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
729 return tipc_link_cong(link, buf); 719 return tipc_link_cong(link, skb);
730 720
731 /* Has valid packet limit been used ? */ 721 /* Has valid packet limit been used ? */
732 if (unlikely(psz > mtu)) { 722 if (unlikely(psz > mtu)) {
733 kfree_skb_list(buf); 723 kfree_skb_list(skb);
734 return -EMSGSIZE; 724 return -EMSGSIZE;
735 } 725 }
736 726
737 /* Prepare each packet for sending, and add to outqueue: */ 727 /* Prepare each packet for sending, and add to outqueue: */
738 while (buf) { 728 while (skb) {
739 next = buf->next; 729 next = skb->next;
740 msg = buf_msg(buf); 730 msg = buf_msg(skb);
741 msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); 731 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
742 msg_set_bcast_ack(msg, bc_last_in); 732 msg_set_bcast_ack(msg, bc_last_in);
743 733
744 if (!link->first_out) { 734 if (skb_queue_len(outqueue) < sndlim) {
745 link->first_out = buf; 735 __skb_queue_tail(outqueue, skb);
746 } else if (qsz < sndlim) { 736 tipc_bearer_send(link->bearer_id, skb, addr);
747 link->last_out->next = buf; 737 link->next_out = NULL;
748 } else if (tipc_msg_bundle(link->last_out, buf, mtu)) { 738 link->unacked_window = 0;
739 } else if (tipc_msg_bundle(outqueue, skb, mtu)) {
749 link->stats.sent_bundled++; 740 link->stats.sent_bundled++;
750 buf = next; 741 skb = next;
751 next = buf->next;
752 continue; 742 continue;
753 } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) { 743 } else if (tipc_msg_make_bundle(outqueue, skb, mtu,
744 link->addr)) {
754 link->stats.sent_bundled++; 745 link->stats.sent_bundled++;
755 link->stats.sent_bundles++; 746 link->stats.sent_bundles++;
756 link->last_out->next = buf;
757 if (!link->next_out) 747 if (!link->next_out)
758 link->next_out = buf; 748 link->next_out = skb_peek_tail(outqueue);
759 } else { 749 } else {
760 link->last_out->next = buf; 750 __skb_queue_tail(outqueue, skb);
761 if (!link->next_out) 751 if (!link->next_out)
762 link->next_out = buf; 752 link->next_out = skb;
763 }
764
765 /* Send packet if possible: */
766 if (likely(++qsz <= sndlim)) {
767 tipc_bearer_send(link->bearer_id, buf, addr);
768 link->next_out = next;
769 link->unacked_window = 0;
770 } 753 }
771 seqno++; 754 seqno++;
772 link->last_out = buf; 755 skb = next;
773 buf = next;
774 } 756 }
775 link->next_out_no = seqno; 757 link->next_out_no = seqno;
776 link->out_queue_size = qsz;
777 return 0; 758 return 0;
778} 759}
779 760
@@ -851,6 +832,14 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
851 kfree_skb(buf); 832 kfree_skb(buf);
852} 833}
853 834
835struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
836 const struct sk_buff *skb)
837{
838 if (skb_queue_is_last(list, skb))
839 return NULL;
840 return skb->next;
841}
842
854/* 843/*
855 * tipc_link_push_packets - push unsent packets to bearer 844 * tipc_link_push_packets - push unsent packets to bearer
856 * 845 *
@@ -861,15 +850,15 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
861 */ 850 */
862void tipc_link_push_packets(struct tipc_link *l_ptr) 851void tipc_link_push_packets(struct tipc_link *l_ptr)
863{ 852{
864 struct sk_buff *skb; 853 struct sk_buff_head *outqueue = &l_ptr->outqueue;
854 struct sk_buff *skb = l_ptr->next_out;
865 struct tipc_msg *msg; 855 struct tipc_msg *msg;
866 u32 next, first; 856 u32 next, first;
867 857
868 while (l_ptr->next_out) { 858 skb_queue_walk_from(outqueue, skb) {
869 skb = l_ptr->next_out;
870 msg = buf_msg(skb); 859 msg = buf_msg(skb);
871 next = msg_seqno(msg); 860 next = msg_seqno(msg);
872 first = buf_seqno(l_ptr->first_out); 861 first = buf_seqno(skb_peek(outqueue));
873 862
874 if (mod(next - first) < l_ptr->queue_limit[0]) { 863 if (mod(next - first) < l_ptr->queue_limit[0]) {
875 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 864 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
@@ -878,7 +867,7 @@ void tipc_link_push_packets(struct tipc_link *l_ptr)
878 TIPC_SKB_CB(skb)->bundling = false; 867 TIPC_SKB_CB(skb)->bundling = false;
879 tipc_bearer_send(l_ptr->bearer_id, skb, 868 tipc_bearer_send(l_ptr->bearer_id, skb,
880 &l_ptr->media_addr); 869 &l_ptr->media_addr);
881 l_ptr->next_out = skb->next; 870 l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
882 } else { 871 } else {
883 break; 872 break;
884 } 873 }
@@ -946,20 +935,20 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
946 } 935 }
947} 936}
948 937
949void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, 938void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
950 u32 retransmits) 939 u32 retransmits)
951{ 940{
952 struct tipc_msg *msg; 941 struct tipc_msg *msg;
953 942
954 if (!buf) 943 if (!skb)
955 return; 944 return;
956 945
957 msg = buf_msg(buf); 946 msg = buf_msg(skb);
958 947
959 /* Detect repeated retransmit failures */ 948 /* Detect repeated retransmit failures */
960 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 949 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
961 if (++l_ptr->stale_count > 100) { 950 if (++l_ptr->stale_count > 100) {
962 link_retransmit_failure(l_ptr, buf); 951 link_retransmit_failure(l_ptr, skb);
963 return; 952 return;
964 } 953 }
965 } else { 954 } else {
@@ -967,12 +956,13 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
967 l_ptr->stale_count = 1; 956 l_ptr->stale_count = 1;
968 } 957 }
969 958
970 while (retransmits && (buf != l_ptr->next_out) && buf) { 959 skb_queue_walk_from(&l_ptr->outqueue, skb) {
971 msg = buf_msg(buf); 960 if (!retransmits || skb == l_ptr->next_out)
961 break;
962 msg = buf_msg(skb);
972 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 963 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
973 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 964 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
974 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); 965 tipc_bearer_send(l_ptr->bearer_id, skb, &l_ptr->media_addr);
975 buf = buf->next;
976 retransmits--; 966 retransmits--;
977 l_ptr->stats.retransmitted++; 967 l_ptr->stats.retransmitted++;
978 } 968 }
@@ -1067,12 +1057,12 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1067 while (head) { 1057 while (head) {
1068 struct tipc_node *n_ptr; 1058 struct tipc_node *n_ptr;
1069 struct tipc_link *l_ptr; 1059 struct tipc_link *l_ptr;
1070 struct sk_buff *crs;
1071 struct sk_buff *buf = head; 1060 struct sk_buff *buf = head;
1061 struct sk_buff *skb1, *tmp;
1072 struct tipc_msg *msg; 1062 struct tipc_msg *msg;
1073 u32 seq_no; 1063 u32 seq_no;
1074 u32 ackd; 1064 u32 ackd;
1075 u32 released = 0; 1065 u32 released;
1076 1066
1077 head = head->next; 1067 head = head->next;
1078 buf->next = NULL; 1068 buf->next = NULL;
@@ -1131,17 +1121,14 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1131 if (n_ptr->bclink.recv_permitted) 1121 if (n_ptr->bclink.recv_permitted)
1132 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1122 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1133 1123
1134 crs = l_ptr->first_out; 1124 released = 0;
1135 while ((crs != l_ptr->next_out) && 1125 skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) {
1136 less_eq(buf_seqno(crs), ackd)) { 1126 if (skb1 == l_ptr->next_out ||
1137 struct sk_buff *next = crs->next; 1127 more(buf_seqno(skb1), ackd))
1138 kfree_skb(crs); 1128 break;
1139 crs = next; 1129 __skb_unlink(skb1, &l_ptr->outqueue);
1140 released++; 1130 kfree_skb(skb1);
1141 } 1131 released = 1;
1142 if (released) {
1143 l_ptr->first_out = crs;
1144 l_ptr->out_queue_size -= released;
1145 } 1132 }
1146 1133
1147 /* Try sending any messages link endpoint has pending */ 1134 /* Try sending any messages link endpoint has pending */
@@ -1590,7 +1577,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
1590 } 1577 }
1591 if (msg_seq_gap(msg)) { 1578 if (msg_seq_gap(msg)) {
1592 l_ptr->stats.recv_nacks++; 1579 l_ptr->stats.recv_nacks++;
1593 tipc_link_retransmit(l_ptr, l_ptr->first_out, 1580 tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue),
1594 msg_seq_gap(msg)); 1581 msg_seq_gap(msg));
1595 } 1582 }
1596 break; 1583 break;
@@ -1637,10 +1624,10 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1637 */ 1624 */
1638void tipc_link_failover_send_queue(struct tipc_link *l_ptr) 1625void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1639{ 1626{
1640 u32 msgcount = l_ptr->out_queue_size; 1627 u32 msgcount = skb_queue_len(&l_ptr->outqueue);
1641 struct sk_buff *crs = l_ptr->first_out;
1642 struct tipc_link *tunnel = l_ptr->owner->active_links[0]; 1628 struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1643 struct tipc_msg tunnel_hdr; 1629 struct tipc_msg tunnel_hdr;
1630 struct sk_buff *skb;
1644 int split_bundles; 1631 int split_bundles;
1645 1632
1646 if (!tunnel) 1633 if (!tunnel)
@@ -1651,14 +1638,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1651 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1638 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1652 msg_set_msgcnt(&tunnel_hdr, msgcount); 1639 msg_set_msgcnt(&tunnel_hdr, msgcount);
1653 1640
1654 if (!l_ptr->first_out) { 1641 if (skb_queue_empty(&l_ptr->outqueue)) {
1655 struct sk_buff *buf; 1642 skb = tipc_buf_acquire(INT_H_SIZE);
1656 1643 if (skb) {
1657 buf = tipc_buf_acquire(INT_H_SIZE); 1644 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
1658 if (buf) {
1659 skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
1660 msg_set_size(&tunnel_hdr, INT_H_SIZE); 1645 msg_set_size(&tunnel_hdr, INT_H_SIZE);
1661 __tipc_link_xmit(tunnel, buf); 1646 __tipc_link_xmit(tunnel, skb);
1662 } else { 1647 } else {
1663 pr_warn("%sunable to send changeover msg\n", 1648 pr_warn("%sunable to send changeover msg\n",
1664 link_co_err); 1649 link_co_err);
@@ -1669,8 +1654,8 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1669 split_bundles = (l_ptr->owner->active_links[0] != 1654 split_bundles = (l_ptr->owner->active_links[0] !=
1670 l_ptr->owner->active_links[1]); 1655 l_ptr->owner->active_links[1]);
1671 1656
1672 while (crs) { 1657 skb_queue_walk(&l_ptr->outqueue, skb) {
1673 struct tipc_msg *msg = buf_msg(crs); 1658 struct tipc_msg *msg = buf_msg(skb);
1674 1659
1675 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 1660 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
1676 struct tipc_msg *m = msg_get_wrapped(msg); 1661 struct tipc_msg *m = msg_get_wrapped(msg);
@@ -1688,7 +1673,6 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1688 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, 1673 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
1689 msg_link_selector(msg)); 1674 msg_link_selector(msg));
1690 } 1675 }
1691 crs = crs->next;
1692 } 1676 }
1693} 1677}
1694 1678
@@ -1704,17 +1688,16 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1704void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, 1688void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1705 struct tipc_link *tunnel) 1689 struct tipc_link *tunnel)
1706{ 1690{
1707 struct sk_buff *iter; 1691 struct sk_buff *skb;
1708 struct tipc_msg tunnel_hdr; 1692 struct tipc_msg tunnel_hdr;
1709 1693
1710 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 1694 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
1711 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); 1695 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
1712 msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); 1696 msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
1713 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1697 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1714 iter = l_ptr->first_out; 1698 skb_queue_walk(&l_ptr->outqueue, skb) {
1715 while (iter) { 1699 struct sk_buff *outskb;
1716 struct sk_buff *outbuf; 1700 struct tipc_msg *msg = buf_msg(skb);
1717 struct tipc_msg *msg = buf_msg(iter);
1718 u32 length = msg_size(msg); 1701 u32 length = msg_size(msg);
1719 1702
1720 if (msg_user(msg) == MSG_BUNDLER) 1703 if (msg_user(msg) == MSG_BUNDLER)
@@ -1722,19 +1705,18 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1722 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 1705 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */
1723 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1706 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1724 msg_set_size(&tunnel_hdr, length + INT_H_SIZE); 1707 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
1725 outbuf = tipc_buf_acquire(length + INT_H_SIZE); 1708 outskb = tipc_buf_acquire(length + INT_H_SIZE);
1726 if (outbuf == NULL) { 1709 if (outskb == NULL) {
1727 pr_warn("%sunable to send duplicate msg\n", 1710 pr_warn("%sunable to send duplicate msg\n",
1728 link_co_err); 1711 link_co_err);
1729 return; 1712 return;
1730 } 1713 }
1731 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); 1714 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
1732 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, 1715 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
1733 length); 1716 length);
1734 __tipc_link_xmit(tunnel, outbuf); 1717 __tipc_link_xmit(tunnel, outskb);
1735 if (!tipc_link_is_up(l_ptr)) 1718 if (!tipc_link_is_up(l_ptr))
1736 return; 1719 return;
1737 iter = iter->next;
1738 } 1720 }
1739} 1721}
1740 1722
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 771123413d5f..96f1e1bf0798 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -119,9 +119,7 @@ struct tipc_stats {
119 * @max_pkt: current maximum packet size for this link 119 * @max_pkt: current maximum packet size for this link
120 * @max_pkt_target: desired maximum packet size for this link 120 * @max_pkt_target: desired maximum packet size for this link
121 * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target) 121 * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target)
122 * @out_queue_size: # of messages in outbound message queue 122 * @outqueue: outbound message queue
123 * @first_out: ptr to first outbound message in queue
124 * @last_out: ptr to last outbound message in queue
125 * @next_out_no: next sequence number to use for outbound messages 123 * @next_out_no: next sequence number to use for outbound messages
126 * @last_retransmitted: sequence number of most recently retransmitted message 124 * @last_retransmitted: sequence number of most recently retransmitted message
127 * @stale_count: # of identical retransmit requests made by peer 125 * @stale_count: # of identical retransmit requests made by peer
@@ -173,9 +171,7 @@ struct tipc_link {
173 u32 max_pkt_probes; 171 u32 max_pkt_probes;
174 172
175 /* Sending */ 173 /* Sending */
176 u32 out_queue_size; 174 struct sk_buff_head outqueue;
177 struct sk_buff *first_out;
178 struct sk_buff *last_out;
179 u32 next_out_no; 175 u32 next_out_no;
180 u32 last_retransmitted; 176 u32 last_retransmitted;
181 u32 stale_count; 177 u32 stale_count;
@@ -233,6 +229,8 @@ u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
233void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); 229void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);
234void tipc_link_retransmit(struct tipc_link *l_ptr, 230void tipc_link_retransmit(struct tipc_link *l_ptr,
235 struct sk_buff *start, u32 retransmits); 231 struct sk_buff *start, u32 retransmits);
232struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
233 const struct sk_buff *skb);
236 234
237int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb); 235int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb);
238int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info); 236int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
@@ -258,6 +256,11 @@ static inline int less_eq(u32 left, u32 right)
258 return mod(right - left) < 32768u; 256 return mod(right - left) < 32768u;
259} 257}
260 258
259static inline int more(u32 left, u32 right)
260{
261 return !less_eq(left, right);
262}
263
261static inline int less(u32 left, u32 right) 264static inline int less(u32 left, u32 right)
262{ 265{
263 return less_eq(left, right) && (mod(right) != mod(left)); 266 return less_eq(left, right) && (mod(right) != mod(left));
@@ -294,7 +297,7 @@ static inline int link_reset_reset(struct tipc_link *l_ptr)
294 297
295static inline int link_congested(struct tipc_link *l_ptr) 298static inline int link_congested(struct tipc_link *l_ptr)
296{ 299{
297 return l_ptr->out_queue_size >= l_ptr->queue_limit[0]; 300 return skb_queue_len(&l_ptr->outqueue) >= l_ptr->queue_limit[0];
298} 301}
299 302
300#endif 303#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 94db39217248..ce7514ae6bf3 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -265,16 +265,17 @@ error:
265 265
266/** 266/**
267 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one 267 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
268 * @bbuf: the existing buffer ("bundle") 268 * @list: the buffer chain of the existing buffer ("bundle")
269 * @buf: buffer to be appended 269 * @skb: buffer to be appended
270 * @mtu: max allowable size for the bundle buffer 270 * @mtu: max allowable size for the bundle buffer
271 * Consumes buffer if successful 271 * Consumes buffer if successful
272 * Returns true if bundling could be performed, otherwise false 272 * Returns true if bundling could be performed, otherwise false
273 */ 273 */
274bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu) 274bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
275{ 275{
276 struct tipc_msg *bmsg = buf_msg(bbuf); 276 struct sk_buff *bskb = skb_peek_tail(list);
277 struct tipc_msg *msg = buf_msg(buf); 277 struct tipc_msg *bmsg = buf_msg(bskb);
278 struct tipc_msg *msg = buf_msg(skb);
278 unsigned int bsz = msg_size(bmsg); 279 unsigned int bsz = msg_size(bmsg);
279 unsigned int msz = msg_size(msg); 280 unsigned int msz = msg_size(msg);
280 u32 start = align(bsz); 281 u32 start = align(bsz);
@@ -289,35 +290,36 @@ bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu)
289 return false; 290 return false;
290 if (likely(msg_user(bmsg) != MSG_BUNDLER)) 291 if (likely(msg_user(bmsg) != MSG_BUNDLER))
291 return false; 292 return false;
292 if (likely(!TIPC_SKB_CB(bbuf)->bundling)) 293 if (likely(!TIPC_SKB_CB(bskb)->bundling))
293 return false; 294 return false;
294 if (unlikely(skb_tailroom(bbuf) < (pad + msz))) 295 if (unlikely(skb_tailroom(bskb) < (pad + msz)))
295 return false; 296 return false;
296 if (unlikely(max < (start + msz))) 297 if (unlikely(max < (start + msz)))
297 return false; 298 return false;
298 299
299 skb_put(bbuf, pad + msz); 300 skb_put(bskb, pad + msz);
300 skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz); 301 skb_copy_to_linear_data_offset(bskb, start, skb->data, msz);
301 msg_set_size(bmsg, start + msz); 302 msg_set_size(bmsg, start + msz);
302 msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1); 303 msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
303 bbuf->next = buf->next; 304 kfree_skb(skb);
304 kfree_skb(buf);
305 return true; 305 return true;
306} 306}
307 307
308/** 308/**
309 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail 309 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
310 * @buf: buffer to be appended and replaced 310 * @list: the buffer chain
311 * @mtu: max allowable size for the bundle buffer, inclusive header 311 * @skb: buffer to be appended and replaced
312 * @mtu: max allowable size for the bundle buffer, inclusive header
312 * @dnode: destination node for message. (Not always present in header) 313 * @dnode: destination node for message. (Not always present in header)
313 * Replaces buffer if successful 314 * Replaces buffer if successful
314 * Returns true if success, otherwise false 315 * Returns true if success, otherwise false
315 */ 316 */
316bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode) 317bool tipc_msg_make_bundle(struct sk_buff_head *list, struct sk_buff *skb,
318 u32 mtu, u32 dnode)
317{ 319{
318 struct sk_buff *bbuf; 320 struct sk_buff *bskb;
319 struct tipc_msg *bmsg; 321 struct tipc_msg *bmsg;
320 struct tipc_msg *msg = buf_msg(*buf); 322 struct tipc_msg *msg = buf_msg(skb);
321 u32 msz = msg_size(msg); 323 u32 msz = msg_size(msg);
322 u32 max = mtu - INT_H_SIZE; 324 u32 max = mtu - INT_H_SIZE;
323 325
@@ -330,21 +332,19 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
330 if (msz > (max / 2)) 332 if (msz > (max / 2))
331 return false; 333 return false;
332 334
333 bbuf = tipc_buf_acquire(max); 335 bskb = tipc_buf_acquire(max);
334 if (!bbuf) 336 if (!bskb)
335 return false; 337 return false;
336 338
337 skb_trim(bbuf, INT_H_SIZE); 339 skb_trim(bskb, INT_H_SIZE);
338 bmsg = buf_msg(bbuf); 340 bmsg = buf_msg(bskb);
339 tipc_msg_init(bmsg, MSG_BUNDLER, 0, INT_H_SIZE, dnode); 341 tipc_msg_init(bmsg, MSG_BUNDLER, 0, INT_H_SIZE, dnode);
340 msg_set_seqno(bmsg, msg_seqno(msg)); 342 msg_set_seqno(bmsg, msg_seqno(msg));
341 msg_set_ack(bmsg, msg_ack(msg)); 343 msg_set_ack(bmsg, msg_ack(msg));
342 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); 344 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
343 bbuf->next = (*buf)->next; 345 TIPC_SKB_CB(bskb)->bundling = true;
344 TIPC_SKB_CB(bbuf)->bundling = true; 346 __skb_queue_tail(list, bskb);
345 tipc_msg_bundle(bbuf, *buf, mtu); 347 return tipc_msg_bundle(list, skb, mtu);
346 *buf = bbuf;
347 return true;
348} 348}
349 349
350/** 350/**
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 8ca874d6b4dc..53e425f12343 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -734,9 +734,10 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
734 734
735int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); 735int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
736 736
737bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu); 737bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu);
738 738
739bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode); 739bool tipc_msg_make_bundle(struct sk_buff_head *list, struct sk_buff *skb,
740 u32 mtu, u32 dnode);
740 741
741int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 742int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
742 int offset, int dsz, int mtu , struct sk_buff **chain); 743 int offset, int dsz, int mtu , struct sk_buff **chain);