aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-02-05 08:36:44 -0500
committerDavid S. Miller <davem@davemloft.net>2015-02-05 19:00:03 -0500
commitcb1b728096f54e7408d60fb571944bed00c5b771 (patch)
tree1b1e50e705f4ddd3b36a43ac9067538db3e8233f
parent3c724acdd5049907555a831f814bfd5927c3350c (diff)
tipc: eliminate race condition at multicast reception
In a previous commit in this series we resolved a race problem during unicast message reception. Here, we resolve the same problem at multicast reception. We apply the same technique: an input queue serializing the delivery of arriving buffers. The main difference is that here we do it in two steps. First, the broadcast link feeds arriving buffers into the tail of an arrival queue, which head is consumed at the socket level, and where destination lookup is performed. Second, if the lookup is successful, the resulting buffer clones are fed into a second queue, the input queue. This queue is consumed at reception in the socket just like in the unicast case. Both queues are protected by the same lock, -the one of the input queue. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/bcast.c57
-rw-r--r--net/tipc/bcast.h3
-rw-r--r--net/tipc/msg.h17
-rw-r--r--net/tipc/name_table.h2
-rw-r--r--net/tipc/node.c11
-rw-r--r--net/tipc/node.h7
-rw-r--r--net/tipc/socket.c72
-rw-r--r--net/tipc/socket.h4
8 files changed, 114 insertions, 59 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3eaa931e2e8c..81b1fef1f5e0 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -79,6 +79,13 @@ static void tipc_bclink_unlock(struct net *net)
79 tipc_link_reset_all(node); 79 tipc_link_reset_all(node);
80} 80}
81 81
82void tipc_bclink_input(struct net *net)
83{
84 struct tipc_net *tn = net_generic(net, tipc_net_id);
85
86 tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
87}
88
82uint tipc_bclink_get_mtu(void) 89uint tipc_bclink_get_mtu(void)
83{ 90{
84 return MAX_PKT_DEFAULT_MCAST; 91 return MAX_PKT_DEFAULT_MCAST;
@@ -356,7 +363,7 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
356 tipc_node_unlock(n_ptr); 363 tipc_node_unlock(n_ptr);
357} 364}
358 365
359/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster 366/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
360 * and to identified node local sockets 367 * and to identified node local sockets
361 * @net: the applicable net namespace 368 * @net: the applicable net namespace
362 * @list: chain of buffers containing message 369 * @list: chain of buffers containing message
@@ -371,6 +378,8 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
371 int rc = 0; 378 int rc = 0;
372 int bc = 0; 379 int bc = 0;
373 struct sk_buff *skb; 380 struct sk_buff *skb;
381 struct sk_buff_head arrvq;
382 struct sk_buff_head inputq;
374 383
375 /* Prepare clone of message for local node */ 384 /* Prepare clone of message for local node */
376 skb = tipc_msg_reassemble(list); 385 skb = tipc_msg_reassemble(list);
@@ -379,7 +388,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
379 return -EHOSTUNREACH; 388 return -EHOSTUNREACH;
380 } 389 }
381 390
382 /* Broadcast to all other nodes */ 391 /* Broadcast to all nodes */
383 if (likely(bclink)) { 392 if (likely(bclink)) {
384 tipc_bclink_lock(net); 393 tipc_bclink_lock(net);
385 if (likely(bclink->bcast_nodes.count)) { 394 if (likely(bclink->bcast_nodes.count)) {
@@ -399,12 +408,15 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
399 if (unlikely(!bc)) 408 if (unlikely(!bc))
400 __skb_queue_purge(list); 409 __skb_queue_purge(list);
401 410
402 /* Deliver message clone */ 411 if (unlikely(rc)) {
403 if (likely(!rc))
404 tipc_sk_mcast_rcv(net, skb);
405 else
406 kfree_skb(skb); 412 kfree_skb(skb);
407 413 return rc;
414 }
415 /* Deliver message clone */
416 __skb_queue_head_init(&arrvq);
417 skb_queue_head_init(&inputq);
418 __skb_queue_tail(&arrvq, skb);
419 tipc_sk_mcast_rcv(net, &arrvq, &inputq);
408 return rc; 420 return rc;
409} 421}
410 422
@@ -449,7 +461,7 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
449 int deferred = 0; 461 int deferred = 0;
450 int pos = 0; 462 int pos = 0;
451 struct sk_buff *iskb; 463 struct sk_buff *iskb;
452 struct sk_buff_head msgs; 464 struct sk_buff_head *arrvq, *inputq;
453 465
454 /* Screen out unwanted broadcast messages */ 466 /* Screen out unwanted broadcast messages */
455 if (msg_mc_netid(msg) != tn->net_id) 467 if (msg_mc_netid(msg) != tn->net_id)
@@ -486,6 +498,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
486 /* Handle in-sequence broadcast message */ 498 /* Handle in-sequence broadcast message */
487 seqno = msg_seqno(msg); 499 seqno = msg_seqno(msg);
488 next_in = mod(node->bclink.last_in + 1); 500 next_in = mod(node->bclink.last_in + 1);
501 arrvq = &tn->bclink->arrvq;
502 inputq = &tn->bclink->inputq;
489 503
490 if (likely(seqno == next_in)) { 504 if (likely(seqno == next_in)) {
491receive: 505receive:
@@ -493,21 +507,26 @@ receive:
493 if (likely(msg_isdata(msg))) { 507 if (likely(msg_isdata(msg))) {
494 tipc_bclink_lock(net); 508 tipc_bclink_lock(net);
495 bclink_accept_pkt(node, seqno); 509 bclink_accept_pkt(node, seqno);
510 spin_lock_bh(&inputq->lock);
511 __skb_queue_tail(arrvq, buf);
512 spin_unlock_bh(&inputq->lock);
513 node->action_flags |= TIPC_BCAST_MSG_EVT;
496 tipc_bclink_unlock(net); 514 tipc_bclink_unlock(net);
497 tipc_node_unlock(node); 515 tipc_node_unlock(node);
498 if (likely(msg_mcast(msg)))
499 tipc_sk_mcast_rcv(net, buf);
500 else
501 kfree_skb(buf);
502 } else if (msg_user(msg) == MSG_BUNDLER) { 516 } else if (msg_user(msg) == MSG_BUNDLER) {
503 tipc_bclink_lock(net); 517 tipc_bclink_lock(net);
504 bclink_accept_pkt(node, seqno); 518 bclink_accept_pkt(node, seqno);
505 bcl->stats.recv_bundles++; 519 bcl->stats.recv_bundles++;
506 bcl->stats.recv_bundled += msg_msgcnt(msg); 520 bcl->stats.recv_bundled += msg_msgcnt(msg);
521 pos = 0;
522 while (tipc_msg_extract(buf, &iskb, &pos)) {
523 spin_lock_bh(&inputq->lock);
524 __skb_queue_tail(arrvq, iskb);
525 spin_unlock_bh(&inputq->lock);
526 }
527 node->action_flags |= TIPC_BCAST_MSG_EVT;
507 tipc_bclink_unlock(net); 528 tipc_bclink_unlock(net);
508 tipc_node_unlock(node); 529 tipc_node_unlock(node);
509 while (tipc_msg_extract(buf, &iskb, &pos))
510 tipc_sk_mcast_rcv(net, iskb);
511 } else if (msg_user(msg) == MSG_FRAGMENTER) { 530 } else if (msg_user(msg) == MSG_FRAGMENTER) {
512 tipc_buf_append(&node->bclink.reasm_buf, &buf); 531 tipc_buf_append(&node->bclink.reasm_buf, &buf);
513 if (unlikely(!buf && !node->bclink.reasm_buf)) 532 if (unlikely(!buf && !node->bclink.reasm_buf))
@@ -523,14 +542,6 @@ receive:
523 } 542 }
524 tipc_bclink_unlock(net); 543 tipc_bclink_unlock(net);
525 tipc_node_unlock(node); 544 tipc_node_unlock(node);
526 } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
527 tipc_bclink_lock(net);
528 bclink_accept_pkt(node, seqno);
529 tipc_bclink_unlock(net);
530 tipc_node_unlock(node);
531 skb_queue_head_init(&msgs);
532 skb_queue_tail(&msgs, buf);
533 tipc_named_rcv(net, &msgs);
534 } else { 545 } else {
535 tipc_bclink_lock(net); 546 tipc_bclink_lock(net);
536 bclink_accept_pkt(node, seqno); 547 bclink_accept_pkt(node, seqno);
@@ -950,6 +961,8 @@ int tipc_bclink_init(struct net *net)
950 skb_queue_head_init(&bcl->wakeupq); 961 skb_queue_head_init(&bcl->wakeupq);
951 bcl->next_out_no = 1; 962 bcl->next_out_no = 1;
952 spin_lock_init(&bclink->node.lock); 963 spin_lock_init(&bclink->node.lock);
964 __skb_queue_head_init(&bclink->arrvq);
965 skb_queue_head_init(&bclink->inputq);
953 bcl->owner = &bclink->node; 966 bcl->owner = &bclink->node;
954 bcl->owner->net = net; 967 bcl->owner->net = net;
955 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 968 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 8f4d4dc38e11..a910c0b9f249 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -97,6 +97,8 @@ struct tipc_bclink {
97 struct tipc_link link; 97 struct tipc_link link;
98 struct tipc_node node; 98 struct tipc_node node;
99 unsigned int flags; 99 unsigned int flags;
100 struct sk_buff_head arrvq;
101 struct sk_buff_head inputq;
100 struct tipc_node_map bcast_nodes; 102 struct tipc_node_map bcast_nodes;
101 struct tipc_node *retransmit_to; 103 struct tipc_node *retransmit_to;
102}; 104};
@@ -134,5 +136,6 @@ uint tipc_bclink_get_mtu(void);
134int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list); 136int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list);
135void tipc_bclink_wakeup_users(struct net *net); 137void tipc_bclink_wakeup_users(struct net *net);
136int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); 138int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
139void tipc_bclink_input(struct net *net);
137 140
138#endif 141#endif
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index ab467261bd9d..9ace47f44a69 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -767,6 +767,23 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode,
767 int *err); 767 int *err);
768struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); 768struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
769 769
770/* tipc_skb_peek(): peek and reserve first buffer in list
771 * @list: list to be peeked in
772 * Returns pointer to first buffer in list, if any
773 */
774static inline struct sk_buff *tipc_skb_peek(struct sk_buff_head *list,
775 spinlock_t *lock)
776{
777 struct sk_buff *skb;
778
779 spin_lock_bh(lock);
780 skb = skb_peek(list);
781 if (skb)
782 skb_get(skb);
783 spin_unlock_bh(lock);
784 return skb;
785}
786
770/* tipc_skb_peek_port(): find a destination port, ignoring all destinations 787/* tipc_skb_peek_port(): find a destination port, ignoring all destinations
771 * up to and including 'filter'. 788 * up to and including 'filter'.
772 * Note: ignoring previously tried destinations minimizes the risk of 789 * Note: ignoring previously tried destinations minimizes the risk of
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 52501fdaafa5..0304ddc6b101 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/name_table.h: Include file for TIPC name table code 2 * net/tipc/name_table.h: Include file for TIPC name table code
3 * 3 *
4 * Copyright (c) 2000-2006, 2014, Ericsson AB 4 * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
5 * Copyright (c) 2004-2005, 2010-2011, Wind River Systems 5 * Copyright (c) 2004-2005, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
diff --git a/net/tipc/node.c b/net/tipc/node.c
index c7fdf3dec92c..52308498f208 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -582,10 +582,10 @@ void tipc_node_unlock(struct tipc_node *node)
582 namedq = node->namedq; 582 namedq = node->namedq;
583 publ_list = &node->publ_list; 583 publ_list = &node->publ_list;
584 584
585 node->action_flags &= ~(TIPC_MSG_EVT | TIPC_NOTIFY_NODE_DOWN | 585 node->action_flags &= ~(TIPC_MSG_EVT |
586 TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP | 586 TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
587 TIPC_NOTIFY_LINK_DOWN | 587 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |
588 TIPC_WAKEUP_BCAST_USERS | 588 TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
589 TIPC_NAMED_MSG_EVT); 589 TIPC_NAMED_MSG_EVT);
590 590
591 spin_unlock_bh(&node->lock); 591 spin_unlock_bh(&node->lock);
@@ -612,6 +612,9 @@ void tipc_node_unlock(struct tipc_node *node)
612 612
613 if (flags & TIPC_NAMED_MSG_EVT) 613 if (flags & TIPC_NAMED_MSG_EVT)
614 tipc_named_rcv(net, namedq); 614 tipc_named_rcv(net, namedq);
615
616 if (flags & TIPC_BCAST_MSG_EVT)
617 tipc_bclink_input(net);
615} 618}
616 619
617/* Caller should hold node lock for the passed node */ 620/* Caller should hold node lock for the passed node */
diff --git a/net/tipc/node.h b/net/tipc/node.h
index c2b0fcf4042b..20ec13f9bede 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/node.h: Include file for TIPC node management routines 2 * net/tipc/node.h: Include file for TIPC node management routines
3 * 3 *
4 * Copyright (c) 2000-2006, 2014, Ericsson AB 4 * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
5 * Copyright (c) 2005, 2010-2014, Wind River Systems 5 * Copyright (c) 2005, 2010-2014, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -63,7 +63,8 @@ enum {
63 TIPC_WAKEUP_BCAST_USERS = (1 << 5), 63 TIPC_WAKEUP_BCAST_USERS = (1 << 5),
64 TIPC_NOTIFY_LINK_UP = (1 << 6), 64 TIPC_NOTIFY_LINK_UP = (1 << 6),
65 TIPC_NOTIFY_LINK_DOWN = (1 << 7), 65 TIPC_NOTIFY_LINK_DOWN = (1 << 7),
66 TIPC_NAMED_MSG_EVT = (1 << 8) 66 TIPC_NAMED_MSG_EVT = (1 << 8),
67 TIPC_BCAST_MSG_EVT = (1 << 9)
67}; 68};
68 69
69/** 70/**
@@ -74,6 +75,7 @@ enum {
74 * @oos_state: state tracker for handling OOS b'cast messages 75 * @oos_state: state tracker for handling OOS b'cast messages
75 * @deferred_queue: deferred queue saved OOS b'cast message received from node 76 * @deferred_queue: deferred queue saved OOS b'cast message received from node
76 * @reasm_buf: broadcast reassembly queue head from node 77 * @reasm_buf: broadcast reassembly queue head from node
78 * @inputq_map: bitmap indicating which inqueues should be kicked
77 * @recv_permitted: true if node is allowed to receive b'cast messages 79 * @recv_permitted: true if node is allowed to receive b'cast messages
78 */ 80 */
79struct tipc_node_bclink { 81struct tipc_node_bclink {
@@ -84,6 +86,7 @@ struct tipc_node_bclink {
84 u32 deferred_size; 86 u32 deferred_size;
85 struct sk_buff_head deferred_queue; 87 struct sk_buff_head deferred_queue;
86 struct sk_buff *reasm_buf; 88 struct sk_buff *reasm_buf;
89 int inputq_map;
87 bool recv_permitted; 90 bool recv_permitted;
88}; 91};
89 92
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 26aec8414ac1..66666805b53c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -776,44 +776,60 @@ new_mtu:
776 return rc; 776 return rc;
777} 777}
778 778
779/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets 779/**
780 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
781 * @arrvq: queue with arriving messages, to be cloned after destination lookup
782 * @inputq: queue with cloned messages, delivered to socket after dest lookup
783 *
784 * Multi-threaded: parallel calls with reference to same queues may occur
780 */ 785 */
781void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *skb) 786void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
787 struct sk_buff_head *inputq)
782{ 788{
783 struct tipc_msg *msg = buf_msg(skb); 789 struct tipc_msg *msg;
784 struct tipc_plist dports; 790 struct tipc_plist dports;
785 struct sk_buff *cskb;
786 u32 portid; 791 u32 portid;
787 u32 scope = TIPC_CLUSTER_SCOPE; 792 u32 scope = TIPC_CLUSTER_SCOPE;
788 struct sk_buff_head msgq; 793 struct sk_buff_head tmpq;
789 uint hsz = skb_headroom(skb) + msg_hdr_sz(msg); 794 uint hsz;
795 struct sk_buff *skb, *_skb;
790 796
791 skb_queue_head_init(&msgq); 797 __skb_queue_head_init(&tmpq);
792 tipc_plist_init(&dports); 798 tipc_plist_init(&dports);
793 799
794 if (in_own_node(net, msg_orignode(msg))) 800 skb = tipc_skb_peek(arrvq, &inputq->lock);
795 scope = TIPC_NODE_SCOPE; 801 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
796 802 msg = buf_msg(skb);
797 if (unlikely(!msg_mcast(msg))) { 803 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
798 pr_warn("Received non-multicast msg in multicast\n"); 804
799 goto exit; 805 if (in_own_node(net, msg_orignode(msg)))
800 } 806 scope = TIPC_NODE_SCOPE;
801 /* Create destination port list: */ 807
802 tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), 808 /* Create destination port list and message clones: */
803 msg_nameupper(msg), scope, &dports); 809 tipc_nametbl_mc_translate(net,
804 portid = tipc_plist_pop(&dports); 810 msg_nametype(msg), msg_namelower(msg),
805 for (; portid; portid = tipc_plist_pop(&dports)) { 811 msg_nameupper(msg), scope, &dports);
806 cskb = __pskb_copy(skb, hsz, GFP_ATOMIC); 812 portid = tipc_plist_pop(&dports);
807 if (!cskb) { 813 for (; portid; portid = tipc_plist_pop(&dports)) {
808 pr_warn("Failed do clone mcast rcv buffer\n"); 814 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
809 continue; 815 if (_skb) {
816 msg_set_destport(buf_msg(_skb), portid);
817 __skb_queue_tail(&tmpq, _skb);
818 continue;
819 }
820 pr_warn("Failed to clone mcast rcv buffer\n");
810 } 821 }
811 msg_set_destport(buf_msg(cskb), portid); 822 /* Append to inputq if not already done by other thread */
812 skb_queue_tail(&msgq, cskb); 823 spin_lock_bh(&inputq->lock);
824 if (skb_peek(arrvq) == skb) {
825 skb_queue_splice_tail_init(&tmpq, inputq);
826 kfree_skb(__skb_dequeue(arrvq));
827 }
828 spin_unlock_bh(&inputq->lock);
829 __skb_queue_purge(&tmpq);
830 kfree_skb(skb);
813 } 831 }
814 tipc_sk_rcv(net, &msgq); 832 tipc_sk_rcv(net, inputq);
815exit:
816 kfree_skb(skb);
817} 833}
818 834
819/** 835/**
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 95b015909ac1..8be0da7df8fc 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -42,7 +42,6 @@
42#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) 42#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
43#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ 43#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
44 SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) 44 SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
45
46int tipc_socket_init(void); 45int tipc_socket_init(void);
47void tipc_socket_stop(void); 46void tipc_socket_stop(void);
48int tipc_sock_create_local(struct net *net, int type, struct socket **res); 47int tipc_sock_create_local(struct net *net, int type, struct socket **res);
@@ -51,7 +50,8 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
51 int flags); 50 int flags);
52int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); 51int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
53struct sk_buff *tipc_sk_socks_show(struct net *net); 52struct sk_buff *tipc_sk_socks_show(struct net *net);
54void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf); 53void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
54 struct sk_buff_head *inputq);
55void tipc_sk_reinit(struct net *net); 55void tipc_sk_reinit(struct net *net);
56int tipc_sk_rht_init(struct net *net); 56int tipc_sk_rht_init(struct net *net);
57void tipc_sk_rht_destroy(struct net *net); 57void tipc_sk_rht_destroy(struct net *net);