aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-02-05 08:36:41 -0500
committerDavid S. Miller <davem@davemloft.net>2015-02-05 19:00:02 -0500
commitc637c1035534867b85b78b453c38c495b58e2c5a (patch)
tree77cd2a48a5b04e43b014da64168a6c1e209a1d40 /net/tipc/socket.c
parent94153e36e709e78fc4e1f93dc4e4da785690c7d1 (diff)
tipc: resolve race problem at unicast message reception
TIPC handles message cardinality and sequencing at the link layer, before passing messages upwards to the destination sockets. During the upcall from link to socket no locks are held. It is therefore possible, and we see it happen occasionally, that messages arriving in different threads and delivered in sequence still bypass each other before they reach the destination socket. This must not happen, since it violates the sequentiality guarantee. We solve this by adding a new input buffer queue to the link structure. Arriving messages are added safely to the tail of that queue by the link, while the head of the queue is consumed, also safely, by the receiving socket. Sequentiality is secured per socket by only allowing buffers to be dequeued inside the socket lock. Since there may be multiple simultaneous readers of the queue, we use a 'filter' parameter to reduce the risk that they peek the same buffer from the queue, hence also reducing the risk of contention on the receiving socket locks. This solves the sequentiality problem, and seems to cause no measurable performance degradation. A nice side effect of this change is that lock handling in the functions tipc_rcv() and tipc_bcast_rcv() now becomes uniform, something that will enable future simplifications of those functions. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c132
1 files changed, 85 insertions, 47 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 611a04fb0ddc..c1a4611649ab 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -41,6 +41,7 @@
41#include "node.h" 41#include "node.h"
42#include "link.h" 42#include "link.h"
43#include "config.h" 43#include "config.h"
44#include "name_distr.h"
44#include "socket.h" 45#include "socket.h"
45 46
46#define SS_LISTENING -1 /* socket is listening */ 47#define SS_LISTENING -1 /* socket is listening */
@@ -785,10 +786,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
785 struct sk_buff *b; 786 struct sk_buff *b;
786 uint i, last, dst = 0; 787 uint i, last, dst = 0;
787 u32 scope = TIPC_CLUSTER_SCOPE; 788 u32 scope = TIPC_CLUSTER_SCOPE;
789 struct sk_buff_head msgs;
788 790
789 if (in_own_node(net, msg_orignode(msg))) 791 if (in_own_node(net, msg_orignode(msg)))
790 scope = TIPC_NODE_SCOPE; 792 scope = TIPC_NODE_SCOPE;
791 793
794 if (unlikely(!msg_mcast(msg))) {
795 pr_warn("Received non-multicast msg in multicast\n");
796 kfree_skb(buf);
797 goto exit;
798 }
792 /* Create destination port list: */ 799 /* Create destination port list: */
793 tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), 800 tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
794 msg_nameupper(msg), scope, &dports); 801 msg_nameupper(msg), scope, &dports);
@@ -806,9 +813,12 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
806 continue; 813 continue;
807 } 814 }
808 msg_set_destport(msg, item->ports[i]); 815 msg_set_destport(msg, item->ports[i]);
809 tipc_sk_rcv(net, b); 816 skb_queue_head_init(&msgs);
817 skb_queue_tail(&msgs, b);
818 tipc_sk_rcv(net, &msgs);
810 } 819 }
811 } 820 }
821exit:
812 tipc_port_list_free(&dports); 822 tipc_port_list_free(&dports);
813} 823}
814 824
@@ -1760,71 +1770,99 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1760} 1770}
1761 1771
1762/** 1772/**
1763 * tipc_sk_enqueue_skb - enqueue buffer to socket or backlog queue 1773 * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1764 * @sk: socket 1774 * inputq and try adding them to socket or backlog queue
1765 * @skb: pointer to message. Set to NULL if buffer is consumed. 1775 * @inputq: list of incoming buffers with potentially different destinations
1766 * @dnode: if buffer should be forwarded/returned, send to this node 1776 * @sk: socket where the buffers should be enqueued
1777 * @dport: port number for the socket
1778 * @_skb: returned buffer to be forwarded or rejected, if applicable
1767 * 1779 *
1768 * Caller must hold socket lock 1780 * Caller must hold socket lock
1769 * 1781 *
1770 * Returns TIPC_OK (0) or -tipc error code 1782 * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
1783 * or -TIPC_ERR_NO_PORT
1771 */ 1784 */
1772static int tipc_sk_enqueue_skb(struct sock *sk, struct sk_buff **skb) 1785static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1786 u32 dport, struct sk_buff **_skb)
1773{ 1787{
1774 unsigned int lim; 1788 unsigned int lim;
1775 atomic_t *dcnt; 1789 atomic_t *dcnt;
1776 1790 int err;
1777 if (unlikely(!*skb)) 1791 struct sk_buff *skb;
1778 return TIPC_OK; 1792 unsigned long time_limit = jiffies + 2;
1779 if (!sock_owned_by_user(sk)) 1793
1780 return filter_rcv(sk, skb); 1794 while (skb_queue_len(inputq)) {
1781 dcnt = &tipc_sk(sk)->dupl_rcvcnt; 1795 skb = tipc_skb_dequeue(inputq, dport);
1782 if (sk->sk_backlog.len) 1796 if (unlikely(!skb))
1783 atomic_set(dcnt, 0); 1797 return TIPC_OK;
1784 lim = rcvbuf_limit(sk, *skb) + atomic_read(dcnt); 1798 /* Return if softirq window exhausted */
1785 if (unlikely(sk_add_backlog(sk, *skb, lim))) 1799 if (unlikely(time_after_eq(jiffies, time_limit)))
1800 return TIPC_OK;
1801 if (!sock_owned_by_user(sk)) {
1802 err = filter_rcv(sk, &skb);
1803 if (likely(!skb))
1804 continue;
1805 *_skb = skb;
1806 return err;
1807 }
1808 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1809 if (sk->sk_backlog.len)
1810 atomic_set(dcnt, 0);
1811 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1812 if (likely(!sk_add_backlog(sk, skb, lim)))
1813 continue;
1814 *_skb = skb;
1786 return -TIPC_ERR_OVERLOAD; 1815 return -TIPC_ERR_OVERLOAD;
1787 *skb = NULL; 1816 }
1788 return TIPC_OK; 1817 return TIPC_OK;
1789} 1818}
1790 1819
1791/** 1820/**
1792 * tipc_sk_rcv - handle incoming message 1821 * tipc_sk_rcv - handle a chain of incoming buffers
1793 * @skb: buffer containing arriving message 1822 * @inputq: buffer list containing the buffers
1794 * Consumes buffer 1823 * Consumes all buffers in list until inputq is empty
1795 * Returns 0 if success, or errno: -EHOSTUNREACH 1824 * Note: may be called in multiple threads referring to the same queue
1825 * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
1826 * Only node local calls check the return value, sending single-buffer queues
1796 */ 1827 */
1797int tipc_sk_rcv(struct net *net, struct sk_buff *skb) 1828int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1798{ 1829{
1830 u32 dnode, dport = 0;
1831 int err = -TIPC_ERR_NO_PORT;
1832 struct sk_buff *skb;
1799 struct tipc_sock *tsk; 1833 struct tipc_sock *tsk;
1800 struct tipc_net *tn; 1834 struct tipc_net *tn;
1801 struct sock *sk; 1835 struct sock *sk;
1802 u32 dport = msg_destport(buf_msg(skb));
1803 int err = -TIPC_ERR_NO_PORT;
1804 u32 dnode;
1805 1836
1806 /* Find destination */ 1837 while (skb_queue_len(inputq)) {
1807 tsk = tipc_sk_lookup(net, dport); 1838 skb = NULL;
1808 if (likely(tsk)) { 1839 dport = tipc_skb_peek_port(inputq, dport);
1809 sk = &tsk->sk; 1840 tsk = tipc_sk_lookup(net, dport);
1810 spin_lock_bh(&sk->sk_lock.slock); 1841 if (likely(tsk)) {
1811 err = tipc_sk_enqueue_skb(sk, &skb); 1842 sk = &tsk->sk;
1812 spin_unlock_bh(&sk->sk_lock.slock); 1843 if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1813 sock_put(sk); 1844 err = tipc_sk_enqueue(inputq, sk, dport, &skb);
1814 } 1845 spin_unlock_bh(&sk->sk_lock.slock);
1815 if (likely(!skb)) 1846 dport = 0;
1816 return 0; 1847 }
1817 if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) 1848 sock_put(sk);
1818 goto xmit; 1849 } else {
1819 if (!err) { 1850 skb = tipc_skb_dequeue(inputq, dport);
1820 dnode = msg_destnode(buf_msg(skb)); 1851 }
1821 goto xmit; 1852 if (likely(!skb))
1822 } 1853 continue;
1823 tn = net_generic(net, tipc_net_id); 1854 if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
1824 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) 1855 goto xmit;
1825 return -EHOSTUNREACH; 1856 if (!err) {
1857 dnode = msg_destnode(buf_msg(skb));
1858 goto xmit;
1859 }
1860 tn = net_generic(net, tipc_net_id);
1861 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
1862 continue;
1826xmit: 1863xmit:
1827 tipc_link_xmit_skb(net, skb, dnode, dport); 1864 tipc_link_xmit_skb(net, skb, dnode, dport);
1865 }
1828 return err ? -EHOSTUNREACH : 0; 1866 return err ? -EHOSTUNREACH : 0;
1829} 1867}
1830 1868