aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-07-16 16:54:24 -0400
committerDavid S. Miller <davem@davemloft.net>2015-07-20 23:41:15 -0400
commitaf9b028e270fda6fb812d70d17d902297df1ceb5 (patch)
tree1a204c6d10d597d5db18908dc2066e980a78120d /net
parent22d85c79428b8ca9a01623aa3e3a1fe29a30a119 (diff)
tipc: make media xmit call outside node spinlock context
Currently, message sending is performed through a deep call chain, where the node spinlock is grabbed and held during a significant part of the transmission time. This is clearly detrimental to overall throughput performance; it would be better if we could send the message after the spinlock has been released. In this commit, we do instead let the call revert on the stack after the buffer chain has been added to the transmission queue, whereafter clones of the buffers are transmitted to the device layer outside the spinlock scope. As a further step in our effort to separate the roles of the node and link entities we also move the function tipc_link_xmit() to node.c, and rename it to tipc_node_xmit(). Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net')
-rw-r--r--net/tipc/bearer.c26
-rw-r--r--net/tipc/bearer.h3
-rw-r--r--net/tipc/link.c132
-rw-r--r--net/tipc/link.h6
-rw-r--r--net/tipc/name_distr.c4
-rw-r--r--net/tipc/node.c78
-rw-r--r--net/tipc/node.h4
-rw-r--r--net/tipc/socket.c22
8 files changed, 198 insertions, 77 deletions
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 00bc0e620532..eae58a6b121c 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
470 rcu_read_unlock(); 470 rcu_read_unlock();
471} 471}
472 472
473/* tipc_bearer_xmit() -send buffer to destination over bearer
474 */
475void tipc_bearer_xmit(struct net *net, u32 bearer_id,
476 struct sk_buff_head *xmitq,
477 struct tipc_media_addr *dst)
478{
479 struct tipc_net *tn = net_generic(net, tipc_net_id);
480 struct tipc_bearer *b;
481 struct sk_buff *skb, *tmp;
482
483 if (skb_queue_empty(xmitq))
484 return;
485
486 rcu_read_lock();
487 b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
488 if (likely(b)) {
489 skb_queue_walk_safe(xmitq, skb, tmp) {
490 __skb_dequeue(xmitq);
491 b->media->send_msg(net, skb, b, dst);
492 /* Until we remove cloning in tipc_l2_send_msg(): */
493 kfree_skb(skb);
494 }
495 }
496 rcu_read_unlock();
497}
498
473/** 499/**
474 * tipc_l2_rcv_msg - handle incoming TIPC message from an interface 500 * tipc_l2_rcv_msg - handle incoming TIPC message from an interface
475 * @buf: the received packet 501 * @buf: the received packet
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index dc714d977768..6426f242f626 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void);
217void tipc_bearer_stop(struct net *net); 217void tipc_bearer_stop(struct net *net);
218void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, 218void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
219 struct tipc_media_addr *dest); 219 struct tipc_media_addr *dest);
220void tipc_bearer_xmit(struct net *net, u32 bearer_id,
221 struct sk_buff_head *xmitq,
222 struct tipc_media_addr *dst);
220 223
221#endif /* _TIPC_BEARER_H */ 224#endif /* _TIPC_BEARER_H */
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ea32679b6737..c052437a7cfa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -353,7 +353,6 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
353 /* This really cannot happen... */ 353 /* This really cannot happen... */
354 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { 354 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
355 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); 355 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
356 tipc_link_reset(link);
357 return -ENOBUFS; 356 return -ENOBUFS;
358 } 357 }
359 /* Non-blocking sender: */ 358 /* Non-blocking sender: */
@@ -701,6 +700,78 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
701 return 0; 700 return 0;
702} 701}
703 702
703/**
704 * tipc_link_xmit(): enqueue buffer list according to queue situation
705 * @link: link to use
706 * @list: chain of buffers containing message
707 * @xmitq: returned list of packets to be sent by caller
708 *
709 * Consumes the buffer chain, except when returning -ELINKCONG,
710 * since the caller then may want to make more send attempts.
711 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
712 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
713 */
714int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
715 struct sk_buff_head *xmitq)
716{
717 struct tipc_msg *hdr = buf_msg(skb_peek(list));
718 unsigned int maxwin = l->window;
719 unsigned int i, imp = msg_importance(hdr);
720 unsigned int mtu = l->mtu;
721 u16 ack = l->rcv_nxt - 1;
722 u16 seqno = l->snd_nxt;
723 u16 bc_last_in = l->owner->bclink.last_in;
724 struct sk_buff_head *transmq = &l->transmq;
725 struct sk_buff_head *backlogq = &l->backlogq;
726 struct sk_buff *skb, *_skb, *bskb;
727
728 /* Match msg importance against this and all higher backlog limits: */
729 for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
730 if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
731 return link_schedule_user(l, list);
732 }
733 if (unlikely(msg_size(hdr) > mtu))
734 return -EMSGSIZE;
735
736 /* Prepare each packet for sending, and add to relevant queue: */
737 while (skb_queue_len(list)) {
738 skb = skb_peek(list);
739 hdr = buf_msg(skb);
740 msg_set_seqno(hdr, seqno);
741 msg_set_ack(hdr, ack);
742 msg_set_bcast_ack(hdr, bc_last_in);
743
744 if (likely(skb_queue_len(transmq) < maxwin)) {
745 _skb = skb_clone(skb, GFP_ATOMIC);
746 if (!_skb)
747 return -ENOBUFS;
748 __skb_dequeue(list);
749 __skb_queue_tail(transmq, skb);
750 __skb_queue_tail(xmitq, _skb);
751 l->rcv_unacked = 0;
752 seqno++;
753 continue;
754 }
755 if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
756 kfree_skb(__skb_dequeue(list));
757 l->stats.sent_bundled++;
758 continue;
759 }
760 if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
761 kfree_skb(__skb_dequeue(list));
762 __skb_queue_tail(backlogq, bskb);
763 l->backlog[msg_importance(buf_msg(bskb))].len++;
764 l->stats.sent_bundled++;
765 l->stats.sent_bundles++;
766 continue;
767 }
768 l->backlog[imp].len += skb_queue_len(list);
769 skb_queue_splice_tail_init(list, backlogq);
770 }
771 l->snd_nxt = seqno;
772 return 0;
773}
774
704static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) 775static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
705{ 776{
706 skb_queue_head_init(list); 777 skb_queue_head_init(list);
@@ -715,65 +786,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
715 return __tipc_link_xmit(link->owner->net, link, &head); 786 return __tipc_link_xmit(link->owner->net, link, &head);
716} 787}
717 788
718/* tipc_link_xmit_skb(): send single buffer to destination
719 * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
720 * messages, which will not cause link congestion
721 * The only exception is datagram messages rerouted after secondary
722 * lookup, which are rare and safe to dispose of anyway.
723 * TODO: Return real return value, and let callers use
724 * tipc_wait_for_sendpkt() where applicable
725 */
726int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
727 u32 selector)
728{
729 struct sk_buff_head head;
730 int rc;
731
732 skb2list(skb, &head);
733 rc = tipc_link_xmit(net, &head, dnode, selector);
734 if (rc)
735 kfree_skb(skb);
736 return 0;
737}
738
739/**
740 * tipc_link_xmit() is the general link level function for message sending
741 * @net: the applicable net namespace
742 * @list: chain of buffers containing message
743 * @dsz: amount of user data to be sent
744 * @dnode: address of destination node
745 * @selector: a number used for deterministic link selection
746 * Consumes the buffer chain, except when returning error
747 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
748 */
749int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
750 u32 selector)
751{
752 struct tipc_link *link = NULL;
753 struct tipc_node *node;
754 int rc = -EHOSTUNREACH;
755
756 node = tipc_node_find(net, dnode);
757 if (node) {
758 tipc_node_lock(node);
759 link = node_active_link(node, selector & 1);
760 if (link)
761 rc = __tipc_link_xmit(net, link, list);
762 tipc_node_unlock(node);
763 tipc_node_put(node);
764 }
765 if (link)
766 return rc;
767
768 if (likely(in_own_node(net, dnode))) {
769 tipc_sk_rcv(net, list);
770 return 0;
771 }
772
773 __skb_queue_purge(list);
774 return rc;
775}
776
777/* 789/*
778 * tipc_link_sync_xmit - synchronize broadcast link endpoints. 790 * tipc_link_sync_xmit - synchronize broadcast link endpoints.
779 * 791 *
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 9c71d9e42e93..7add2b90361d 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -223,12 +223,10 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr);
223void tipc_link_purge_backlog(struct tipc_link *l); 223void tipc_link_purge_backlog(struct tipc_link *l);
224void tipc_link_reset_all(struct tipc_node *node); 224void tipc_link_reset_all(struct tipc_node *node);
225void tipc_link_reset(struct tipc_link *l_ptr); 225void tipc_link_reset(struct tipc_link *l_ptr);
226int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
227 u32 selector);
228int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,
229 u32 selector);
230int __tipc_link_xmit(struct net *net, struct tipc_link *link, 226int __tipc_link_xmit(struct net *net, struct tipc_link *link,
231 struct sk_buff_head *list); 227 struct sk_buff_head *list);
228int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
229 struct sk_buff_head *xmitq);
232void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, 230void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
233 u32 gap, u32 tolerance, u32 priority); 231 u32 gap, u32 tolerance, u32 priority);
234void tipc_link_push_packets(struct tipc_link *l_ptr); 232void tipc_link_push_packets(struct tipc_link *l_ptr);
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 3a1539e96294..e6018b7eb197 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -102,7 +102,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)
102 if (!oskb) 102 if (!oskb)
103 break; 103 break;
104 msg_set_destnode(buf_msg(oskb), dnode); 104 msg_set_destnode(buf_msg(oskb), dnode);
105 tipc_link_xmit_skb(net, oskb, dnode, dnode); 105 tipc_node_xmit_skb(net, oskb, dnode, dnode);
106 } 106 }
107 rcu_read_unlock(); 107 rcu_read_unlock();
108 108
@@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
223 &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]); 223 &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);
224 rcu_read_unlock(); 224 rcu_read_unlock();
225 225
226 tipc_link_xmit(net, &head, dnode, dnode); 226 tipc_node_xmit(net, &head, dnode, dnode);
227} 227}
228 228
229static void tipc_publ_subscribe(struct net *net, struct publication *publ, 229static void tipc_publ_subscribe(struct net *net, struct publication *publ,
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 19729645d494..ad759bb034e7 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -563,6 +563,84 @@ msg_full:
563 return -EMSGSIZE; 563 return -EMSGSIZE;
564} 564}
565 565
566static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
567 int *bearer_id,
568 struct tipc_media_addr **maddr)
569{
570 int id = n->active_links[sel & 1];
571
572 if (unlikely(id < 0))
573 return NULL;
574
575 *bearer_id = id;
576 *maddr = &n->links[id].maddr;
577 return n->links[id].link;
578}
579
580/**
581 * tipc_node_xmit() is the general link level function for message sending
582 * @net: the applicable net namespace
583 * @list: chain of buffers containing message
584 * @dnode: address of destination node
585 * @selector: a number used for deterministic link selection
586 * Consumes the buffer chain, except when returning -ELINKCONG
587 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
588 */
589int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
590 u32 dnode, int selector)
591{
592 struct tipc_link *l = NULL;
593 struct tipc_node *n;
594 struct sk_buff_head xmitq;
595 struct tipc_media_addr *maddr;
596 int bearer_id;
597 int rc = -EHOSTUNREACH;
598
599 __skb_queue_head_init(&xmitq);
600 n = tipc_node_find(net, dnode);
601 if (likely(n)) {
602 tipc_node_lock(n);
603 l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
604 if (likely(l))
605 rc = tipc_link_xmit(l, list, &xmitq);
606 if (unlikely(rc == -ENOBUFS))
607 tipc_link_reset(l);
608 tipc_node_unlock(n);
609 tipc_node_put(n);
610 }
611 if (likely(!rc)) {
612 tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
613 return 0;
614 }
615 if (likely(in_own_node(net, dnode))) {
616 tipc_sk_rcv(net, list);
617 return 0;
618 }
619 return rc;
620}
621
622/* tipc_node_xmit_skb(): send single buffer to destination
623 * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
624 * messages, which will not be rejected
625 * The only exception is datagram messages rerouted after secondary
626 * lookup, which are rare and safe to dispose of anyway.
627 * TODO: Return real return value, and let callers use
628 * tipc_wait_for_sendpkt() where applicable
629 */
630int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
631 u32 selector)
632{
633 struct sk_buff_head head;
634 int rc;
635
636 skb_queue_head_init(&head);
637 __skb_queue_tail(&head, skb);
638 rc = tipc_node_xmit(net, &head, dnode, selector);
639 if (rc == -ELINKCONG)
640 kfree_skb(skb);
641 return 0;
642}
643
566int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb) 644int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
567{ 645{
568 int err; 646 int err;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 74f278adada3..86b7c740cf84 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -160,6 +160,10 @@ bool tipc_node_is_up(struct tipc_node *n);
160int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node, 160int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
161 char *linkname, size_t len); 161 char *linkname, size_t len);
162void tipc_node_unlock(struct tipc_node *node); 162void tipc_node_unlock(struct tipc_node *node);
163int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
164 int selector);
165int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
166 u32 selector);
163int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port); 167int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
164void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); 168void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
165 169
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 87fef25f6519..5b0b08d58fcc 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -261,7 +261,7 @@ static void tsk_rej_rx_queue(struct sock *sk)
261 261
262 while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { 262 while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
263 if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT)) 263 if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
264 tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0); 264 tipc_node_xmit_skb(sock_net(sk), skb, dnode, 0);
265 } 265 }
266} 266}
267 267
@@ -443,7 +443,7 @@ static int tipc_release(struct socket *sock)
443 } 443 }
444 if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, 444 if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
445 TIPC_ERR_NO_PORT)) 445 TIPC_ERR_NO_PORT))
446 tipc_link_xmit_skb(net, skb, dnode, 0); 446 tipc_node_xmit_skb(net, skb, dnode, 0);
447 } 447 }
448 } 448 }
449 449
@@ -456,7 +456,7 @@ static int tipc_release(struct socket *sock)
456 tsk_own_node(tsk), tsk_peer_port(tsk), 456 tsk_own_node(tsk), tsk_peer_port(tsk),
457 tsk->portid, TIPC_ERR_NO_PORT); 457 tsk->portid, TIPC_ERR_NO_PORT);
458 if (skb) 458 if (skb)
459 tipc_link_xmit_skb(net, skb, dnode, tsk->portid); 459 tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
460 tipc_node_remove_conn(net, dnode, tsk->portid); 460 tipc_node_remove_conn(net, dnode, tsk->portid);
461 } 461 }
462 462
@@ -925,7 +925,7 @@ new_mtu:
925 do { 925 do {
926 skb = skb_peek(pktchain); 926 skb = skb_peek(pktchain);
927 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; 927 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
928 rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid); 928 rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid);
929 if (likely(!rc)) { 929 if (likely(!rc)) {
930 if (sock->state != SS_READY) 930 if (sock->state != SS_READY)
931 sock->state = SS_CONNECTING; 931 sock->state = SS_CONNECTING;
@@ -1045,7 +1045,7 @@ next:
1045 return rc; 1045 return rc;
1046 do { 1046 do {
1047 if (likely(!tsk_conn_cong(tsk))) { 1047 if (likely(!tsk_conn_cong(tsk))) {
1048 rc = tipc_link_xmit(net, pktchain, dnode, portid); 1048 rc = tipc_node_xmit(net, pktchain, dnode, portid);
1049 if (likely(!rc)) { 1049 if (likely(!rc)) {
1050 tsk->sent_unacked++; 1050 tsk->sent_unacked++;
1051 sent += send; 1051 sent += send;
@@ -1224,7 +1224,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1224 return; 1224 return;
1225 msg = buf_msg(skb); 1225 msg = buf_msg(skb);
1226 msg_set_msgcnt(msg, ack); 1226 msg_set_msgcnt(msg, ack);
1227 tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg)); 1227 tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1228} 1228}
1229 1229
1230static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) 1230static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1703,7 +1703,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1703 return 0; 1703 return 0;
1704 } 1704 }
1705 if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err)) 1705 if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
1706 tipc_link_xmit_skb(net, skb, dnode, tsk->portid); 1706 tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
1707 return 0; 1707 return 0;
1708} 1708}
1709 1709
@@ -1799,7 +1799,7 @@ int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1799 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) 1799 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
1800 continue; 1800 continue;
1801xmit: 1801xmit:
1802 tipc_link_xmit_skb(net, skb, dnode, dport); 1802 tipc_node_xmit_skb(net, skb, dnode, dport);
1803 } 1803 }
1804 return err ? -EHOSTUNREACH : 0; 1804 return err ? -EHOSTUNREACH : 0;
1805} 1805}
@@ -2092,7 +2092,7 @@ restart:
2092 } 2092 }
2093 if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, 2093 if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
2094 TIPC_CONN_SHUTDOWN)) 2094 TIPC_CONN_SHUTDOWN))
2095 tipc_link_xmit_skb(net, skb, dnode, 2095 tipc_node_xmit_skb(net, skb, dnode,
2096 tsk->portid); 2096 tsk->portid);
2097 } else { 2097 } else {
2098 dnode = tsk_peer_node(tsk); 2098 dnode = tsk_peer_node(tsk);
@@ -2102,7 +2102,7 @@ restart:
2102 0, dnode, tsk_own_node(tsk), 2102 0, dnode, tsk_own_node(tsk),
2103 tsk_peer_port(tsk), 2103 tsk_peer_port(tsk),
2104 tsk->portid, TIPC_CONN_SHUTDOWN); 2104 tsk->portid, TIPC_CONN_SHUTDOWN);
2105 tipc_link_xmit_skb(net, skb, dnode, tsk->portid); 2105 tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
2106 } 2106 }
2107 tsk->connected = 0; 2107 tsk->connected = 0;
2108 sock->state = SS_DISCONNECTING; 2108 sock->state = SS_DISCONNECTING;
@@ -2164,7 +2164,7 @@ static void tipc_sk_timeout(unsigned long data)
2164 } 2164 }
2165 bh_unlock_sock(sk); 2165 bh_unlock_sock(sk);
2166 if (skb) 2166 if (skb)
2167 tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid); 2167 tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2168exit: 2168exit:
2169 sock_put(sk); 2169 sock_put(sk);
2170} 2170}