aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2014-11-25 22:41:55 -0500
committerDavid S. Miller <davem@davemloft.net>2014-11-26 12:30:17 -0500
commita6ca109443842e7251c68451f8137ae68ae6d8a6 (patch)
tree82658ff3ecd103abdad794b9b0833e45160b235e /net/tipc
parentf03273f1e2fc8a59c3831200dd1532cf29e37e35 (diff)
tipc: use generic SKB list APIs to manage TIPC outgoing packet chains
Use standard SKB list APIs associated with struct sk_buff_head to manage socket outgoing packet chain and name table outgoing packet chain, having relevant code simpler and more readable. Signed-off-by: Ying Xue <ying.xue@windriver.com> Reviewed-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c20
-rw-r--r--net/tipc/bcast.h2
-rw-r--r--net/tipc/link.c98
-rw-r--r--net/tipc/link.h5
-rw-r--r--net/tipc/msg.c74
-rw-r--r--net/tipc/msg.h6
-rw-r--r--net/tipc/name_distr.c46
-rw-r--r--net/tipc/socket.c127
8 files changed, 203 insertions, 175 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 7b238b1f339b..f0761c771734 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -398,20 +398,20 @@ static void bclink_peek_nack(struct tipc_msg *msg)
398 398
399/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster 399/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
400 * and to identified node local sockets 400 * and to identified node local sockets
401 * @buf: chain of buffers containing message 401 * @list: chain of buffers containing message
402 * Consumes the buffer chain, except when returning -ELINKCONG 402 * Consumes the buffer chain, except when returning -ELINKCONG
403 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 403 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
404 */ 404 */
405int tipc_bclink_xmit(struct sk_buff *buf) 405int tipc_bclink_xmit(struct sk_buff_head *list)
406{ 406{
407 int rc = 0; 407 int rc = 0;
408 int bc = 0; 408 int bc = 0;
409 struct sk_buff *clbuf; 409 struct sk_buff *skb;
410 410
411 /* Prepare clone of message for local node */ 411 /* Prepare clone of message for local node */
412 clbuf = tipc_msg_reassemble(buf); 412 skb = tipc_msg_reassemble(list);
413 if (unlikely(!clbuf)) { 413 if (unlikely(!skb)) {
414 kfree_skb_list(buf); 414 __skb_queue_purge(list);
415 return -EHOSTUNREACH; 415 return -EHOSTUNREACH;
416 } 416 }
417 417
@@ -419,7 +419,7 @@ int tipc_bclink_xmit(struct sk_buff *buf)
419 if (likely(bclink)) { 419 if (likely(bclink)) {
420 tipc_bclink_lock(); 420 tipc_bclink_lock();
421 if (likely(bclink->bcast_nodes.count)) { 421 if (likely(bclink->bcast_nodes.count)) {
422 rc = __tipc_link_xmit(bcl, buf); 422 rc = __tipc_link_xmit(bcl, list);
423 if (likely(!rc)) { 423 if (likely(!rc)) {
424 u32 len = skb_queue_len(&bcl->outqueue); 424 u32 len = skb_queue_len(&bcl->outqueue);
425 425
@@ -433,13 +433,13 @@ int tipc_bclink_xmit(struct sk_buff *buf)
433 } 433 }
434 434
435 if (unlikely(!bc)) 435 if (unlikely(!bc))
436 kfree_skb_list(buf); 436 __skb_queue_purge(list);
437 437
438 /* Deliver message clone */ 438 /* Deliver message clone */
439 if (likely(!rc)) 439 if (likely(!rc))
440 tipc_sk_mcast_rcv(clbuf); 440 tipc_sk_mcast_rcv(skb);
441 else 441 else
442 kfree_skb(clbuf); 442 kfree_skb(skb);
443 443
444 return rc; 444 return rc;
445} 445}
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 443de084d3e8..644d79129fba 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -100,7 +100,7 @@ int tipc_bclink_reset_stats(void);
100int tipc_bclink_set_queue_limits(u32 limit); 100int tipc_bclink_set_queue_limits(u32 limit);
101void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action); 101void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
102uint tipc_bclink_get_mtu(void); 102uint tipc_bclink_get_mtu(void);
103int tipc_bclink_xmit(struct sk_buff *buf); 103int tipc_bclink_xmit(struct sk_buff_head *list);
104void tipc_bclink_wakeup_users(void); 104void tipc_bclink_wakeup_users(void);
105int tipc_nl_add_bc_link(struct tipc_nl_msg *msg); 105int tipc_nl_add_bc_link(struct tipc_nl_msg *msg);
106 106
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 0e04508cdba4..34bf15c90c78 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -664,9 +664,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
664 * - For all other messages we discard the buffer and return -EHOSTUNREACH 664 * - For all other messages we discard the buffer and return -EHOSTUNREACH
665 * - For TIPC internal messages we also reset the link 665 * - For TIPC internal messages we also reset the link
666 */ 666 */
667static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) 667static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
668{ 668{
669 struct tipc_msg *msg = buf_msg(buf); 669 struct sk_buff *skb = skb_peek(list);
670 struct tipc_msg *msg = buf_msg(skb);
670 uint imp = tipc_msg_tot_importance(msg); 671 uint imp = tipc_msg_tot_importance(msg);
671 u32 oport = msg_tot_origport(msg); 672 u32 oport = msg_tot_origport(msg);
672 673
@@ -679,28 +680,29 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
679 goto drop; 680 goto drop;
680 if (unlikely(msg_reroute_cnt(msg))) 681 if (unlikely(msg_reroute_cnt(msg)))
681 goto drop; 682 goto drop;
682 if (TIPC_SKB_CB(buf)->wakeup_pending) 683 if (TIPC_SKB_CB(skb)->wakeup_pending)
683 return -ELINKCONG; 684 return -ELINKCONG;
684 if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp)) 685 if (link_schedule_user(link, oport, skb_queue_len(list), imp))
685 return -ELINKCONG; 686 return -ELINKCONG;
686drop: 687drop:
687 kfree_skb_list(buf); 688 __skb_queue_purge(list);
688 return -EHOSTUNREACH; 689 return -EHOSTUNREACH;
689} 690}
690 691
691/** 692/**
692 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked 693 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
693 * @link: link to use 694 * @link: link to use
694 * @skb: chain of buffers containing message 695 * @list: chain of buffers containing message
696 *
695 * Consumes the buffer chain, except when returning -ELINKCONG 697 * Consumes the buffer chain, except when returning -ELINKCONG
696 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket 698 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
697 * user data messages) or -EHOSTUNREACH (all other messages/senders) 699 * user data messages) or -EHOSTUNREACH (all other messages/senders)
698 * Only the socket functions tipc_send_stream() and tipc_send_packet() need 700 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
699 * to act on the return value, since they may need to do more send attempts. 701 * to act on the return value, since they may need to do more send attempts.
700 */ 702 */
701int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb) 703int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list)
702{ 704{
703 struct tipc_msg *msg = buf_msg(skb); 705 struct tipc_msg *msg = buf_msg(skb_peek(list));
704 uint psz = msg_size(msg); 706 uint psz = msg_size(msg);
705 uint sndlim = link->queue_limit[0]; 707 uint sndlim = link->queue_limit[0];
706 uint imp = tipc_msg_tot_importance(msg); 708 uint imp = tipc_msg_tot_importance(msg);
@@ -710,21 +712,21 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb)
710 uint bc_last_in = link->owner->bclink.last_in; 712 uint bc_last_in = link->owner->bclink.last_in;
711 struct tipc_media_addr *addr = &link->media_addr; 713 struct tipc_media_addr *addr = &link->media_addr;
712 struct sk_buff_head *outqueue = &link->outqueue; 714 struct sk_buff_head *outqueue = &link->outqueue;
713 struct sk_buff *next; 715 struct sk_buff *skb, *tmp;
714 716
715 /* Match queue limits against msg importance: */ 717 /* Match queue limits against msg importance: */
716 if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp])) 718 if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
717 return tipc_link_cong(link, skb); 719 return tipc_link_cong(link, list);
718 720
719 /* Has valid packet limit been used ? */ 721 /* Has valid packet limit been used ? */
720 if (unlikely(psz > mtu)) { 722 if (unlikely(psz > mtu)) {
721 kfree_skb_list(skb); 723 __skb_queue_purge(list);
722 return -EMSGSIZE; 724 return -EMSGSIZE;
723 } 725 }
724 726
725 /* Prepare each packet for sending, and add to outqueue: */ 727 /* Prepare each packet for sending, and add to outqueue: */
726 while (skb) { 728 skb_queue_walk_safe(list, skb, tmp) {
727 next = skb->next; 729 __skb_unlink(skb, list);
728 msg = buf_msg(skb); 730 msg = buf_msg(skb);
729 msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); 731 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
730 msg_set_bcast_ack(msg, bc_last_in); 732 msg_set_bcast_ack(msg, bc_last_in);
@@ -736,7 +738,6 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb)
736 link->unacked_window = 0; 738 link->unacked_window = 0;
737 } else if (tipc_msg_bundle(outqueue, skb, mtu)) { 739 } else if (tipc_msg_bundle(outqueue, skb, mtu)) {
738 link->stats.sent_bundled++; 740 link->stats.sent_bundled++;
739 skb = next;
740 continue; 741 continue;
741 } else if (tipc_msg_make_bundle(outqueue, skb, mtu, 742 } else if (tipc_msg_make_bundle(outqueue, skb, mtu,
742 link->addr)) { 743 link->addr)) {
@@ -750,22 +751,43 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb)
750 link->next_out = skb; 751 link->next_out = skb;
751 } 752 }
752 seqno++; 753 seqno++;
753 skb = next;
754 } 754 }
755 link->next_out_no = seqno; 755 link->next_out_no = seqno;
756 return 0; 756 return 0;
757} 757}
758 758
759static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
760{
761 __skb_queue_head_init(list);
762 __skb_queue_tail(list, skb);
763}
764
765static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
766{
767 struct sk_buff_head head;
768
769 skb2list(skb, &head);
770 return __tipc_link_xmit(link, &head);
771}
772
773int tipc_link_xmit_skb(struct sk_buff *skb, u32 dnode, u32 selector)
774{
775 struct sk_buff_head head;
776
777 skb2list(skb, &head);
778 return tipc_link_xmit(&head, dnode, selector);
779}
780
759/** 781/**
760 * tipc_link_xmit() is the general link level function for message sending 782 * tipc_link_xmit() is the general link level function for message sending
761 * @buf: chain of buffers containing message 783 * @list: chain of buffers containing message
762 * @dsz: amount of user data to be sent 784 * @dsz: amount of user data to be sent
763 * @dnode: address of destination node 785 * @dnode: address of destination node
764 * @selector: a number used for deterministic link selection 786 * @selector: a number used for deterministic link selection
765 * Consumes the buffer chain, except when returning -ELINKCONG 787 * Consumes the buffer chain, except when returning -ELINKCONG
766 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 788 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
767 */ 789 */
768int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) 790int tipc_link_xmit(struct sk_buff_head *list, u32 dnode, u32 selector)
769{ 791{
770 struct tipc_link *link = NULL; 792 struct tipc_link *link = NULL;
771 struct tipc_node *node; 793 struct tipc_node *node;
@@ -776,17 +798,22 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
776 tipc_node_lock(node); 798 tipc_node_lock(node);
777 link = node->active_links[selector & 1]; 799 link = node->active_links[selector & 1];
778 if (link) 800 if (link)
779 rc = __tipc_link_xmit(link, buf); 801 rc = __tipc_link_xmit(link, list);
780 tipc_node_unlock(node); 802 tipc_node_unlock(node);
781 } 803 }
782 804
783 if (link) 805 if (link)
784 return rc; 806 return rc;
785 807
786 if (likely(in_own_node(dnode))) 808 if (likely(in_own_node(dnode))) {
787 return tipc_sk_rcv(buf); 809 /* As a node local message chain never contains more than one
810 * buffer, we just need to dequeue one SKB buffer from the
811 * head list.
812 */
813 return tipc_sk_rcv(__skb_dequeue(list));
814 }
815 __skb_queue_purge(list);
788 816
789 kfree_skb_list(buf);
790 return rc; 817 return rc;
791} 818}
792 819
@@ -800,17 +827,17 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
800 */ 827 */
801static void tipc_link_sync_xmit(struct tipc_link *link) 828static void tipc_link_sync_xmit(struct tipc_link *link)
802{ 829{
803 struct sk_buff *buf; 830 struct sk_buff *skb;
804 struct tipc_msg *msg; 831 struct tipc_msg *msg;
805 832
806 buf = tipc_buf_acquire(INT_H_SIZE); 833 skb = tipc_buf_acquire(INT_H_SIZE);
807 if (!buf) 834 if (!skb)
808 return; 835 return;
809 836
810 msg = buf_msg(buf); 837 msg = buf_msg(skb);
811 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); 838 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
812 msg_set_last_bcast(msg, link->owner->bclink.acked); 839 msg_set_last_bcast(msg, link->owner->bclink.acked);
813 __tipc_link_xmit(link, buf); 840 __tipc_link_xmit_skb(link, skb);
814} 841}
815 842
816/* 843/*
@@ -1053,8 +1080,7 @@ void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *b_ptr)
1053 u32 ackd; 1080 u32 ackd;
1054 u32 released; 1081 u32 released;
1055 1082
1056 __skb_queue_head_init(&head); 1083 skb2list(skb, &head);
1057 __skb_queue_tail(&head, skb);
1058 1084
1059 while ((skb = __skb_dequeue(&head))) { 1085 while ((skb = __skb_dequeue(&head))) {
1060 /* Ensure message is well-formed */ 1086 /* Ensure message is well-formed */
@@ -1573,7 +1599,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1573 u32 selector) 1599 u32 selector)
1574{ 1600{
1575 struct tipc_link *tunnel; 1601 struct tipc_link *tunnel;
1576 struct sk_buff *buf; 1602 struct sk_buff *skb;
1577 u32 length = msg_size(msg); 1603 u32 length = msg_size(msg);
1578 1604
1579 tunnel = l_ptr->owner->active_links[selector & 1]; 1605 tunnel = l_ptr->owner->active_links[selector & 1];
@@ -1582,14 +1608,14 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1582 return; 1608 return;
1583 } 1609 }
1584 msg_set_size(tunnel_hdr, length + INT_H_SIZE); 1610 msg_set_size(tunnel_hdr, length + INT_H_SIZE);
1585 buf = tipc_buf_acquire(length + INT_H_SIZE); 1611 skb = tipc_buf_acquire(length + INT_H_SIZE);
1586 if (!buf) { 1612 if (!skb) {
1587 pr_warn("%sunable to send tunnel msg\n", link_co_err); 1613 pr_warn("%sunable to send tunnel msg\n", link_co_err);
1588 return; 1614 return;
1589 } 1615 }
1590 skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); 1616 skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
1591 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); 1617 skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
1592 __tipc_link_xmit(tunnel, buf); 1618 __tipc_link_xmit_skb(tunnel, skb);
1593} 1619}
1594 1620
1595 1621
@@ -1620,7 +1646,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1620 if (skb) { 1646 if (skb) {
1621 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); 1647 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
1622 msg_set_size(&tunnel_hdr, INT_H_SIZE); 1648 msg_set_size(&tunnel_hdr, INT_H_SIZE);
1623 __tipc_link_xmit(tunnel, skb); 1649 __tipc_link_xmit_skb(tunnel, skb);
1624 } else { 1650 } else {
1625 pr_warn("%sunable to send changeover msg\n", 1651 pr_warn("%sunable to send changeover msg\n",
1626 link_co_err); 1652 link_co_err);
@@ -1691,7 +1717,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1691 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE); 1717 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
1692 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data, 1718 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
1693 length); 1719 length);
1694 __tipc_link_xmit(tunnel, outskb); 1720 __tipc_link_xmit_skb(tunnel, outskb);
1695 if (!tipc_link_is_up(l_ptr)) 1721 if (!tipc_link_is_up(l_ptr))
1696 return; 1722 return;
1697 } 1723 }
diff --git a/net/tipc/link.h b/net/tipc/link.h
index de7b8833641a..55812e87ca1e 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -213,8 +213,9 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area,
213void tipc_link_reset_all(struct tipc_node *node); 213void tipc_link_reset_all(struct tipc_node *node);
214void tipc_link_reset(struct tipc_link *l_ptr); 214void tipc_link_reset(struct tipc_link *l_ptr);
215void tipc_link_reset_list(unsigned int bearer_id); 215void tipc_link_reset_list(unsigned int bearer_id);
216int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); 216int tipc_link_xmit_skb(struct sk_buff *skb, u32 dest, u32 selector);
217int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf); 217int tipc_link_xmit(struct sk_buff_head *list, u32 dest, u32 selector);
218int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list);
218u32 tipc_link_get_max_pkt(u32 dest, u32 selector); 219u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
219void tipc_link_bundle_rcv(struct sk_buff *buf); 220void tipc_link_bundle_rcv(struct sk_buff *buf);
220void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, 221void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index ce7514ae6bf3..5b0659791c07 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -166,11 +166,12 @@ err:
166 * @offset: Posision in iov to start copying from 166 * @offset: Posision in iov to start copying from
167 * @dsz: Total length of user data 167 * @dsz: Total length of user data
168 * @pktmax: Max packet size that can be used 168 * @pktmax: Max packet size that can be used
169 * @chain: Buffer or chain of buffers to be returned to caller 169 * @list: Buffer or chain of buffers to be returned to caller
170 *
170 * Returns message data size or errno: -ENOMEM, -EFAULT 171 * Returns message data size or errno: -ENOMEM, -EFAULT
171 */ 172 */
172int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 173int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset,
173 int offset, int dsz, int pktmax , struct sk_buff **chain) 174 int dsz, int pktmax, struct sk_buff_head *list)
174{ 175{
175 int mhsz = msg_hdr_sz(mhdr); 176 int mhsz = msg_hdr_sz(mhdr);
176 int msz = mhsz + dsz; 177 int msz = mhsz + dsz;
@@ -179,22 +180,22 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
179 int pktrem = pktmax; 180 int pktrem = pktmax;
180 int drem = dsz; 181 int drem = dsz;
181 struct tipc_msg pkthdr; 182 struct tipc_msg pkthdr;
182 struct sk_buff *buf, *prev; 183 struct sk_buff *skb;
183 char *pktpos; 184 char *pktpos;
184 int rc; 185 int rc;
185 uint chain_sz = 0; 186
186 msg_set_size(mhdr, msz); 187 msg_set_size(mhdr, msz);
187 188
188 /* No fragmentation needed? */ 189 /* No fragmentation needed? */
189 if (likely(msz <= pktmax)) { 190 if (likely(msz <= pktmax)) {
190 buf = tipc_buf_acquire(msz); 191 skb = tipc_buf_acquire(msz);
191 *chain = buf; 192 if (unlikely(!skb))
192 if (unlikely(!buf))
193 return -ENOMEM; 193 return -ENOMEM;
194 skb_copy_to_linear_data(buf, mhdr, mhsz); 194 __skb_queue_tail(list, skb);
195 pktpos = buf->data + mhsz; 195 skb_copy_to_linear_data(skb, mhdr, mhsz);
196 TIPC_SKB_CB(buf)->chain_sz = 1; 196 pktpos = skb->data + mhsz;
197 if (!dsz || !memcpy_fromiovecend(pktpos, m->msg_iov, offset, dsz)) 197 if (!dsz || !memcpy_fromiovecend(pktpos, m->msg_iov, offset,
198 dsz))
198 return dsz; 199 return dsz;
199 rc = -EFAULT; 200 rc = -EFAULT;
200 goto error; 201 goto error;
@@ -207,15 +208,15 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
207 msg_set_fragm_no(&pkthdr, pktno); 208 msg_set_fragm_no(&pkthdr, pktno);
208 209
209 /* Prepare first fragment */ 210 /* Prepare first fragment */
210 *chain = buf = tipc_buf_acquire(pktmax); 211 skb = tipc_buf_acquire(pktmax);
211 if (!buf) 212 if (!skb)
212 return -ENOMEM; 213 return -ENOMEM;
213 chain_sz = 1; 214 __skb_queue_tail(list, skb);
214 pktpos = buf->data; 215 pktpos = skb->data;
215 skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); 216 skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
216 pktpos += INT_H_SIZE; 217 pktpos += INT_H_SIZE;
217 pktrem -= INT_H_SIZE; 218 pktrem -= INT_H_SIZE;
218 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, mhdr, mhsz); 219 skb_copy_to_linear_data_offset(skb, INT_H_SIZE, mhdr, mhsz);
219 pktpos += mhsz; 220 pktpos += mhsz;
220 pktrem -= mhsz; 221 pktrem -= mhsz;
221 222
@@ -238,28 +239,25 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
238 pktsz = drem + INT_H_SIZE; 239 pktsz = drem + INT_H_SIZE;
239 else 240 else
240 pktsz = pktmax; 241 pktsz = pktmax;
241 prev = buf; 242 skb = tipc_buf_acquire(pktsz);
242 buf = tipc_buf_acquire(pktsz); 243 if (!skb) {
243 if (!buf) {
244 rc = -ENOMEM; 244 rc = -ENOMEM;
245 goto error; 245 goto error;
246 } 246 }
247 chain_sz++; 247 __skb_queue_tail(list, skb);
248 prev->next = buf;
249 msg_set_type(&pkthdr, FRAGMENT); 248 msg_set_type(&pkthdr, FRAGMENT);
250 msg_set_size(&pkthdr, pktsz); 249 msg_set_size(&pkthdr, pktsz);
251 msg_set_fragm_no(&pkthdr, ++pktno); 250 msg_set_fragm_no(&pkthdr, ++pktno);
252 skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); 251 skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
253 pktpos = buf->data + INT_H_SIZE; 252 pktpos = skb->data + INT_H_SIZE;
254 pktrem = pktsz - INT_H_SIZE; 253 pktrem = pktsz - INT_H_SIZE;
255 254
256 } while (1); 255 } while (1);
257 TIPC_SKB_CB(*chain)->chain_sz = chain_sz; 256 msg_set_type(buf_msg(skb), LAST_FRAGMENT);
258 msg_set_type(buf_msg(buf), LAST_FRAGMENT);
259 return dsz; 257 return dsz;
260error: 258error:
261 kfree_skb_list(*chain); 259 __skb_queue_purge(list);
262 *chain = NULL; 260 __skb_queue_head_init(list);
263 return rc; 261 return rc;
264} 262}
265 263
@@ -430,22 +428,23 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
430/* tipc_msg_reassemble() - clone a buffer chain of fragments and 428/* tipc_msg_reassemble() - clone a buffer chain of fragments and
431 * reassemble the clones into one message 429 * reassemble the clones into one message
432 */ 430 */
433struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain) 431struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list)
434{ 432{
435 struct sk_buff *buf = chain; 433 struct sk_buff *skb;
436 struct sk_buff *frag = buf; 434 struct sk_buff *frag = NULL;
437 struct sk_buff *head = NULL; 435 struct sk_buff *head = NULL;
438 int hdr_sz; 436 int hdr_sz;
439 437
440 /* Copy header if single buffer */ 438 /* Copy header if single buffer */
441 if (!buf->next) { 439 if (skb_queue_len(list) == 1) {
442 hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf)); 440 skb = skb_peek(list);
443 return __pskb_copy(buf, hdr_sz, GFP_ATOMIC); 441 hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
442 return __pskb_copy(skb, hdr_sz, GFP_ATOMIC);
444 } 443 }
445 444
446 /* Clone all fragments and reassemble */ 445 /* Clone all fragments and reassemble */
447 while (buf) { 446 skb_queue_walk(list, skb) {
448 frag = skb_clone(buf, GFP_ATOMIC); 447 frag = skb_clone(skb, GFP_ATOMIC);
449 if (!frag) 448 if (!frag)
450 goto error; 449 goto error;
451 frag->next = NULL; 450 frag->next = NULL;
@@ -453,7 +452,6 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
453 break; 452 break;
454 if (!head) 453 if (!head)
455 goto error; 454 goto error;
456 buf = buf->next;
457 } 455 }
458 return frag; 456 return frag;
459error: 457error:
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 53e425f12343..d5c83d7ecb47 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -739,9 +739,9 @@ bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu);
739bool tipc_msg_make_bundle(struct sk_buff_head *list, struct sk_buff *skb, 739bool tipc_msg_make_bundle(struct sk_buff_head *list, struct sk_buff *skb,
740 u32 mtu, u32 dnode); 740 u32 mtu, u32 dnode);
741 741
742int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 742int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset,
743 int offset, int dsz, int mtu , struct sk_buff **chain); 743 int dsz, int mtu, struct sk_buff_head *list);
744 744
745struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain); 745struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
746 746
747#endif 747#endif
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 6c2638d3c659..56248db75274 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -114,9 +114,9 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
114 return buf; 114 return buf;
115} 115}
116 116
117void named_cluster_distribute(struct sk_buff *buf) 117void named_cluster_distribute(struct sk_buff *skb)
118{ 118{
119 struct sk_buff *obuf; 119 struct sk_buff *oskb;
120 struct tipc_node *node; 120 struct tipc_node *node;
121 u32 dnode; 121 u32 dnode;
122 122
@@ -127,15 +127,15 @@ void named_cluster_distribute(struct sk_buff *buf)
127 continue; 127 continue;
128 if (!tipc_node_active_links(node)) 128 if (!tipc_node_active_links(node))
129 continue; 129 continue;
130 obuf = skb_copy(buf, GFP_ATOMIC); 130 oskb = skb_copy(skb, GFP_ATOMIC);
131 if (!obuf) 131 if (!oskb)
132 break; 132 break;
133 msg_set_destnode(buf_msg(obuf), dnode); 133 msg_set_destnode(buf_msg(oskb), dnode);
134 tipc_link_xmit(obuf, dnode, dnode); 134 tipc_link_xmit_skb(oskb, dnode, dnode);
135 } 135 }
136 rcu_read_unlock(); 136 rcu_read_unlock();
137 137
138 kfree_skb(buf); 138 kfree_skb(skb);
139} 139}
140 140
141/** 141/**
@@ -190,15 +190,15 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
190 190
191/** 191/**
192 * named_distribute - prepare name info for bulk distribution to another node 192 * named_distribute - prepare name info for bulk distribution to another node
193 * @msg_list: list of messages (buffers) to be returned from this function 193 * @list: list of messages (buffers) to be returned from this function
194 * @dnode: node to be updated 194 * @dnode: node to be updated
195 * @pls: linked list of publication items to be packed into buffer chain 195 * @pls: linked list of publication items to be packed into buffer chain
196 */ 196 */
197static void named_distribute(struct list_head *msg_list, u32 dnode, 197static void named_distribute(struct sk_buff_head *list, u32 dnode,
198 struct publ_list *pls) 198 struct publ_list *pls)
199{ 199{
200 struct publication *publ; 200 struct publication *publ;
201 struct sk_buff *buf = NULL; 201 struct sk_buff *skb = NULL;
202 struct distr_item *item = NULL; 202 struct distr_item *item = NULL;
203 uint dsz = pls->size * ITEM_SIZE; 203 uint dsz = pls->size * ITEM_SIZE;
204 uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE; 204 uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
@@ -207,15 +207,15 @@ static void named_distribute(struct list_head *msg_list, u32 dnode,
207 207
208 list_for_each_entry(publ, &pls->list, local_list) { 208 list_for_each_entry(publ, &pls->list, local_list) {
209 /* Prepare next buffer: */ 209 /* Prepare next buffer: */
210 if (!buf) { 210 if (!skb) {
211 msg_rem = min_t(uint, rem, msg_dsz); 211 msg_rem = min_t(uint, rem, msg_dsz);
212 rem -= msg_rem; 212 rem -= msg_rem;
213 buf = named_prepare_buf(PUBLICATION, msg_rem, dnode); 213 skb = named_prepare_buf(PUBLICATION, msg_rem, dnode);
214 if (!buf) { 214 if (!skb) {
215 pr_warn("Bulk publication failure\n"); 215 pr_warn("Bulk publication failure\n");
216 return; 216 return;
217 } 217 }
218 item = (struct distr_item *)msg_data(buf_msg(buf)); 218 item = (struct distr_item *)msg_data(buf_msg(skb));
219 } 219 }
220 220
221 /* Pack publication into message: */ 221 /* Pack publication into message: */
@@ -225,8 +225,8 @@ static void named_distribute(struct list_head *msg_list, u32 dnode,
225 225
226 /* Append full buffer to list: */ 226 /* Append full buffer to list: */
227 if (!msg_rem) { 227 if (!msg_rem) {
228 list_add_tail((struct list_head *)buf, msg_list); 228 __skb_queue_tail(list, skb);
229 buf = NULL; 229 skb = NULL;
230 } 230 }
231 } 231 }
232} 232}
@@ -236,18 +236,16 @@ static void named_distribute(struct list_head *msg_list, u32 dnode,
236 */ 236 */
237void tipc_named_node_up(u32 dnode) 237void tipc_named_node_up(u32 dnode)
238{ 238{
239 LIST_HEAD(msg_list); 239 struct sk_buff_head head;
240 struct sk_buff *buf_chain; 240
241 __skb_queue_head_init(&head);
241 242
242 read_lock_bh(&tipc_nametbl_lock); 243 read_lock_bh(&tipc_nametbl_lock);
243 named_distribute(&msg_list, dnode, &publ_cluster); 244 named_distribute(&head, dnode, &publ_cluster);
244 named_distribute(&msg_list, dnode, &publ_zone); 245 named_distribute(&head, dnode, &publ_zone);
245 read_unlock_bh(&tipc_nametbl_lock); 246 read_unlock_bh(&tipc_nametbl_lock);
246 247
247 /* Convert circular list to linear list and send: */ 248 tipc_link_xmit(&head, dnode, dnode);
248 buf_chain = (struct sk_buff *)msg_list.next;
249 ((struct sk_buff *)msg_list.prev)->next = NULL;
250 tipc_link_xmit(buf_chain, dnode, dnode);
251} 249}
252 250
253static void tipc_publ_subscribe(struct publication *publ, u32 addr) 251static void tipc_publ_subscribe(struct publication *publ, u32 addr)
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 341fbd1b5f74..9658d9b63876 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -244,12 +244,12 @@ static void tsk_advance_rx_queue(struct sock *sk)
244 */ 244 */
245static void tsk_rej_rx_queue(struct sock *sk) 245static void tsk_rej_rx_queue(struct sock *sk)
246{ 246{
247 struct sk_buff *buf; 247 struct sk_buff *skb;
248 u32 dnode; 248 u32 dnode;
249 249
250 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { 250 while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
251 if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) 251 if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
252 tipc_link_xmit(buf, dnode, 0); 252 tipc_link_xmit_skb(skb, dnode, 0);
253 } 253 }
254} 254}
255 255
@@ -462,7 +462,7 @@ static int tipc_release(struct socket *sock)
462{ 462{
463 struct sock *sk = sock->sk; 463 struct sock *sk = sock->sk;
464 struct tipc_sock *tsk; 464 struct tipc_sock *tsk;
465 struct sk_buff *buf; 465 struct sk_buff *skb;
466 u32 dnode; 466 u32 dnode;
467 467
468 /* 468 /*
@@ -481,11 +481,11 @@ static int tipc_release(struct socket *sock)
481 */ 481 */
482 dnode = tsk_peer_node(tsk); 482 dnode = tsk_peer_node(tsk);
483 while (sock->state != SS_DISCONNECTING) { 483 while (sock->state != SS_DISCONNECTING) {
484 buf = __skb_dequeue(&sk->sk_receive_queue); 484 skb = __skb_dequeue(&sk->sk_receive_queue);
485 if (buf == NULL) 485 if (skb == NULL)
486 break; 486 break;
487 if (TIPC_SKB_CB(buf)->handle != NULL) 487 if (TIPC_SKB_CB(skb)->handle != NULL)
488 kfree_skb(buf); 488 kfree_skb(skb);
489 else { 489 else {
490 if ((sock->state == SS_CONNECTING) || 490 if ((sock->state == SS_CONNECTING) ||
491 (sock->state == SS_CONNECTED)) { 491 (sock->state == SS_CONNECTED)) {
@@ -493,8 +493,8 @@ static int tipc_release(struct socket *sock)
493 tsk->connected = 0; 493 tsk->connected = 0;
494 tipc_node_remove_conn(dnode, tsk->ref); 494 tipc_node_remove_conn(dnode, tsk->ref);
495 } 495 }
496 if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) 496 if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
497 tipc_link_xmit(buf, dnode, 0); 497 tipc_link_xmit_skb(skb, dnode, 0);
498 } 498 }
499 } 499 }
500 500
@@ -502,12 +502,12 @@ static int tipc_release(struct socket *sock)
502 tipc_sk_ref_discard(tsk->ref); 502 tipc_sk_ref_discard(tsk->ref);
503 k_cancel_timer(&tsk->timer); 503 k_cancel_timer(&tsk->timer);
504 if (tsk->connected) { 504 if (tsk->connected) {
505 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 505 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
506 SHORT_H_SIZE, 0, dnode, tipc_own_addr, 506 SHORT_H_SIZE, 0, dnode, tipc_own_addr,
507 tsk_peer_port(tsk), 507 tsk_peer_port(tsk),
508 tsk->ref, TIPC_ERR_NO_PORT); 508 tsk->ref, TIPC_ERR_NO_PORT);
509 if (buf) 509 if (skb)
510 tipc_link_xmit(buf, dnode, tsk->ref); 510 tipc_link_xmit_skb(skb, dnode, tsk->ref);
511 tipc_node_remove_conn(dnode, tsk->ref); 511 tipc_node_remove_conn(dnode, tsk->ref);
512 } 512 }
513 k_term_timer(&tsk->timer); 513 k_term_timer(&tsk->timer);
@@ -712,7 +712,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
712{ 712{
713 struct sock *sk = sock->sk; 713 struct sock *sk = sock->sk;
714 struct tipc_msg *mhdr = &tipc_sk(sk)->phdr; 714 struct tipc_msg *mhdr = &tipc_sk(sk)->phdr;
715 struct sk_buff *buf; 715 struct sk_buff_head head;
716 uint mtu; 716 uint mtu;
717 int rc; 717 int rc;
718 718
@@ -727,12 +727,13 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
727 727
728new_mtu: 728new_mtu:
729 mtu = tipc_bclink_get_mtu(); 729 mtu = tipc_bclink_get_mtu();
730 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &buf); 730 __skb_queue_head_init(&head);
731 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &head);
731 if (unlikely(rc < 0)) 732 if (unlikely(rc < 0))
732 return rc; 733 return rc;
733 734
734 do { 735 do {
735 rc = tipc_bclink_xmit(buf); 736 rc = tipc_bclink_xmit(&head);
736 if (likely(rc >= 0)) { 737 if (likely(rc >= 0)) {
737 rc = dsz; 738 rc = dsz;
738 break; 739 break;
@@ -744,7 +745,7 @@ new_mtu:
744 tipc_sk(sk)->link_cong = 1; 745 tipc_sk(sk)->link_cong = 1;
745 rc = tipc_wait_for_sndmsg(sock, &timeo); 746 rc = tipc_wait_for_sndmsg(sock, &timeo);
746 if (rc) 747 if (rc)
747 kfree_skb_list(buf); 748 __skb_queue_purge(&head);
748 } while (!rc); 749 } while (!rc);
749 return rc; 750 return rc;
750} 751}
@@ -906,7 +907,8 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
906 struct tipc_sock *tsk = tipc_sk(sk); 907 struct tipc_sock *tsk = tipc_sk(sk);
907 struct tipc_msg *mhdr = &tsk->phdr; 908 struct tipc_msg *mhdr = &tsk->phdr;
908 u32 dnode, dport; 909 u32 dnode, dport;
909 struct sk_buff *buf; 910 struct sk_buff_head head;
911 struct sk_buff *skb;
910 struct tipc_name_seq *seq = &dest->addr.nameseq; 912 struct tipc_name_seq *seq = &dest->addr.nameseq;
911 u32 mtu; 913 u32 mtu;
912 long timeo; 914 long timeo;
@@ -981,13 +983,15 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
981 983
982new_mtu: 984new_mtu:
983 mtu = tipc_node_get_mtu(dnode, tsk->ref); 985 mtu = tipc_node_get_mtu(dnode, tsk->ref);
984 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &buf); 986 __skb_queue_head_init(&head);
987 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head);
985 if (rc < 0) 988 if (rc < 0)
986 goto exit; 989 goto exit;
987 990
988 do { 991 do {
989 TIPC_SKB_CB(buf)->wakeup_pending = tsk->link_cong; 992 skb = skb_peek(&head);
990 rc = tipc_link_xmit(buf, dnode, tsk->ref); 993 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
994 rc = tipc_link_xmit(&head, dnode, tsk->ref);
991 if (likely(rc >= 0)) { 995 if (likely(rc >= 0)) {
992 if (sock->state != SS_READY) 996 if (sock->state != SS_READY)
993 sock->state = SS_CONNECTING; 997 sock->state = SS_CONNECTING;
@@ -1001,7 +1005,7 @@ new_mtu:
1001 tsk->link_cong = 1; 1005 tsk->link_cong = 1;
1002 rc = tipc_wait_for_sndmsg(sock, &timeo); 1006 rc = tipc_wait_for_sndmsg(sock, &timeo);
1003 if (rc) 1007 if (rc)
1004 kfree_skb_list(buf); 1008 __skb_queue_purge(&head);
1005 } while (!rc); 1009 } while (!rc);
1006exit: 1010exit:
1007 if (iocb) 1011 if (iocb)
@@ -1058,7 +1062,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1058 struct sock *sk = sock->sk; 1062 struct sock *sk = sock->sk;
1059 struct tipc_sock *tsk = tipc_sk(sk); 1063 struct tipc_sock *tsk = tipc_sk(sk);
1060 struct tipc_msg *mhdr = &tsk->phdr; 1064 struct tipc_msg *mhdr = &tsk->phdr;
1061 struct sk_buff *buf; 1065 struct sk_buff_head head;
1062 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 1066 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1063 u32 ref = tsk->ref; 1067 u32 ref = tsk->ref;
1064 int rc = -EINVAL; 1068 int rc = -EINVAL;
@@ -1093,12 +1097,13 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1093next: 1097next:
1094 mtu = tsk->max_pkt; 1098 mtu = tsk->max_pkt;
1095 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); 1099 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1096 rc = tipc_msg_build(mhdr, m, sent, send, mtu, &buf); 1100 __skb_queue_head_init(&head);
1101 rc = tipc_msg_build(mhdr, m, sent, send, mtu, &head);
1097 if (unlikely(rc < 0)) 1102 if (unlikely(rc < 0))
1098 goto exit; 1103 goto exit;
1099 do { 1104 do {
1100 if (likely(!tsk_conn_cong(tsk))) { 1105 if (likely(!tsk_conn_cong(tsk))) {
1101 rc = tipc_link_xmit(buf, dnode, ref); 1106 rc = tipc_link_xmit(&head, dnode, ref);
1102 if (likely(!rc)) { 1107 if (likely(!rc)) {
1103 tsk->sent_unacked++; 1108 tsk->sent_unacked++;
1104 sent += send; 1109 sent += send;
@@ -1116,7 +1121,7 @@ next:
1116 } 1121 }
1117 rc = tipc_wait_for_sndpkt(sock, &timeo); 1122 rc = tipc_wait_for_sndpkt(sock, &timeo);
1118 if (rc) 1123 if (rc)
1119 kfree_skb_list(buf); 1124 __skb_queue_purge(&head);
1120 } while (!rc); 1125 } while (!rc);
1121exit: 1126exit:
1122 if (iocb) 1127 if (iocb)
@@ -1261,20 +1266,20 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1261 1266
1262static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) 1267static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1263{ 1268{
1264 struct sk_buff *buf = NULL; 1269 struct sk_buff *skb = NULL;
1265 struct tipc_msg *msg; 1270 struct tipc_msg *msg;
1266 u32 peer_port = tsk_peer_port(tsk); 1271 u32 peer_port = tsk_peer_port(tsk);
1267 u32 dnode = tsk_peer_node(tsk); 1272 u32 dnode = tsk_peer_node(tsk);
1268 1273
1269 if (!tsk->connected) 1274 if (!tsk->connected)
1270 return; 1275 return;
1271 buf = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode, 1276 skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode,
1272 tipc_own_addr, peer_port, tsk->ref, TIPC_OK); 1277 tipc_own_addr, peer_port, tsk->ref, TIPC_OK);
1273 if (!buf) 1278 if (!skb)
1274 return; 1279 return;
1275 msg = buf_msg(buf); 1280 msg = buf_msg(skb);
1276 msg_set_msgcnt(msg, ack); 1281 msg_set_msgcnt(msg, ack);
1277 tipc_link_xmit(buf, dnode, msg_link_selector(msg)); 1282 tipc_link_xmit_skb(skb, dnode, msg_link_selector(msg));
1278} 1283}
1279 1284
1280static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) 1285static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1729,20 +1734,20 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
1729/** 1734/**
1730 * tipc_backlog_rcv - handle incoming message from backlog queue 1735 * tipc_backlog_rcv - handle incoming message from backlog queue
1731 * @sk: socket 1736 * @sk: socket
1732 * @buf: message 1737 * @skb: message
1733 * 1738 *
1734 * Caller must hold socket lock, but not port lock. 1739 * Caller must hold socket lock, but not port lock.
1735 * 1740 *
1736 * Returns 0 1741 * Returns 0
1737 */ 1742 */
1738static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) 1743static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1739{ 1744{
1740 int rc; 1745 int rc;
1741 u32 onode; 1746 u32 onode;
1742 struct tipc_sock *tsk = tipc_sk(sk); 1747 struct tipc_sock *tsk = tipc_sk(sk);
1743 uint truesize = buf->truesize; 1748 uint truesize = skb->truesize;
1744 1749
1745 rc = filter_rcv(sk, buf); 1750 rc = filter_rcv(sk, skb);
1746 1751
1747 if (likely(!rc)) { 1752 if (likely(!rc)) {
1748 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) 1753 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
@@ -1750,25 +1755,25 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
1750 return 0; 1755 return 0;
1751 } 1756 }
1752 1757
1753 if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc)) 1758 if ((rc < 0) && !tipc_msg_reverse(skb, &onode, -rc))
1754 return 0; 1759 return 0;
1755 1760
1756 tipc_link_xmit(buf, onode, 0); 1761 tipc_link_xmit_skb(skb, onode, 0);
1757 1762
1758 return 0; 1763 return 0;
1759} 1764}
1760 1765
1761/** 1766/**
1762 * tipc_sk_rcv - handle incoming message 1767 * tipc_sk_rcv - handle incoming message
1763 * @buf: buffer containing arriving message 1768 * @skb: buffer containing arriving message
1764 * Consumes buffer 1769 * Consumes buffer
1765 * Returns 0 if success, or errno: -EHOSTUNREACH 1770 * Returns 0 if success, or errno: -EHOSTUNREACH
1766 */ 1771 */
1767int tipc_sk_rcv(struct sk_buff *buf) 1772int tipc_sk_rcv(struct sk_buff *skb)
1768{ 1773{
1769 struct tipc_sock *tsk; 1774 struct tipc_sock *tsk;
1770 struct sock *sk; 1775 struct sock *sk;
1771 u32 dport = msg_destport(buf_msg(buf)); 1776 u32 dport = msg_destport(buf_msg(skb));
1772 int rc = TIPC_OK; 1777 int rc = TIPC_OK;
1773 uint limit; 1778 uint limit;
1774 u32 dnode; 1779 u32 dnode;
@@ -1776,7 +1781,7 @@ int tipc_sk_rcv(struct sk_buff *buf)
1776 /* Validate destination and message */ 1781 /* Validate destination and message */
1777 tsk = tipc_sk_get(dport); 1782 tsk = tipc_sk_get(dport);
1778 if (unlikely(!tsk)) { 1783 if (unlikely(!tsk)) {
1779 rc = tipc_msg_eval(buf, &dnode); 1784 rc = tipc_msg_eval(skb, &dnode);
1780 goto exit; 1785 goto exit;
1781 } 1786 }
1782 sk = &tsk->sk; 1787 sk = &tsk->sk;
@@ -1785,12 +1790,12 @@ int tipc_sk_rcv(struct sk_buff *buf)
1785 spin_lock_bh(&sk->sk_lock.slock); 1790 spin_lock_bh(&sk->sk_lock.slock);
1786 1791
1787 if (!sock_owned_by_user(sk)) { 1792 if (!sock_owned_by_user(sk)) {
1788 rc = filter_rcv(sk, buf); 1793 rc = filter_rcv(sk, skb);
1789 } else { 1794 } else {
1790 if (sk->sk_backlog.len == 0) 1795 if (sk->sk_backlog.len == 0)
1791 atomic_set(&tsk->dupl_rcvcnt, 0); 1796 atomic_set(&tsk->dupl_rcvcnt, 0);
1792 limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt); 1797 limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
1793 if (sk_add_backlog(sk, buf, limit)) 1798 if (sk_add_backlog(sk, skb, limit))
1794 rc = -TIPC_ERR_OVERLOAD; 1799 rc = -TIPC_ERR_OVERLOAD;
1795 } 1800 }
1796 spin_unlock_bh(&sk->sk_lock.slock); 1801 spin_unlock_bh(&sk->sk_lock.slock);
@@ -1798,10 +1803,10 @@ int tipc_sk_rcv(struct sk_buff *buf)
1798 if (likely(!rc)) 1803 if (likely(!rc))
1799 return 0; 1804 return 0;
1800exit: 1805exit:
1801 if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc)) 1806 if ((rc < 0) && !tipc_msg_reverse(skb, &dnode, -rc))
1802 return -EHOSTUNREACH; 1807 return -EHOSTUNREACH;
1803 1808
1804 tipc_link_xmit(buf, dnode, 0); 1809 tipc_link_xmit_skb(skb, dnode, 0);
1805 return (rc < 0) ? -EHOSTUNREACH : 0; 1810 return (rc < 0) ? -EHOSTUNREACH : 0;
1806} 1811}
1807 1812
@@ -2059,7 +2064,7 @@ static int tipc_shutdown(struct socket *sock, int how)
2059{ 2064{
2060 struct sock *sk = sock->sk; 2065 struct sock *sk = sock->sk;
2061 struct tipc_sock *tsk = tipc_sk(sk); 2066 struct tipc_sock *tsk = tipc_sk(sk);
2062 struct sk_buff *buf; 2067 struct sk_buff *skb;
2063 u32 dnode; 2068 u32 dnode;
2064 int res; 2069 int res;
2065 2070
@@ -2074,23 +2079,23 @@ static int tipc_shutdown(struct socket *sock, int how)
2074 2079
2075restart: 2080restart:
2076 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */ 2081 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2077 buf = __skb_dequeue(&sk->sk_receive_queue); 2082 skb = __skb_dequeue(&sk->sk_receive_queue);
2078 if (buf) { 2083 if (skb) {
2079 if (TIPC_SKB_CB(buf)->handle != NULL) { 2084 if (TIPC_SKB_CB(skb)->handle != NULL) {
2080 kfree_skb(buf); 2085 kfree_skb(skb);
2081 goto restart; 2086 goto restart;
2082 } 2087 }
2083 if (tipc_msg_reverse(buf, &dnode, TIPC_CONN_SHUTDOWN)) 2088 if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN))
2084 tipc_link_xmit(buf, dnode, tsk->ref); 2089 tipc_link_xmit_skb(skb, dnode, tsk->ref);
2085 tipc_node_remove_conn(dnode, tsk->ref); 2090 tipc_node_remove_conn(dnode, tsk->ref);
2086 } else { 2091 } else {
2087 dnode = tsk_peer_node(tsk); 2092 dnode = tsk_peer_node(tsk);
2088 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, 2093 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2089 TIPC_CONN_MSG, SHORT_H_SIZE, 2094 TIPC_CONN_MSG, SHORT_H_SIZE,
2090 0, dnode, tipc_own_addr, 2095 0, dnode, tipc_own_addr,
2091 tsk_peer_port(tsk), 2096 tsk_peer_port(tsk),
2092 tsk->ref, TIPC_CONN_SHUTDOWN); 2097 tsk->ref, TIPC_CONN_SHUTDOWN);
2093 tipc_link_xmit(buf, dnode, tsk->ref); 2098 tipc_link_xmit_skb(skb, dnode, tsk->ref);
2094 } 2099 }
2095 tsk->connected = 0; 2100 tsk->connected = 0;
2096 sock->state = SS_DISCONNECTING; 2101 sock->state = SS_DISCONNECTING;
@@ -2119,7 +2124,7 @@ static void tipc_sk_timeout(unsigned long ref)
2119{ 2124{
2120 struct tipc_sock *tsk; 2125 struct tipc_sock *tsk;
2121 struct sock *sk; 2126 struct sock *sk;
2122 struct sk_buff *buf = NULL; 2127 struct sk_buff *skb = NULL;
2123 u32 peer_port, peer_node; 2128 u32 peer_port, peer_node;
2124 2129
2125 tsk = tipc_sk_get(ref); 2130 tsk = tipc_sk_get(ref);
@@ -2137,20 +2142,20 @@ static void tipc_sk_timeout(unsigned long ref)
2137 2142
2138 if (tsk->probing_state == TIPC_CONN_PROBING) { 2143 if (tsk->probing_state == TIPC_CONN_PROBING) {
2139 /* Previous probe not answered -> self abort */ 2144 /* Previous probe not answered -> self abort */
2140 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 2145 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
2141 SHORT_H_SIZE, 0, tipc_own_addr, 2146 SHORT_H_SIZE, 0, tipc_own_addr,
2142 peer_node, ref, peer_port, 2147 peer_node, ref, peer_port,
2143 TIPC_ERR_NO_PORT); 2148 TIPC_ERR_NO_PORT);
2144 } else { 2149 } else {
2145 buf = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 2150 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
2146 0, peer_node, tipc_own_addr, 2151 0, peer_node, tipc_own_addr,
2147 peer_port, ref, TIPC_OK); 2152 peer_port, ref, TIPC_OK);
2148 tsk->probing_state = TIPC_CONN_PROBING; 2153 tsk->probing_state = TIPC_CONN_PROBING;
2149 k_start_timer(&tsk->timer, tsk->probing_interval); 2154 k_start_timer(&tsk->timer, tsk->probing_interval);
2150 } 2155 }
2151 bh_unlock_sock(sk); 2156 bh_unlock_sock(sk);
2152 if (buf) 2157 if (skb)
2153 tipc_link_xmit(buf, peer_node, ref); 2158 tipc_link_xmit_skb(skb, peer_node, ref);
2154exit: 2159exit:
2155 tipc_sk_put(tsk); 2160 tipc_sk_put(tsk);
2156} 2161}