aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c1013
1 files changed, 451 insertions, 562 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 4731cad99d1c..b4d4467d0bb0 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/socket.c: TIPC socket API 2 * net/tipc/socket.c: TIPC socket API
3 * 3 *
4 * Copyright (c) 2001-2007, 2012-2014, Ericsson AB 4 * Copyright (c) 2001-2007, 2012-2015, Ericsson AB
5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems 5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -34,22 +34,25 @@
34 * POSSIBILITY OF SUCH DAMAGE. 34 * POSSIBILITY OF SUCH DAMAGE.
35 */ 35 */
36 36
37#include <linux/rhashtable.h>
38#include <linux/jhash.h>
37#include "core.h" 39#include "core.h"
38#include "name_table.h" 40#include "name_table.h"
39#include "node.h" 41#include "node.h"
40#include "link.h" 42#include "link.h"
41#include <linux/export.h> 43#include "name_distr.h"
42#include "config.h"
43#include "socket.h" 44#include "socket.h"
44 45
45#define SS_LISTENING -1 /* socket is listening */ 46#define SS_LISTENING -1 /* socket is listening */
46#define SS_READY -2 /* socket is connectionless */ 47#define SS_READY -2 /* socket is connectionless */
47 48
48#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ 49#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
49#define CONN_PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 50#define CONN_PROBING_INTERVAL msecs_to_jiffies(3600000) /* [ms] => 1 h */
50#define TIPC_FWD_MSG 1 51#define TIPC_FWD_MSG 1
51#define TIPC_CONN_OK 0 52#define TIPC_CONN_OK 0
52#define TIPC_CONN_PROBING 1 53#define TIPC_CONN_PROBING 1
54#define TIPC_MAX_PORT 0xffffffff
55#define TIPC_MIN_PORT 1
53 56
54/** 57/**
55 * struct tipc_sock - TIPC socket structure 58 * struct tipc_sock - TIPC socket structure
@@ -59,21 +62,20 @@
59 * @conn_instance: TIPC instance used when connection was established 62 * @conn_instance: TIPC instance used when connection was established
60 * @published: non-zero if port has one or more associated names 63 * @published: non-zero if port has one or more associated names
61 * @max_pkt: maximum packet size "hint" used when building messages sent by port 64 * @max_pkt: maximum packet size "hint" used when building messages sent by port
62 * @ref: unique reference to port in TIPC object registry 65 * @portid: unique port identity in TIPC socket hash table
63 * @phdr: preformatted message header used when sending messages 66 * @phdr: preformatted message header used when sending messages
64 * @port_list: adjacent ports in TIPC's global list of ports 67 * @port_list: adjacent ports in TIPC's global list of ports
65 * @publications: list of publications for port 68 * @publications: list of publications for port
66 * @pub_count: total # of publications port has made during its lifetime 69 * @pub_count: total # of publications port has made during its lifetime
67 * @probing_state: 70 * @probing_state:
68 * @probing_interval: 71 * @probing_intv:
69 * @timer:
70 * @port: port - interacts with 'sk' and with the rest of the TIPC stack
71 * @peer_name: the peer of the connection, if any
72 * @conn_timeout: the time we can wait for an unresponded setup request 72 * @conn_timeout: the time we can wait for an unresponded setup request
73 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue 73 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
74 * @link_cong: non-zero if owner must sleep because of link congestion 74 * @link_cong: non-zero if owner must sleep because of link congestion
75 * @sent_unacked: # messages sent by socket, and not yet acked by peer 75 * @sent_unacked: # messages sent by socket, and not yet acked by peer
76 * @rcv_unacked: # messages read by user, but not yet acked back to peer 76 * @rcv_unacked: # messages read by user, but not yet acked back to peer
77 * @node: hash table node
78 * @rcu: rcu struct for tipc_sock
77 */ 79 */
78struct tipc_sock { 80struct tipc_sock {
79 struct sock sk; 81 struct sock sk;
@@ -82,19 +84,20 @@ struct tipc_sock {
82 u32 conn_instance; 84 u32 conn_instance;
83 int published; 85 int published;
84 u32 max_pkt; 86 u32 max_pkt;
85 u32 ref; 87 u32 portid;
86 struct tipc_msg phdr; 88 struct tipc_msg phdr;
87 struct list_head sock_list; 89 struct list_head sock_list;
88 struct list_head publications; 90 struct list_head publications;
89 u32 pub_count; 91 u32 pub_count;
90 u32 probing_state; 92 u32 probing_state;
91 u32 probing_interval; 93 unsigned long probing_intv;
92 struct timer_list timer;
93 uint conn_timeout; 94 uint conn_timeout;
94 atomic_t dupl_rcvcnt; 95 atomic_t dupl_rcvcnt;
95 bool link_cong; 96 bool link_cong;
96 uint sent_unacked; 97 uint sent_unacked;
97 uint rcv_unacked; 98 uint rcv_unacked;
99 struct rhash_head node;
100 struct rcu_head rcu;
98}; 101};
99 102
100static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); 103static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
@@ -103,16 +106,14 @@ static void tipc_write_space(struct sock *sk);
103static int tipc_release(struct socket *sock); 106static int tipc_release(struct socket *sock);
104static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); 107static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
105static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p); 108static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
106static void tipc_sk_timeout(unsigned long ref); 109static void tipc_sk_timeout(unsigned long data);
107static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, 110static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
108 struct tipc_name_seq const *seq); 111 struct tipc_name_seq const *seq);
109static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, 112static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
110 struct tipc_name_seq const *seq); 113 struct tipc_name_seq const *seq);
111static u32 tipc_sk_ref_acquire(struct tipc_sock *tsk); 114static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
112static void tipc_sk_ref_discard(u32 ref); 115static int tipc_sk_insert(struct tipc_sock *tsk);
113static struct tipc_sock *tipc_sk_get(u32 ref); 116static void tipc_sk_remove(struct tipc_sock *tsk);
114static struct tipc_sock *tipc_sk_get_next(u32 *ref);
115static void tipc_sk_put(struct tipc_sock *tsk);
116 117
117static const struct proto_ops packet_ops; 118static const struct proto_ops packet_ops;
118static const struct proto_ops stream_ops; 119static const struct proto_ops stream_ops;
@@ -174,6 +175,11 @@ static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
174 * - port reference 175 * - port reference
175 */ 176 */
176 177
178static u32 tsk_own_node(struct tipc_sock *tsk)
179{
180 return msg_prevnode(&tsk->phdr);
181}
182
177static u32 tsk_peer_node(struct tipc_sock *tsk) 183static u32 tsk_peer_node(struct tipc_sock *tsk)
178{ 184{
179 return msg_destnode(&tsk->phdr); 185 return msg_destnode(&tsk->phdr);
@@ -246,10 +252,11 @@ static void tsk_rej_rx_queue(struct sock *sk)
246{ 252{
247 struct sk_buff *skb; 253 struct sk_buff *skb;
248 u32 dnode; 254 u32 dnode;
255 u32 own_node = tsk_own_node(tipc_sk(sk));
249 256
250 while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { 257 while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
251 if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT)) 258 if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
252 tipc_link_xmit_skb(skb, dnode, 0); 259 tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
253 } 260 }
254} 261}
255 262
@@ -260,6 +267,7 @@ static void tsk_rej_rx_queue(struct sock *sk)
260 */ 267 */
261static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg) 268static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
262{ 269{
270 struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);
263 u32 peer_port = tsk_peer_port(tsk); 271 u32 peer_port = tsk_peer_port(tsk);
264 u32 orig_node; 272 u32 orig_node;
265 u32 peer_node; 273 u32 peer_node;
@@ -276,10 +284,10 @@ static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
276 if (likely(orig_node == peer_node)) 284 if (likely(orig_node == peer_node))
277 return true; 285 return true;
278 286
279 if (!orig_node && (peer_node == tipc_own_addr)) 287 if (!orig_node && (peer_node == tn->own_addr))
280 return true; 288 return true;
281 289
282 if (!peer_node && (orig_node == tipc_own_addr)) 290 if (!peer_node && (orig_node == tn->own_addr))
283 return true; 291 return true;
284 292
285 return false; 293 return false;
@@ -300,12 +308,12 @@ static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
300static int tipc_sk_create(struct net *net, struct socket *sock, 308static int tipc_sk_create(struct net *net, struct socket *sock,
301 int protocol, int kern) 309 int protocol, int kern)
302{ 310{
311 struct tipc_net *tn;
303 const struct proto_ops *ops; 312 const struct proto_ops *ops;
304 socket_state state; 313 socket_state state;
305 struct sock *sk; 314 struct sock *sk;
306 struct tipc_sock *tsk; 315 struct tipc_sock *tsk;
307 struct tipc_msg *msg; 316 struct tipc_msg *msg;
308 u32 ref;
309 317
310 /* Validate arguments */ 318 /* Validate arguments */
311 if (unlikely(protocol != 0)) 319 if (unlikely(protocol != 0))
@@ -339,24 +347,23 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
339 return -ENOMEM; 347 return -ENOMEM;
340 348
341 tsk = tipc_sk(sk); 349 tsk = tipc_sk(sk);
342 ref = tipc_sk_ref_acquire(tsk);
343 if (!ref) {
344 pr_warn("Socket create failed; reference table exhausted\n");
345 return -ENOMEM;
346 }
347 tsk->max_pkt = MAX_PKT_DEFAULT; 350 tsk->max_pkt = MAX_PKT_DEFAULT;
348 tsk->ref = ref;
349 INIT_LIST_HEAD(&tsk->publications); 351 INIT_LIST_HEAD(&tsk->publications);
350 msg = &tsk->phdr; 352 msg = &tsk->phdr;
351 tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, 353 tn = net_generic(sock_net(sk), tipc_net_id);
354 tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
352 NAMED_H_SIZE, 0); 355 NAMED_H_SIZE, 0);
353 msg_set_origport(msg, ref);
354 356
355 /* Finish initializing socket data structures */ 357 /* Finish initializing socket data structures */
356 sock->ops = ops; 358 sock->ops = ops;
357 sock->state = state; 359 sock->state = state;
358 sock_init_data(sock, sk); 360 sock_init_data(sock, sk);
359 k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, ref); 361 if (tipc_sk_insert(tsk)) {
362 pr_warn("Socket create failed; port numbrer exhausted\n");
363 return -EINVAL;
364 }
365 msg_set_origport(msg, tsk->portid);
366 setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
360 sk->sk_backlog_rcv = tipc_backlog_rcv; 367 sk->sk_backlog_rcv = tipc_backlog_rcv;
361 sk->sk_rcvbuf = sysctl_tipc_rmem[1]; 368 sk->sk_rcvbuf = sysctl_tipc_rmem[1];
362 sk->sk_data_ready = tipc_data_ready; 369 sk->sk_data_ready = tipc_data_ready;
@@ -384,7 +391,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
384 * 391 *
385 * Returns 0 on success, errno otherwise 392 * Returns 0 on success, errno otherwise
386 */ 393 */
387int tipc_sock_create_local(int type, struct socket **res) 394int tipc_sock_create_local(struct net *net, int type, struct socket **res)
388{ 395{
389 int rc; 396 int rc;
390 397
@@ -393,7 +400,7 @@ int tipc_sock_create_local(int type, struct socket **res)
393 pr_err("Failed to create kernel socket\n"); 400 pr_err("Failed to create kernel socket\n");
394 return rc; 401 return rc;
395 } 402 }
396 tipc_sk_create(&init_net, *res, 0, 1); 403 tipc_sk_create(net, *res, 0, 1);
397 404
398 return 0; 405 return 0;
399} 406}
@@ -442,6 +449,13 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
442 return ret; 449 return ret;
443} 450}
444 451
452static void tipc_sk_callback(struct rcu_head *head)
453{
454 struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
455
456 sock_put(&tsk->sk);
457}
458
445/** 459/**
446 * tipc_release - destroy a TIPC socket 460 * tipc_release - destroy a TIPC socket
447 * @sock: socket to destroy 461 * @sock: socket to destroy
@@ -461,9 +475,10 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
461static int tipc_release(struct socket *sock) 475static int tipc_release(struct socket *sock)
462{ 476{
463 struct sock *sk = sock->sk; 477 struct sock *sk = sock->sk;
478 struct net *net;
464 struct tipc_sock *tsk; 479 struct tipc_sock *tsk;
465 struct sk_buff *skb; 480 struct sk_buff *skb;
466 u32 dnode; 481 u32 dnode, probing_state;
467 482
468 /* 483 /*
469 * Exit if socket isn't fully initialized (occurs when a failed accept() 484 * Exit if socket isn't fully initialized (occurs when a failed accept()
@@ -472,6 +487,7 @@ static int tipc_release(struct socket *sock)
472 if (sk == NULL) 487 if (sk == NULL)
473 return 0; 488 return 0;
474 489
490 net = sock_net(sk);
475 tsk = tipc_sk(sk); 491 tsk = tipc_sk(sk);
476 lock_sock(sk); 492 lock_sock(sk);
477 493
@@ -491,26 +507,29 @@ static int tipc_release(struct socket *sock)
491 (sock->state == SS_CONNECTED)) { 507 (sock->state == SS_CONNECTED)) {
492 sock->state = SS_DISCONNECTING; 508 sock->state = SS_DISCONNECTING;
493 tsk->connected = 0; 509 tsk->connected = 0;
494 tipc_node_remove_conn(dnode, tsk->ref); 510 tipc_node_remove_conn(net, dnode, tsk->portid);
495 } 511 }
496 if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT)) 512 if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
497 tipc_link_xmit_skb(skb, dnode, 0); 513 TIPC_ERR_NO_PORT))
514 tipc_link_xmit_skb(net, skb, dnode, 0);
498 } 515 }
499 } 516 }
500 517
501 tipc_sk_withdraw(tsk, 0, NULL); 518 tipc_sk_withdraw(tsk, 0, NULL);
502 tipc_sk_ref_discard(tsk->ref); 519 probing_state = tsk->probing_state;
503 k_cancel_timer(&tsk->timer); 520 if (del_timer_sync(&sk->sk_timer) &&
521 probing_state != TIPC_CONN_PROBING)
522 sock_put(sk);
523 tipc_sk_remove(tsk);
504 if (tsk->connected) { 524 if (tsk->connected) {
505 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 525 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
506 SHORT_H_SIZE, 0, dnode, tipc_own_addr, 526 TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
507 tsk_peer_port(tsk), 527 tsk_own_node(tsk), tsk_peer_port(tsk),
508 tsk->ref, TIPC_ERR_NO_PORT); 528 tsk->portid, TIPC_ERR_NO_PORT);
509 if (skb) 529 if (skb)
510 tipc_link_xmit_skb(skb, dnode, tsk->ref); 530 tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
511 tipc_node_remove_conn(dnode, tsk->ref); 531 tipc_node_remove_conn(net, dnode, tsk->portid);
512 } 532 }
513 k_term_timer(&tsk->timer);
514 533
515 /* Discard any remaining (connection-based) messages in receive queue */ 534 /* Discard any remaining (connection-based) messages in receive queue */
516 __skb_queue_purge(&sk->sk_receive_queue); 535 __skb_queue_purge(&sk->sk_receive_queue);
@@ -518,7 +537,8 @@ static int tipc_release(struct socket *sock)
518 /* Reject any messages that accumulated in backlog queue */ 537 /* Reject any messages that accumulated in backlog queue */
519 sock->state = SS_DISCONNECTING; 538 sock->state = SS_DISCONNECTING;
520 release_sock(sk); 539 release_sock(sk);
521 sock_put(sk); 540
541 call_rcu(&tsk->rcu, tipc_sk_callback);
522 sock->sk = NULL; 542 sock->sk = NULL;
523 543
524 return 0; 544 return 0;
@@ -602,6 +622,7 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
602{ 622{
603 struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr; 623 struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
604 struct tipc_sock *tsk = tipc_sk(sock->sk); 624 struct tipc_sock *tsk = tipc_sk(sock->sk);
625 struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
605 626
606 memset(addr, 0, sizeof(*addr)); 627 memset(addr, 0, sizeof(*addr));
607 if (peer) { 628 if (peer) {
@@ -611,8 +632,8 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
611 addr->addr.id.ref = tsk_peer_port(tsk); 632 addr->addr.id.ref = tsk_peer_port(tsk);
612 addr->addr.id.node = tsk_peer_node(tsk); 633 addr->addr.id.node = tsk_peer_node(tsk);
613 } else { 634 } else {
614 addr->addr.id.ref = tsk->ref; 635 addr->addr.id.ref = tsk->portid;
615 addr->addr.id.node = tipc_own_addr; 636 addr->addr.id.node = tn->own_addr;
616 } 637 }
617 638
618 *uaddr_len = sizeof(*addr); 639 *uaddr_len = sizeof(*addr);
@@ -711,8 +732,11 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
711 struct msghdr *msg, size_t dsz, long timeo) 732 struct msghdr *msg, size_t dsz, long timeo)
712{ 733{
713 struct sock *sk = sock->sk; 734 struct sock *sk = sock->sk;
714 struct tipc_msg *mhdr = &tipc_sk(sk)->phdr; 735 struct tipc_sock *tsk = tipc_sk(sk);
715 struct sk_buff_head head; 736 struct net *net = sock_net(sk);
737 struct tipc_msg *mhdr = &tsk->phdr;
738 struct sk_buff_head *pktchain = &sk->sk_write_queue;
739 struct iov_iter save = msg->msg_iter;
716 uint mtu; 740 uint mtu;
717 int rc; 741 int rc;
718 742
@@ -727,83 +751,97 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
727 751
728new_mtu: 752new_mtu:
729 mtu = tipc_bclink_get_mtu(); 753 mtu = tipc_bclink_get_mtu();
730 __skb_queue_head_init(&head); 754 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain);
731 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &head);
732 if (unlikely(rc < 0)) 755 if (unlikely(rc < 0))
733 return rc; 756 return rc;
734 757
735 do { 758 do {
736 rc = tipc_bclink_xmit(&head); 759 rc = tipc_bclink_xmit(net, pktchain);
737 if (likely(rc >= 0)) { 760 if (likely(rc >= 0)) {
738 rc = dsz; 761 rc = dsz;
739 break; 762 break;
740 } 763 }
741 if (rc == -EMSGSIZE) 764 if (rc == -EMSGSIZE) {
765 msg->msg_iter = save;
742 goto new_mtu; 766 goto new_mtu;
767 }
743 if (rc != -ELINKCONG) 768 if (rc != -ELINKCONG)
744 break; 769 break;
745 tipc_sk(sk)->link_cong = 1; 770 tipc_sk(sk)->link_cong = 1;
746 rc = tipc_wait_for_sndmsg(sock, &timeo); 771 rc = tipc_wait_for_sndmsg(sock, &timeo);
747 if (rc) 772 if (rc)
748 __skb_queue_purge(&head); 773 __skb_queue_purge(pktchain);
749 } while (!rc); 774 } while (!rc);
750 return rc; 775 return rc;
751} 776}
752 777
753/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets 778/**
779 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
780 * @arrvq: queue with arriving messages, to be cloned after destination lookup
781 * @inputq: queue with cloned messages, delivered to socket after dest lookup
782 *
783 * Multi-threaded: parallel calls with reference to same queues may occur
754 */ 784 */
755void tipc_sk_mcast_rcv(struct sk_buff *buf) 785void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
786 struct sk_buff_head *inputq)
756{ 787{
757 struct tipc_msg *msg = buf_msg(buf); 788 struct tipc_msg *msg;
758 struct tipc_port_list dports = {0, NULL, }; 789 struct tipc_plist dports;
759 struct tipc_port_list *item; 790 u32 portid;
760 struct sk_buff *b;
761 uint i, last, dst = 0;
762 u32 scope = TIPC_CLUSTER_SCOPE; 791 u32 scope = TIPC_CLUSTER_SCOPE;
763 792 struct sk_buff_head tmpq;
764 if (in_own_node(msg_orignode(msg))) 793 uint hsz;
765 scope = TIPC_NODE_SCOPE; 794 struct sk_buff *skb, *_skb;
766 795
767 /* Create destination port list: */ 796 __skb_queue_head_init(&tmpq);
768 tipc_nametbl_mc_translate(msg_nametype(msg), 797 tipc_plist_init(&dports);
769 msg_namelower(msg), 798
770 msg_nameupper(msg), 799 skb = tipc_skb_peek(arrvq, &inputq->lock);
771 scope, 800 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
772 &dports); 801 msg = buf_msg(skb);
773 last = dports.count; 802 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
774 if (!last) { 803
775 kfree_skb(buf); 804 if (in_own_node(net, msg_orignode(msg)))
776 return; 805 scope = TIPC_NODE_SCOPE;
777 } 806
778 807 /* Create destination port list and message clones: */
779 for (item = &dports; item; item = item->next) { 808 tipc_nametbl_mc_translate(net,
780 for (i = 0; i < PLSIZE && ++dst <= last; i++) { 809 msg_nametype(msg), msg_namelower(msg),
781 b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf; 810 msg_nameupper(msg), scope, &dports);
782 if (!b) { 811 portid = tipc_plist_pop(&dports);
783 pr_warn("Failed do clone mcast rcv buffer\n"); 812 for (; portid; portid = tipc_plist_pop(&dports)) {
813 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
814 if (_skb) {
815 msg_set_destport(buf_msg(_skb), portid);
816 __skb_queue_tail(&tmpq, _skb);
784 continue; 817 continue;
785 } 818 }
786 msg_set_destport(msg, item->ports[i]); 819 pr_warn("Failed to clone mcast rcv buffer\n");
787 tipc_sk_rcv(b);
788 } 820 }
821 /* Append to inputq if not already done by other thread */
822 spin_lock_bh(&inputq->lock);
823 if (skb_peek(arrvq) == skb) {
824 skb_queue_splice_tail_init(&tmpq, inputq);
825 kfree_skb(__skb_dequeue(arrvq));
826 }
827 spin_unlock_bh(&inputq->lock);
828 __skb_queue_purge(&tmpq);
829 kfree_skb(skb);
789 } 830 }
790 tipc_port_list_free(&dports); 831 tipc_sk_rcv(net, inputq);
791} 832}
792 833
793/** 834/**
794 * tipc_sk_proto_rcv - receive a connection mng protocol message 835 * tipc_sk_proto_rcv - receive a connection mng protocol message
795 * @tsk: receiving socket 836 * @tsk: receiving socket
796 * @dnode: node to send response message to, if any 837 * @skb: pointer to message buffer. Set to NULL if buffer is consumed.
797 * @buf: buffer containing protocol message
798 * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
799 * (CONN_PROBE_REPLY) message should be forwarded.
800 */ 838 */
801static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, 839static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
802 struct sk_buff *buf)
803{ 840{
804 struct tipc_msg *msg = buf_msg(buf); 841 struct tipc_msg *msg = buf_msg(*skb);
805 int conn_cong; 842 int conn_cong;
806 843 u32 dnode;
844 u32 own_node = tsk_own_node(tsk);
807 /* Ignore if connection cannot be validated: */ 845 /* Ignore if connection cannot be validated: */
808 if (!tsk_peer_msg(tsk, msg)) 846 if (!tsk_peer_msg(tsk, msg))
809 goto exit; 847 goto exit;
@@ -816,15 +854,15 @@ static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
816 if (conn_cong) 854 if (conn_cong)
817 tsk->sk.sk_write_space(&tsk->sk); 855 tsk->sk.sk_write_space(&tsk->sk);
818 } else if (msg_type(msg) == CONN_PROBE) { 856 } else if (msg_type(msg) == CONN_PROBE) {
819 if (!tipc_msg_reverse(buf, dnode, TIPC_OK)) 857 if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) {
820 return TIPC_OK; 858 msg_set_type(msg, CONN_PROBE_REPLY);
821 msg_set_type(msg, CONN_PROBE_REPLY); 859 return;
822 return TIPC_FWD_MSG; 860 }
823 } 861 }
824 /* Do nothing if msg_type() == CONN_PROBE_REPLY */ 862 /* Do nothing if msg_type() == CONN_PROBE_REPLY */
825exit: 863exit:
826 kfree_skb(buf); 864 kfree_skb(*skb);
827 return TIPC_OK; 865 *skb = NULL;
828} 866}
829 867
830static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) 868static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
@@ -872,11 +910,13 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
872 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 910 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
873 struct sock *sk = sock->sk; 911 struct sock *sk = sock->sk;
874 struct tipc_sock *tsk = tipc_sk(sk); 912 struct tipc_sock *tsk = tipc_sk(sk);
913 struct net *net = sock_net(sk);
875 struct tipc_msg *mhdr = &tsk->phdr; 914 struct tipc_msg *mhdr = &tsk->phdr;
876 u32 dnode, dport; 915 u32 dnode, dport;
877 struct sk_buff_head head; 916 struct sk_buff_head *pktchain = &sk->sk_write_queue;
878 struct sk_buff *skb; 917 struct sk_buff *skb;
879 struct tipc_name_seq *seq = &dest->addr.nameseq; 918 struct tipc_name_seq *seq = &dest->addr.nameseq;
919 struct iov_iter save;
880 u32 mtu; 920 u32 mtu;
881 long timeo; 921 long timeo;
882 int rc; 922 int rc;
@@ -929,7 +969,7 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
929 msg_set_nametype(mhdr, type); 969 msg_set_nametype(mhdr, type);
930 msg_set_nameinst(mhdr, inst); 970 msg_set_nameinst(mhdr, inst);
931 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); 971 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
932 dport = tipc_nametbl_translate(type, inst, &dnode); 972 dport = tipc_nametbl_translate(net, type, inst, &dnode);
933 msg_set_destnode(mhdr, dnode); 973 msg_set_destnode(mhdr, dnode);
934 msg_set_destport(mhdr, dport); 974 msg_set_destport(mhdr, dport);
935 if (unlikely(!dport && !dnode)) { 975 if (unlikely(!dport && !dnode)) {
@@ -945,31 +985,33 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
945 msg_set_hdr_sz(mhdr, BASIC_H_SIZE); 985 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
946 } 986 }
947 987
988 save = m->msg_iter;
948new_mtu: 989new_mtu:
949 mtu = tipc_node_get_mtu(dnode, tsk->ref); 990 mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
950 __skb_queue_head_init(&head); 991 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, pktchain);
951 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head);
952 if (rc < 0) 992 if (rc < 0)
953 goto exit; 993 goto exit;
954 994
955 do { 995 do {
956 skb = skb_peek(&head); 996 skb = skb_peek(pktchain);
957 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; 997 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
958 rc = tipc_link_xmit(&head, dnode, tsk->ref); 998 rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
959 if (likely(rc >= 0)) { 999 if (likely(rc >= 0)) {
960 if (sock->state != SS_READY) 1000 if (sock->state != SS_READY)
961 sock->state = SS_CONNECTING; 1001 sock->state = SS_CONNECTING;
962 rc = dsz; 1002 rc = dsz;
963 break; 1003 break;
964 } 1004 }
965 if (rc == -EMSGSIZE) 1005 if (rc == -EMSGSIZE) {
1006 m->msg_iter = save;
966 goto new_mtu; 1007 goto new_mtu;
1008 }
967 if (rc != -ELINKCONG) 1009 if (rc != -ELINKCONG)
968 break; 1010 break;
969 tsk->link_cong = 1; 1011 tsk->link_cong = 1;
970 rc = tipc_wait_for_sndmsg(sock, &timeo); 1012 rc = tipc_wait_for_sndmsg(sock, &timeo);
971 if (rc) 1013 if (rc)
972 __skb_queue_purge(&head); 1014 __skb_queue_purge(pktchain);
973 } while (!rc); 1015 } while (!rc);
974exit: 1016exit:
975 if (iocb) 1017 if (iocb)
@@ -1024,15 +1066,17 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1024 struct msghdr *m, size_t dsz) 1066 struct msghdr *m, size_t dsz)
1025{ 1067{
1026 struct sock *sk = sock->sk; 1068 struct sock *sk = sock->sk;
1069 struct net *net = sock_net(sk);
1027 struct tipc_sock *tsk = tipc_sk(sk); 1070 struct tipc_sock *tsk = tipc_sk(sk);
1028 struct tipc_msg *mhdr = &tsk->phdr; 1071 struct tipc_msg *mhdr = &tsk->phdr;
1029 struct sk_buff_head head; 1072 struct sk_buff_head *pktchain = &sk->sk_write_queue;
1030 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 1073 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1031 u32 ref = tsk->ref; 1074 u32 portid = tsk->portid;
1032 int rc = -EINVAL; 1075 int rc = -EINVAL;
1033 long timeo; 1076 long timeo;
1034 u32 dnode; 1077 u32 dnode;
1035 uint mtu, send, sent = 0; 1078 uint mtu, send, sent = 0;
1079 struct iov_iter save;
1036 1080
1037 /* Handle implied connection establishment */ 1081 /* Handle implied connection establishment */
1038 if (unlikely(dest)) { 1082 if (unlikely(dest)) {
@@ -1059,15 +1103,15 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1059 dnode = tsk_peer_node(tsk); 1103 dnode = tsk_peer_node(tsk);
1060 1104
1061next: 1105next:
1106 save = m->msg_iter;
1062 mtu = tsk->max_pkt; 1107 mtu = tsk->max_pkt;
1063 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); 1108 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1064 __skb_queue_head_init(&head); 1109 rc = tipc_msg_build(mhdr, m, sent, send, mtu, pktchain);
1065 rc = tipc_msg_build(mhdr, m, sent, send, mtu, &head);
1066 if (unlikely(rc < 0)) 1110 if (unlikely(rc < 0))
1067 goto exit; 1111 goto exit;
1068 do { 1112 do {
1069 if (likely(!tsk_conn_cong(tsk))) { 1113 if (likely(!tsk_conn_cong(tsk))) {
1070 rc = tipc_link_xmit(&head, dnode, ref); 1114 rc = tipc_link_xmit(net, pktchain, dnode, portid);
1071 if (likely(!rc)) { 1115 if (likely(!rc)) {
1072 tsk->sent_unacked++; 1116 tsk->sent_unacked++;
1073 sent += send; 1117 sent += send;
@@ -1076,7 +1120,9 @@ next:
1076 goto next; 1120 goto next;
1077 } 1121 }
1078 if (rc == -EMSGSIZE) { 1122 if (rc == -EMSGSIZE) {
1079 tsk->max_pkt = tipc_node_get_mtu(dnode, ref); 1123 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1124 portid);
1125 m->msg_iter = save;
1080 goto next; 1126 goto next;
1081 } 1127 }
1082 if (rc != -ELINKCONG) 1128 if (rc != -ELINKCONG)
@@ -1085,7 +1131,7 @@ next:
1085 } 1131 }
1086 rc = tipc_wait_for_sndpkt(sock, &timeo); 1132 rc = tipc_wait_for_sndpkt(sock, &timeo);
1087 if (rc) 1133 if (rc)
1088 __skb_queue_purge(&head); 1134 __skb_queue_purge(pktchain);
1089 } while (!rc); 1135 } while (!rc);
1090exit: 1136exit:
1091 if (iocb) 1137 if (iocb)
@@ -1118,6 +1164,8 @@ static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
1118static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port, 1164static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1119 u32 peer_node) 1165 u32 peer_node)
1120{ 1166{
1167 struct sock *sk = &tsk->sk;
1168 struct net *net = sock_net(sk);
1121 struct tipc_msg *msg = &tsk->phdr; 1169 struct tipc_msg *msg = &tsk->phdr;
1122 1170
1123 msg_set_destnode(msg, peer_node); 1171 msg_set_destnode(msg, peer_node);
@@ -1126,12 +1174,12 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1126 msg_set_lookup_scope(msg, 0); 1174 msg_set_lookup_scope(msg, 0);
1127 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1175 msg_set_hdr_sz(msg, SHORT_H_SIZE);
1128 1176
1129 tsk->probing_interval = CONN_PROBING_INTERVAL; 1177 tsk->probing_intv = CONN_PROBING_INTERVAL;
1130 tsk->probing_state = TIPC_CONN_OK; 1178 tsk->probing_state = TIPC_CONN_OK;
1131 tsk->connected = 1; 1179 tsk->connected = 1;
1132 k_start_timer(&tsk->timer, tsk->probing_interval); 1180 sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
1133 tipc_node_add_conn(peer_node, tsk->ref, peer_port); 1181 tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1134 tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->ref); 1182 tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1135} 1183}
1136 1184
1137/** 1185/**
@@ -1230,6 +1278,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1230 1278
1231static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) 1279static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1232{ 1280{
1281 struct net *net = sock_net(&tsk->sk);
1233 struct sk_buff *skb = NULL; 1282 struct sk_buff *skb = NULL;
1234 struct tipc_msg *msg; 1283 struct tipc_msg *msg;
1235 u32 peer_port = tsk_peer_port(tsk); 1284 u32 peer_port = tsk_peer_port(tsk);
@@ -1237,13 +1286,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1237 1286
1238 if (!tsk->connected) 1287 if (!tsk->connected)
1239 return; 1288 return;
1240 skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode, 1289 skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1241 tipc_own_addr, peer_port, tsk->ref, TIPC_OK); 1290 dnode, tsk_own_node(tsk), peer_port,
1291 tsk->portid, TIPC_OK);
1242 if (!skb) 1292 if (!skb)
1243 return; 1293 return;
1244 msg = buf_msg(skb); 1294 msg = buf_msg(skb);
1245 msg_set_msgcnt(msg, ack); 1295 msg_set_msgcnt(msg, ack);
1246 tipc_link_xmit_skb(skb, dnode, msg_link_selector(msg)); 1296 tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1247} 1297}
1248 1298
1249static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) 1299static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1529,15 +1579,16 @@ static void tipc_data_ready(struct sock *sk)
1529/** 1579/**
1530 * filter_connect - Handle all incoming messages for a connection-based socket 1580 * filter_connect - Handle all incoming messages for a connection-based socket
1531 * @tsk: TIPC socket 1581 * @tsk: TIPC socket
1532 * @msg: message 1582 * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1533 * 1583 *
1534 * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise 1584 * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
1535 */ 1585 */
1536static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) 1586static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)
1537{ 1587{
1538 struct sock *sk = &tsk->sk; 1588 struct sock *sk = &tsk->sk;
1589 struct net *net = sock_net(sk);
1539 struct socket *sock = sk->sk_socket; 1590 struct socket *sock = sk->sk_socket;
1540 struct tipc_msg *msg = buf_msg(*buf); 1591 struct tipc_msg *msg = buf_msg(*skb);
1541 int retval = -TIPC_ERR_NO_PORT; 1592 int retval = -TIPC_ERR_NO_PORT;
1542 1593
1543 if (msg_mcast(msg)) 1594 if (msg_mcast(msg))
@@ -1551,8 +1602,8 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1551 sock->state = SS_DISCONNECTING; 1602 sock->state = SS_DISCONNECTING;
1552 tsk->connected = 0; 1603 tsk->connected = 0;
1553 /* let timer expire on it's own */ 1604 /* let timer expire on it's own */
1554 tipc_node_remove_conn(tsk_peer_node(tsk), 1605 tipc_node_remove_conn(net, tsk_peer_node(tsk),
1555 tsk->ref); 1606 tsk->portid);
1556 } 1607 }
1557 retval = TIPC_OK; 1608 retval = TIPC_OK;
1558 } 1609 }
@@ -1587,8 +1638,8 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1587 * connect() routine if sleeping. 1638 * connect() routine if sleeping.
1588 */ 1639 */
1589 if (msg_data_sz(msg) == 0) { 1640 if (msg_data_sz(msg) == 0) {
1590 kfree_skb(*buf); 1641 kfree_skb(*skb);
1591 *buf = NULL; 1642 *skb = NULL;
1592 if (waitqueue_active(sk_sleep(sk))) 1643 if (waitqueue_active(sk_sleep(sk)))
1593 wake_up_interruptible(sk_sleep(sk)); 1644 wake_up_interruptible(sk_sleep(sk));
1594 } 1645 }
@@ -1640,32 +1691,33 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1640/** 1691/**
1641 * filter_rcv - validate incoming message 1692 * filter_rcv - validate incoming message
1642 * @sk: socket 1693 * @sk: socket
1643 * @buf: message 1694 * @skb: pointer to message. Set to NULL if buffer is consumed.
1644 * 1695 *
1645 * Enqueues message on receive queue if acceptable; optionally handles 1696 * Enqueues message on receive queue if acceptable; optionally handles
1646 * disconnect indication for a connected socket. 1697 * disconnect indication for a connected socket.
1647 * 1698 *
1648 * Called with socket lock already taken; port lock may also be taken. 1699 * Called with socket lock already taken
1649 * 1700 *
1650 * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message 1701 * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected
1651 * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
1652 */ 1702 */
1653static int filter_rcv(struct sock *sk, struct sk_buff *buf) 1703static int filter_rcv(struct sock *sk, struct sk_buff **skb)
1654{ 1704{
1655 struct socket *sock = sk->sk_socket; 1705 struct socket *sock = sk->sk_socket;
1656 struct tipc_sock *tsk = tipc_sk(sk); 1706 struct tipc_sock *tsk = tipc_sk(sk);
1657 struct tipc_msg *msg = buf_msg(buf); 1707 struct tipc_msg *msg = buf_msg(*skb);
1658 unsigned int limit = rcvbuf_limit(sk, buf); 1708 unsigned int limit = rcvbuf_limit(sk, *skb);
1659 u32 onode;
1660 int rc = TIPC_OK; 1709 int rc = TIPC_OK;
1661 1710
1662 if (unlikely(msg_user(msg) == CONN_MANAGER)) 1711 if (unlikely(msg_user(msg) == CONN_MANAGER)) {
1663 return tipc_sk_proto_rcv(tsk, &onode, buf); 1712 tipc_sk_proto_rcv(tsk, skb);
1713 return TIPC_OK;
1714 }
1664 1715
1665 if (unlikely(msg_user(msg) == SOCK_WAKEUP)) { 1716 if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
1666 kfree_skb(buf); 1717 kfree_skb(*skb);
1667 tsk->link_cong = 0; 1718 tsk->link_cong = 0;
1668 sk->sk_write_space(sk); 1719 sk->sk_write_space(sk);
1720 *skb = NULL;
1669 return TIPC_OK; 1721 return TIPC_OK;
1670 } 1722 }
1671 1723
@@ -1677,21 +1729,22 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
1677 if (msg_connected(msg)) 1729 if (msg_connected(msg))
1678 return -TIPC_ERR_NO_PORT; 1730 return -TIPC_ERR_NO_PORT;
1679 } else { 1731 } else {
1680 rc = filter_connect(tsk, &buf); 1732 rc = filter_connect(tsk, skb);
1681 if (rc != TIPC_OK || buf == NULL) 1733 if (rc != TIPC_OK || !*skb)
1682 return rc; 1734 return rc;
1683 } 1735 }
1684 1736
1685 /* Reject message if there isn't room to queue it */ 1737 /* Reject message if there isn't room to queue it */
1686 if (sk_rmem_alloc_get(sk) + buf->truesize >= limit) 1738 if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)
1687 return -TIPC_ERR_OVERLOAD; 1739 return -TIPC_ERR_OVERLOAD;
1688 1740
1689 /* Enqueue message */ 1741 /* Enqueue message */
1690 TIPC_SKB_CB(buf)->handle = NULL; 1742 TIPC_SKB_CB(*skb)->handle = NULL;
1691 __skb_queue_tail(&sk->sk_receive_queue, buf); 1743 __skb_queue_tail(&sk->sk_receive_queue, *skb);
1692 skb_set_owner_r(buf, sk); 1744 skb_set_owner_r(*skb, sk);
1693 1745
1694 sk->sk_data_ready(sk); 1746 sk->sk_data_ready(sk);
1747 *skb = NULL;
1695 return TIPC_OK; 1748 return TIPC_OK;
1696} 1749}
1697 1750
@@ -1700,78 +1753,125 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
1700 * @sk: socket 1753 * @sk: socket
1701 * @skb: message 1754 * @skb: message
1702 * 1755 *
1703 * Caller must hold socket lock, but not port lock. 1756 * Caller must hold socket lock
1704 * 1757 *
1705 * Returns 0 1758 * Returns 0
1706 */ 1759 */
1707static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) 1760static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1708{ 1761{
1709 int rc; 1762 int err;
1710 u32 onode; 1763 atomic_t *dcnt;
1764 u32 dnode;
1711 struct tipc_sock *tsk = tipc_sk(sk); 1765 struct tipc_sock *tsk = tipc_sk(sk);
1766 struct net *net = sock_net(sk);
1712 uint truesize = skb->truesize; 1767 uint truesize = skb->truesize;
1713 1768
1714 rc = filter_rcv(sk, skb); 1769 err = filter_rcv(sk, &skb);
1715 1770 if (likely(!skb)) {
1716 if (likely(!rc)) { 1771 dcnt = &tsk->dupl_rcvcnt;
1717 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) 1772 if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1718 atomic_add(truesize, &tsk->dupl_rcvcnt); 1773 atomic_add(truesize, dcnt);
1719 return 0; 1774 return 0;
1720 } 1775 }
1776 if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
1777 tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
1778 return 0;
1779}
1721 1780
1722 if ((rc < 0) && !tipc_msg_reverse(skb, &onode, -rc)) 1781/**
1723 return 0; 1782 * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1724 1783 * inputq and try adding them to socket or backlog queue
1725 tipc_link_xmit_skb(skb, onode, 0); 1784 * @inputq: list of incoming buffers with potentially different destinations
1785 * @sk: socket where the buffers should be enqueued
1786 * @dport: port number for the socket
1787 * @_skb: returned buffer to be forwarded or rejected, if applicable
1788 *
1789 * Caller must hold socket lock
1790 *
1791 * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
1792 * or -TIPC_ERR_NO_PORT
1793 */
1794static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1795 u32 dport, struct sk_buff **_skb)
1796{
1797 unsigned int lim;
1798 atomic_t *dcnt;
1799 int err;
1800 struct sk_buff *skb;
1801 unsigned long time_limit = jiffies + 2;
1726 1802
1727 return 0; 1803 while (skb_queue_len(inputq)) {
1804 if (unlikely(time_after_eq(jiffies, time_limit)))
1805 return TIPC_OK;
1806 skb = tipc_skb_dequeue(inputq, dport);
1807 if (unlikely(!skb))
1808 return TIPC_OK;
1809 if (!sock_owned_by_user(sk)) {
1810 err = filter_rcv(sk, &skb);
1811 if (likely(!skb))
1812 continue;
1813 *_skb = skb;
1814 return err;
1815 }
1816 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1817 if (sk->sk_backlog.len)
1818 atomic_set(dcnt, 0);
1819 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1820 if (likely(!sk_add_backlog(sk, skb, lim)))
1821 continue;
1822 *_skb = skb;
1823 return -TIPC_ERR_OVERLOAD;
1824 }
1825 return TIPC_OK;
1728} 1826}
1729 1827
1730/** 1828/**
1731 * tipc_sk_rcv - handle incoming message 1829 * tipc_sk_rcv - handle a chain of incoming buffers
1732 * @skb: buffer containing arriving message 1830 * @inputq: buffer list containing the buffers
1733 * Consumes buffer 1831 * Consumes all buffers in list until inputq is empty
1734 * Returns 0 if success, or errno: -EHOSTUNREACH 1832 * Note: may be called in multiple threads referring to the same queue
1833 * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
1834 * Only node local calls check the return value, sending single-buffer queues
1735 */ 1835 */
1736int tipc_sk_rcv(struct sk_buff *skb) 1836int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1737{ 1837{
1838 u32 dnode, dport = 0;
1839 int err = -TIPC_ERR_NO_PORT;
1840 struct sk_buff *skb;
1738 struct tipc_sock *tsk; 1841 struct tipc_sock *tsk;
1842 struct tipc_net *tn;
1739 struct sock *sk; 1843 struct sock *sk;
1740 u32 dport = msg_destport(buf_msg(skb));
1741 int rc = TIPC_OK;
1742 uint limit;
1743 u32 dnode;
1744 1844
1745 /* Validate destination and message */ 1845 while (skb_queue_len(inputq)) {
1746 tsk = tipc_sk_get(dport); 1846 skb = NULL;
1747 if (unlikely(!tsk)) { 1847 dport = tipc_skb_peek_port(inputq, dport);
1748 rc = tipc_msg_eval(skb, &dnode); 1848 tsk = tipc_sk_lookup(net, dport);
1749 goto exit; 1849 if (likely(tsk)) {
1850 sk = &tsk->sk;
1851 if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1852 err = tipc_sk_enqueue(inputq, sk, dport, &skb);
1853 spin_unlock_bh(&sk->sk_lock.slock);
1854 dport = 0;
1855 }
1856 sock_put(sk);
1857 } else {
1858 skb = tipc_skb_dequeue(inputq, dport);
1859 }
1860 if (likely(!skb))
1861 continue;
1862 if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
1863 goto xmit;
1864 if (!err) {
1865 dnode = msg_destnode(buf_msg(skb));
1866 goto xmit;
1867 }
1868 tn = net_generic(net, tipc_net_id);
1869 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
1870 continue;
1871xmit:
1872 tipc_link_xmit_skb(net, skb, dnode, dport);
1750 } 1873 }
1751 sk = &tsk->sk; 1874 return err ? -EHOSTUNREACH : 0;
1752
1753 /* Queue message */
1754 spin_lock_bh(&sk->sk_lock.slock);
1755
1756 if (!sock_owned_by_user(sk)) {
1757 rc = filter_rcv(sk, skb);
1758 } else {
1759 if (sk->sk_backlog.len == 0)
1760 atomic_set(&tsk->dupl_rcvcnt, 0);
1761 limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
1762 if (sk_add_backlog(sk, skb, limit))
1763 rc = -TIPC_ERR_OVERLOAD;
1764 }
1765 spin_unlock_bh(&sk->sk_lock.slock);
1766 tipc_sk_put(tsk);
1767 if (likely(!rc))
1768 return 0;
1769exit:
1770 if ((rc < 0) && !tipc_msg_reverse(skb, &dnode, -rc))
1771 return -EHOSTUNREACH;
1772
1773 tipc_link_xmit_skb(skb, dnode, 0);
1774 return (rc < 0) ? -EHOSTUNREACH : 0;
1775} 1875}
1776 1876
1777static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) 1877static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
@@ -2027,6 +2127,7 @@ exit:
2027static int tipc_shutdown(struct socket *sock, int how) 2127static int tipc_shutdown(struct socket *sock, int how)
2028{ 2128{
2029 struct sock *sk = sock->sk; 2129 struct sock *sk = sock->sk;
2130 struct net *net = sock_net(sk);
2030 struct tipc_sock *tsk = tipc_sk(sk); 2131 struct tipc_sock *tsk = tipc_sk(sk);
2031 struct sk_buff *skb; 2132 struct sk_buff *skb;
2032 u32 dnode; 2133 u32 dnode;
@@ -2049,21 +2150,24 @@ restart:
2049 kfree_skb(skb); 2150 kfree_skb(skb);
2050 goto restart; 2151 goto restart;
2051 } 2152 }
2052 if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN)) 2153 if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
2053 tipc_link_xmit_skb(skb, dnode, tsk->ref); 2154 TIPC_CONN_SHUTDOWN))
2054 tipc_node_remove_conn(dnode, tsk->ref); 2155 tipc_link_xmit_skb(net, skb, dnode,
2156 tsk->portid);
2157 tipc_node_remove_conn(net, dnode, tsk->portid);
2055 } else { 2158 } else {
2056 dnode = tsk_peer_node(tsk); 2159 dnode = tsk_peer_node(tsk);
2160
2057 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, 2161 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2058 TIPC_CONN_MSG, SHORT_H_SIZE, 2162 TIPC_CONN_MSG, SHORT_H_SIZE,
2059 0, dnode, tipc_own_addr, 2163 0, dnode, tsk_own_node(tsk),
2060 tsk_peer_port(tsk), 2164 tsk_peer_port(tsk),
2061 tsk->ref, TIPC_CONN_SHUTDOWN); 2165 tsk->portid, TIPC_CONN_SHUTDOWN);
2062 tipc_link_xmit_skb(skb, dnode, tsk->ref); 2166 tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
2063 } 2167 }
2064 tsk->connected = 0; 2168 tsk->connected = 0;
2065 sock->state = SS_DISCONNECTING; 2169 sock->state = SS_DISCONNECTING;
2066 tipc_node_remove_conn(dnode, tsk->ref); 2170 tipc_node_remove_conn(net, dnode, tsk->portid);
2067 /* fall through */ 2171 /* fall through */
2068 2172
2069 case SS_DISCONNECTING: 2173 case SS_DISCONNECTING:
@@ -2084,18 +2188,14 @@ restart:
2084 return res; 2188 return res;
2085} 2189}
2086 2190
2087static void tipc_sk_timeout(unsigned long ref) 2191static void tipc_sk_timeout(unsigned long data)
2088{ 2192{
2089 struct tipc_sock *tsk; 2193 struct tipc_sock *tsk = (struct tipc_sock *)data;
2090 struct sock *sk; 2194 struct sock *sk = &tsk->sk;
2091 struct sk_buff *skb = NULL; 2195 struct sk_buff *skb = NULL;
2092 u32 peer_port, peer_node; 2196 u32 peer_port, peer_node;
2197 u32 own_node = tsk_own_node(tsk);
2093 2198
2094 tsk = tipc_sk_get(ref);
2095 if (!tsk)
2096 return;
2097
2098 sk = &tsk->sk;
2099 bh_lock_sock(sk); 2199 bh_lock_sock(sk);
2100 if (!tsk->connected) { 2200 if (!tsk->connected) {
2101 bh_unlock_sock(sk); 2201 bh_unlock_sock(sk);
@@ -2106,38 +2206,39 @@ static void tipc_sk_timeout(unsigned long ref)
2106 2206
2107 if (tsk->probing_state == TIPC_CONN_PROBING) { 2207 if (tsk->probing_state == TIPC_CONN_PROBING) {
2108 /* Previous probe not answered -> self abort */ 2208 /* Previous probe not answered -> self abort */
2109 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 2209 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2110 SHORT_H_SIZE, 0, tipc_own_addr, 2210 TIPC_CONN_MSG, SHORT_H_SIZE, 0,
2111 peer_node, ref, peer_port, 2211 own_node, peer_node, tsk->portid,
2112 TIPC_ERR_NO_PORT); 2212 peer_port, TIPC_ERR_NO_PORT);
2113 } else { 2213 } else {
2114 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 2214 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
2115 0, peer_node, tipc_own_addr, 2215 INT_H_SIZE, 0, peer_node, own_node,
2116 peer_port, ref, TIPC_OK); 2216 peer_port, tsk->portid, TIPC_OK);
2117 tsk->probing_state = TIPC_CONN_PROBING; 2217 tsk->probing_state = TIPC_CONN_PROBING;
2118 k_start_timer(&tsk->timer, tsk->probing_interval); 2218 sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
2119 } 2219 }
2120 bh_unlock_sock(sk); 2220 bh_unlock_sock(sk);
2121 if (skb) 2221 if (skb)
2122 tipc_link_xmit_skb(skb, peer_node, ref); 2222 tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2123exit: 2223exit:
2124 tipc_sk_put(tsk); 2224 sock_put(sk);
2125} 2225}
2126 2226
2127static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, 2227static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2128 struct tipc_name_seq const *seq) 2228 struct tipc_name_seq const *seq)
2129{ 2229{
2230 struct net *net = sock_net(&tsk->sk);
2130 struct publication *publ; 2231 struct publication *publ;
2131 u32 key; 2232 u32 key;
2132 2233
2133 if (tsk->connected) 2234 if (tsk->connected)
2134 return -EINVAL; 2235 return -EINVAL;
2135 key = tsk->ref + tsk->pub_count + 1; 2236 key = tsk->portid + tsk->pub_count + 1;
2136 if (key == tsk->ref) 2237 if (key == tsk->portid)
2137 return -EADDRINUSE; 2238 return -EADDRINUSE;
2138 2239
2139 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 2240 publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2140 scope, tsk->ref, key); 2241 scope, tsk->portid, key);
2141 if (unlikely(!publ)) 2242 if (unlikely(!publ))
2142 return -EINVAL; 2243 return -EINVAL;
2143 2244
@@ -2150,6 +2251,7 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2150static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, 2251static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2151 struct tipc_name_seq const *seq) 2252 struct tipc_name_seq const *seq)
2152{ 2253{
2254 struct net *net = sock_net(&tsk->sk);
2153 struct publication *publ; 2255 struct publication *publ;
2154 struct publication *safe; 2256 struct publication *safe;
2155 int rc = -EINVAL; 2257 int rc = -EINVAL;
@@ -2164,12 +2266,12 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2164 continue; 2266 continue;
2165 if (publ->upper != seq->upper) 2267 if (publ->upper != seq->upper)
2166 break; 2268 break;
2167 tipc_nametbl_withdraw(publ->type, publ->lower, 2269 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2168 publ->ref, publ->key); 2270 publ->ref, publ->key);
2169 rc = 0; 2271 rc = 0;
2170 break; 2272 break;
2171 } 2273 }
2172 tipc_nametbl_withdraw(publ->type, publ->lower, 2274 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2173 publ->ref, publ->key); 2275 publ->ref, publ->key);
2174 rc = 0; 2276 rc = 0;
2175 } 2277 }
@@ -2178,336 +2280,103 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2178 return rc; 2280 return rc;
2179} 2281}
2180 2282
2181static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
2182 int len, int full_id)
2183{
2184 struct publication *publ;
2185 int ret;
2186
2187 if (full_id)
2188 ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
2189 tipc_zone(tipc_own_addr),
2190 tipc_cluster(tipc_own_addr),
2191 tipc_node(tipc_own_addr), tsk->ref);
2192 else
2193 ret = tipc_snprintf(buf, len, "%-10u:", tsk->ref);
2194
2195 if (tsk->connected) {
2196 u32 dport = tsk_peer_port(tsk);
2197 u32 destnode = tsk_peer_node(tsk);
2198
2199 ret += tipc_snprintf(buf + ret, len - ret,
2200 " connected to <%u.%u.%u:%u>",
2201 tipc_zone(destnode),
2202 tipc_cluster(destnode),
2203 tipc_node(destnode), dport);
2204 if (tsk->conn_type != 0)
2205 ret += tipc_snprintf(buf + ret, len - ret,
2206 " via {%u,%u}", tsk->conn_type,
2207 tsk->conn_instance);
2208 } else if (tsk->published) {
2209 ret += tipc_snprintf(buf + ret, len - ret, " bound to");
2210 list_for_each_entry(publ, &tsk->publications, pport_list) {
2211 if (publ->lower == publ->upper)
2212 ret += tipc_snprintf(buf + ret, len - ret,
2213 " {%u,%u}", publ->type,
2214 publ->lower);
2215 else
2216 ret += tipc_snprintf(buf + ret, len - ret,
2217 " {%u,%u,%u}", publ->type,
2218 publ->lower, publ->upper);
2219 }
2220 }
2221 ret += tipc_snprintf(buf + ret, len - ret, "\n");
2222 return ret;
2223}
2224
2225struct sk_buff *tipc_sk_socks_show(void)
2226{
2227 struct sk_buff *buf;
2228 struct tlv_desc *rep_tlv;
2229 char *pb;
2230 int pb_len;
2231 struct tipc_sock *tsk;
2232 int str_len = 0;
2233 u32 ref = 0;
2234
2235 buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2236 if (!buf)
2237 return NULL;
2238 rep_tlv = (struct tlv_desc *)buf->data;
2239 pb = TLV_DATA(rep_tlv);
2240 pb_len = ULTRA_STRING_MAX_LEN;
2241
2242 tsk = tipc_sk_get_next(&ref);
2243 for (; tsk; tsk = tipc_sk_get_next(&ref)) {
2244 lock_sock(&tsk->sk);
2245 str_len += tipc_sk_show(tsk, pb + str_len,
2246 pb_len - str_len, 0);
2247 release_sock(&tsk->sk);
2248 tipc_sk_put(tsk);
2249 }
2250 str_len += 1; /* for "\0" */
2251 skb_put(buf, TLV_SPACE(str_len));
2252 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2253
2254 return buf;
2255}
2256
2257/* tipc_sk_reinit: set non-zero address in all existing sockets 2283/* tipc_sk_reinit: set non-zero address in all existing sockets
2258 * when we go from standalone to network mode. 2284 * when we go from standalone to network mode.
2259 */ 2285 */
2260void tipc_sk_reinit(void) 2286void tipc_sk_reinit(struct net *net)
2261{ 2287{
2288 struct tipc_net *tn = net_generic(net, tipc_net_id);
2289 const struct bucket_table *tbl;
2290 struct rhash_head *pos;
2291 struct tipc_sock *tsk;
2262 struct tipc_msg *msg; 2292 struct tipc_msg *msg;
2263 u32 ref = 0; 2293 int i;
2264 struct tipc_sock *tsk = tipc_sk_get_next(&ref);
2265 2294
2266 for (; tsk; tsk = tipc_sk_get_next(&ref)) { 2295 rcu_read_lock();
2267 lock_sock(&tsk->sk); 2296 tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2268 msg = &tsk->phdr; 2297 for (i = 0; i < tbl->size; i++) {
2269 msg_set_prevnode(msg, tipc_own_addr); 2298 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2270 msg_set_orignode(msg, tipc_own_addr); 2299 spin_lock_bh(&tsk->sk.sk_lock.slock);
2271 release_sock(&tsk->sk); 2300 msg = &tsk->phdr;
2272 tipc_sk_put(tsk); 2301 msg_set_prevnode(msg, tn->own_addr);
2302 msg_set_orignode(msg, tn->own_addr);
2303 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2304 }
2273 } 2305 }
2306 rcu_read_unlock();
2274} 2307}
2275 2308
2276/** 2309static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2277 * struct reference - TIPC socket reference entry
2278 * @tsk: pointer to socket associated with reference entry
2279 * @ref: reference value for socket (combines instance & array index info)
2280 */
2281struct reference {
2282 struct tipc_sock *tsk;
2283 u32 ref;
2284};
2285
2286/**
2287 * struct tipc_ref_table - table of TIPC socket reference entries
2288 * @entries: pointer to array of reference entries
2289 * @capacity: array index of first unusable entry
2290 * @init_point: array index of first uninitialized entry
2291 * @first_free: array index of first unused socket reference entry
2292 * @last_free: array index of last unused socket reference entry
2293 * @index_mask: bitmask for array index portion of reference values
2294 * @start_mask: initial value for instance value portion of reference values
2295 */
2296struct ref_table {
2297 struct reference *entries;
2298 u32 capacity;
2299 u32 init_point;
2300 u32 first_free;
2301 u32 last_free;
2302 u32 index_mask;
2303 u32 start_mask;
2304};
2305
2306/* Socket reference table consists of 2**N entries.
2307 *
2308 * State Socket ptr Reference
2309 * ----- ---------- ---------
2310 * In use non-NULL XXXX|own index
2311 * (XXXX changes each time entry is acquired)
2312 * Free NULL YYYY|next free index
2313 * (YYYY is one more than last used XXXX)
2314 * Uninitialized NULL 0
2315 *
2316 * Entry 0 is not used; this allows index 0 to denote the end of the free list.
2317 *
2318 * Note that a reference value of 0 does not necessarily indicate that an
2319 * entry is uninitialized, since the last entry in the free list could also
2320 * have a reference value of 0 (although this is unlikely).
2321 */
2322
2323static struct ref_table tipc_ref_table;
2324
2325static DEFINE_RWLOCK(ref_table_lock);
2326
2327/**
2328 * tipc_ref_table_init - create reference table for sockets
2329 */
2330int tipc_sk_ref_table_init(u32 req_sz, u32 start)
2331{ 2310{
2332 struct reference *table; 2311 struct tipc_net *tn = net_generic(net, tipc_net_id);
2333 u32 actual_sz; 2312 struct tipc_sock *tsk;
2334
2335 /* account for unused entry, then round up size to a power of 2 */
2336
2337 req_sz++;
2338 for (actual_sz = 16; actual_sz < req_sz; actual_sz <<= 1) {
2339 /* do nothing */
2340 };
2341
2342 /* allocate table & mark all entries as uninitialized */
2343 table = vzalloc(actual_sz * sizeof(struct reference));
2344 if (table == NULL)
2345 return -ENOMEM;
2346
2347 tipc_ref_table.entries = table;
2348 tipc_ref_table.capacity = req_sz;
2349 tipc_ref_table.init_point = 1;
2350 tipc_ref_table.first_free = 0;
2351 tipc_ref_table.last_free = 0;
2352 tipc_ref_table.index_mask = actual_sz - 1;
2353 tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask;
2354 2313
2355 return 0; 2314 rcu_read_lock();
2356} 2315 tsk = rhashtable_lookup(&tn->sk_rht, &portid);
2316 if (tsk)
2317 sock_hold(&tsk->sk);
2318 rcu_read_unlock();
2357 2319
2358/** 2320 return tsk;
2359 * tipc_ref_table_stop - destroy reference table for sockets
2360 */
2361void tipc_sk_ref_table_stop(void)
2362{
2363 if (!tipc_ref_table.entries)
2364 return;
2365 vfree(tipc_ref_table.entries);
2366 tipc_ref_table.entries = NULL;
2367} 2321}
2368 2322
2369/* tipc_ref_acquire - create reference to a socket 2323static int tipc_sk_insert(struct tipc_sock *tsk)
2370 *
2371 * Register an socket pointer in the reference table.
2372 * Returns a unique reference value that is used from then on to retrieve the
2373 * socket pointer, or to determine if the socket has been deregistered.
2374 */
2375u32 tipc_sk_ref_acquire(struct tipc_sock *tsk)
2376{ 2324{
2377 u32 index; 2325 struct sock *sk = &tsk->sk;
2378 u32 index_mask; 2326 struct net *net = sock_net(sk);
2379 u32 next_plus_upper; 2327 struct tipc_net *tn = net_generic(net, tipc_net_id);
2380 u32 ref = 0; 2328 u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2381 struct reference *entry; 2329 u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2382 2330
2383 if (unlikely(!tsk)) { 2331 while (remaining--) {
2384 pr_err("Attempt to acquire ref. to non-existent obj\n"); 2332 portid++;
2385 return 0; 2333 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2386 } 2334 portid = TIPC_MIN_PORT;
2387 if (unlikely(!tipc_ref_table.entries)) { 2335 tsk->portid = portid;
2388 pr_err("Ref. table not found in acquisition attempt\n"); 2336 sock_hold(&tsk->sk);
2389 return 0; 2337 if (rhashtable_lookup_insert(&tn->sk_rht, &tsk->node))
2390 } 2338 return 0;
2391 2339 sock_put(&tsk->sk);
2392 /* Take a free entry, if available; otherwise initialize a new one */
2393 write_lock_bh(&ref_table_lock);
2394 index = tipc_ref_table.first_free;
2395 entry = &tipc_ref_table.entries[index];
2396
2397 if (likely(index)) {
2398 index = tipc_ref_table.first_free;
2399 entry = &tipc_ref_table.entries[index];
2400 index_mask = tipc_ref_table.index_mask;
2401 next_plus_upper = entry->ref;
2402 tipc_ref_table.first_free = next_plus_upper & index_mask;
2403 ref = (next_plus_upper & ~index_mask) + index;
2404 entry->tsk = tsk;
2405 } else if (tipc_ref_table.init_point < tipc_ref_table.capacity) {
2406 index = tipc_ref_table.init_point++;
2407 entry = &tipc_ref_table.entries[index];
2408 ref = tipc_ref_table.start_mask + index;
2409 } 2340 }
2410 2341
2411 if (ref) { 2342 return -1;
2412 entry->ref = ref;
2413 entry->tsk = tsk;
2414 }
2415 write_unlock_bh(&ref_table_lock);
2416 return ref;
2417} 2343}
2418 2344
2419/* tipc_sk_ref_discard - invalidate reference to an socket 2345static void tipc_sk_remove(struct tipc_sock *tsk)
2420 *
2421 * Disallow future references to an socket and free up the entry for re-use.
2422 */
2423void tipc_sk_ref_discard(u32 ref)
2424{ 2346{
2425 struct reference *entry; 2347 struct sock *sk = &tsk->sk;
2426 u32 index; 2348 struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2427 u32 index_mask;
2428
2429 if (unlikely(!tipc_ref_table.entries)) {
2430 pr_err("Ref. table not found during discard attempt\n");
2431 return;
2432 }
2433
2434 index_mask = tipc_ref_table.index_mask;
2435 index = ref & index_mask;
2436 entry = &tipc_ref_table.entries[index];
2437
2438 write_lock_bh(&ref_table_lock);
2439 2349
2440 if (unlikely(!entry->tsk)) { 2350 if (rhashtable_remove(&tn->sk_rht, &tsk->node)) {
2441 pr_err("Attempt to discard ref. to non-existent socket\n"); 2351 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2442 goto exit; 2352 __sock_put(sk);
2443 } 2353 }
2444 if (unlikely(entry->ref != ref)) {
2445 pr_err("Attempt to discard non-existent reference\n");
2446 goto exit;
2447 }
2448
2449 /* Mark entry as unused; increment instance part of entry's
2450 * reference to invalidate any subsequent references
2451 */
2452
2453 entry->tsk = NULL;
2454 entry->ref = (ref & ~index_mask) + (index_mask + 1);
2455
2456 /* Append entry to free entry list */
2457 if (unlikely(tipc_ref_table.first_free == 0))
2458 tipc_ref_table.first_free = index;
2459 else
2460 tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index;
2461 tipc_ref_table.last_free = index;
2462exit:
2463 write_unlock_bh(&ref_table_lock);
2464} 2354}
2465 2355
2466/* tipc_sk_get - find referenced socket and return pointer to it 2356int tipc_sk_rht_init(struct net *net)
2467 */
2468struct tipc_sock *tipc_sk_get(u32 ref)
2469{ 2357{
2470 struct reference *entry; 2358 struct tipc_net *tn = net_generic(net, tipc_net_id);
2471 struct tipc_sock *tsk; 2359 struct rhashtable_params rht_params = {
2360 .nelem_hint = 192,
2361 .head_offset = offsetof(struct tipc_sock, node),
2362 .key_offset = offsetof(struct tipc_sock, portid),
2363 .key_len = sizeof(u32), /* portid */
2364 .hashfn = jhash,
2365 .max_shift = 20, /* 1M */
2366 .min_shift = 8, /* 256 */
2367 };
2472 2368
2473 if (unlikely(!tipc_ref_table.entries)) 2369 return rhashtable_init(&tn->sk_rht, &rht_params);
2474 return NULL;
2475 read_lock_bh(&ref_table_lock);
2476 entry = &tipc_ref_table.entries[ref & tipc_ref_table.index_mask];
2477 tsk = entry->tsk;
2478 if (likely(tsk && (entry->ref == ref)))
2479 sock_hold(&tsk->sk);
2480 else
2481 tsk = NULL;
2482 read_unlock_bh(&ref_table_lock);
2483 return tsk;
2484} 2370}
2485 2371
2486/* tipc_sk_get_next - lock & return next socket after referenced one 2372void tipc_sk_rht_destroy(struct net *net)
2487*/
2488struct tipc_sock *tipc_sk_get_next(u32 *ref)
2489{ 2373{
2490 struct reference *entry; 2374 struct tipc_net *tn = net_generic(net, tipc_net_id);
2491 struct tipc_sock *tsk = NULL;
2492 uint index = *ref & tipc_ref_table.index_mask;
2493 2375
2494 read_lock_bh(&ref_table_lock); 2376 /* Wait for socket readers to complete */
2495 while (++index < tipc_ref_table.capacity) { 2377 synchronize_net();
2496 entry = &tipc_ref_table.entries[index];
2497 if (!entry->tsk)
2498 continue;
2499 tsk = entry->tsk;
2500 sock_hold(&tsk->sk);
2501 *ref = entry->ref;
2502 break;
2503 }
2504 read_unlock_bh(&ref_table_lock);
2505 return tsk;
2506}
2507 2378
2508static void tipc_sk_put(struct tipc_sock *tsk) 2379 rhashtable_destroy(&tn->sk_rht);
2509{
2510 sock_put(&tsk->sk);
2511} 2380}
2512 2381
2513/** 2382/**
@@ -2639,8 +2508,9 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2639 return put_user(sizeof(value), ol); 2508 return put_user(sizeof(value), ol);
2640} 2509}
2641 2510
2642static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) 2511static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2643{ 2512{
2513 struct sock *sk = sock->sk;
2644 struct tipc_sioc_ln_req lnr; 2514 struct tipc_sioc_ln_req lnr;
2645 void __user *argp = (void __user *)arg; 2515 void __user *argp = (void __user *)arg;
2646 2516
@@ -2648,7 +2518,8 @@ static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
2648 case SIOCGETLINKNAME: 2518 case SIOCGETLINKNAME:
2649 if (copy_from_user(&lnr, argp, sizeof(lnr))) 2519 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2650 return -EFAULT; 2520 return -EFAULT;
2651 if (!tipc_node_get_linkname(lnr.bearer_id & 0xffff, lnr.peer, 2521 if (!tipc_node_get_linkname(sock_net(sk),
2522 lnr.bearer_id & 0xffff, lnr.peer,
2652 lnr.linkname, TIPC_MAX_LINK_NAME)) { 2523 lnr.linkname, TIPC_MAX_LINK_NAME)) {
2653 if (copy_to_user(argp, &lnr, sizeof(lnr))) 2524 if (copy_to_user(argp, &lnr, sizeof(lnr)))
2654 return -EFAULT; 2525 return -EFAULT;
@@ -2820,18 +2691,20 @@ static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2820 int err; 2691 int err;
2821 void *hdr; 2692 void *hdr;
2822 struct nlattr *attrs; 2693 struct nlattr *attrs;
2694 struct net *net = sock_net(skb->sk);
2695 struct tipc_net *tn = net_generic(net, tipc_net_id);
2823 2696
2824 hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq, 2697 hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2825 &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_SOCK_GET); 2698 &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2826 if (!hdr) 2699 if (!hdr)
2827 goto msg_cancel; 2700 goto msg_cancel;
2828 2701
2829 attrs = nla_nest_start(skb, TIPC_NLA_SOCK); 2702 attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2830 if (!attrs) 2703 if (!attrs)
2831 goto genlmsg_cancel; 2704 goto genlmsg_cancel;
2832 if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->ref)) 2705 if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2833 goto attr_msg_cancel; 2706 goto attr_msg_cancel;
2834 if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr)) 2707 if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2835 goto attr_msg_cancel; 2708 goto attr_msg_cancel;
2836 2709
2837 if (tsk->connected) { 2710 if (tsk->connected) {
@@ -2859,22 +2732,37 @@ int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2859{ 2732{
2860 int err; 2733 int err;
2861 struct tipc_sock *tsk; 2734 struct tipc_sock *tsk;
2862 u32 prev_ref = cb->args[0]; 2735 const struct bucket_table *tbl;
2863 u32 ref = prev_ref; 2736 struct rhash_head *pos;
2864 2737 struct net *net = sock_net(skb->sk);
2865 tsk = tipc_sk_get_next(&ref); 2738 struct tipc_net *tn = net_generic(net, tipc_net_id);
2866 for (; tsk; tsk = tipc_sk_get_next(&ref)) { 2739 u32 tbl_id = cb->args[0];
2867 lock_sock(&tsk->sk); 2740 u32 prev_portid = cb->args[1];
2868 err = __tipc_nl_add_sk(skb, cb, tsk);
2869 release_sock(&tsk->sk);
2870 tipc_sk_put(tsk);
2871 if (err)
2872 break;
2873 2741
2874 prev_ref = ref; 2742 rcu_read_lock();
2875 } 2743 tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2744 for (; tbl_id < tbl->size; tbl_id++) {
2745 rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2746 spin_lock_bh(&tsk->sk.sk_lock.slock);
2747 if (prev_portid && prev_portid != tsk->portid) {
2748 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2749 continue;
2750 }
2876 2751
2877 cb->args[0] = prev_ref; 2752 err = __tipc_nl_add_sk(skb, cb, tsk);
2753 if (err) {
2754 prev_portid = tsk->portid;
2755 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2756 goto out;
2757 }
2758 prev_portid = 0;
2759 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2760 }
2761 }
2762out:
2763 rcu_read_unlock();
2764 cb->args[0] = tbl_id;
2765 cb->args[1] = prev_portid;
2878 2766
2879 return skb->len; 2767 return skb->len;
2880} 2768}
@@ -2888,7 +2776,7 @@ static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2888 struct nlattr *attrs; 2776 struct nlattr *attrs;
2889 2777
2890 hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq, 2778 hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2891 &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_PUBL_GET); 2779 &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2892 if (!hdr) 2780 if (!hdr)
2893 goto msg_cancel; 2781 goto msg_cancel;
2894 2782
@@ -2962,12 +2850,13 @@ static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2962int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb) 2850int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2963{ 2851{
2964 int err; 2852 int err;
2965 u32 tsk_ref = cb->args[0]; 2853 u32 tsk_portid = cb->args[0];
2966 u32 last_publ = cb->args[1]; 2854 u32 last_publ = cb->args[1];
2967 u32 done = cb->args[2]; 2855 u32 done = cb->args[2];
2856 struct net *net = sock_net(skb->sk);
2968 struct tipc_sock *tsk; 2857 struct tipc_sock *tsk;
2969 2858
2970 if (!tsk_ref) { 2859 if (!tsk_portid) {
2971 struct nlattr **attrs; 2860 struct nlattr **attrs;
2972 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1]; 2861 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2973 2862
@@ -2984,13 +2873,13 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2984 if (!sock[TIPC_NLA_SOCK_REF]) 2873 if (!sock[TIPC_NLA_SOCK_REF])
2985 return -EINVAL; 2874 return -EINVAL;
2986 2875
2987 tsk_ref = nla_get_u32(sock[TIPC_NLA_SOCK_REF]); 2876 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2988 } 2877 }
2989 2878
2990 if (done) 2879 if (done)
2991 return 0; 2880 return 0;
2992 2881
2993 tsk = tipc_sk_get(tsk_ref); 2882 tsk = tipc_sk_lookup(net, tsk_portid);
2994 if (!tsk) 2883 if (!tsk)
2995 return -EINVAL; 2884 return -EINVAL;
2996 2885
@@ -2999,9 +2888,9 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2999 if (!err) 2888 if (!err)
3000 done = 1; 2889 done = 1;
3001 release_sock(&tsk->sk); 2890 release_sock(&tsk->sk);
3002 tipc_sk_put(tsk); 2891 sock_put(&tsk->sk);
3003 2892
3004 cb->args[0] = tsk_ref; 2893 cb->args[0] = tsk_portid;
3005 cb->args[1] = last_publ; 2894 cb->args[1] = last_publ;
3006 cb->args[2] = done; 2895 cb->args[2] = done;
3007 2896