aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-04-02 09:33:00 -0400
committerDavid S. Miller <davem@davemloft.net>2015-04-02 16:27:12 -0400
commit2da7142516527a5213588f47ed302e79a5d9527a (patch)
tree6a01a2fbb9828c072ad2518bb0f105b13b35f189 /net/tipc/link.c
parentc5531ca2bf3de4e172d2dcc12b97b9f663ab0453 (diff)
tipc: drop tunneled packet duplicates at reception
In commit 8b4ed8634f8b3f9aacfc42b4a872d30c36b9e255 ("tipc: eliminate race condition at dual link establishment") we introduced a parallel link synchronization mechanism that guarentees sequential delivery even for users switching from an old to a newly established link. The new mechanism makes it unnecessary to deliver the tunneled duplicate packets back to the old link, as we are currently doing. It is now sufficient to use the last tunneled packet's inner sequence number as synchronization point between the two parallel links, whereafter it can be dropped. In this commit, we drop the duplicate packets arriving on the new link, after updating the synchronization point at each new arrival. Although it would now have been sufficient for the other endpoint to only tunnel the last packet in its send queue, and not the entire queue, we must still do this to maintain compatibility with older nodes. This commit makes it possible to get rid if some complex interaction between the two parallel links. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c132
1 files changed, 47 insertions, 85 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 514466efc25c..c697cf69da91 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -105,8 +105,6 @@ static void link_handle_out_of_seq_msg(struct tipc_link *link,
105 struct sk_buff *skb); 105 struct sk_buff *skb);
106static void tipc_link_proto_rcv(struct tipc_link *link, 106static void tipc_link_proto_rcv(struct tipc_link *link,
107 struct sk_buff *skb); 107 struct sk_buff *skb);
108static int tipc_link_tunnel_rcv(struct tipc_node *node,
109 struct sk_buff **skb);
110static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol); 108static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol);
111static void link_state_event(struct tipc_link *l_ptr, u32 event); 109static void link_state_event(struct tipc_link *l_ptr, u32 event);
112static void link_reset_statistics(struct tipc_link *l_ptr); 110static void link_reset_statistics(struct tipc_link *l_ptr);
@@ -115,7 +113,8 @@ static void tipc_link_sync_xmit(struct tipc_link *l);
115static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 113static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
116static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb); 114static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
117static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb); 115static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
118 116static bool tipc_link_failover_rcv(struct tipc_node *node,
117 struct sk_buff **skb);
119/* 118/*
120 * Simple link routines 119 * Simple link routines
121 */ 120 */
@@ -1274,8 +1273,10 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1274 if (msg_dup(msg)) { 1273 if (msg_dup(msg)) {
1275 link->flags |= LINK_SYNCHING; 1274 link->flags |= LINK_SYNCHING;
1276 link->synch_point = msg_seqno(msg_get_wrapped(msg)); 1275 link->synch_point = msg_seqno(msg_get_wrapped(msg));
1276 kfree_skb(skb);
1277 break;
1277 } 1278 }
1278 if (!tipc_link_tunnel_rcv(node, &skb)) 1279 if (!tipc_link_failover_rcv(node, &skb))
1279 break; 1280 break;
1280 if (msg_user(buf_msg(skb)) != MSG_BUNDLER) { 1281 if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
1281 tipc_data_input(link, skb); 1282 tipc_data_input(link, skb);
@@ -1755,101 +1756,62 @@ tunnel_queue:
1755 goto tunnel_queue; 1756 goto tunnel_queue;
1756} 1757}
1757 1758
1758/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
1759 * Owner node is locked.
1760 */
1761static void tipc_link_dup_rcv(struct tipc_link *link,
1762 struct sk_buff *skb)
1763{
1764 struct sk_buff *iskb;
1765 int pos = 0;
1766
1767 if (!tipc_link_is_up(link))
1768 return;
1769
1770 if (!tipc_msg_extract(skb, &iskb, &pos)) {
1771 pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
1772 return;
1773 }
1774 /* Append buffer to deferred queue, if applicable: */
1775 link_handle_out_of_seq_msg(link, iskb);
1776}
1777
1778/* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet 1759/* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
1779 * Owner node is locked. 1760 * Owner node is locked.
1780 */ 1761 */
1781static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr, 1762static bool tipc_link_failover_rcv(struct tipc_node *node,
1782 struct sk_buff *t_buf) 1763 struct sk_buff **skb)
1783{ 1764{
1784 struct tipc_msg *t_msg = buf_msg(t_buf); 1765 struct tipc_msg *msg = buf_msg(*skb);
1785 struct sk_buff *buf = NULL; 1766 struct sk_buff *iskb = NULL;
1786 struct tipc_msg *msg; 1767 struct tipc_link *link = NULL;
1768 int bearer_id = msg_bearer_id(msg);
1787 int pos = 0; 1769 int pos = 0;
1788 1770
1789 if (tipc_link_is_up(l_ptr)) 1771 if (msg_type(msg) != ORIGINAL_MSG) {
1790 tipc_link_reset(l_ptr); 1772 pr_warn("%sunknown tunnel pkt received\n", link_co_err);
1791 1773 goto exit;
1792 /* First failover packet? */
1793 if (l_ptr->exp_msg_count == START_CHANGEOVER)
1794 l_ptr->exp_msg_count = msg_msgcnt(t_msg);
1795
1796 /* Should there be an inner packet? */
1797 if (l_ptr->exp_msg_count) {
1798 l_ptr->exp_msg_count--;
1799 if (!tipc_msg_extract(t_buf, &buf, &pos)) {
1800 pr_warn("%sno inner failover pkt\n", link_co_err);
1801 goto exit;
1802 }
1803 msg = buf_msg(buf);
1804
1805 if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) {
1806 kfree_skb(buf);
1807 buf = NULL;
1808 goto exit;
1809 }
1810 if (msg_user(msg) == MSG_FRAGMENTER) {
1811 l_ptr->stats.recv_fragments++;
1812 tipc_buf_append(&l_ptr->reasm_buf, &buf);
1813 }
1814 } 1774 }
1815exit: 1775 if (bearer_id >= MAX_BEARERS)
1816 if ((!l_ptr->exp_msg_count) && (l_ptr->flags & LINK_STOPPED)) 1776 goto exit;
1817 tipc_link_delete(l_ptr); 1777 link = node->links[bearer_id];
1818 return buf; 1778 if (!link)
1819} 1779 goto exit;
1820 1780 if (tipc_link_is_up(link))
1821/* tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent 1781 tipc_link_reset(link);
1822 * via other link as result of a failover (ORIGINAL_MSG) or
1823 * a new active link (DUPLICATE_MSG). Failover packets are
1824 * returned to the active link for delivery upwards.
1825 * Owner node is locked.
1826 */
1827static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
1828 struct sk_buff **buf)
1829{
1830 struct sk_buff *t_buf = *buf;
1831 struct tipc_link *l_ptr;
1832 struct tipc_msg *t_msg = buf_msg(t_buf);
1833 u32 bearer_id = msg_bearer_id(t_msg);
1834 1782
1835 *buf = NULL; 1783 /* First failover packet? */
1784 if (link->exp_msg_count == START_CHANGEOVER)
1785 link->exp_msg_count = msg_msgcnt(msg);
1836 1786
1837 if (bearer_id >= MAX_BEARERS) 1787 /* Should we expect an inner packet? */
1788 if (!link->exp_msg_count)
1838 goto exit; 1789 goto exit;
1839 1790
1840 l_ptr = n_ptr->links[bearer_id]; 1791 if (!tipc_msg_extract(*skb, &iskb, &pos)) {
1841 if (!l_ptr) 1792 pr_warn("%sno inner failover pkt\n", link_co_err);
1793 *skb = NULL;
1842 goto exit; 1794 goto exit;
1795 }
1796 link->exp_msg_count--;
1797 *skb = NULL;
1843 1798
1844 if (msg_type(t_msg) == DUPLICATE_MSG) 1799 /* Was packet already delivered? */
1845 tipc_link_dup_rcv(l_ptr, t_buf); 1800 if (less(buf_seqno(iskb), link->reset_checkpoint)) {
1846 else if (msg_type(t_msg) == ORIGINAL_MSG) 1801 kfree_skb(iskb);
1847 *buf = tipc_link_failover_rcv(l_ptr, t_buf); 1802 iskb = NULL;
1848 else 1803 goto exit;
1849 pr_warn("%sunknown tunnel pkt received\n", link_co_err); 1804 }
1805 if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) {
1806 link->stats.recv_fragments++;
1807 tipc_buf_append(&link->reasm_buf, &iskb);
1808 }
1850exit: 1809exit:
1851 kfree_skb(t_buf); 1810 if (link && (!link->exp_msg_count) && (link->flags & LINK_STOPPED))
1852 return *buf != NULL; 1811 tipc_link_delete(link);
1812 kfree_skb(*skb);
1813 *skb = iskb;
1814 return *skb;
1853} 1815}
1854 1816
1855static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol) 1817static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)