aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-07-22 10:11:20 -0400
committerDavid S. Miller <davem@davemloft.net>2015-07-26 19:31:50 -0400
commitcda3696d3d26eb798c94de0dab5bd66ddb5627cb (patch)
treea76895b3294564437b327e95dd1742daebbd56a9 /net/tipc
parentbcd3ffd4f6d7c994c93be2ab8598fdfb2952a1f1 (diff)
tipc: clean up socket layer message reception
When a message is received in a socket, one of the call chains tipc_sk_rcv()->tipc_sk_enqueue()->filter_rcv()(->tipc_sk_proto_rcv()) or tipc_sk_backlog_rcv()->filter_rcv()(->tipc_sk_proto_rcv()) are followed. At each of these levels we may encounter situations where the message may need to be rejected, or a new message produced for transfer back to the sender. Despite recent improvements, the current code for doing this is perceived as awkward and hard to follow. Leveraging the two previous commits in this series, we now introduce a more uniform handling of such situations. We let each of the functions in the chain itself produce/reverse the message to be returned to the sender, but also perform the actual forwarding. This simplifies the necessary logics within each function. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/msg.c20
-rw-r--r--net/tipc/msg.h3
-rw-r--r--net/tipc/socket.c255
-rw-r--r--net/tipc/socket.h2
4 files changed, 134 insertions, 146 deletions
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index b6cc58ec7346..562c926a51cc 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -520,17 +520,15 @@ exit:
520/** 520/**
521 * tipc_msg_lookup_dest(): try to find new destination for named message 521 * tipc_msg_lookup_dest(): try to find new destination for named message
522 * @skb: the buffer containing the message. 522 * @skb: the buffer containing the message.
523 * @dnode: return value: next-hop node, if destination found 523 * @err: error code to be used by caller if lookup fails
524 * @err: return value: error code to use, if message to be rejected
525 * Does not consume buffer 524 * Does not consume buffer
526 * Returns true if a destination is found, false otherwise 525 * Returns true if a destination is found, false otherwise
527 */ 526 */
528bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, 527bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
529 u32 *dnode, int *err)
530{ 528{
531 struct tipc_msg *msg = buf_msg(skb); 529 struct tipc_msg *msg = buf_msg(skb);
532 u32 dport; 530 u32 dport, dnode;
533 u32 own_addr = tipc_own_addr(net); 531 u32 onode = tipc_own_addr(net);
534 532
535 if (!msg_isdata(msg)) 533 if (!msg_isdata(msg))
536 return false; 534 return false;
@@ -543,15 +541,15 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,
543 return false; 541 return false;
544 if (msg_reroute_cnt(msg)) 542 if (msg_reroute_cnt(msg))
545 return false; 543 return false;
546 *dnode = addr_domain(net, msg_lookup_scope(msg)); 544 dnode = addr_domain(net, msg_lookup_scope(msg));
547 dport = tipc_nametbl_translate(net, msg_nametype(msg), 545 dport = tipc_nametbl_translate(net, msg_nametype(msg),
548 msg_nameinst(msg), dnode); 546 msg_nameinst(msg), &dnode);
549 if (!dport) 547 if (!dport)
550 return false; 548 return false;
551 msg_incr_reroute_cnt(msg); 549 msg_incr_reroute_cnt(msg);
552 if (*dnode != own_addr) 550 if (dnode != onode)
553 msg_set_prevnode(msg, own_addr); 551 msg_set_prevnode(msg, onode);
554 msg_set_destnode(msg, *dnode); 552 msg_set_destnode(msg, dnode);
555 msg_set_destport(msg, dport); 553 msg_set_destport(msg, dport);
556 *err = TIPC_OK; 554 *err = TIPC_OK;
557 return true; 555 return true;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index d0834bc519aa..234fb0531d1d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -798,8 +798,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg,
798bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); 798bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
799int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 799int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
800 int offset, int dsz, int mtu, struct sk_buff_head *list); 800 int offset, int dsz, int mtu, struct sk_buff_head *list);
801bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode, 801bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
802 int *err);
803struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); 802struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
804 803
805static inline u16 buf_seqno(struct sk_buff *skb) 804static inline u16 buf_seqno(struct sk_buff *skb)
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 71d88adadb18..1060d52ff23e 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1520,82 +1520,81 @@ static void tipc_data_ready(struct sock *sk)
1520 * @tsk: TIPC socket 1520 * @tsk: TIPC socket
1521 * @skb: pointer to message buffer. Set to NULL if buffer is consumed 1521 * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1522 * 1522 *
1523 * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise 1523 * Returns true if everything ok, false otherwise
1524 */ 1524 */
1525static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb) 1525static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
1526{ 1526{
1527 struct sock *sk = &tsk->sk; 1527 struct sock *sk = &tsk->sk;
1528 struct net *net = sock_net(sk); 1528 struct net *net = sock_net(sk);
1529 struct socket *sock = sk->sk_socket; 1529 struct socket *sock = sk->sk_socket;
1530 struct tipc_msg *msg = buf_msg(*skb); 1530 struct tipc_msg *hdr = buf_msg(skb);
1531 int retval = -TIPC_ERR_NO_PORT;
1532 1531
1533 if (msg_mcast(msg)) 1532 if (unlikely(msg_mcast(hdr)))
1534 return retval; 1533 return false;
1535 1534
1536 switch ((int)sock->state) { 1535 switch ((int)sock->state) {
1537 case SS_CONNECTED: 1536 case SS_CONNECTED:
1537
1538 /* Accept only connection-based messages sent by peer */ 1538 /* Accept only connection-based messages sent by peer */
1539 if (tsk_peer_msg(tsk, msg)) { 1539 if (unlikely(!tsk_peer_msg(tsk, hdr)))
1540 if (unlikely(msg_errcode(msg))) { 1540 return false;
1541 sock->state = SS_DISCONNECTING; 1541
1542 tsk->connected = 0; 1542 if (unlikely(msg_errcode(hdr))) {
1543 /* let timer expire on it's own */ 1543 sock->state = SS_DISCONNECTING;
1544 tipc_node_remove_conn(net, tsk_peer_node(tsk), 1544 tsk->connected = 0;
1545 tsk->portid); 1545 /* Let timer expire on it's own */
1546 } 1546 tipc_node_remove_conn(net, tsk_peer_node(tsk),
1547 retval = TIPC_OK; 1547 tsk->portid);
1548 } 1548 }
1549 break; 1549 return true;
1550
1550 case SS_CONNECTING: 1551 case SS_CONNECTING:
1551 /* Accept only ACK or NACK message */
1552 1552
1553 if (unlikely(!msg_connected(msg))) 1553 /* Accept only ACK or NACK message */
1554 break; 1554 if (unlikely(!msg_connected(hdr)))
1555 return false;
1555 1556
1556 if (unlikely(msg_errcode(msg))) { 1557 if (unlikely(msg_errcode(hdr))) {
1557 sock->state = SS_DISCONNECTING; 1558 sock->state = SS_DISCONNECTING;
1558 sk->sk_err = ECONNREFUSED; 1559 sk->sk_err = ECONNREFUSED;
1559 retval = TIPC_OK; 1560 return true;
1560 break;
1561 } 1561 }
1562 1562
1563 if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) { 1563 if (unlikely(!msg_isdata(hdr))) {
1564 sock->state = SS_DISCONNECTING; 1564 sock->state = SS_DISCONNECTING;
1565 sk->sk_err = EINVAL; 1565 sk->sk_err = EINVAL;
1566 retval = TIPC_OK; 1566 return true;
1567 break;
1568 } 1567 }
1569 1568
1570 tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg)); 1569 tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
1571 msg_set_importance(&tsk->phdr, msg_importance(msg)); 1570 msg_set_importance(&tsk->phdr, msg_importance(hdr));
1572 sock->state = SS_CONNECTED; 1571 sock->state = SS_CONNECTED;
1573 1572
1574 /* If an incoming message is an 'ACK-', it should be 1573 /* If 'ACK+' message, add to socket receive queue */
1575 * discarded here because it doesn't contain useful 1574 if (msg_data_sz(hdr))
1576 * data. In addition, we should try to wake up 1575 return true;
1577 * connect() routine if sleeping. 1576
1578 */ 1577 /* If empty 'ACK-' message, wake up sleeping connect() */
1579 if (msg_data_sz(msg) == 0) { 1578 if (waitqueue_active(sk_sleep(sk)))
1580 kfree_skb(*skb); 1579 wake_up_interruptible(sk_sleep(sk));
1581 *skb = NULL; 1580
1582 if (waitqueue_active(sk_sleep(sk))) 1581 /* 'ACK-' message is neither accepted nor rejected: */
1583 wake_up_interruptible(sk_sleep(sk)); 1582 msg_set_dest_droppable(hdr, 1);
1584 } 1583 return false;
1585 retval = TIPC_OK; 1584
1586 break;
1587 case SS_LISTENING: 1585 case SS_LISTENING:
1588 case SS_UNCONNECTED: 1586 case SS_UNCONNECTED:
1587
1589 /* Accept only SYN message */ 1588 /* Accept only SYN message */
1590 if (!msg_connected(msg) && !(msg_errcode(msg))) 1589 if (!msg_connected(hdr) && !(msg_errcode(hdr)))
1591 retval = TIPC_OK; 1590 return true;
1592 break; 1591 break;
1593 case SS_DISCONNECTING: 1592 case SS_DISCONNECTING:
1594 break; 1593 break;
1595 default: 1594 default:
1596 pr_err("Unknown socket state %u\n", sock->state); 1595 pr_err("Unknown socket state %u\n", sock->state);
1597 } 1596 }
1598 return retval; 1597 return false;
1599} 1598}
1600 1599
1601/** 1600/**
@@ -1630,61 +1629,70 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1630/** 1629/**
1631 * filter_rcv - validate incoming message 1630 * filter_rcv - validate incoming message
1632 * @sk: socket 1631 * @sk: socket
1633 * @skb: pointer to message. Set to NULL if buffer is consumed. 1632 * @skb: pointer to message.
1634 * 1633 *
1635 * Enqueues message on receive queue if acceptable; optionally handles 1634 * Enqueues message on receive queue if acceptable; optionally handles
1636 * disconnect indication for a connected socket. 1635 * disconnect indication for a connected socket.
1637 * 1636 *
1638 * Called with socket lock already taken 1637 * Called with socket lock already taken
1639 * 1638 *
1640 * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected 1639 * Returns true if message was added to socket receive queue, otherwise false
1641 */ 1640 */
1642static int filter_rcv(struct sock *sk, struct sk_buff **skb) 1641static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
1643{ 1642{
1644 struct socket *sock = sk->sk_socket; 1643 struct socket *sock = sk->sk_socket;
1645 struct tipc_sock *tsk = tipc_sk(sk); 1644 struct tipc_sock *tsk = tipc_sk(sk);
1646 struct tipc_msg *msg = buf_msg(*skb); 1645 struct tipc_msg *hdr = buf_msg(skb);
1647 unsigned int limit = rcvbuf_limit(sk, *skb); 1646 unsigned int limit = rcvbuf_limit(sk, skb);
1648 int rc = TIPC_OK; 1647 int err = TIPC_OK;
1648 int usr = msg_user(hdr);
1649 1649
1650 if (unlikely(msg_user(msg) == CONN_MANAGER)) { 1650 if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
1651 tipc_sk_proto_rcv(tsk, *skb); 1651 tipc_sk_proto_rcv(tsk, skb);
1652 return TIPC_OK; 1652 return false;
1653 } 1653 }
1654 1654
1655 if (unlikely(msg_user(msg) == SOCK_WAKEUP)) { 1655 if (unlikely(usr == SOCK_WAKEUP)) {
1656 kfree_skb(*skb); 1656 kfree_skb(skb);
1657 tsk->link_cong = 0; 1657 tsk->link_cong = 0;
1658 sk->sk_write_space(sk); 1658 sk->sk_write_space(sk);
1659 *skb = NULL; 1659 return false;
1660 return TIPC_OK;
1661 } 1660 }
1662 1661
1663 /* Reject message if it is wrong sort of message for socket */ 1662 /* Drop if illegal message type */
1664 if (msg_type(msg) > TIPC_DIRECT_MSG) 1663 if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
1665 return -TIPC_ERR_NO_PORT; 1664 kfree_skb(skb);
1665 return false;
1666 }
1666 1667
1667 if (sock->state == SS_READY) { 1668 /* Reject if wrong message type for current socket state */
1668 if (msg_connected(msg)) 1669 if (unlikely(sock->state == SS_READY)) {
1669 return -TIPC_ERR_NO_PORT; 1670 if (msg_connected(hdr)) {
1670 } else { 1671 err = TIPC_ERR_NO_PORT;
1671 rc = filter_connect(tsk, skb); 1672 goto reject;
1672 if (rc != TIPC_OK || !*skb) 1673 }
1673 return rc; 1674 } else if (unlikely(!filter_connect(tsk, skb))) {
1675 err = TIPC_ERR_NO_PORT;
1676 goto reject;
1674 } 1677 }
1675 1678
1676 /* Reject message if there isn't room to queue it */ 1679 /* Reject message if there isn't room to queue it */
1677 if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit) 1680 if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
1678 return -TIPC_ERR_OVERLOAD; 1681 err = TIPC_ERR_OVERLOAD;
1682 goto reject;
1683 }
1679 1684
1680 /* Enqueue message */ 1685 /* Enqueue message */
1681 TIPC_SKB_CB(*skb)->handle = NULL; 1686 TIPC_SKB_CB(skb)->handle = NULL;
1682 __skb_queue_tail(&sk->sk_receive_queue, *skb); 1687 __skb_queue_tail(&sk->sk_receive_queue, skb);
1683 skb_set_owner_r(*skb, sk); 1688 skb_set_owner_r(skb, sk);
1684 1689
1685 sk->sk_data_ready(sk); 1690 sk->sk_data_ready(sk);
1686 *skb = NULL; 1691 return true;
1687 return TIPC_OK; 1692
1693reject:
1694 tipc_sk_respond(sk, skb, err);
1695 return false;
1688} 1696}
1689 1697
1690/** 1698/**
@@ -1698,22 +1706,10 @@ static int filter_rcv(struct sock *sk, struct sk_buff **skb)
1698 */ 1706 */
1699static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) 1707static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1700{ 1708{
1701 int err; 1709 unsigned int truesize = skb->truesize;
1702 atomic_t *dcnt;
1703 u32 dnode = msg_prevnode(buf_msg(skb));
1704 struct tipc_sock *tsk = tipc_sk(sk);
1705 struct net *net = sock_net(sk);
1706 uint truesize = skb->truesize;
1707 1710
1708 err = filter_rcv(sk, &skb); 1711 if (likely(filter_rcv(sk, skb)))
1709 if (likely(!skb)) { 1712 atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
1710 dcnt = &tsk->dupl_rcvcnt;
1711 if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1712 atomic_add(truesize, dcnt);
1713 return 0;
1714 }
1715 if (!err || tipc_msg_reverse(tsk_own_node(tsk), &skb, -err))
1716 tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
1717 return 0; 1713 return 0;
1718} 1714}
1719 1715
@@ -1723,45 +1719,43 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1723 * @inputq: list of incoming buffers with potentially different destinations 1719 * @inputq: list of incoming buffers with potentially different destinations
1724 * @sk: socket where the buffers should be enqueued 1720 * @sk: socket where the buffers should be enqueued
1725 * @dport: port number for the socket 1721 * @dport: port number for the socket
1726 * @_skb: returned buffer to be forwarded or rejected, if applicable
1727 * 1722 *
1728 * Caller must hold socket lock 1723 * Caller must hold socket lock
1729 *
1730 * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
1731 * or -TIPC_ERR_NO_PORT
1732 */ 1724 */
1733static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, 1725static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1734 u32 dport, struct sk_buff **_skb) 1726 u32 dport)
1735{ 1727{
1736 unsigned int lim; 1728 unsigned int lim;
1737 atomic_t *dcnt; 1729 atomic_t *dcnt;
1738 int err;
1739 struct sk_buff *skb; 1730 struct sk_buff *skb;
1740 unsigned long time_limit = jiffies + 2; 1731 unsigned long time_limit = jiffies + 2;
1741 1732
1742 while (skb_queue_len(inputq)) { 1733 while (skb_queue_len(inputq)) {
1743 if (unlikely(time_after_eq(jiffies, time_limit))) 1734 if (unlikely(time_after_eq(jiffies, time_limit)))
1744 return TIPC_OK; 1735 return;
1736
1745 skb = tipc_skb_dequeue(inputq, dport); 1737 skb = tipc_skb_dequeue(inputq, dport);
1746 if (unlikely(!skb)) 1738 if (unlikely(!skb))
1747 return TIPC_OK; 1739 return;
1740
1741 /* Add message directly to receive queue if possible */
1748 if (!sock_owned_by_user(sk)) { 1742 if (!sock_owned_by_user(sk)) {
1749 err = filter_rcv(sk, &skb); 1743 filter_rcv(sk, skb);
1750 if (likely(!skb)) 1744 continue;
1751 continue;
1752 *_skb = skb;
1753 return err;
1754 } 1745 }
1746
1747 /* Try backlog, compensating for double-counted bytes */
1755 dcnt = &tipc_sk(sk)->dupl_rcvcnt; 1748 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1756 if (sk->sk_backlog.len) 1749 if (sk->sk_backlog.len)
1757 atomic_set(dcnt, 0); 1750 atomic_set(dcnt, 0);
1758 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); 1751 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1759 if (likely(!sk_add_backlog(sk, skb, lim))) 1752 if (likely(!sk_add_backlog(sk, skb, lim)))
1760 continue; 1753 continue;
1761 *_skb = skb; 1754
1762 return -TIPC_ERR_OVERLOAD; 1755 /* Overload => reject message back to sender */
1756 tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD);
1757 break;
1763 } 1758 }
1764 return TIPC_OK;
1765} 1759}
1766 1760
1767/** 1761/**
@@ -1769,51 +1763,46 @@ static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1769 * @inputq: buffer list containing the buffers 1763 * @inputq: buffer list containing the buffers
1770 * Consumes all buffers in list until inputq is empty 1764 * Consumes all buffers in list until inputq is empty
1771 * Note: may be called in multiple threads referring to the same queue 1765 * Note: may be called in multiple threads referring to the same queue
1772 * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
1773 * Only node local calls check the return value, sending single-buffer queues
1774 */ 1766 */
1775int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) 1767void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1776{ 1768{
1777 u32 dnode, dport = 0; 1769 u32 dnode, dport = 0;
1778 int err; 1770 int err;
1779 struct sk_buff *skb;
1780 struct tipc_sock *tsk; 1771 struct tipc_sock *tsk;
1781 struct tipc_net *tn;
1782 struct sock *sk; 1772 struct sock *sk;
1773 struct sk_buff *skb;
1783 1774
1784 while (skb_queue_len(inputq)) { 1775 while (skb_queue_len(inputq)) {
1785 err = -TIPC_ERR_NO_PORT;
1786 skb = NULL;
1787 dport = tipc_skb_peek_port(inputq, dport); 1776 dport = tipc_skb_peek_port(inputq, dport);
1788 tsk = tipc_sk_lookup(net, dport); 1777 tsk = tipc_sk_lookup(net, dport);
1778
1789 if (likely(tsk)) { 1779 if (likely(tsk)) {
1790 sk = &tsk->sk; 1780 sk = &tsk->sk;
1791 if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { 1781 if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1792 err = tipc_sk_enqueue(inputq, sk, dport, &skb); 1782 tipc_sk_enqueue(inputq, sk, dport);
1793 spin_unlock_bh(&sk->sk_lock.slock); 1783 spin_unlock_bh(&sk->sk_lock.slock);
1794 dport = 0;
1795 } 1784 }
1796 sock_put(sk); 1785 sock_put(sk);
1797 } else {
1798 skb = tipc_skb_dequeue(inputq, dport);
1799 }
1800 if (likely(!skb))
1801 continue; 1786 continue;
1802 if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
1803 goto xmit;
1804 if (!err) {
1805 dnode = msg_destnode(buf_msg(skb));
1806 goto xmit;
1807 } else {
1808 dnode = msg_prevnode(buf_msg(skb));
1809 } 1787 }
1810 tn = net_generic(net, tipc_net_id); 1788
1811 if (!tipc_msg_reverse(tn->own_addr, &skb, -err)) 1789 /* No destination socket => dequeue skb if still there */
1790 skb = tipc_skb_dequeue(inputq, dport);
1791 if (!skb)
1792 return;
1793
1794 /* Try secondary lookup if unresolved named message */
1795 err = TIPC_ERR_NO_PORT;
1796 if (tipc_msg_lookup_dest(net, skb, &err))
1797 goto xmit;
1798
1799 /* Prepare for message rejection */
1800 if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
1812 continue; 1801 continue;
1813xmit: 1802xmit:
1803 dnode = msg_destnode(buf_msg(skb));
1814 tipc_node_xmit_skb(net, skb, dnode, dport); 1804 tipc_node_xmit_skb(net, skb, dnode, dport);
1815 } 1805 }
1816 return err ? -EHOSTUNREACH : 0;
1817} 1806}
1818 1807
1819static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) 1808static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
@@ -2082,7 +2071,10 @@ static int tipc_shutdown(struct socket *sock, int how)
2082 struct net *net = sock_net(sk); 2071 struct net *net = sock_net(sk);
2083 struct tipc_sock *tsk = tipc_sk(sk); 2072 struct tipc_sock *tsk = tipc_sk(sk);
2084 struct sk_buff *skb; 2073 struct sk_buff *skb;
2085 u32 dnode; 2074 u32 dnode = tsk_peer_node(tsk);
2075 u32 dport = tsk_peer_port(tsk);
2076 u32 onode = tipc_own_addr(net);
2077 u32 oport = tsk->portid;
2086 int res; 2078 int res;
2087 2079
2088 if (how != SHUT_RDWR) 2080 if (how != SHUT_RDWR)
@@ -2108,9 +2100,8 @@ restart:
2108 } else { 2100 } else {
2109 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, 2101 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2110 TIPC_CONN_MSG, SHORT_H_SIZE, 2102 TIPC_CONN_MSG, SHORT_H_SIZE,
2111 0, dnode, tsk_own_node(tsk), 2103 0, dnode, onode, dport, oport,
2112 tsk_peer_port(tsk), 2104 TIPC_CONN_SHUTDOWN);
2113 tsk->portid, TIPC_CONN_SHUTDOWN);
2114 tipc_node_xmit_skb(net, skb, dnode, tsk->portid); 2105 tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
2115 } 2106 }
2116 tsk->connected = 0; 2107 tsk->connected = 0;
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index bf6551389522..4241f22069dc 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -44,7 +44,7 @@
44 SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) 44 SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
45int tipc_socket_init(void); 45int tipc_socket_init(void);
46void tipc_socket_stop(void); 46void tipc_socket_stop(void);
47int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); 47void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
48void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, 48void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
49 struct sk_buff_head *inputq); 49 struct sk_buff_head *inputq);
50void tipc_sk_reinit(struct net *net); 50void tipc_sk_reinit(struct net *net);