aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2014-11-25 22:41:55 -0500
committerDavid S. Miller <davem@davemloft.net>2014-11-26 12:30:17 -0500
commita6ca109443842e7251c68451f8137ae68ae6d8a6 (patch)
tree82658ff3ecd103abdad794b9b0833e45160b235e /net/tipc/link.c
parentf03273f1e2fc8a59c3831200dd1532cf29e37e35 (diff)
tipc: use generic SKB list APIs to manage TIPC outgoing packet chains
Use standard SKB list APIs associated with struct sk_buff_head to manage socket outgoing packet chain and name table outgoing packet chain, having relevant code simpler and more readable. Signed-off-by: Ying Xue <ying.xue@windriver.com> Reviewed-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c98
1 files changed, 62 insertions, 36 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 0e04508cdba4..34bf15c90c78 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -664,9 +664,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
664 * - For all other messages we discard the buffer and return -EHOSTUNREACH 664 * - For all other messages we discard the buffer and return -EHOSTUNREACH
665 * - For TIPC internal messages we also reset the link 665 * - For TIPC internal messages we also reset the link
666 */ 666 */
667static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) 667static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
668{ 668{
669 struct tipc_msg *msg = buf_msg(buf); 669 struct sk_buff *skb = skb_peek(list);
670 struct tipc_msg *msg = buf_msg(skb);
670 uint imp = tipc_msg_tot_importance(msg); 671 uint imp = tipc_msg_tot_importance(msg);
671 u32 oport = msg_tot_origport(msg); 672 u32 oport = msg_tot_origport(msg);
672 673
@@ -679,28 +680,29 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
679 goto drop; 680 goto drop;
680 if (unlikely(msg_reroute_cnt(msg))) 681 if (unlikely(msg_reroute_cnt(msg)))
681 goto drop; 682 goto drop;
682 if (TIPC_SKB_CB(buf)->wakeup_pending) 683 if (TIPC_SKB_CB(skb)->wakeup_pending)
683 return -ELINKCONG; 684 return -ELINKCONG;
684 if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp)) 685 if (link_schedule_user(link, oport, skb_queue_len(list), imp))
685 return -ELINKCONG; 686 return -ELINKCONG;
686drop: 687drop:
687 kfree_skb_list(buf); 688 __skb_queue_purge(list);
688 return -EHOSTUNREACH; 689 return -EHOSTUNREACH;
689} 690}
690 691
691/** 692/**
692 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked 693 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
693 * @link: link to use 694 * @link: link to use
694 * @skb: chain of buffers containing message 695 * @list: chain of buffers containing message
696 *
695 * Consumes the buffer chain, except when returning -ELINKCONG 697 * Consumes the buffer chain, except when returning -ELINKCONG
696 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket 698 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
697 * user data messages) or -EHOSTUNREACH (all other messages/senders) 699 * user data messages) or -EHOSTUNREACH (all other messages/senders)
698 * Only the socket functions tipc_send_stream() and tipc_send_packet() need 700 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
699 * to act on the return value, since they may need to do more send attempts. 701 * to act on the return value, since they may need to do more send attempts.
700 */ 702 */
701int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb) 703int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list)
702{ 704{
703 struct tipc_msg *msg = buf_msg(skb); 705 struct tipc_msg *msg = buf_msg(skb_peek(list));
704 uint psz = msg_size(msg); 706 uint psz = msg_size(msg);
705 uint sndlim = link->queue_limit[0]; 707 uint sndlim = link->queue_limit[0];
706 uint imp = tipc_msg_tot_importance(msg); 708 uint imp = tipc_msg_tot_importance(msg);
@@ -710,21 +712,21 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb)
710 uint bc_last_in = link->owner->bclink.last_in; 712 uint bc_last_in = link->owner->bclink.last_in;
711 struct tipc_media_addr *addr = &link->media_addr; 713 struct tipc_media_addr *addr = &link->media_addr;
712 struct sk_buff_head *outqueue = &link->outqueue; 714 struct sk_buff_head *outqueue = &link->outqueue;
713 struct sk_buff *next; 715 struct sk_buff *skb, *tmp;
714 716
715 /* Match queue limits against msg importance: */ 717 /* Match queue limits against msg importance: */
716 if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp])) 718 if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
717 return tipc_link_cong(link, skb); 719 return tipc_link_cong(link, list);
718 720
719 /* Has valid packet limit been used ? */ 721 /* Has valid packet limit been used ? */
720 if (unlikely(psz > mtu)) { 722 if (unlikely(psz > mtu)) {
721 kfree_skb_list(skb); 723 __skb_queue_purge(list);
722 return -EMSGSIZE; 724 return -EMSGSIZE;
723 } 725 }
724 726
725 /* Prepare each packet for sending, and add to outqueue: */ 727 /* Prepare each packet for sending, and add to outqueue: */
726 while (skb) { 728 skb_queue_walk_safe(list, skb, tmp) {
727 next = skb->next; 729 __skb_unlink(skb, list);
728 msg = buf_msg(skb); 730 msg = buf_msg(skb);
729 msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); 731 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
730 msg_set_bcast_ack(msg, bc_last_in); 732 msg_set_bcast_ack(msg, bc_last_in);
@@ -736,7 +738,6 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb)
736 link->unacked_window = 0; 738 link->unacked_window = 0;
737 } else if (tipc_msg_bundle(outqueue, skb, mtu)) { 739 } else if (tipc_msg_bundle(outqueue, skb, mtu)) {
738 link->stats.sent_bundled++; 740 link->stats.sent_bundled++;
739 skb = next;
740 continue; 741 continue;
741 } else if (tipc_msg_make_bundle(outqueue, skb, mtu, 742 } else if (tipc_msg_make_bundle(outqueue, skb, mtu,
742 link->addr)) { 743 link->addr)) {
@@ -750,22 +751,43 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb)
750 link->next_out = skb; 751 link->next_out = skb;
751 } 752 }
752 seqno++; 753 seqno++;
753 skb = next;
754 } 754 }
755 link->next_out_no = seqno; 755 link->next_out_no = seqno;
756 return 0; 756 return 0;
757} 757}
758 758
759static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
760{
761 __skb_queue_head_init(list);
762 __skb_queue_tail(list, skb);
763}
764
765static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
766{
767 struct sk_buff_head head;
768
769 skb2list(skb, &head);
770 return __tipc_link_xmit(link, &head);
771}
772
773int tipc_link_xmit_skb(struct sk_buff *skb, u32 dnode, u32 selector)
774{
775 struct sk_buff_head head;
776
777 skb2list(skb, &head);
778 return tipc_link_xmit(&head, dnode, selector);
779}
780
759/** 781/**
760 * tipc_link_xmit() is the general link level function for message sending 782 * tipc_link_xmit() is the general link level function for message sending
761 * @buf: chain of buffers containing message 783 * @list: chain of buffers containing message
762 * @dsz: amount of user data to be sent 784 * @dsz: amount of user data to be sent
763 * @dnode: address of destination node 785 * @dnode: address of destination node
764 * @selector: a number used for deterministic link selection 786 * @selector: a number used for deterministic link selection
765 * Consumes the buffer chain, except when returning -ELINKCONG 787 * Consumes the buffer chain, except when returning -ELINKCONG
766 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 788 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
767 */ 789 */
768int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) 790int tipc_link_xmit(struct sk_buff_head *list, u32 dnode, u32 selector)
769{ 791{
770 struct tipc_link *link = NULL; 792 struct tipc_link *link = NULL;
771 struct tipc_node *node; 793 struct tipc_node *node;
@@ -776,17 +798,22 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
776 tipc_node_lock(node); 798 tipc_node_lock(node);
777 link = node->active_links[selector & 1]; 799 link = node->active_links[selector & 1];
778 if (link) 800 if (link)
779 rc = __tipc_link_xmit(link, buf); 801 rc = __tipc_link_xmit(link, list);
780 tipc_node_unlock(node); 802 tipc_node_unlock(node);
781 } 803 }
782 804
783 if (link) 805 if (link)
784 return rc; 806 return rc;
785 807
786 if (likely(in_own_node(dnode))) 808 if (likely(in_own_node(dnode))) {
787 return tipc_sk_rcv(buf); 809 /* As a node local message chain never contains more than one
810 * buffer, we just need to dequeue one SKB buffer from the
811 * head list.
812 */
813 return tipc_sk_rcv(__skb_dequeue(list));
814 }
815 __skb_queue_purge(list);
788 816
789 kfree_skb_list(buf);
790 return rc; 817 return rc;
791} 818}
792 819
@@ -800,17 +827,17 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
800 */ 827 */
801static void tipc_link_sync_xmit(struct tipc_link *link) 828static void tipc_link_sync_xmit(struct tipc_link *link)
802{ 829{
803 struct sk_buff *buf; 830 struct sk_buff *skb;
804 struct tipc_msg *msg; 831 struct tipc_msg *msg;
805 832
806 buf = tipc_buf_acquire(INT_H_SIZE); 833 skb = tipc_buf_acquire(INT_H_SIZE);
807 if (!buf) 834 if (!skb)
808 return; 835 return;
809 836
810 msg = buf_msg(buf); 837 msg = buf_msg(skb);
811 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); 838 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
812 msg_set_last_bcast(msg, link->owner->bclink.acked); 839 msg_set_last_bcast(msg, link->owner->bclink.acked);
813 __tipc_link_xmit(link, buf); 840 __tipc_link_xmit_skb(link, skb);
814} 841}
815 842
816/* 843/*
@@ -1053,8 +1080,7 @@ void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *b_ptr)
1053 u32 ackd; 1080 u32 ackd;
1054 u32 released; 1081 u32 released;
1055 1082
1056 __skb_queue_head_init(&head); 1083 skb2list(skb, &head);
1057 __skb_queue_tail(&head, skb);
1058 1084
1059 while ((skb = __skb_dequeue(&head))) { 1085 while ((skb = __skb_dequeue(&head))) {
1060 /* Ensure message is well-formed */ 1086 /* Ensure message is well-formed */
@@ -1573,7 +1599,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1573 u32 selector) 1599 u32 selector)
1574{ 1600{
1575 struct tipc_link *tunnel; 1601 struct tipc_link *tunnel;
1576 struct sk_buff *buf; 1602 struct sk_buff *skb;
1577 u32 length = msg_size(msg); 1603 u32 length = msg_size(msg);
1578 1604
1579 tunnel = l_ptr->owner->active_links[selector & 1]; 1605 tunnel = l_ptr->owner->active_links[selector & 1];
@@ -1582,14 +1608,14 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1582 return; 1608 return;
1583 } 1609 }
1584 msg_set_size(tunnel_hdr, length + INT_H_SIZE); 1610 msg_set_size(tunnel_hdr, length + INT_H_SIZE);
1585 buf = tipc_buf_acquire(length + INT_H_SIZE); 1611 skb = tipc_buf_acquire(length + INT_H_SIZE);
1586 if (!buf) { 1612 if (!skb) {
1587 pr_warn("%sunable to send tunnel msg\n", link_co_err); 1613 pr_warn("%sunable to send tunnel msg\n", link_co_err);
1588 return; 1614 return;
1589 } 1615 }
1590 skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); 1616 skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
1591 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); 1617 skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
1592 __tipc_link_xmit(tunnel, buf); 1618 __tipc_link_xmit_skb(tunnel, skb);
1593} 1619}
1594 1620
1595 1621
@@ -1620,7 +1646,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1620 if (skb) { 1646 if (skb) {
1621 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); 1647 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
1622 msg_set_size(&tunnel_hdr, INT_H_SIZE); 1648 msg_set_size(&tunnel_hdr, INT_H_SIZE);
1623 __tipc_link_xmit(tunnel, skb); 1649 __tipc_link_xmit_skb(tunnel, skb);
1624 } else { 1650 } else {
1625 pr_warn("%sunable to send changeover msg\n", 1651 pr_warn("%sunable to send changeover msg\n",
1626 link_co_err); 1652 link_co_err);
@@ -1691,7 +1717,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1691 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE); 1717 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
1692 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data, 1718 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
1693 length); 1719 length);
1694 __tipc_link_xmit(tunnel, outskb); 1720 __tipc_link_xmit_skb(tunnel, outskb);
1695 if (!tipc_link_is_up(l_ptr)) 1721 if (!tipc_link_is_up(l_ptr))
1696 return; 1722 return;
1697 } 1723 }