aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2016-06-17 06:35:57 -0400
committerDavid S. Miller <davem@davemloft.net>2016-06-18 00:38:10 -0400
commitf1d048f24e66ba85d3dabf3d076cefa5f2b546b0 (patch)
treec58a0cc946334eceba6c0dbafe95b5f30a9cd81f /net/tipc/socket.c
parent695ef16cd0510f3bc963967fd73a360989fe4ebf (diff)
tipc: fix socket timer deadlock
We sometimes observe a 'deadly embrace' type deadlock occurring between mutually connected sockets on the same node. This happens when the one-hour peer supervision timers happen to expire simultaneously in both sockets. The scenario is as follows: CPU 1: CPU 2: -------- -------- tipc_sk_timeout(sk1) tipc_sk_timeout(sk2) lock(sk1.slock) lock(sk2.slock) msg_create(probe) msg_create(probe) unlock(sk1.slock) unlock(sk2.slock) tipc_node_xmit_skb() tipc_node_xmit_skb() tipc_node_xmit() tipc_node_xmit() tipc_sk_rcv(sk2) tipc_sk_rcv(sk1) lock(sk2.slock) lock((sk1.slock) filter_rcv() filter_rcv() tipc_sk_proto_rcv() tipc_sk_proto_rcv() msg_create(probe_rsp) msg_create(probe_rsp) tipc_sk_respond() tipc_sk_respond() tipc_node_xmit_skb() tipc_node_xmit_skb() tipc_node_xmit() tipc_node_xmit() tipc_sk_rcv(sk1) tipc_sk_rcv(sk2) lock((sk1.slock) lock((sk2.slock) ===> DEADLOCK ===> DEADLOCK Further analysis reveals that there are three different locations in the socket code where tipc_sk_respond() is called within the context of the socket lock, with ensuing risk of similar deadlocks. We now solve this by passing a buffer queue along with all upcalls where sk_lock.slock may potentially be held. Response or rejected message buffers are accumulated into this queue instead of being sent out directly, and only sent once we know we are safely outside the slock context. Reported-by: GUNA <gbalasun@gmail.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c54
1 files changed, 42 insertions, 12 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 88bfcd707064..c49b8df438cb 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -796,9 +796,11 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
796 * @tsk: receiving socket 796 * @tsk: receiving socket
797 * @skb: pointer to message buffer. 797 * @skb: pointer to message buffer.
798 */ 798 */
799static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb) 799static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
800 struct sk_buff_head *xmitq)
800{ 801{
801 struct sock *sk = &tsk->sk; 802 struct sock *sk = &tsk->sk;
803 u32 onode = tsk_own_node(tsk);
802 struct tipc_msg *hdr = buf_msg(skb); 804 struct tipc_msg *hdr = buf_msg(skb);
803 int mtyp = msg_type(hdr); 805 int mtyp = msg_type(hdr);
804 bool conn_cong; 806 bool conn_cong;
@@ -811,7 +813,8 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
811 813
812 if (mtyp == CONN_PROBE) { 814 if (mtyp == CONN_PROBE) {
813 msg_set_type(hdr, CONN_PROBE_REPLY); 815 msg_set_type(hdr, CONN_PROBE_REPLY);
814 tipc_sk_respond(sk, skb, TIPC_OK); 816 if (tipc_msg_reverse(onode, &skb, TIPC_OK))
817 __skb_queue_tail(xmitq, skb);
815 return; 818 return;
816 } else if (mtyp == CONN_ACK) { 819 } else if (mtyp == CONN_ACK) {
817 conn_cong = tsk_conn_cong(tsk); 820 conn_cong = tsk_conn_cong(tsk);
@@ -1686,7 +1689,8 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1686 * 1689 *
1687 * Returns true if message was added to socket receive queue, otherwise false 1690 * Returns true if message was added to socket receive queue, otherwise false
1688 */ 1691 */
1689static bool filter_rcv(struct sock *sk, struct sk_buff *skb) 1692static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1693 struct sk_buff_head *xmitq)
1690{ 1694{
1691 struct socket *sock = sk->sk_socket; 1695 struct socket *sock = sk->sk_socket;
1692 struct tipc_sock *tsk = tipc_sk(sk); 1696 struct tipc_sock *tsk = tipc_sk(sk);
@@ -1696,7 +1700,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
1696 int usr = msg_user(hdr); 1700 int usr = msg_user(hdr);
1697 1701
1698 if (unlikely(msg_user(hdr) == CONN_MANAGER)) { 1702 if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
1699 tipc_sk_proto_rcv(tsk, skb); 1703 tipc_sk_proto_rcv(tsk, skb, xmitq);
1700 return false; 1704 return false;
1701 } 1705 }
1702 1706
@@ -1739,7 +1743,8 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
1739 return true; 1743 return true;
1740 1744
1741reject: 1745reject:
1742 tipc_sk_respond(sk, skb, err); 1746 if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
1747 __skb_queue_tail(xmitq, skb);
1743 return false; 1748 return false;
1744} 1749}
1745 1750
@@ -1755,9 +1760,24 @@ reject:
1755static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) 1760static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1756{ 1761{
1757 unsigned int truesize = skb->truesize; 1762 unsigned int truesize = skb->truesize;
1763 struct sk_buff_head xmitq;
1764 u32 dnode, selector;
1758 1765
1759 if (likely(filter_rcv(sk, skb))) 1766 __skb_queue_head_init(&xmitq);
1767
1768 if (likely(filter_rcv(sk, skb, &xmitq))) {
1760 atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt); 1769 atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
1770 return 0;
1771 }
1772
1773 if (skb_queue_empty(&xmitq))
1774 return 0;
1775
1776 /* Send response/rejected message */
1777 skb = __skb_dequeue(&xmitq);
1778 dnode = msg_destnode(buf_msg(skb));
1779 selector = msg_origport(buf_msg(skb));
1780 tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
1761 return 0; 1781 return 0;
1762} 1782}
1763 1783
@@ -1771,12 +1791,13 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1771 * Caller must hold socket lock 1791 * Caller must hold socket lock
1772 */ 1792 */
1773static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, 1793static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1774 u32 dport) 1794 u32 dport, struct sk_buff_head *xmitq)
1775{ 1795{
1796 unsigned long time_limit = jiffies + 2;
1797 struct sk_buff *skb;
1776 unsigned int lim; 1798 unsigned int lim;
1777 atomic_t *dcnt; 1799 atomic_t *dcnt;
1778 struct sk_buff *skb; 1800 u32 onode;
1779 unsigned long time_limit = jiffies + 2;
1780 1801
1781 while (skb_queue_len(inputq)) { 1802 while (skb_queue_len(inputq)) {
1782 if (unlikely(time_after_eq(jiffies, time_limit))) 1803 if (unlikely(time_after_eq(jiffies, time_limit)))
@@ -1788,7 +1809,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1788 1809
1789 /* Add message directly to receive queue if possible */ 1810 /* Add message directly to receive queue if possible */
1790 if (!sock_owned_by_user(sk)) { 1811 if (!sock_owned_by_user(sk)) {
1791 filter_rcv(sk, skb); 1812 filter_rcv(sk, skb, xmitq);
1792 continue; 1813 continue;
1793 } 1814 }
1794 1815
@@ -1801,7 +1822,9 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1801 continue; 1822 continue;
1802 1823
1803 /* Overload => reject message back to sender */ 1824 /* Overload => reject message back to sender */
1804 tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD); 1825 onode = tipc_own_addr(sock_net(sk));
1826 if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
1827 __skb_queue_tail(xmitq, skb);
1805 break; 1828 break;
1806 } 1829 }
1807} 1830}
@@ -1814,12 +1837,14 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1814 */ 1837 */
1815void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) 1838void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1816{ 1839{
1840 struct sk_buff_head xmitq;
1817 u32 dnode, dport = 0; 1841 u32 dnode, dport = 0;
1818 int err; 1842 int err;
1819 struct tipc_sock *tsk; 1843 struct tipc_sock *tsk;
1820 struct sock *sk; 1844 struct sock *sk;
1821 struct sk_buff *skb; 1845 struct sk_buff *skb;
1822 1846
1847 __skb_queue_head_init(&xmitq);
1823 while (skb_queue_len(inputq)) { 1848 while (skb_queue_len(inputq)) {
1824 dport = tipc_skb_peek_port(inputq, dport); 1849 dport = tipc_skb_peek_port(inputq, dport);
1825 tsk = tipc_sk_lookup(net, dport); 1850 tsk = tipc_sk_lookup(net, dport);
@@ -1827,9 +1852,14 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1827 if (likely(tsk)) { 1852 if (likely(tsk)) {
1828 sk = &tsk->sk; 1853 sk = &tsk->sk;
1829 if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { 1854 if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1830 tipc_sk_enqueue(inputq, sk, dport); 1855 tipc_sk_enqueue(inputq, sk, dport, &xmitq);
1831 spin_unlock_bh(&sk->sk_lock.slock); 1856 spin_unlock_bh(&sk->sk_lock.slock);
1832 } 1857 }
1858 /* Send pending response/rejected messages, if any */
1859 while ((skb = __skb_dequeue(&xmitq))) {
1860 dnode = msg_destnode(buf_msg(skb));
1861 tipc_node_xmit_skb(net, skb, dnode, dport);
1862 }
1833 sock_put(sk); 1863 sock_put(sk);
1834 continue; 1864 continue;
1835 } 1865 }