aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c132
1 files changed, 85 insertions, 47 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 611a04fb0ddc..c1a4611649ab 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -41,6 +41,7 @@
41#include "node.h" 41#include "node.h"
42#include "link.h" 42#include "link.h"
43#include "config.h" 43#include "config.h"
44#include "name_distr.h"
44#include "socket.h" 45#include "socket.h"
45 46
46#define SS_LISTENING -1 /* socket is listening */ 47#define SS_LISTENING -1 /* socket is listening */
@@ -785,10 +786,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
785 struct sk_buff *b; 786 struct sk_buff *b;
786 uint i, last, dst = 0; 787 uint i, last, dst = 0;
787 u32 scope = TIPC_CLUSTER_SCOPE; 788 u32 scope = TIPC_CLUSTER_SCOPE;
789 struct sk_buff_head msgs;
788 790
789 if (in_own_node(net, msg_orignode(msg))) 791 if (in_own_node(net, msg_orignode(msg)))
790 scope = TIPC_NODE_SCOPE; 792 scope = TIPC_NODE_SCOPE;
791 793
794 if (unlikely(!msg_mcast(msg))) {
795 pr_warn("Received non-multicast msg in multicast\n");
796 kfree_skb(buf);
797 goto exit;
798 }
792 /* Create destination port list: */ 799 /* Create destination port list: */
793 tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), 800 tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
794 msg_nameupper(msg), scope, &dports); 801 msg_nameupper(msg), scope, &dports);
@@ -806,9 +813,12 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
806 continue; 813 continue;
807 } 814 }
808 msg_set_destport(msg, item->ports[i]); 815 msg_set_destport(msg, item->ports[i]);
809 tipc_sk_rcv(net, b); 816 skb_queue_head_init(&msgs);
817 skb_queue_tail(&msgs, b);
818 tipc_sk_rcv(net, &msgs);
810 } 819 }
811 } 820 }
821exit:
812 tipc_port_list_free(&dports); 822 tipc_port_list_free(&dports);
813} 823}
814 824
@@ -1760,71 +1770,99 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1760} 1770}
1761 1771
1762/** 1772/**
1763 * tipc_sk_enqueue_skb - enqueue buffer to socket or backlog queue 1773 * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1764 * @sk: socket 1774 * inputq and try adding them to socket or backlog queue
1765 * @skb: pointer to message. Set to NULL if buffer is consumed. 1775 * @inputq: list of incoming buffers with potentially different destinations
1766 * @dnode: if buffer should be forwarded/returned, send to this node 1776 * @sk: socket where the buffers should be enqueued
1777 * @dport: port number for the socket
1778 * @_skb: returned buffer to be forwarded or rejected, if applicable
1767 * 1779 *
1768 * Caller must hold socket lock 1780 * Caller must hold socket lock
1769 * 1781 *
1770 * Returns TIPC_OK (0) or -tipc error code 1782 * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
1783 * or -TIPC_ERR_NO_PORT
1771 */ 1784 */
1772static int tipc_sk_enqueue_skb(struct sock *sk, struct sk_buff **skb) 1785static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1786 u32 dport, struct sk_buff **_skb)
1773{ 1787{
1774 unsigned int lim; 1788 unsigned int lim;
1775 atomic_t *dcnt; 1789 atomic_t *dcnt;
1776 1790 int err;
1777 if (unlikely(!*skb)) 1791 struct sk_buff *skb;
1778 return TIPC_OK; 1792 unsigned long time_limit = jiffies + 2;
1779 if (!sock_owned_by_user(sk)) 1793
1780 return filter_rcv(sk, skb); 1794 while (skb_queue_len(inputq)) {
1781 dcnt = &tipc_sk(sk)->dupl_rcvcnt; 1795 skb = tipc_skb_dequeue(inputq, dport);
1782 if (sk->sk_backlog.len) 1796 if (unlikely(!skb))
1783 atomic_set(dcnt, 0); 1797 return TIPC_OK;
1784 lim = rcvbuf_limit(sk, *skb) + atomic_read(dcnt); 1798 /* Return if softirq window exhausted */
1785 if (unlikely(sk_add_backlog(sk, *skb, lim))) 1799 if (unlikely(time_after_eq(jiffies, time_limit)))
1800 return TIPC_OK;
1801 if (!sock_owned_by_user(sk)) {
1802 err = filter_rcv(sk, &skb);
1803 if (likely(!skb))
1804 continue;
1805 *_skb = skb;
1806 return err;
1807 }
1808 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1809 if (sk->sk_backlog.len)
1810 atomic_set(dcnt, 0);
1811 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1812 if (likely(!sk_add_backlog(sk, skb, lim)))
1813 continue;
1814 *_skb = skb;
1786 return -TIPC_ERR_OVERLOAD; 1815 return -TIPC_ERR_OVERLOAD;
1787 *skb = NULL; 1816 }
1788 return TIPC_OK; 1817 return TIPC_OK;
1789} 1818}
1790 1819
1791/** 1820/**
1792 * tipc_sk_rcv - handle incoming message 1821 * tipc_sk_rcv - handle a chain of incoming buffers
1793 * @skb: buffer containing arriving message 1822 * @inputq: buffer list containing the buffers
1794 * Consumes buffer 1823 * Consumes all buffers in list until inputq is empty
1795 * Returns 0 if success, or errno: -EHOSTUNREACH 1824 * Note: may be called in multiple threads referring to the same queue
1825 * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
1826 * Only node local calls check the return value, sending single-buffer queues
1796 */ 1827 */
1797int tipc_sk_rcv(struct net *net, struct sk_buff *skb) 1828int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1798{ 1829{
1830 u32 dnode, dport = 0;
1831 int err = -TIPC_ERR_NO_PORT;
1832 struct sk_buff *skb;
1799 struct tipc_sock *tsk; 1833 struct tipc_sock *tsk;
1800 struct tipc_net *tn; 1834 struct tipc_net *tn;
1801 struct sock *sk; 1835 struct sock *sk;
1802 u32 dport = msg_destport(buf_msg(skb));
1803 int err = -TIPC_ERR_NO_PORT;
1804 u32 dnode;
1805 1836
1806 /* Find destination */ 1837 while (skb_queue_len(inputq)) {
1807 tsk = tipc_sk_lookup(net, dport); 1838 skb = NULL;
1808 if (likely(tsk)) { 1839 dport = tipc_skb_peek_port(inputq, dport);
1809 sk = &tsk->sk; 1840 tsk = tipc_sk_lookup(net, dport);
1810 spin_lock_bh(&sk->sk_lock.slock); 1841 if (likely(tsk)) {
1811 err = tipc_sk_enqueue_skb(sk, &skb); 1842 sk = &tsk->sk;
1812 spin_unlock_bh(&sk->sk_lock.slock); 1843 if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1813 sock_put(sk); 1844 err = tipc_sk_enqueue(inputq, sk, dport, &skb);
1814 } 1845 spin_unlock_bh(&sk->sk_lock.slock);
1815 if (likely(!skb)) 1846 dport = 0;
1816 return 0; 1847 }
1817 if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) 1848 sock_put(sk);
1818 goto xmit; 1849 } else {
1819 if (!err) { 1850 skb = tipc_skb_dequeue(inputq, dport);
1820 dnode = msg_destnode(buf_msg(skb)); 1851 }
1821 goto xmit; 1852 if (likely(!skb))
1822 } 1853 continue;
1823 tn = net_generic(net, tipc_net_id); 1854 if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
1824 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) 1855 goto xmit;
1825 return -EHOSTUNREACH; 1856 if (!err) {
1857 dnode = msg_destnode(buf_msg(skb));
1858 goto xmit;
1859 }
1860 tn = net_generic(net, tipc_net_id);
1861 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
1862 continue;
1826xmit: 1863xmit:
1827 tipc_link_xmit_skb(net, skb, dnode, dport); 1864 tipc_link_xmit_skb(net, skb, dnode, dport);
1865 }
1828 return err ? -EHOSTUNREACH : 0; 1866 return err ? -EHOSTUNREACH : 0;
1829} 1867}
1830 1868