aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2015-01-07 00:41:58 -0500
committerDavid S. Miller <davem@davemloft.net>2015-01-08 22:47:14 -0500
commit07f6c4bc048a7a8939c68a668bf77474890794c5 (patch)
treea90781d92d4e268479aebe1fb3375ac20cee7f13 /net/tipc
parent545a148e43bed67618cc90b66f9864fba0878890 (diff)
tipc: convert tipc reference table to use generic rhashtable
As tipc reference table is statically allocated, its memory size requested on stack initialization stage is quite big even if the maximum port number is just restricted to 8191 currently, however, the number already becomes insufficient in practice. But if the maximum ports is allowed to its theory value - 2^32, its consumed memory size will reach a ridiculously unacceptable value. Apart from this, heavy tipc users spend a considerable amount of time in tipc_sk_get() due to the read-lock on ref_table_lock. If tipc reference table is converted with generic rhashtable, above mentioned both disadvantages would be resolved respectively: making use of the new resizable hash table can avoid locking on the lookup; smaller memory size is required at initial stage, for example, 256 hash bucket slots are requested at the beginning phase instead of allocating the entire 8191 slots in old mode. The hash table will grow if entries exceeds 75% of table size up to a total table size of 1M, and it will automatically shrink if usage falls below 30%, but the minimum table size is allowed down to 256. Also converts ref_table_lock to a separate mutex to protect hash table mutations on write side. Lastly defers the release of the socket reference using call_rcu() to allow using an RCU read-side protected call to rhashtable_lookup(). Signed-off-by: Ying Xue <ying.xue@windriver.com> Acked-by: Jon Maloy <jon.maloy@ericsson.com> Acked-by: Erik Hugne <erik.hugne@ericsson.com> Cc: Thomas Graf <tgraf@suug.ch> Acked-by: Thomas Graf <tgraf@suug.ch> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/Kconfig12
-rw-r--r--net/tipc/config.c24
-rw-r--r--net/tipc/core.c10
-rw-r--r--net/tipc/core.h3
-rw-r--r--net/tipc/socket.c480
-rw-r--r--net/tipc/socket.h4
6 files changed, 180 insertions, 353 deletions
diff --git a/net/tipc/Kconfig b/net/tipc/Kconfig
index c890848f9d56..91c8a8e031db 100644
--- a/net/tipc/Kconfig
+++ b/net/tipc/Kconfig
@@ -20,18 +20,6 @@ menuconfig TIPC
20 20
21 If in doubt, say N. 21 If in doubt, say N.
22 22
23config TIPC_PORTS
24 int "Maximum number of ports in a node"
25 depends on TIPC
26 range 127 65535
27 default "8191"
28 help
29 Specifies how many ports can be supported by a node.
30 Can range from 127 to 65535 ports; default is 8191.
31
32 Setting this to a smaller value saves some memory,
33 setting it to higher allows for more ports.
34
35config TIPC_MEDIA_IB 23config TIPC_MEDIA_IB
36 bool "InfiniBand media type support" 24 bool "InfiniBand media type support"
37 depends on TIPC && INFINIBAND_IPOIB 25 depends on TIPC && INFINIBAND_IPOIB
diff --git a/net/tipc/config.c b/net/tipc/config.c
index 876f4c6a2631..0b3a90ecab6d 100644
--- a/net/tipc/config.c
+++ b/net/tipc/config.c
@@ -183,22 +183,6 @@ static struct sk_buff *cfg_set_own_addr(void)
183 return tipc_cfg_reply_error_string("cannot change to network mode"); 183 return tipc_cfg_reply_error_string("cannot change to network mode");
184} 184}
185 185
186static struct sk_buff *cfg_set_max_ports(void)
187{
188 u32 value;
189
190 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_UNSIGNED))
191 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
192 value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
193 if (value == tipc_max_ports)
194 return tipc_cfg_reply_none();
195 if (value < 127 || value > 65535)
196 return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
197 " (max ports must be 127-65535)");
198 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
199 " (cannot change max ports while TIPC is active)");
200}
201
202static struct sk_buff *cfg_set_netid(void) 186static struct sk_buff *cfg_set_netid(void)
203{ 187{
204 u32 value; 188 u32 value;
@@ -285,15 +269,9 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
285 case TIPC_CMD_SET_NODE_ADDR: 269 case TIPC_CMD_SET_NODE_ADDR:
286 rep_tlv_buf = cfg_set_own_addr(); 270 rep_tlv_buf = cfg_set_own_addr();
287 break; 271 break;
288 case TIPC_CMD_SET_MAX_PORTS:
289 rep_tlv_buf = cfg_set_max_ports();
290 break;
291 case TIPC_CMD_SET_NETID: 272 case TIPC_CMD_SET_NETID:
292 rep_tlv_buf = cfg_set_netid(); 273 rep_tlv_buf = cfg_set_netid();
293 break; 274 break;
294 case TIPC_CMD_GET_MAX_PORTS:
295 rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_max_ports);
296 break;
297 case TIPC_CMD_GET_NETID: 275 case TIPC_CMD_GET_NETID:
298 rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_net_id); 276 rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_net_id);
299 break; 277 break;
@@ -317,6 +295,8 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
317 case TIPC_CMD_SET_REMOTE_MNG: 295 case TIPC_CMD_SET_REMOTE_MNG:
318 case TIPC_CMD_GET_REMOTE_MNG: 296 case TIPC_CMD_GET_REMOTE_MNG:
319 case TIPC_CMD_DUMP_LOG: 297 case TIPC_CMD_DUMP_LOG:
298 case TIPC_CMD_SET_MAX_PORTS:
299 case TIPC_CMD_GET_MAX_PORTS:
320 rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED 300 rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
321 " (obsolete command)"); 301 " (obsolete command)");
322 break; 302 break;
diff --git a/net/tipc/core.c b/net/tipc/core.c
index a5737b8407dd..71b2ada0f5ab 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -34,6 +34,8 @@
34 * POSSIBILITY OF SUCH DAMAGE. 34 * POSSIBILITY OF SUCH DAMAGE.
35 */ 35 */
36 36
37#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
38
37#include "core.h" 39#include "core.h"
38#include "name_table.h" 40#include "name_table.h"
39#include "subscr.h" 41#include "subscr.h"
@@ -47,7 +49,6 @@ int tipc_random __read_mostly;
47 49
48/* configurable TIPC parameters */ 50/* configurable TIPC parameters */
49u32 tipc_own_addr __read_mostly; 51u32 tipc_own_addr __read_mostly;
50int tipc_max_ports __read_mostly;
51int tipc_net_id __read_mostly; 52int tipc_net_id __read_mostly;
52int sysctl_tipc_rmem[3] __read_mostly; /* min/default/max */ 53int sysctl_tipc_rmem[3] __read_mostly; /* min/default/max */
53 54
@@ -84,9 +85,9 @@ static void tipc_core_stop(void)
84 tipc_netlink_stop(); 85 tipc_netlink_stop();
85 tipc_subscr_stop(); 86 tipc_subscr_stop();
86 tipc_nametbl_stop(); 87 tipc_nametbl_stop();
87 tipc_sk_ref_table_stop();
88 tipc_socket_stop(); 88 tipc_socket_stop();
89 tipc_unregister_sysctl(); 89 tipc_unregister_sysctl();
90 tipc_sk_rht_destroy();
90} 91}
91 92
92/** 93/**
@@ -98,7 +99,7 @@ static int tipc_core_start(void)
98 99
99 get_random_bytes(&tipc_random, sizeof(tipc_random)); 100 get_random_bytes(&tipc_random, sizeof(tipc_random));
100 101
101 err = tipc_sk_ref_table_init(tipc_max_ports, tipc_random); 102 err = tipc_sk_rht_init();
102 if (err) 103 if (err)
103 goto out_reftbl; 104 goto out_reftbl;
104 105
@@ -138,7 +139,7 @@ out_socket:
138out_netlink: 139out_netlink:
139 tipc_nametbl_stop(); 140 tipc_nametbl_stop();
140out_nametbl: 141out_nametbl:
141 tipc_sk_ref_table_stop(); 142 tipc_sk_rht_destroy();
142out_reftbl: 143out_reftbl:
143 return err; 144 return err;
144} 145}
@@ -150,7 +151,6 @@ static int __init tipc_init(void)
150 pr_info("Activated (version " TIPC_MOD_VER ")\n"); 151 pr_info("Activated (version " TIPC_MOD_VER ")\n");
151 152
152 tipc_own_addr = 0; 153 tipc_own_addr = 0;
153 tipc_max_ports = CONFIG_TIPC_PORTS;
154 tipc_net_id = 4711; 154 tipc_net_id = 4711;
155 155
156 sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 << 156 sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 84602137ce20..56fe4229fc5e 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -37,8 +37,6 @@
37#ifndef _TIPC_CORE_H 37#ifndef _TIPC_CORE_H
38#define _TIPC_CORE_H 38#define _TIPC_CORE_H
39 39
40#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
41
42#include <linux/tipc.h> 40#include <linux/tipc.h>
43#include <linux/tipc_config.h> 41#include <linux/tipc_config.h>
44#include <linux/tipc_netlink.h> 42#include <linux/tipc_netlink.h>
@@ -79,7 +77,6 @@ int tipc_snprintf(char *buf, int len, const char *fmt, ...);
79 * Global configuration variables 77 * Global configuration variables
80 */ 78 */
81extern u32 tipc_own_addr __read_mostly; 79extern u32 tipc_own_addr __read_mostly;
82extern int tipc_max_ports __read_mostly;
83extern int tipc_net_id __read_mostly; 80extern int tipc_net_id __read_mostly;
84extern int sysctl_tipc_rmem[3] __read_mostly; 81extern int sysctl_tipc_rmem[3] __read_mostly;
85extern int sysctl_tipc_named_timeout __read_mostly; 82extern int sysctl_tipc_named_timeout __read_mostly;
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 4731cad99d1c..701f31bbbbfb 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -34,22 +34,25 @@
34 * POSSIBILITY OF SUCH DAMAGE. 34 * POSSIBILITY OF SUCH DAMAGE.
35 */ 35 */
36 36
37#include <linux/rhashtable.h>
38#include <linux/jhash.h>
37#include "core.h" 39#include "core.h"
38#include "name_table.h" 40#include "name_table.h"
39#include "node.h" 41#include "node.h"
40#include "link.h" 42#include "link.h"
41#include <linux/export.h>
42#include "config.h" 43#include "config.h"
43#include "socket.h" 44#include "socket.h"
44 45
45#define SS_LISTENING -1 /* socket is listening */ 46#define SS_LISTENING -1 /* socket is listening */
46#define SS_READY -2 /* socket is connectionless */ 47#define SS_READY -2 /* socket is connectionless */
47 48
48#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ 49#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
49#define CONN_PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 50#define CONN_PROBING_INTERVAL 3600000 /* [ms] => 1 h */
50#define TIPC_FWD_MSG 1 51#define TIPC_FWD_MSG 1
51#define TIPC_CONN_OK 0 52#define TIPC_CONN_OK 0
52#define TIPC_CONN_PROBING 1 53#define TIPC_CONN_PROBING 1
54#define TIPC_MAX_PORT 0xffffffff
55#define TIPC_MIN_PORT 1
53 56
54/** 57/**
55 * struct tipc_sock - TIPC socket structure 58 * struct tipc_sock - TIPC socket structure
@@ -59,7 +62,7 @@
59 * @conn_instance: TIPC instance used when connection was established 62 * @conn_instance: TIPC instance used when connection was established
60 * @published: non-zero if port has one or more associated names 63 * @published: non-zero if port has one or more associated names
61 * @max_pkt: maximum packet size "hint" used when building messages sent by port 64 * @max_pkt: maximum packet size "hint" used when building messages sent by port
62 * @ref: unique reference to port in TIPC object registry 65 * @portid: unique port identity in TIPC socket hash table
63 * @phdr: preformatted message header used when sending messages 66 * @phdr: preformatted message header used when sending messages
64 * @port_list: adjacent ports in TIPC's global list of ports 67 * @port_list: adjacent ports in TIPC's global list of ports
65 * @publications: list of publications for port 68 * @publications: list of publications for port
@@ -74,6 +77,8 @@
74 * @link_cong: non-zero if owner must sleep because of link congestion 77 * @link_cong: non-zero if owner must sleep because of link congestion
75 * @sent_unacked: # messages sent by socket, and not yet acked by peer 78 * @sent_unacked: # messages sent by socket, and not yet acked by peer
76 * @rcv_unacked: # messages read by user, but not yet acked back to peer 79 * @rcv_unacked: # messages read by user, but not yet acked back to peer
80 * @node: hash table node
81 * @rcu: rcu struct for tipc_sock
77 */ 82 */
78struct tipc_sock { 83struct tipc_sock {
79 struct sock sk; 84 struct sock sk;
@@ -82,7 +87,7 @@ struct tipc_sock {
82 u32 conn_instance; 87 u32 conn_instance;
83 int published; 88 int published;
84 u32 max_pkt; 89 u32 max_pkt;
85 u32 ref; 90 u32 portid;
86 struct tipc_msg phdr; 91 struct tipc_msg phdr;
87 struct list_head sock_list; 92 struct list_head sock_list;
88 struct list_head publications; 93 struct list_head publications;
@@ -95,6 +100,8 @@ struct tipc_sock {
95 bool link_cong; 100 bool link_cong;
96 uint sent_unacked; 101 uint sent_unacked;
97 uint rcv_unacked; 102 uint rcv_unacked;
103 struct rhash_head node;
104 struct rcu_head rcu;
98}; 105};
99 106
100static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); 107static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
@@ -103,16 +110,14 @@ static void tipc_write_space(struct sock *sk);
103static int tipc_release(struct socket *sock); 110static int tipc_release(struct socket *sock);
104static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); 111static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
105static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p); 112static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
106static void tipc_sk_timeout(unsigned long ref); 113static void tipc_sk_timeout(unsigned long portid);
107static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, 114static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
108 struct tipc_name_seq const *seq); 115 struct tipc_name_seq const *seq);
109static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, 116static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
110 struct tipc_name_seq const *seq); 117 struct tipc_name_seq const *seq);
111static u32 tipc_sk_ref_acquire(struct tipc_sock *tsk); 118static struct tipc_sock *tipc_sk_lookup(u32 portid);
112static void tipc_sk_ref_discard(u32 ref); 119static int tipc_sk_insert(struct tipc_sock *tsk);
113static struct tipc_sock *tipc_sk_get(u32 ref); 120static void tipc_sk_remove(struct tipc_sock *tsk);
114static struct tipc_sock *tipc_sk_get_next(u32 *ref);
115static void tipc_sk_put(struct tipc_sock *tsk);
116 121
117static const struct proto_ops packet_ops; 122static const struct proto_ops packet_ops;
118static const struct proto_ops stream_ops; 123static const struct proto_ops stream_ops;
@@ -174,6 +179,9 @@ static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
174 * - port reference 179 * - port reference
175 */ 180 */
176 181
182/* Protects tipc socket hash table mutations */
183static struct rhashtable tipc_sk_rht;
184
177static u32 tsk_peer_node(struct tipc_sock *tsk) 185static u32 tsk_peer_node(struct tipc_sock *tsk)
178{ 186{
179 return msg_destnode(&tsk->phdr); 187 return msg_destnode(&tsk->phdr);
@@ -305,7 +313,6 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
305 struct sock *sk; 313 struct sock *sk;
306 struct tipc_sock *tsk; 314 struct tipc_sock *tsk;
307 struct tipc_msg *msg; 315 struct tipc_msg *msg;
308 u32 ref;
309 316
310 /* Validate arguments */ 317 /* Validate arguments */
311 if (unlikely(protocol != 0)) 318 if (unlikely(protocol != 0))
@@ -339,24 +346,22 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
339 return -ENOMEM; 346 return -ENOMEM;
340 347
341 tsk = tipc_sk(sk); 348 tsk = tipc_sk(sk);
342 ref = tipc_sk_ref_acquire(tsk);
343 if (!ref) {
344 pr_warn("Socket create failed; reference table exhausted\n");
345 return -ENOMEM;
346 }
347 tsk->max_pkt = MAX_PKT_DEFAULT; 349 tsk->max_pkt = MAX_PKT_DEFAULT;
348 tsk->ref = ref;
349 INIT_LIST_HEAD(&tsk->publications); 350 INIT_LIST_HEAD(&tsk->publications);
350 msg = &tsk->phdr; 351 msg = &tsk->phdr;
351 tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, 352 tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
352 NAMED_H_SIZE, 0); 353 NAMED_H_SIZE, 0);
353 msg_set_origport(msg, ref);
354 354
355 /* Finish initializing socket data structures */ 355 /* Finish initializing socket data structures */
356 sock->ops = ops; 356 sock->ops = ops;
357 sock->state = state; 357 sock->state = state;
358 sock_init_data(sock, sk); 358 sock_init_data(sock, sk);
359 k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, ref); 359 if (tipc_sk_insert(tsk)) {
360 pr_warn("Socket create failed; port numbrer exhausted\n");
361 return -EINVAL;
362 }
363 msg_set_origport(msg, tsk->portid);
364 k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, tsk->portid);
360 sk->sk_backlog_rcv = tipc_backlog_rcv; 365 sk->sk_backlog_rcv = tipc_backlog_rcv;
361 sk->sk_rcvbuf = sysctl_tipc_rmem[1]; 366 sk->sk_rcvbuf = sysctl_tipc_rmem[1];
362 sk->sk_data_ready = tipc_data_ready; 367 sk->sk_data_ready = tipc_data_ready;
@@ -442,6 +447,13 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
442 return ret; 447 return ret;
443} 448}
444 449
450static void tipc_sk_callback(struct rcu_head *head)
451{
452 struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
453
454 sock_put(&tsk->sk);
455}
456
445/** 457/**
446 * tipc_release - destroy a TIPC socket 458 * tipc_release - destroy a TIPC socket
447 * @sock: socket to destroy 459 * @sock: socket to destroy
@@ -491,7 +503,7 @@ static int tipc_release(struct socket *sock)
491 (sock->state == SS_CONNECTED)) { 503 (sock->state == SS_CONNECTED)) {
492 sock->state = SS_DISCONNECTING; 504 sock->state = SS_DISCONNECTING;
493 tsk->connected = 0; 505 tsk->connected = 0;
494 tipc_node_remove_conn(dnode, tsk->ref); 506 tipc_node_remove_conn(dnode, tsk->portid);
495 } 507 }
496 if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT)) 508 if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
497 tipc_link_xmit_skb(skb, dnode, 0); 509 tipc_link_xmit_skb(skb, dnode, 0);
@@ -499,16 +511,16 @@ static int tipc_release(struct socket *sock)
499 } 511 }
500 512
501 tipc_sk_withdraw(tsk, 0, NULL); 513 tipc_sk_withdraw(tsk, 0, NULL);
502 tipc_sk_ref_discard(tsk->ref);
503 k_cancel_timer(&tsk->timer); 514 k_cancel_timer(&tsk->timer);
515 tipc_sk_remove(tsk);
504 if (tsk->connected) { 516 if (tsk->connected) {
505 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 517 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
506 SHORT_H_SIZE, 0, dnode, tipc_own_addr, 518 SHORT_H_SIZE, 0, dnode, tipc_own_addr,
507 tsk_peer_port(tsk), 519 tsk_peer_port(tsk),
508 tsk->ref, TIPC_ERR_NO_PORT); 520 tsk->portid, TIPC_ERR_NO_PORT);
509 if (skb) 521 if (skb)
510 tipc_link_xmit_skb(skb, dnode, tsk->ref); 522 tipc_link_xmit_skb(skb, dnode, tsk->portid);
511 tipc_node_remove_conn(dnode, tsk->ref); 523 tipc_node_remove_conn(dnode, tsk->portid);
512 } 524 }
513 k_term_timer(&tsk->timer); 525 k_term_timer(&tsk->timer);
514 526
@@ -518,7 +530,8 @@ static int tipc_release(struct socket *sock)
518 /* Reject any messages that accumulated in backlog queue */ 530 /* Reject any messages that accumulated in backlog queue */
519 sock->state = SS_DISCONNECTING; 531 sock->state = SS_DISCONNECTING;
520 release_sock(sk); 532 release_sock(sk);
521 sock_put(sk); 533
534 call_rcu(&tsk->rcu, tipc_sk_callback);
522 sock->sk = NULL; 535 sock->sk = NULL;
523 536
524 return 0; 537 return 0;
@@ -611,7 +624,7 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
611 addr->addr.id.ref = tsk_peer_port(tsk); 624 addr->addr.id.ref = tsk_peer_port(tsk);
612 addr->addr.id.node = tsk_peer_node(tsk); 625 addr->addr.id.node = tsk_peer_node(tsk);
613 } else { 626 } else {
614 addr->addr.id.ref = tsk->ref; 627 addr->addr.id.ref = tsk->portid;
615 addr->addr.id.node = tipc_own_addr; 628 addr->addr.id.node = tipc_own_addr;
616 } 629 }
617 630
@@ -946,7 +959,7 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
946 } 959 }
947 960
948new_mtu: 961new_mtu:
949 mtu = tipc_node_get_mtu(dnode, tsk->ref); 962 mtu = tipc_node_get_mtu(dnode, tsk->portid);
950 __skb_queue_head_init(&head); 963 __skb_queue_head_init(&head);
951 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head); 964 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head);
952 if (rc < 0) 965 if (rc < 0)
@@ -955,7 +968,7 @@ new_mtu:
955 do { 968 do {
956 skb = skb_peek(&head); 969 skb = skb_peek(&head);
957 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; 970 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
958 rc = tipc_link_xmit(&head, dnode, tsk->ref); 971 rc = tipc_link_xmit(&head, dnode, tsk->portid);
959 if (likely(rc >= 0)) { 972 if (likely(rc >= 0)) {
960 if (sock->state != SS_READY) 973 if (sock->state != SS_READY)
961 sock->state = SS_CONNECTING; 974 sock->state = SS_CONNECTING;
@@ -1028,7 +1041,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1028 struct tipc_msg *mhdr = &tsk->phdr; 1041 struct tipc_msg *mhdr = &tsk->phdr;
1029 struct sk_buff_head head; 1042 struct sk_buff_head head;
1030 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 1043 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1031 u32 ref = tsk->ref; 1044 u32 portid = tsk->portid;
1032 int rc = -EINVAL; 1045 int rc = -EINVAL;
1033 long timeo; 1046 long timeo;
1034 u32 dnode; 1047 u32 dnode;
@@ -1067,7 +1080,7 @@ next:
1067 goto exit; 1080 goto exit;
1068 do { 1081 do {
1069 if (likely(!tsk_conn_cong(tsk))) { 1082 if (likely(!tsk_conn_cong(tsk))) {
1070 rc = tipc_link_xmit(&head, dnode, ref); 1083 rc = tipc_link_xmit(&head, dnode, portid);
1071 if (likely(!rc)) { 1084 if (likely(!rc)) {
1072 tsk->sent_unacked++; 1085 tsk->sent_unacked++;
1073 sent += send; 1086 sent += send;
@@ -1076,7 +1089,7 @@ next:
1076 goto next; 1089 goto next;
1077 } 1090 }
1078 if (rc == -EMSGSIZE) { 1091 if (rc == -EMSGSIZE) {
1079 tsk->max_pkt = tipc_node_get_mtu(dnode, ref); 1092 tsk->max_pkt = tipc_node_get_mtu(dnode, portid);
1080 goto next; 1093 goto next;
1081 } 1094 }
1082 if (rc != -ELINKCONG) 1095 if (rc != -ELINKCONG)
@@ -1130,8 +1143,8 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1130 tsk->probing_state = TIPC_CONN_OK; 1143 tsk->probing_state = TIPC_CONN_OK;
1131 tsk->connected = 1; 1144 tsk->connected = 1;
1132 k_start_timer(&tsk->timer, tsk->probing_interval); 1145 k_start_timer(&tsk->timer, tsk->probing_interval);
1133 tipc_node_add_conn(peer_node, tsk->ref, peer_port); 1146 tipc_node_add_conn(peer_node, tsk->portid, peer_port);
1134 tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->ref); 1147 tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->portid);
1135} 1148}
1136 1149
1137/** 1150/**
@@ -1238,7 +1251,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1238 if (!tsk->connected) 1251 if (!tsk->connected)
1239 return; 1252 return;
1240 skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode, 1253 skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode,
1241 tipc_own_addr, peer_port, tsk->ref, TIPC_OK); 1254 tipc_own_addr, peer_port, tsk->portid, TIPC_OK);
1242 if (!skb) 1255 if (!skb)
1243 return; 1256 return;
1244 msg = buf_msg(skb); 1257 msg = buf_msg(skb);
@@ -1552,7 +1565,7 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1552 tsk->connected = 0; 1565 tsk->connected = 0;
1553 /* let timer expire on it's own */ 1566 /* let timer expire on it's own */
1554 tipc_node_remove_conn(tsk_peer_node(tsk), 1567 tipc_node_remove_conn(tsk_peer_node(tsk),
1555 tsk->ref); 1568 tsk->portid);
1556 } 1569 }
1557 retval = TIPC_OK; 1570 retval = TIPC_OK;
1558 } 1571 }
@@ -1743,7 +1756,7 @@ int tipc_sk_rcv(struct sk_buff *skb)
1743 u32 dnode; 1756 u32 dnode;
1744 1757
1745 /* Validate destination and message */ 1758 /* Validate destination and message */
1746 tsk = tipc_sk_get(dport); 1759 tsk = tipc_sk_lookup(dport);
1747 if (unlikely(!tsk)) { 1760 if (unlikely(!tsk)) {
1748 rc = tipc_msg_eval(skb, &dnode); 1761 rc = tipc_msg_eval(skb, &dnode);
1749 goto exit; 1762 goto exit;
@@ -1763,7 +1776,7 @@ int tipc_sk_rcv(struct sk_buff *skb)
1763 rc = -TIPC_ERR_OVERLOAD; 1776 rc = -TIPC_ERR_OVERLOAD;
1764 } 1777 }
1765 spin_unlock_bh(&sk->sk_lock.slock); 1778 spin_unlock_bh(&sk->sk_lock.slock);
1766 tipc_sk_put(tsk); 1779 sock_put(sk);
1767 if (likely(!rc)) 1780 if (likely(!rc))
1768 return 0; 1781 return 0;
1769exit: 1782exit:
@@ -2050,20 +2063,20 @@ restart:
2050 goto restart; 2063 goto restart;
2051 } 2064 }
2052 if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN)) 2065 if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN))
2053 tipc_link_xmit_skb(skb, dnode, tsk->ref); 2066 tipc_link_xmit_skb(skb, dnode, tsk->portid);
2054 tipc_node_remove_conn(dnode, tsk->ref); 2067 tipc_node_remove_conn(dnode, tsk->portid);
2055 } else { 2068 } else {
2056 dnode = tsk_peer_node(tsk); 2069 dnode = tsk_peer_node(tsk);
2057 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, 2070 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2058 TIPC_CONN_MSG, SHORT_H_SIZE, 2071 TIPC_CONN_MSG, SHORT_H_SIZE,
2059 0, dnode, tipc_own_addr, 2072 0, dnode, tipc_own_addr,
2060 tsk_peer_port(tsk), 2073 tsk_peer_port(tsk),
2061 tsk->ref, TIPC_CONN_SHUTDOWN); 2074 tsk->portid, TIPC_CONN_SHUTDOWN);
2062 tipc_link_xmit_skb(skb, dnode, tsk->ref); 2075 tipc_link_xmit_skb(skb, dnode, tsk->portid);
2063 } 2076 }
2064 tsk->connected = 0; 2077 tsk->connected = 0;
2065 sock->state = SS_DISCONNECTING; 2078 sock->state = SS_DISCONNECTING;
2066 tipc_node_remove_conn(dnode, tsk->ref); 2079 tipc_node_remove_conn(dnode, tsk->portid);
2067 /* fall through */ 2080 /* fall through */
2068 2081
2069 case SS_DISCONNECTING: 2082 case SS_DISCONNECTING:
@@ -2084,14 +2097,14 @@ restart:
2084 return res; 2097 return res;
2085} 2098}
2086 2099
2087static void tipc_sk_timeout(unsigned long ref) 2100static void tipc_sk_timeout(unsigned long portid)
2088{ 2101{
2089 struct tipc_sock *tsk; 2102 struct tipc_sock *tsk;
2090 struct sock *sk; 2103 struct sock *sk;
2091 struct sk_buff *skb = NULL; 2104 struct sk_buff *skb = NULL;
2092 u32 peer_port, peer_node; 2105 u32 peer_port, peer_node;
2093 2106
2094 tsk = tipc_sk_get(ref); 2107 tsk = tipc_sk_lookup(portid);
2095 if (!tsk) 2108 if (!tsk)
2096 return; 2109 return;
2097 2110
@@ -2108,20 +2121,20 @@ static void tipc_sk_timeout(unsigned long ref)
2108 /* Previous probe not answered -> self abort */ 2121 /* Previous probe not answered -> self abort */
2109 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 2122 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
2110 SHORT_H_SIZE, 0, tipc_own_addr, 2123 SHORT_H_SIZE, 0, tipc_own_addr,
2111 peer_node, ref, peer_port, 2124 peer_node, portid, peer_port,
2112 TIPC_ERR_NO_PORT); 2125 TIPC_ERR_NO_PORT);
2113 } else { 2126 } else {
2114 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 2127 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
2115 0, peer_node, tipc_own_addr, 2128 0, peer_node, tipc_own_addr,
2116 peer_port, ref, TIPC_OK); 2129 peer_port, portid, TIPC_OK);
2117 tsk->probing_state = TIPC_CONN_PROBING; 2130 tsk->probing_state = TIPC_CONN_PROBING;
2118 k_start_timer(&tsk->timer, tsk->probing_interval); 2131 k_start_timer(&tsk->timer, tsk->probing_interval);
2119 } 2132 }
2120 bh_unlock_sock(sk); 2133 bh_unlock_sock(sk);
2121 if (skb) 2134 if (skb)
2122 tipc_link_xmit_skb(skb, peer_node, ref); 2135 tipc_link_xmit_skb(skb, peer_node, portid);
2123exit: 2136exit:
2124 tipc_sk_put(tsk); 2137 sock_put(sk);
2125} 2138}
2126 2139
2127static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, 2140static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
@@ -2132,12 +2145,12 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2132 2145
2133 if (tsk->connected) 2146 if (tsk->connected)
2134 return -EINVAL; 2147 return -EINVAL;
2135 key = tsk->ref + tsk->pub_count + 1; 2148 key = tsk->portid + tsk->pub_count + 1;
2136 if (key == tsk->ref) 2149 if (key == tsk->portid)
2137 return -EADDRINUSE; 2150 return -EADDRINUSE;
2138 2151
2139 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 2152 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
2140 scope, tsk->ref, key); 2153 scope, tsk->portid, key);
2141 if (unlikely(!publ)) 2154 if (unlikely(!publ))
2142 return -EINVAL; 2155 return -EINVAL;
2143 2156
@@ -2188,9 +2201,9 @@ static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
2188 ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:", 2201 ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
2189 tipc_zone(tipc_own_addr), 2202 tipc_zone(tipc_own_addr),
2190 tipc_cluster(tipc_own_addr), 2203 tipc_cluster(tipc_own_addr),
2191 tipc_node(tipc_own_addr), tsk->ref); 2204 tipc_node(tipc_own_addr), tsk->portid);
2192 else 2205 else
2193 ret = tipc_snprintf(buf, len, "%-10u:", tsk->ref); 2206 ret = tipc_snprintf(buf, len, "%-10u:", tsk->portid);
2194 2207
2195 if (tsk->connected) { 2208 if (tsk->connected) {
2196 u32 dport = tsk_peer_port(tsk); 2209 u32 dport = tsk_peer_port(tsk);
@@ -2224,13 +2237,15 @@ static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
2224 2237
2225struct sk_buff *tipc_sk_socks_show(void) 2238struct sk_buff *tipc_sk_socks_show(void)
2226{ 2239{
2240 const struct bucket_table *tbl;
2241 struct rhash_head *pos;
2227 struct sk_buff *buf; 2242 struct sk_buff *buf;
2228 struct tlv_desc *rep_tlv; 2243 struct tlv_desc *rep_tlv;
2229 char *pb; 2244 char *pb;
2230 int pb_len; 2245 int pb_len;
2231 struct tipc_sock *tsk; 2246 struct tipc_sock *tsk;
2232 int str_len = 0; 2247 int str_len = 0;
2233 u32 ref = 0; 2248 int i;
2234 2249
2235 buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN)); 2250 buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2236 if (!buf) 2251 if (!buf)
@@ -2239,14 +2254,18 @@ struct sk_buff *tipc_sk_socks_show(void)
2239 pb = TLV_DATA(rep_tlv); 2254 pb = TLV_DATA(rep_tlv);
2240 pb_len = ULTRA_STRING_MAX_LEN; 2255 pb_len = ULTRA_STRING_MAX_LEN;
2241 2256
2242 tsk = tipc_sk_get_next(&ref); 2257 rcu_read_lock();
2243 for (; tsk; tsk = tipc_sk_get_next(&ref)) { 2258 tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht);
2244 lock_sock(&tsk->sk); 2259 for (i = 0; i < tbl->size; i++) {
2245 str_len += tipc_sk_show(tsk, pb + str_len, 2260 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2246 pb_len - str_len, 0); 2261 spin_lock_bh(&tsk->sk.sk_lock.slock);
2247 release_sock(&tsk->sk); 2262 str_len += tipc_sk_show(tsk, pb + str_len,
2248 tipc_sk_put(tsk); 2263 pb_len - str_len, 0);
2264 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2265 }
2249 } 2266 }
2267 rcu_read_unlock();
2268
2250 str_len += 1; /* for "\0" */ 2269 str_len += 1; /* for "\0" */
2251 skb_put(buf, TLV_SPACE(str_len)); 2270 skb_put(buf, TLV_SPACE(str_len));
2252 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 2271 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
@@ -2259,255 +2278,91 @@ struct sk_buff *tipc_sk_socks_show(void)
2259 */ 2278 */
2260void tipc_sk_reinit(void) 2279void tipc_sk_reinit(void)
2261{ 2280{
2281 const struct bucket_table *tbl;
2282 struct rhash_head *pos;
2283 struct tipc_sock *tsk;
2262 struct tipc_msg *msg; 2284 struct tipc_msg *msg;
2263 u32 ref = 0; 2285 int i;
2264 struct tipc_sock *tsk = tipc_sk_get_next(&ref);
2265 2286
2266 for (; tsk; tsk = tipc_sk_get_next(&ref)) { 2287 rcu_read_lock();
2267 lock_sock(&tsk->sk); 2288 tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht);
2268 msg = &tsk->phdr; 2289 for (i = 0; i < tbl->size; i++) {
2269 msg_set_prevnode(msg, tipc_own_addr); 2290 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2270 msg_set_orignode(msg, tipc_own_addr); 2291 spin_lock_bh(&tsk->sk.sk_lock.slock);
2271 release_sock(&tsk->sk); 2292 msg = &tsk->phdr;
2272 tipc_sk_put(tsk); 2293 msg_set_prevnode(msg, tipc_own_addr);
2294 msg_set_orignode(msg, tipc_own_addr);
2295 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2296 }
2273 } 2297 }
2298 rcu_read_unlock();
2274} 2299}
2275 2300
2276/** 2301static struct tipc_sock *tipc_sk_lookup(u32 portid)
2277 * struct reference - TIPC socket reference entry
2278 * @tsk: pointer to socket associated with reference entry
2279 * @ref: reference value for socket (combines instance & array index info)
2280 */
2281struct reference {
2282 struct tipc_sock *tsk;
2283 u32 ref;
2284};
2285
2286/**
2287 * struct tipc_ref_table - table of TIPC socket reference entries
2288 * @entries: pointer to array of reference entries
2289 * @capacity: array index of first unusable entry
2290 * @init_point: array index of first uninitialized entry
2291 * @first_free: array index of first unused socket reference entry
2292 * @last_free: array index of last unused socket reference entry
2293 * @index_mask: bitmask for array index portion of reference values
2294 * @start_mask: initial value for instance value portion of reference values
2295 */
2296struct ref_table {
2297 struct reference *entries;
2298 u32 capacity;
2299 u32 init_point;
2300 u32 first_free;
2301 u32 last_free;
2302 u32 index_mask;
2303 u32 start_mask;
2304};
2305
2306/* Socket reference table consists of 2**N entries.
2307 *
2308 * State Socket ptr Reference
2309 * ----- ---------- ---------
2310 * In use non-NULL XXXX|own index
2311 * (XXXX changes each time entry is acquired)
2312 * Free NULL YYYY|next free index
2313 * (YYYY is one more than last used XXXX)
2314 * Uninitialized NULL 0
2315 *
2316 * Entry 0 is not used; this allows index 0 to denote the end of the free list.
2317 *
2318 * Note that a reference value of 0 does not necessarily indicate that an
2319 * entry is uninitialized, since the last entry in the free list could also
2320 * have a reference value of 0 (although this is unlikely).
2321 */
2322
2323static struct ref_table tipc_ref_table;
2324
2325static DEFINE_RWLOCK(ref_table_lock);
2326
2327/**
2328 * tipc_ref_table_init - create reference table for sockets
2329 */
2330int tipc_sk_ref_table_init(u32 req_sz, u32 start)
2331{ 2302{
2332 struct reference *table; 2303 struct tipc_sock *tsk;
2333 u32 actual_sz;
2334
2335 /* account for unused entry, then round up size to a power of 2 */
2336
2337 req_sz++;
2338 for (actual_sz = 16; actual_sz < req_sz; actual_sz <<= 1) {
2339 /* do nothing */
2340 };
2341
2342 /* allocate table & mark all entries as uninitialized */
2343 table = vzalloc(actual_sz * sizeof(struct reference));
2344 if (table == NULL)
2345 return -ENOMEM;
2346
2347 tipc_ref_table.entries = table;
2348 tipc_ref_table.capacity = req_sz;
2349 tipc_ref_table.init_point = 1;
2350 tipc_ref_table.first_free = 0;
2351 tipc_ref_table.last_free = 0;
2352 tipc_ref_table.index_mask = actual_sz - 1;
2353 tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask;
2354 2304
2355 return 0; 2305 rcu_read_lock();
2356} 2306 tsk = rhashtable_lookup(&tipc_sk_rht, &portid);
2307 if (tsk)
2308 sock_hold(&tsk->sk);
2309 rcu_read_unlock();
2357 2310
2358/** 2311 return tsk;
2359 * tipc_ref_table_stop - destroy reference table for sockets
2360 */
2361void tipc_sk_ref_table_stop(void)
2362{
2363 if (!tipc_ref_table.entries)
2364 return;
2365 vfree(tipc_ref_table.entries);
2366 tipc_ref_table.entries = NULL;
2367} 2312}
2368 2313
2369/* tipc_ref_acquire - create reference to a socket 2314static int tipc_sk_insert(struct tipc_sock *tsk)
2370 *
2371 * Register an socket pointer in the reference table.
2372 * Returns a unique reference value that is used from then on to retrieve the
2373 * socket pointer, or to determine if the socket has been deregistered.
2374 */
2375u32 tipc_sk_ref_acquire(struct tipc_sock *tsk)
2376{ 2315{
2377 u32 index; 2316 u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2378 u32 index_mask; 2317 u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2379 u32 next_plus_upper;
2380 u32 ref = 0;
2381 struct reference *entry;
2382
2383 if (unlikely(!tsk)) {
2384 pr_err("Attempt to acquire ref. to non-existent obj\n");
2385 return 0;
2386 }
2387 if (unlikely(!tipc_ref_table.entries)) {
2388 pr_err("Ref. table not found in acquisition attempt\n");
2389 return 0;
2390 }
2391
2392 /* Take a free entry, if available; otherwise initialize a new one */
2393 write_lock_bh(&ref_table_lock);
2394 index = tipc_ref_table.first_free;
2395 entry = &tipc_ref_table.entries[index];
2396 2318
2397 if (likely(index)) { 2319 while (remaining--) {
2398 index = tipc_ref_table.first_free; 2320 portid++;
2399 entry = &tipc_ref_table.entries[index]; 2321 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2400 index_mask = tipc_ref_table.index_mask; 2322 portid = TIPC_MIN_PORT;
2401 next_plus_upper = entry->ref; 2323 tsk->portid = portid;
2402 tipc_ref_table.first_free = next_plus_upper & index_mask; 2324 sock_hold(&tsk->sk);
2403 ref = (next_plus_upper & ~index_mask) + index; 2325 if (rhashtable_lookup_insert(&tipc_sk_rht, &tsk->node))
2404 entry->tsk = tsk; 2326 return 0;
2405 } else if (tipc_ref_table.init_point < tipc_ref_table.capacity) { 2327 sock_put(&tsk->sk);
2406 index = tipc_ref_table.init_point++;
2407 entry = &tipc_ref_table.entries[index];
2408 ref = tipc_ref_table.start_mask + index;
2409 } 2328 }
2410 2329
2411 if (ref) { 2330 return -1;
2412 entry->ref = ref;
2413 entry->tsk = tsk;
2414 }
2415 write_unlock_bh(&ref_table_lock);
2416 return ref;
2417} 2331}
2418 2332
2419/* tipc_sk_ref_discard - invalidate reference to an socket 2333static void tipc_sk_remove(struct tipc_sock *tsk)
2420 *
2421 * Disallow future references to an socket and free up the entry for re-use.
2422 */
2423void tipc_sk_ref_discard(u32 ref)
2424{ 2334{
2425 struct reference *entry; 2335 struct sock *sk = &tsk->sk;
2426 u32 index;
2427 u32 index_mask;
2428
2429 if (unlikely(!tipc_ref_table.entries)) {
2430 pr_err("Ref. table not found during discard attempt\n");
2431 return;
2432 }
2433
2434 index_mask = tipc_ref_table.index_mask;
2435 index = ref & index_mask;
2436 entry = &tipc_ref_table.entries[index];
2437
2438 write_lock_bh(&ref_table_lock);
2439 2336
2440 if (unlikely(!entry->tsk)) { 2337 if (rhashtable_remove(&tipc_sk_rht, &tsk->node)) {
2441 pr_err("Attempt to discard ref. to non-existent socket\n"); 2338 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2442 goto exit; 2339 __sock_put(sk);
2443 }
2444 if (unlikely(entry->ref != ref)) {
2445 pr_err("Attempt to discard non-existent reference\n");
2446 goto exit;
2447 } 2340 }
2448
2449 /* Mark entry as unused; increment instance part of entry's
2450 * reference to invalidate any subsequent references
2451 */
2452
2453 entry->tsk = NULL;
2454 entry->ref = (ref & ~index_mask) + (index_mask + 1);
2455
2456 /* Append entry to free entry list */
2457 if (unlikely(tipc_ref_table.first_free == 0))
2458 tipc_ref_table.first_free = index;
2459 else
2460 tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index;
2461 tipc_ref_table.last_free = index;
2462exit:
2463 write_unlock_bh(&ref_table_lock);
2464} 2341}
2465 2342
2466/* tipc_sk_get - find referenced socket and return pointer to it 2343int tipc_sk_rht_init(void)
2467 */
2468struct tipc_sock *tipc_sk_get(u32 ref)
2469{ 2344{
2470 struct reference *entry; 2345 struct rhashtable_params rht_params = {
2471 struct tipc_sock *tsk; 2346 .nelem_hint = 192,
2347 .head_offset = offsetof(struct tipc_sock, node),
2348 .key_offset = offsetof(struct tipc_sock, portid),
2349 .key_len = sizeof(u32), /* portid */
2350 .hashfn = jhash,
2351 .max_shift = 20, /* 1M */
2352 .min_shift = 8, /* 256 */
2353 .grow_decision = rht_grow_above_75,
2354 .shrink_decision = rht_shrink_below_30,
2355 };
2472 2356
2473 if (unlikely(!tipc_ref_table.entries)) 2357 return rhashtable_init(&tipc_sk_rht, &rht_params);
2474 return NULL;
2475 read_lock_bh(&ref_table_lock);
2476 entry = &tipc_ref_table.entries[ref & tipc_ref_table.index_mask];
2477 tsk = entry->tsk;
2478 if (likely(tsk && (entry->ref == ref)))
2479 sock_hold(&tsk->sk);
2480 else
2481 tsk = NULL;
2482 read_unlock_bh(&ref_table_lock);
2483 return tsk;
2484} 2358}
2485 2359
2486/* tipc_sk_get_next - lock & return next socket after referenced one 2360void tipc_sk_rht_destroy(void)
2487*/
2488struct tipc_sock *tipc_sk_get_next(u32 *ref)
2489{ 2361{
2490 struct reference *entry; 2362 /* Wait for socket readers to complete */
2491 struct tipc_sock *tsk = NULL; 2363 synchronize_net();
2492 uint index = *ref & tipc_ref_table.index_mask;
2493 2364
2494 read_lock_bh(&ref_table_lock); 2365 rhashtable_destroy(&tipc_sk_rht);
2495 while (++index < tipc_ref_table.capacity) {
2496 entry = &tipc_ref_table.entries[index];
2497 if (!entry->tsk)
2498 continue;
2499 tsk = entry->tsk;
2500 sock_hold(&tsk->sk);
2501 *ref = entry->ref;
2502 break;
2503 }
2504 read_unlock_bh(&ref_table_lock);
2505 return tsk;
2506}
2507
2508static void tipc_sk_put(struct tipc_sock *tsk)
2509{
2510 sock_put(&tsk->sk);
2511} 2366}
2512 2367
2513/** 2368/**
@@ -2829,7 +2684,7 @@ static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2829 attrs = nla_nest_start(skb, TIPC_NLA_SOCK); 2684 attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2830 if (!attrs) 2685 if (!attrs)
2831 goto genlmsg_cancel; 2686 goto genlmsg_cancel;
2832 if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->ref)) 2687 if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2833 goto attr_msg_cancel; 2688 goto attr_msg_cancel;
2834 if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr)) 2689 if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr))
2835 goto attr_msg_cancel; 2690 goto attr_msg_cancel;
@@ -2859,22 +2714,29 @@ int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2859{ 2714{
2860 int err; 2715 int err;
2861 struct tipc_sock *tsk; 2716 struct tipc_sock *tsk;
2862 u32 prev_ref = cb->args[0]; 2717 const struct bucket_table *tbl;
2863 u32 ref = prev_ref; 2718 struct rhash_head *pos;
2864 2719 u32 prev_portid = cb->args[0];
2865 tsk = tipc_sk_get_next(&ref); 2720 u32 portid = prev_portid;
2866 for (; tsk; tsk = tipc_sk_get_next(&ref)) { 2721 int i;
2867 lock_sock(&tsk->sk);
2868 err = __tipc_nl_add_sk(skb, cb, tsk);
2869 release_sock(&tsk->sk);
2870 tipc_sk_put(tsk);
2871 if (err)
2872 break;
2873 2722
2874 prev_ref = ref; 2723 rcu_read_lock();
2724 tbl = rht_dereference_rcu((&tipc_sk_rht)->tbl, &tipc_sk_rht);
2725 for (i = 0; i < tbl->size; i++) {
2726 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2727 spin_lock_bh(&tsk->sk.sk_lock.slock);
2728 portid = tsk->portid;
2729 err = __tipc_nl_add_sk(skb, cb, tsk);
2730 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2731 if (err)
2732 break;
2733
2734 prev_portid = portid;
2735 }
2875 } 2736 }
2737 rcu_read_unlock();
2876 2738
2877 cb->args[0] = prev_ref; 2739 cb->args[0] = prev_portid;
2878 2740
2879 return skb->len; 2741 return skb->len;
2880} 2742}
@@ -2962,12 +2824,12 @@ static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2962int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb) 2824int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2963{ 2825{
2964 int err; 2826 int err;
2965 u32 tsk_ref = cb->args[0]; 2827 u32 tsk_portid = cb->args[0];
2966 u32 last_publ = cb->args[1]; 2828 u32 last_publ = cb->args[1];
2967 u32 done = cb->args[2]; 2829 u32 done = cb->args[2];
2968 struct tipc_sock *tsk; 2830 struct tipc_sock *tsk;
2969 2831
2970 if (!tsk_ref) { 2832 if (!tsk_portid) {
2971 struct nlattr **attrs; 2833 struct nlattr **attrs;
2972 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1]; 2834 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2973 2835
@@ -2984,13 +2846,13 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2984 if (!sock[TIPC_NLA_SOCK_REF]) 2846 if (!sock[TIPC_NLA_SOCK_REF])
2985 return -EINVAL; 2847 return -EINVAL;
2986 2848
2987 tsk_ref = nla_get_u32(sock[TIPC_NLA_SOCK_REF]); 2849 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2988 } 2850 }
2989 2851
2990 if (done) 2852 if (done)
2991 return 0; 2853 return 0;
2992 2854
2993 tsk = tipc_sk_get(tsk_ref); 2855 tsk = tipc_sk_lookup(tsk_portid);
2994 if (!tsk) 2856 if (!tsk)
2995 return -EINVAL; 2857 return -EINVAL;
2996 2858
@@ -2999,9 +2861,9 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2999 if (!err) 2861 if (!err)
3000 done = 1; 2862 done = 1;
3001 release_sock(&tsk->sk); 2863 release_sock(&tsk->sk);
3002 tipc_sk_put(tsk); 2864 sock_put(&tsk->sk);
3003 2865
3004 cb->args[0] = tsk_ref; 2866 cb->args[0] = tsk_portid;
3005 cb->args[1] = last_publ; 2867 cb->args[1] = last_publ;
3006 cb->args[2] = done; 2868 cb->args[2] = done;
3007 2869
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index d34089387006..c7d46d069d89 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -46,8 +46,8 @@ int tipc_sk_rcv(struct sk_buff *buf);
46struct sk_buff *tipc_sk_socks_show(void); 46struct sk_buff *tipc_sk_socks_show(void);
47void tipc_sk_mcast_rcv(struct sk_buff *buf); 47void tipc_sk_mcast_rcv(struct sk_buff *buf);
48void tipc_sk_reinit(void); 48void tipc_sk_reinit(void);
49int tipc_sk_ref_table_init(u32 requested_size, u32 start); 49int tipc_sk_rht_init(void);
50void tipc_sk_ref_table_stop(void); 50void tipc_sk_rht_destroy(void);
51int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb); 51int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb);
52int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb); 52int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb);
53 53