aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/tipc/bearer.c26
-rw-r--r--net/tipc/bearer.h3
-rw-r--r--net/tipc/link.c132
-rw-r--r--net/tipc/link.h6
-rw-r--r--net/tipc/name_distr.c4
-rw-r--r--net/tipc/node.c78
-rw-r--r--net/tipc/node.h4
-rw-r--r--net/tipc/socket.c22
8 files changed, 198 insertions, 77 deletions
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 00bc0e620532..eae58a6b121c 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
470 rcu_read_unlock(); 470 rcu_read_unlock();
471} 471}
472 472
473/* tipc_bearer_xmit() -send buffer to destination over bearer
474 */
475void tipc_bearer_xmit(struct net *net, u32 bearer_id,
476 struct sk_buff_head *xmitq,
477 struct tipc_media_addr *dst)
478{
479 struct tipc_net *tn = net_generic(net, tipc_net_id);
480 struct tipc_bearer *b;
481 struct sk_buff *skb, *tmp;
482
483 if (skb_queue_empty(xmitq))
484 return;
485
486 rcu_read_lock();
487 b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
488 if (likely(b)) {
489 skb_queue_walk_safe(xmitq, skb, tmp) {
490 __skb_dequeue(xmitq);
491 b->media->send_msg(net, skb, b, dst);
492 /* Until we remove cloning in tipc_l2_send_msg(): */
493 kfree_skb(skb);
494 }
495 }
496 rcu_read_unlock();
497}
498
473/** 499/**
474 * tipc_l2_rcv_msg - handle incoming TIPC message from an interface 500 * tipc_l2_rcv_msg - handle incoming TIPC message from an interface
475 * @buf: the received packet 501 * @buf: the received packet
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index dc714d977768..6426f242f626 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void);
217void tipc_bearer_stop(struct net *net); 217void tipc_bearer_stop(struct net *net);
218void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, 218void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
219 struct tipc_media_addr *dest); 219 struct tipc_media_addr *dest);
220void tipc_bearer_xmit(struct net *net, u32 bearer_id,
221 struct sk_buff_head *xmitq,
222 struct tipc_media_addr *dst);
220 223
221#endif /* _TIPC_BEARER_H */ 224#endif /* _TIPC_BEARER_H */
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ea32679b6737..c052437a7cfa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -353,7 +353,6 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
353 /* This really cannot happen... */ 353 /* This really cannot happen... */
354 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { 354 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
355 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); 355 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
356 tipc_link_reset(link);
357 return -ENOBUFS; 356 return -ENOBUFS;
358 } 357 }
359 /* Non-blocking sender: */ 358 /* Non-blocking sender: */
@@ -701,6 +700,78 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
701 return 0; 700 return 0;
702} 701}
703 702
703/**
704 * tipc_link_xmit(): enqueue buffer list according to queue situation
705 * @link: link to use
706 * @list: chain of buffers containing message
707 * @xmitq: returned list of packets to be sent by caller
708 *
709 * Consumes the buffer chain, except when returning -ELINKCONG,
710 * since the caller then may want to make more send attempts.
711 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
712 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
713 */
714int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
715 struct sk_buff_head *xmitq)
716{
717 struct tipc_msg *hdr = buf_msg(skb_peek(list));
718 unsigned int maxwin = l->window;
719 unsigned int i, imp = msg_importance(hdr);
720 unsigned int mtu = l->mtu;
721 u16 ack = l->rcv_nxt - 1;
722 u16 seqno = l->snd_nxt;
723 u16 bc_last_in = l->owner->bclink.last_in;
724 struct sk_buff_head *transmq = &l->transmq;
725 struct sk_buff_head *backlogq = &l->backlogq;
726 struct sk_buff *skb, *_skb, *bskb;
727
728 /* Match msg importance against this and all higher backlog limits: */
729 for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
730 if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
731 return link_schedule_user(l, list);
732 }
733 if (unlikely(msg_size(hdr) > mtu))
734 return -EMSGSIZE;
735
736 /* Prepare each packet for sending, and add to relevant queue: */
737 while (skb_queue_len(list)) {
738 skb = skb_peek(list);
739 hdr = buf_msg(skb);
740 msg_set_seqno(hdr, seqno);
741 msg_set_ack(hdr, ack);
742 msg_set_bcast_ack(hdr, bc_last_in);
743
744 if (likely(skb_queue_len(transmq) < maxwin)) {
745 _skb = skb_clone(skb, GFP_ATOMIC);
746 if (!_skb)
747 return -ENOBUFS;
748 __skb_dequeue(list);
749 __skb_queue_tail(transmq, skb);
750 __skb_queue_tail(xmitq, _skb);
751 l->rcv_unacked = 0;
752 seqno++;
753 continue;
754 }
755 if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
756 kfree_skb(__skb_dequeue(list));
757 l->stats.sent_bundled++;
758 continue;
759 }
760 if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
761 kfree_skb(__skb_dequeue(list));
762 __skb_queue_tail(backlogq, bskb);
763 l->backlog[msg_importance(buf_msg(bskb))].len++;
764 l->stats.sent_bundled++;
765 l->stats.sent_bundles++;
766 continue;
767 }
768 l->backlog[imp].len += skb_queue_len(list);
769 skb_queue_splice_tail_init(list, backlogq);
770 }
771 l->snd_nxt = seqno;
772 return 0;
773}
774
704static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) 775static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
705{ 776{
706 skb_queue_head_init(list); 777 skb_queue_head_init(list);
@@ -715,65 +786,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
715 return __tipc_link_xmit(link->owner->net, link, &head); 786 return __tipc_link_xmit(link->owner->net, link, &head);
716} 787}
717 788
718/* tipc_link_xmit_skb(): send single buffer to destination
719 * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
720 * messages, which will not cause link congestion
721 * The only exception is datagram messages rerouted after secondary
722 * lookup, which are rare and safe to dispose of anyway.
723 * TODO: Return real return value, and let callers use
724 * tipc_wait_for_sendpkt() where applicable
725 */
726int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
727 u32 selector)
728{
729 struct sk_buff_head head;
730 int rc;
731
732 skb2list(skb, &head);
733 rc = tipc_link_xmit(net, &head, dnode, selector);
734 if (rc)
735 kfree_skb(skb);
736 return 0;
737}
738
739/**
740 * tipc_link_xmit() is the general link level function for message sending
741 * @net: the applicable net namespace
742 * @list: chain of buffers containing message
743 * @dsz: amount of user data to be sent
744 * @dnode: address of destination node
745 * @selector: a number used for deterministic link selection
746 * Consumes the buffer chain, except when returning error
747 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
748 */
749int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
750 u32 selector)
751{
752 struct tipc_link *link = NULL;
753 struct tipc_node *node;
754 int rc = -EHOSTUNREACH;
755
756 node = tipc_node_find(net, dnode);
757 if (node) {
758 tipc_node_lock(node);
759 link = node_active_link(node, selector & 1);
760 if (link)
761 rc = __tipc_link_xmit(net, link, list);
762 tipc_node_unlock(node);
763 tipc_node_put(node);
764 }
765 if (link)
766 return rc;
767
768 if (likely(in_own_node(net, dnode))) {
769 tipc_sk_rcv(net, list);
770 return 0;
771 }
772
773 __skb_queue_purge(list);
774 return rc;
775}
776
777/* 789/*
778 * tipc_link_sync_xmit - synchronize broadcast link endpoints. 790 * tipc_link_sync_xmit - synchronize broadcast link endpoints.
779 * 791 *
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 9c71d9e42e93..7add2b90361d 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -223,12 +223,10 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr);
223void tipc_link_purge_backlog(struct tipc_link *l); 223void tipc_link_purge_backlog(struct tipc_link *l);
224void tipc_link_reset_all(struct tipc_node *node); 224void tipc_link_reset_all(struct tipc_node *node);
225void tipc_link_reset(struct tipc_link *l_ptr); 225void tipc_link_reset(struct tipc_link *l_ptr);
226int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
227 u32 selector);
228int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,
229 u32 selector);
230int __tipc_link_xmit(struct net *net, struct tipc_link *link, 226int __tipc_link_xmit(struct net *net, struct tipc_link *link,
231 struct sk_buff_head *list); 227 struct sk_buff_head *list);
228int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
229 struct sk_buff_head *xmitq);
232void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, 230void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
233 u32 gap, u32 tolerance, u32 priority); 231 u32 gap, u32 tolerance, u32 priority);
234void tipc_link_push_packets(struct tipc_link *l_ptr); 232void tipc_link_push_packets(struct tipc_link *l_ptr);
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 3a1539e96294..e6018b7eb197 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -102,7 +102,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)
102 if (!oskb) 102 if (!oskb)
103 break; 103 break;
104 msg_set_destnode(buf_msg(oskb), dnode); 104 msg_set_destnode(buf_msg(oskb), dnode);
105 tipc_link_xmit_skb(net, oskb, dnode, dnode); 105 tipc_node_xmit_skb(net, oskb, dnode, dnode);
106 } 106 }
107 rcu_read_unlock(); 107 rcu_read_unlock();
108 108
@@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
223 &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]); 223 &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);
224 rcu_read_unlock(); 224 rcu_read_unlock();
225 225
226 tipc_link_xmit(net, &head, dnode, dnode); 226 tipc_node_xmit(net, &head, dnode, dnode);
227} 227}
228 228
229static void tipc_publ_subscribe(struct net *net, struct publication *publ, 229static void tipc_publ_subscribe(struct net *net, struct publication *publ,
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 19729645d494..ad759bb034e7 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -563,6 +563,84 @@ msg_full:
563 return -EMSGSIZE; 563 return -EMSGSIZE;
564} 564}
565 565
566static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
567 int *bearer_id,
568 struct tipc_media_addr **maddr)
569{
570 int id = n->active_links[sel & 1];
571
572 if (unlikely(id < 0))
573 return NULL;
574
575 *bearer_id = id;
576 *maddr = &n->links[id].maddr;
577 return n->links[id].link;
578}
579
580/**
581 * tipc_node_xmit() is the general link level function for message sending
582 * @net: the applicable net namespace
583 * @list: chain of buffers containing message
584 * @dnode: address of destination node
585 * @selector: a number used for deterministic link selection
586 * Consumes the buffer chain, except when returning -ELINKCONG
587 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
588 */
589int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
590 u32 dnode, int selector)
591{
592 struct tipc_link *l = NULL;
593 struct tipc_node *n;
594 struct sk_buff_head xmitq;
595 struct tipc_media_addr *maddr;
596 int bearer_id;
597 int rc = -EHOSTUNREACH;
598
599 __skb_queue_head_init(&xmitq);
600 n = tipc_node_find(net, dnode);
601 if (likely(n)) {
602 tipc_node_lock(n);
603 l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
604 if (likely(l))
605 rc = tipc_link_xmit(l, list, &xmitq);
606 if (unlikely(rc == -ENOBUFS))
607 tipc_link_reset(l);
608 tipc_node_unlock(n);
609 tipc_node_put(n);
610 }
611 if (likely(!rc)) {
612 tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
613 return 0;
614 }
615 if (likely(in_own_node(net, dnode))) {
616 tipc_sk_rcv(net, list);
617 return 0;
618 }
619 return rc;
620}
621
622/* tipc_node_xmit_skb(): send single buffer to destination
623 * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
624 * messages, which will not be rejected
625 * The only exception is datagram messages rerouted after secondary
626 * lookup, which are rare and safe to dispose of anyway.
627 * TODO: Return real return value, and let callers use
628 * tipc_wait_for_sendpkt() where applicable
629 */
630int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
631 u32 selector)
632{
633 struct sk_buff_head head;
634 int rc;
635
636 skb_queue_head_init(&head);
637 __skb_queue_tail(&head, skb);
638 rc = tipc_node_xmit(net, &head, dnode, selector);
639 if (rc == -ELINKCONG)
640 kfree_skb(skb);
641 return 0;
642}
643
566int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb) 644int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
567{ 645{
568 int err; 646 int err;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 74f278adada3..86b7c740cf84 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -160,6 +160,10 @@ bool tipc_node_is_up(struct tipc_node *n);
160int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node, 160int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
161 char *linkname, size_t len); 161 char *linkname, size_t len);
162void tipc_node_unlock(struct tipc_node *node); 162void tipc_node_unlock(struct tipc_node *node);
163int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
164 int selector);
165int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
166 u32 selector);
163int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port); 167int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
164void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); 168void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
165 169
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 87fef25f6519..5b0b08d58fcc 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -261,7 +261,7 @@ static void tsk_rej_rx_queue(struct sock *sk)
261 261
262 while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { 262 while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
263 if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT)) 263 if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
264 tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0); 264 tipc_node_xmit_skb(sock_net(sk), skb, dnode, 0);
265 } 265 }
266} 266}
267 267
@@ -443,7 +443,7 @@ static int tipc_release(struct socket *sock)
443 } 443 }
444 if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, 444 if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
445 TIPC_ERR_NO_PORT)) 445 TIPC_ERR_NO_PORT))
446 tipc_link_xmit_skb(net, skb, dnode, 0); 446 tipc_node_xmit_skb(net, skb, dnode, 0);
447 } 447 }
448 } 448 }
449 449
@@ -456,7 +456,7 @@ static int tipc_release(struct socket *sock)
456 tsk_own_node(tsk), tsk_peer_port(tsk), 456 tsk_own_node(tsk), tsk_peer_port(tsk),
457 tsk->portid, TIPC_ERR_NO_PORT); 457 tsk->portid, TIPC_ERR_NO_PORT);
458 if (skb) 458 if (skb)
459 tipc_link_xmit_skb(net, skb, dnode, tsk->portid); 459 tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
460 tipc_node_remove_conn(net, dnode, tsk->portid); 460 tipc_node_remove_conn(net, dnode, tsk->portid);
461 } 461 }
462 462
@@ -925,7 +925,7 @@ new_mtu:
925 do { 925 do {
926 skb = skb_peek(pktchain); 926 skb = skb_peek(pktchain);
927 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; 927 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
928 rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid); 928 rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid);
929 if (likely(!rc)) { 929 if (likely(!rc)) {
930 if (sock->state != SS_READY) 930 if (sock->state != SS_READY)
931 sock->state = SS_CONNECTING; 931 sock->state = SS_CONNECTING;
@@ -1045,7 +1045,7 @@ next:
1045 return rc; 1045 return rc;
1046 do { 1046 do {
1047 if (likely(!tsk_conn_cong(tsk))) { 1047 if (likely(!tsk_conn_cong(tsk))) {
1048 rc = tipc_link_xmit(net, pktchain, dnode, portid); 1048 rc = tipc_node_xmit(net, pktchain, dnode, portid);
1049 if (likely(!rc)) { 1049 if (likely(!rc)) {
1050 tsk->sent_unacked++; 1050 tsk->sent_unacked++;
1051 sent += send; 1051 sent += send;
@@ -1224,7 +1224,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1224 return; 1224 return;
1225 msg = buf_msg(skb); 1225 msg = buf_msg(skb);
1226 msg_set_msgcnt(msg, ack); 1226 msg_set_msgcnt(msg, ack);
1227 tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg)); 1227 tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1228} 1228}
1229 1229
1230static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) 1230static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1703,7 +1703,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1703 return 0; 1703 return 0;
1704 } 1704 }
1705 if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err)) 1705 if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
1706 tipc_link_xmit_skb(net, skb, dnode, tsk->portid); 1706 tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
1707 return 0; 1707 return 0;
1708} 1708}
1709 1709
@@ -1799,7 +1799,7 @@ int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1799 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) 1799 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
1800 continue; 1800 continue;
1801xmit: 1801xmit:
1802 tipc_link_xmit_skb(net, skb, dnode, dport); 1802 tipc_node_xmit_skb(net, skb, dnode, dport);
1803 } 1803 }
1804 return err ? -EHOSTUNREACH : 0; 1804 return err ? -EHOSTUNREACH : 0;
1805} 1805}
@@ -2092,7 +2092,7 @@ restart:
2092 } 2092 }
2093 if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, 2093 if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
2094 TIPC_CONN_SHUTDOWN)) 2094 TIPC_CONN_SHUTDOWN))
2095 tipc_link_xmit_skb(net, skb, dnode, 2095 tipc_node_xmit_skb(net, skb, dnode,
2096 tsk->portid); 2096 tsk->portid);
2097 } else { 2097 } else {
2098 dnode = tsk_peer_node(tsk); 2098 dnode = tsk_peer_node(tsk);
@@ -2102,7 +2102,7 @@ restart:
2102 0, dnode, tsk_own_node(tsk), 2102 0, dnode, tsk_own_node(tsk),
2103 tsk_peer_port(tsk), 2103 tsk_peer_port(tsk),
2104 tsk->portid, TIPC_CONN_SHUTDOWN); 2104 tsk->portid, TIPC_CONN_SHUTDOWN);
2105 tipc_link_xmit_skb(net, skb, dnode, tsk->portid); 2105 tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
2106 } 2106 }
2107 tsk->connected = 0; 2107 tsk->connected = 0;
2108 sock->state = SS_DISCONNECTING; 2108 sock->state = SS_DISCONNECTING;
@@ -2164,7 +2164,7 @@ static void tipc_sk_timeout(unsigned long data)
2164 } 2164 }
2165 bh_unlock_sock(sk); 2165 bh_unlock_sock(sk);
2166 if (skb) 2166 if (skb)
2167 tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid); 2167 tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2168exit: 2168exit:
2169 sock_put(sk); 2169 sock_put(sk);
2170} 2170}