aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/Makefile3
-rw-r--r--net/tipc/bcast.c18
-rw-r--r--net/tipc/core.h5
-rw-r--r--net/tipc/discover.c6
-rw-r--r--net/tipc/group.c871
-rw-r--r--net/tipc/group.h73
-rw-r--r--net/tipc/link.c34
-rw-r--r--net/tipc/monitor.c17
-rw-r--r--net/tipc/msg.c31
-rw-r--r--net/tipc/msg.h135
-rw-r--r--net/tipc/name_table.c176
-rw-r--r--net/tipc/name_table.h28
-rw-r--r--net/tipc/node.c52
-rw-r--r--net/tipc/node.h5
-rw-r--r--net/tipc/server.c121
-rw-r--r--net/tipc/server.h5
-rw-r--r--net/tipc/socket.c845
-rw-r--r--net/tipc/subscr.c6
-rw-r--r--net/tipc/udp_media.c4
19 files changed, 2084 insertions, 351 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index 31b9f9c52974..37bb0bfbd936 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -1,3 +1,4 @@
1# SPDX-License-Identifier: GPL-2.0
1# 2#
2# Makefile for the Linux TIPC layer 3# Makefile for the Linux TIPC layer
3# 4#
@@ -8,7 +9,7 @@ tipc-y += addr.o bcast.o bearer.o \
8 core.o link.o discover.o msg.o \ 9 core.o link.o discover.o msg.o \
9 name_distr.o subscr.o monitor.o name_table.o net.o \ 10 name_distr.o subscr.o monitor.o name_table.o net.o \
10 netlink.o netlink_compat.o node.o socket.o eth_media.o \ 11 netlink.o netlink_compat.o node.o socket.o eth_media.o \
11 server.o socket.o 12 server.o socket.o group.o
12 13
13tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o 14tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o
14tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o 15tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index a140dd4a84af..329325bd553e 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -258,20 +258,20 @@ static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
258static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts, 258static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
259 struct tipc_nlist *dests, u16 *cong_link_cnt) 259 struct tipc_nlist *dests, u16 *cong_link_cnt)
260{ 260{
261 struct tipc_dest *dst, *tmp;
261 struct sk_buff_head _pkts; 262 struct sk_buff_head _pkts;
262 struct u32_item *n, *tmp; 263 u32 dnode, selector;
263 u32 dst, selector;
264 264
265 selector = msg_link_selector(buf_msg(skb_peek(pkts))); 265 selector = msg_link_selector(buf_msg(skb_peek(pkts)));
266 skb_queue_head_init(&_pkts); 266 skb_queue_head_init(&_pkts);
267 267
268 list_for_each_entry_safe(n, tmp, &dests->list, list) { 268 list_for_each_entry_safe(dst, tmp, &dests->list, list) {
269 dst = n->value; 269 dnode = dst->node;
270 if (!tipc_msg_pskb_copy(dst, pkts, &_pkts)) 270 if (!tipc_msg_pskb_copy(dnode, pkts, &_pkts))
271 return -ENOMEM; 271 return -ENOMEM;
272 272
273 /* Any other return value than -ELINKCONG is ignored */ 273 /* Any other return value than -ELINKCONG is ignored */
274 if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG) 274 if (tipc_node_xmit(net, &_pkts, dnode, selector) == -ELINKCONG)
275 (*cong_link_cnt)++; 275 (*cong_link_cnt)++;
276 } 276 }
277 return 0; 277 return 0;
@@ -554,7 +554,7 @@ void tipc_nlist_add(struct tipc_nlist *nl, u32 node)
554{ 554{
555 if (node == nl->self) 555 if (node == nl->self)
556 nl->local = true; 556 nl->local = true;
557 else if (u32_push(&nl->list, node)) 557 else if (tipc_dest_push(&nl->list, node, 0))
558 nl->remote++; 558 nl->remote++;
559} 559}
560 560
@@ -562,13 +562,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node)
562{ 562{
563 if (node == nl->self) 563 if (node == nl->self)
564 nl->local = false; 564 nl->local = false;
565 else if (u32_del(&nl->list, node)) 565 else if (tipc_dest_del(&nl->list, node, 0))
566 nl->remote--; 566 nl->remote--;
567} 567}
568 568
569void tipc_nlist_purge(struct tipc_nlist *nl) 569void tipc_nlist_purge(struct tipc_nlist *nl)
570{ 570{
571 u32_list_purge(&nl->list); 571 tipc_dest_list_purge(&nl->list);
572 nl->remote = 0; 572 nl->remote = 0;
573 nl->local = 0; 573 nl->local = 0;
574} 574}
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 5cc5398be722..964342689f2c 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -132,6 +132,11 @@ static inline struct list_head *tipc_nodes(struct net *net)
132 return &tipc_net(net)->node_list; 132 return &tipc_net(net)->node_list;
133} 133}
134 134
135static inline struct tipc_server *tipc_topsrv(struct net *net)
136{
137 return tipc_net(net)->topsrv;
138}
139
135static inline unsigned int tipc_hashfn(u32 addr) 140static inline unsigned int tipc_hashfn(u32 addr)
136{ 141{
137 return addr & (NODE_HTABLE_SIZE - 1); 142 return addr & (NODE_HTABLE_SIZE - 1);
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index 02462d67d191..92e4828c6b09 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -224,9 +224,9 @@ void tipc_disc_remove_dest(struct tipc_link_req *req)
224 * 224 *
225 * Called whenever a link setup request timer associated with a bearer expires. 225 * Called whenever a link setup request timer associated with a bearer expires.
226 */ 226 */
227static void disc_timeout(unsigned long data) 227static void disc_timeout(struct timer_list *t)
228{ 228{
229 struct tipc_link_req *req = (struct tipc_link_req *)data; 229 struct tipc_link_req *req = from_timer(req, t, timer);
230 struct sk_buff *skb; 230 struct sk_buff *skb;
231 int max_delay; 231 int max_delay;
232 232
@@ -292,7 +292,7 @@ int tipc_disc_create(struct net *net, struct tipc_bearer *b,
292 req->num_nodes = 0; 292 req->num_nodes = 0;
293 req->timer_intv = TIPC_LINK_REQ_INIT; 293 req->timer_intv = TIPC_LINK_REQ_INIT;
294 spin_lock_init(&req->lock); 294 spin_lock_init(&req->lock);
295 setup_timer(&req->timer, disc_timeout, (unsigned long)req); 295 timer_setup(&req->timer, disc_timeout, 0);
296 mod_timer(&req->timer, jiffies + req->timer_intv); 296 mod_timer(&req->timer, jiffies + req->timer_intv);
297 b->link_req = req; 297 b->link_req = req;
298 *skb = skb_clone(req->buf, GFP_ATOMIC); 298 *skb = skb_clone(req->buf, GFP_ATOMIC);
diff --git a/net/tipc/group.c b/net/tipc/group.c
new file mode 100644
index 000000000000..95fec2c057d6
--- /dev/null
+++ b/net/tipc/group.c
@@ -0,0 +1,871 @@
1/*
2 * net/tipc/group.c: TIPC group messaging code
3 *
4 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include "core.h"
37#include "addr.h"
38#include "group.h"
39#include "bcast.h"
40#include "server.h"
41#include "msg.h"
42#include "socket.h"
43#include "node.h"
44#include "name_table.h"
45#include "subscr.h"
46
47#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
48#define ADV_IDLE ADV_UNIT
49#define ADV_ACTIVE (ADV_UNIT * 12)
50
51enum mbr_state {
52 MBR_QUARANTINED,
53 MBR_DISCOVERED,
54 MBR_JOINING,
55 MBR_PUBLISHED,
56 MBR_JOINED,
57 MBR_PENDING,
58 MBR_ACTIVE,
59 MBR_RECLAIMING,
60 MBR_REMITTED,
61 MBR_LEAVING
62};
63
64struct tipc_member {
65 struct rb_node tree_node;
66 struct list_head list;
67 struct list_head congested;
68 struct sk_buff *event_msg;
69 struct sk_buff_head deferredq;
70 struct tipc_group *group;
71 u32 node;
72 u32 port;
73 u32 instance;
74 enum mbr_state state;
75 u16 advertised;
76 u16 window;
77 u16 bc_rcv_nxt;
78 u16 bc_syncpt;
79 u16 bc_acked;
80 bool usr_pending;
81};
82
83struct tipc_group {
84 struct rb_root members;
85 struct list_head congested;
86 struct list_head pending;
87 struct list_head active;
88 struct list_head reclaiming;
89 struct tipc_nlist dests;
90 struct net *net;
91 int subid;
92 u32 type;
93 u32 instance;
94 u32 domain;
95 u32 scope;
96 u32 portid;
97 u16 member_cnt;
98 u16 active_cnt;
99 u16 max_active;
100 u16 bc_snd_nxt;
101 u16 bc_ackers;
102 bool loopback;
103 bool events;
104};
105
106static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
107 int mtyp, struct sk_buff_head *xmitq);
108
109static void tipc_group_decr_active(struct tipc_group *grp,
110 struct tipc_member *m)
111{
112 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING)
113 grp->active_cnt--;
114}
115
116static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
117{
118 int max_active, active_pool, idle_pool;
119 int mcnt = grp->member_cnt + 1;
120
121 /* Limit simultaneous reception from other members */
122 max_active = min(mcnt / 8, 64);
123 max_active = max(max_active, 16);
124 grp->max_active = max_active;
125
126 /* Reserve blocks for active and idle members */
127 active_pool = max_active * ADV_ACTIVE;
128 idle_pool = (mcnt - max_active) * ADV_IDLE;
129
130 /* Scale to bytes, considering worst-case truesize/msgsize ratio */
131 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4;
132}
133
134u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
135{
136 return grp->bc_snd_nxt;
137}
138
139static bool tipc_group_is_enabled(struct tipc_member *m)
140{
141 return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
142}
143
144static bool tipc_group_is_receiver(struct tipc_member *m)
145{
146 return m && m->state >= MBR_JOINED;
147}
148
149u32 tipc_group_exclude(struct tipc_group *grp)
150{
151 if (!grp->loopback)
152 return grp->portid;
153 return 0;
154}
155
156int tipc_group_size(struct tipc_group *grp)
157{
158 return grp->member_cnt;
159}
160
161struct tipc_group *tipc_group_create(struct net *net, u32 portid,
162 struct tipc_group_req *mreq)
163{
164 struct tipc_group *grp;
165 u32 type = mreq->type;
166
167 grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
168 if (!grp)
169 return NULL;
170 tipc_nlist_init(&grp->dests, tipc_own_addr(net));
171 INIT_LIST_HEAD(&grp->congested);
172 INIT_LIST_HEAD(&grp->active);
173 INIT_LIST_HEAD(&grp->pending);
174 INIT_LIST_HEAD(&grp->reclaiming);
175 grp->members = RB_ROOT;
176 grp->net = net;
177 grp->portid = portid;
178 grp->domain = addr_domain(net, mreq->scope);
179 grp->type = type;
180 grp->instance = mreq->instance;
181 grp->scope = mreq->scope;
182 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
183 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
184 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
185 return grp;
186 kfree(grp);
187 return NULL;
188}
189
190void tipc_group_delete(struct net *net, struct tipc_group *grp)
191{
192 struct rb_root *tree = &grp->members;
193 struct tipc_member *m, *tmp;
194 struct sk_buff_head xmitq;
195
196 __skb_queue_head_init(&xmitq);
197
198 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
199 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
200 list_del(&m->list);
201 kfree(m);
202 }
203 tipc_node_distr_xmit(net, &xmitq);
204 tipc_nlist_purge(&grp->dests);
205 tipc_topsrv_kern_unsubscr(net, grp->subid);
206 kfree(grp);
207}
208
209struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
210 u32 node, u32 port)
211{
212 struct rb_node *n = grp->members.rb_node;
213 u64 nkey, key = (u64)node << 32 | port;
214 struct tipc_member *m;
215
216 while (n) {
217 m = container_of(n, struct tipc_member, tree_node);
218 nkey = (u64)m->node << 32 | m->port;
219 if (key < nkey)
220 n = n->rb_left;
221 else if (key > nkey)
222 n = n->rb_right;
223 else
224 return m;
225 }
226 return NULL;
227}
228
229static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
230 u32 node, u32 port)
231{
232 struct tipc_member *m;
233
234 m = tipc_group_find_member(grp, node, port);
235 if (m && tipc_group_is_enabled(m))
236 return m;
237 return NULL;
238}
239
240static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
241 u32 node)
242{
243 struct tipc_member *m;
244 struct rb_node *n;
245
246 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
247 m = container_of(n, struct tipc_member, tree_node);
248 if (m->node == node)
249 return m;
250 }
251 return NULL;
252}
253
254static void tipc_group_add_to_tree(struct tipc_group *grp,
255 struct tipc_member *m)
256{
257 u64 nkey, key = (u64)m->node << 32 | m->port;
258 struct rb_node **n, *parent = NULL;
259 struct tipc_member *tmp;
260
261 n = &grp->members.rb_node;
262 while (*n) {
263 tmp = container_of(*n, struct tipc_member, tree_node);
264 parent = *n;
265 tmp = container_of(parent, struct tipc_member, tree_node);
266 nkey = (u64)tmp->node << 32 | tmp->port;
267 if (key < nkey)
268 n = &(*n)->rb_left;
269 else if (key > nkey)
270 n = &(*n)->rb_right;
271 else
272 return;
273 }
274 rb_link_node(&m->tree_node, parent, n);
275 rb_insert_color(&m->tree_node, &grp->members);
276}
277
278static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
279 u32 node, u32 port,
280 int state)
281{
282 struct tipc_member *m;
283
284 m = kzalloc(sizeof(*m), GFP_ATOMIC);
285 if (!m)
286 return NULL;
287 INIT_LIST_HEAD(&m->list);
288 INIT_LIST_HEAD(&m->congested);
289 __skb_queue_head_init(&m->deferredq);
290 m->group = grp;
291 m->node = node;
292 m->port = port;
293 m->bc_acked = grp->bc_snd_nxt - 1;
294 grp->member_cnt++;
295 tipc_group_add_to_tree(grp, m);
296 tipc_nlist_add(&grp->dests, m->node);
297 m->state = state;
298 return m;
299}
300
301void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
302{
303 tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
304}
305
306static void tipc_group_delete_member(struct tipc_group *grp,
307 struct tipc_member *m)
308{
309 rb_erase(&m->tree_node, &grp->members);
310 grp->member_cnt--;
311
312 /* Check if we were waiting for replicast ack from this member */
313 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1))
314 grp->bc_ackers--;
315
316 list_del_init(&m->list);
317 list_del_init(&m->congested);
318 tipc_group_decr_active(grp, m);
319
320 /* If last member on a node, remove node from dest list */
321 if (!tipc_group_find_node(grp, m->node))
322 tipc_nlist_del(&grp->dests, m->node);
323
324 kfree(m);
325}
326
327struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
328{
329 return &grp->dests;
330}
331
332void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
333 int *scope)
334{
335 seq->type = grp->type;
336 seq->lower = grp->instance;
337 seq->upper = grp->instance;
338 *scope = grp->scope;
339}
340
341void tipc_group_update_member(struct tipc_member *m, int len)
342{
343 struct tipc_group *grp = m->group;
344 struct tipc_member *_m, *tmp;
345
346 if (!tipc_group_is_enabled(m))
347 return;
348
349 m->window -= len;
350
351 if (m->window >= ADV_IDLE)
352 return;
353
354 if (!list_empty(&m->congested))
355 return;
356
357 /* Sort member into congested members' list */
358 list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
359 if (m->window > _m->window)
360 continue;
361 list_add_tail(&m->congested, &_m->congested);
362 return;
363 }
364 list_add_tail(&m->congested, &grp->congested);
365}
366
367void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
368{
369 u16 prev = grp->bc_snd_nxt - 1;
370 struct tipc_member *m;
371 struct rb_node *n;
372
373 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
374 m = container_of(n, struct tipc_member, tree_node);
375 if (tipc_group_is_enabled(m)) {
376 tipc_group_update_member(m, len);
377 m->bc_acked = prev;
378 }
379 }
380
381 /* Mark number of acknowledges to expect, if any */
382 if (ack)
383 grp->bc_ackers = grp->member_cnt;
384 grp->bc_snd_nxt++;
385}
386
387bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
388 int len, struct tipc_member **mbr)
389{
390 struct sk_buff_head xmitq;
391 struct tipc_member *m;
392 int adv, state;
393
394 m = tipc_group_find_dest(grp, dnode, dport);
395 *mbr = m;
396 if (!m)
397 return false;
398 if (m->usr_pending)
399 return true;
400 if (m->window >= len)
401 return false;
402 m->usr_pending = true;
403
404 /* If not fully advertised, do it now to prevent mutual blocking */
405 adv = m->advertised;
406 state = m->state;
407 if (state < MBR_JOINED)
408 return true;
409 if (state == MBR_JOINED && adv == ADV_IDLE)
410 return true;
411 if (state == MBR_ACTIVE && adv == ADV_ACTIVE)
412 return true;
413 if (state == MBR_PENDING && adv == ADV_IDLE)
414 return true;
415 skb_queue_head_init(&xmitq);
416 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
417 tipc_node_distr_xmit(grp->net, &xmitq);
418 return true;
419}
420
421bool tipc_group_bc_cong(struct tipc_group *grp, int len)
422{
423 struct tipc_member *m = NULL;
424
425 /* If prev bcast was replicast, reject until all receivers have acked */
426 if (grp->bc_ackers)
427 return true;
428
429 if (list_empty(&grp->congested))
430 return false;
431
432 m = list_first_entry(&grp->congested, struct tipc_member, congested);
433 if (m->window >= len)
434 return false;
435
436 return tipc_group_cong(grp, m->node, m->port, len, &m);
437}
438
439/* tipc_group_sort_msg() - sort msg into queue by bcast sequence number
440 */
441static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
442{
443 struct tipc_msg *_hdr, *hdr = buf_msg(skb);
444 u16 bc_seqno = msg_grp_bc_seqno(hdr);
445 struct sk_buff *_skb, *tmp;
446 int mtyp = msg_type(hdr);
447
448 /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */
449 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
450 skb_queue_walk_safe(defq, _skb, tmp) {
451 _hdr = buf_msg(_skb);
452 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr)))
453 continue;
454 __skb_queue_before(defq, _skb, skb);
455 return;
456 }
457 /* Bcast was not bypassed, - add to tail */
458 }
459 /* Unicasts are never bypassed, - always add to tail */
460 __skb_queue_tail(defq, skb);
461}
462
463/* tipc_group_filter_msg() - determine if we should accept arriving message
464 */
465void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
466 struct sk_buff_head *xmitq)
467{
468 struct sk_buff *skb = __skb_dequeue(inputq);
469 bool ack, deliver, update, leave = false;
470 struct sk_buff_head *defq;
471 struct tipc_member *m;
472 struct tipc_msg *hdr;
473 u32 node, port;
474 int mtyp, blks;
475
476 if (!skb)
477 return;
478
479 hdr = buf_msg(skb);
480 node = msg_orignode(hdr);
481 port = msg_origport(hdr);
482
483 if (!msg_in_group(hdr))
484 goto drop;
485
486 m = tipc_group_find_member(grp, node, port);
487 if (!tipc_group_is_receiver(m))
488 goto drop;
489
490 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
491 goto drop;
492
493 TIPC_SKB_CB(skb)->orig_member = m->instance;
494 defq = &m->deferredq;
495 tipc_group_sort_msg(skb, defq);
496
497 while ((skb = skb_peek(defq))) {
498 hdr = buf_msg(skb);
499 mtyp = msg_type(hdr);
500 blks = msg_blocks(hdr);
501 deliver = true;
502 ack = false;
503 update = false;
504
505 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
506 break;
507
508 /* Decide what to do with message */
509 switch (mtyp) {
510 case TIPC_GRP_MCAST_MSG:
511 if (msg_nameinst(hdr) != grp->instance) {
512 update = true;
513 deliver = false;
514 }
515 /* Fall thru */
516 case TIPC_GRP_BCAST_MSG:
517 m->bc_rcv_nxt++;
518 ack = msg_grp_bc_ack_req(hdr);
519 break;
520 case TIPC_GRP_UCAST_MSG:
521 break;
522 case TIPC_GRP_MEMBER_EVT:
523 if (m->state == MBR_LEAVING)
524 leave = true;
525 if (!grp->events)
526 deliver = false;
527 break;
528 default:
529 break;
530 }
531
532 /* Execute decisions */
533 __skb_dequeue(defq);
534 if (deliver)
535 __skb_queue_tail(inputq, skb);
536 else
537 kfree_skb(skb);
538
539 if (ack)
540 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
541
542 if (leave) {
543 __skb_queue_purge(defq);
544 tipc_group_delete_member(grp, m);
545 break;
546 }
547 if (!update)
548 continue;
549
550 tipc_group_update_rcv_win(grp, blks, node, port, xmitq);
551 }
552 return;
553drop:
554 kfree_skb(skb);
555}
556
557void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
558 u32 port, struct sk_buff_head *xmitq)
559{
560 struct list_head *active = &grp->active;
561 int max_active = grp->max_active;
562 int reclaim_limit = max_active * 3 / 4;
563 int active_cnt = grp->active_cnt;
564 struct tipc_member *m, *rm;
565
566 m = tipc_group_find_member(grp, node, port);
567 if (!m)
568 return;
569
570 m->advertised -= blks;
571
572 switch (m->state) {
573 case MBR_JOINED:
574 /* Reclaim advertised space from least active member */
575 if (!list_empty(active) && active_cnt >= reclaim_limit) {
576 rm = list_first_entry(active, struct tipc_member, list);
577 rm->state = MBR_RECLAIMING;
578 list_move_tail(&rm->list, &grp->reclaiming);
579 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq);
580 }
581 /* If max active, become pending and wait for reclaimed space */
582 if (active_cnt >= max_active) {
583 m->state = MBR_PENDING;
584 list_add_tail(&m->list, &grp->pending);
585 break;
586 }
587 /* Otherwise become active */
588 m->state = MBR_ACTIVE;
589 list_add_tail(&m->list, &grp->active);
590 grp->active_cnt++;
591 /* Fall through */
592 case MBR_ACTIVE:
593 if (!list_is_last(&m->list, &grp->active))
594 list_move_tail(&m->list, &grp->active);
595 if (m->advertised > (ADV_ACTIVE * 3 / 4))
596 break;
597 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
598 break;
599 case MBR_REMITTED:
600 if (m->advertised > ADV_IDLE)
601 break;
602 m->state = MBR_JOINED;
603 if (m->advertised < ADV_IDLE) {
604 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
605 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
606 }
607 break;
608 case MBR_RECLAIMING:
609 case MBR_DISCOVERED:
610 case MBR_JOINING:
611 case MBR_LEAVING:
612 default:
613 break;
614 }
615}
616
617static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
618 int mtyp, struct sk_buff_head *xmitq)
619{
620 struct tipc_msg *hdr;
621 struct sk_buff *skb;
622 int adv = 0;
623
624 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
625 m->node, tipc_own_addr(grp->net),
626 m->port, grp->portid, 0);
627 if (!skb)
628 return;
629
630 if (m->state == MBR_ACTIVE)
631 adv = ADV_ACTIVE - m->advertised;
632 else if (m->state == MBR_JOINED || m->state == MBR_PENDING)
633 adv = ADV_IDLE - m->advertised;
634
635 hdr = buf_msg(skb);
636
637 if (mtyp == GRP_JOIN_MSG) {
638 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
639 msg_set_adv_win(hdr, adv);
640 m->advertised += adv;
641 } else if (mtyp == GRP_LEAVE_MSG) {
642 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
643 } else if (mtyp == GRP_ADV_MSG) {
644 msg_set_adv_win(hdr, adv);
645 m->advertised += adv;
646 } else if (mtyp == GRP_ACK_MSG) {
647 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
648 } else if (mtyp == GRP_REMIT_MSG) {
649 msg_set_grp_remitted(hdr, m->window);
650 }
651 __skb_queue_tail(xmitq, skb);
652}
653
654void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
655 struct tipc_msg *hdr, struct sk_buff_head *inputq,
656 struct sk_buff_head *xmitq)
657{
658 u32 node = msg_orignode(hdr);
659 u32 port = msg_origport(hdr);
660 struct tipc_member *m, *pm;
661 struct tipc_msg *ehdr;
662 u16 remitted, in_flight;
663
664 if (!grp)
665 return;
666
667 m = tipc_group_find_member(grp, node, port);
668
669 switch (msg_type(hdr)) {
670 case GRP_JOIN_MSG:
671 if (!m)
672 m = tipc_group_create_member(grp, node, port,
673 MBR_QUARANTINED);
674 if (!m)
675 return;
676 m->bc_syncpt = msg_grp_bc_syncpt(hdr);
677 m->bc_rcv_nxt = m->bc_syncpt;
678 m->window += msg_adv_win(hdr);
679
680 /* Wait until PUBLISH event is received */
681 if (m->state == MBR_DISCOVERED) {
682 m->state = MBR_JOINING;
683 } else if (m->state == MBR_PUBLISHED) {
684 m->state = MBR_JOINED;
685 *usr_wakeup = true;
686 m->usr_pending = false;
687 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
688 ehdr = buf_msg(m->event_msg);
689 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
690 __skb_queue_tail(inputq, m->event_msg);
691 }
692 if (m->window < ADV_IDLE)
693 tipc_group_update_member(m, 0);
694 else
695 list_del_init(&m->congested);
696 return;
697 case GRP_LEAVE_MSG:
698 if (!m)
699 return;
700 m->bc_syncpt = msg_grp_bc_syncpt(hdr);
701
702 /* Wait until WITHDRAW event is received */
703 if (m->state != MBR_LEAVING) {
704 tipc_group_decr_active(grp, m);
705 m->state = MBR_LEAVING;
706 return;
707 }
708 /* Otherwise deliver already received WITHDRAW event */
709 ehdr = buf_msg(m->event_msg);
710 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
711 __skb_queue_tail(inputq, m->event_msg);
712 *usr_wakeup = true;
713 list_del_init(&m->congested);
714 return;
715 case GRP_ADV_MSG:
716 if (!m)
717 return;
718 m->window += msg_adv_win(hdr);
719 *usr_wakeup = m->usr_pending;
720 m->usr_pending = false;
721 list_del_init(&m->congested);
722 return;
723 case GRP_ACK_MSG:
724 if (!m)
725 return;
726 m->bc_acked = msg_grp_bc_acked(hdr);
727 if (--grp->bc_ackers)
728 break;
729 *usr_wakeup = true;
730 m->usr_pending = false;
731 return;
732 case GRP_RECLAIM_MSG:
733 if (!m)
734 return;
735 *usr_wakeup = m->usr_pending;
736 m->usr_pending = false;
737 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);
738 m->window = ADV_IDLE;
739 return;
740 case GRP_REMIT_MSG:
741 if (!m || m->state != MBR_RECLAIMING)
742 return;
743
744 list_del_init(&m->list);
745 grp->active_cnt--;
746 remitted = msg_grp_remitted(hdr);
747
748 /* Messages preceding the REMIT still in receive queue */
749 if (m->advertised > remitted) {
750 m->state = MBR_REMITTED;
751 in_flight = m->advertised - remitted;
752 }
753 /* All messages preceding the REMIT have been read */
754 if (m->advertised <= remitted) {
755 m->state = MBR_JOINED;
756 in_flight = 0;
757 }
758 /* ..and the REMIT overtaken by more messages => re-advertise */
759 if (m->advertised < remitted)
760 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
761
762 m->advertised = ADV_IDLE + in_flight;
763
764 /* Set oldest pending member to active and advertise */
765 if (list_empty(&grp->pending))
766 return;
767 pm = list_first_entry(&grp->pending, struct tipc_member, list);
768 pm->state = MBR_ACTIVE;
769 list_move_tail(&pm->list, &grp->active);
770 grp->active_cnt++;
771 if (pm->advertised <= (ADV_ACTIVE * 3 / 4))
772 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
773 return;
774 default:
775 pr_warn("Received unknown GROUP_PROTO message\n");
776 }
777}
778
779/* tipc_group_member_evt() - receive and handle a member up/down event
780 */
781void tipc_group_member_evt(struct tipc_group *grp,
782 bool *usr_wakeup,
783 int *sk_rcvbuf,
784 struct sk_buff *skb,
785 struct sk_buff_head *inputq,
786 struct sk_buff_head *xmitq)
787{
788 struct tipc_msg *hdr = buf_msg(skb);
789 struct tipc_event *evt = (void *)msg_data(hdr);
790 u32 instance = evt->found_lower;
791 u32 node = evt->port.node;
792 u32 port = evt->port.ref;
793 int event = evt->event;
794 struct tipc_member *m;
795 struct net *net;
796 bool node_up;
797 u32 self;
798
799 if (!grp)
800 goto drop;
801
802 net = grp->net;
803 self = tipc_own_addr(net);
804 if (!grp->loopback && node == self && port == grp->portid)
805 goto drop;
806
807 /* Convert message before delivery to user */
808 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
809 msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
810 msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
811 msg_set_origport(hdr, port);
812 msg_set_orignode(hdr, node);
813 msg_set_nametype(hdr, grp->type);
814 msg_set_grp_evt(hdr, event);
815
816 m = tipc_group_find_member(grp, node, port);
817
818 if (event == TIPC_PUBLISHED) {
819 if (!m)
820 m = tipc_group_create_member(grp, node, port,
821 MBR_DISCOVERED);
822 if (!m)
823 goto drop;
824
825 /* Hold back event if JOIN message not yet received */
826 if (m->state == MBR_DISCOVERED) {
827 m->event_msg = skb;
828 m->state = MBR_PUBLISHED;
829 } else {
830 msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
831 __skb_queue_tail(inputq, skb);
832 m->state = MBR_JOINED;
833 *usr_wakeup = true;
834 m->usr_pending = false;
835 }
836 m->instance = instance;
837 TIPC_SKB_CB(skb)->orig_member = m->instance;
838 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
839 if (m->window < ADV_IDLE)
840 tipc_group_update_member(m, 0);
841 else
842 list_del_init(&m->congested);
843 } else if (event == TIPC_WITHDRAWN) {
844 if (!m)
845 goto drop;
846
847 TIPC_SKB_CB(skb)->orig_member = m->instance;
848
849 *usr_wakeup = true;
850 m->usr_pending = false;
851 node_up = tipc_node_is_up(net, node);
852
853 /* Hold back event if more messages might be expected */
854 if (m->state != MBR_LEAVING && node_up) {
855 m->event_msg = skb;
856 tipc_group_decr_active(grp, m);
857 m->state = MBR_LEAVING;
858 } else {
859 if (node_up)
860 msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
861 else
862 msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt);
863 __skb_queue_tail(inputq, skb);
864 }
865 list_del_init(&m->congested);
866 }
867 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
868 return;
869drop:
870 kfree_skb(skb);
871}
diff --git a/net/tipc/group.h b/net/tipc/group.h
new file mode 100644
index 000000000000..d525e1cd7de5
--- /dev/null
+++ b/net/tipc/group.h
@@ -0,0 +1,73 @@
1/*
2 * net/tipc/group.h: Include file for TIPC group unicast/multicast functions
3 *
4 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#ifndef _TIPC_GROUP_H
37#define _TIPC_GROUP_H
38
39#include "core.h"
40
41struct tipc_group;
42struct tipc_member;
43struct tipc_msg;
44
45struct tipc_group *tipc_group_create(struct net *net, u32 portid,
46 struct tipc_group_req *mreq);
47void tipc_group_delete(struct net *net, struct tipc_group *grp);
48void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port);
49struct tipc_nlist *tipc_group_dests(struct tipc_group *grp);
50void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
51 int *scope);
52u32 tipc_group_exclude(struct tipc_group *grp);
53void tipc_group_filter_msg(struct tipc_group *grp,
54 struct sk_buff_head *inputq,
55 struct sk_buff_head *xmitq);
56void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup,
57 int *sk_rcvbuf, struct sk_buff *skb,
58 struct sk_buff_head *inputq,
59 struct sk_buff_head *xmitq);
60void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
61 struct tipc_msg *hdr,
62 struct sk_buff_head *inputq,
63 struct sk_buff_head *xmitq);
64void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack);
65bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
66 int len, struct tipc_member **m);
67bool tipc_group_bc_cong(struct tipc_group *grp, int len);
68void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
69 u32 port, struct sk_buff_head *xmitq);
70u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
71void tipc_group_update_member(struct tipc_member *m, int len);
72int tipc_group_size(struct tipc_group *grp);
73#endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ac0144f532aa..6bce0b1117bd 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -239,7 +239,8 @@ static int link_is_up(struct tipc_link *l)
239static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, 239static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
240 struct sk_buff_head *xmitq); 240 struct sk_buff_head *xmitq);
241static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, 241static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
242 u16 rcvgap, int tolerance, int priority, 242 bool probe_reply, u16 rcvgap,
243 int tolerance, int priority,
243 struct sk_buff_head *xmitq); 244 struct sk_buff_head *xmitq);
244static void link_print(struct tipc_link *l, const char *str); 245static void link_print(struct tipc_link *l, const char *str);
245static int tipc_link_build_nack_msg(struct tipc_link *l, 246static int tipc_link_build_nack_msg(struct tipc_link *l,
@@ -773,7 +774,7 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
773 } 774 }
774 775
775 if (state || probe || setup) 776 if (state || probe || setup)
776 tipc_link_build_proto_msg(l, mtyp, probe, 0, 0, 0, xmitq); 777 tipc_link_build_proto_msg(l, mtyp, probe, 0, 0, 0, 0, xmitq);
777 778
778 return rc; 779 return rc;
779} 780}
@@ -1039,6 +1040,7 @@ int tipc_link_retrans(struct tipc_link *l, struct tipc_link *nacker,
1039static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, 1040static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1040 struct sk_buff_head *inputq) 1041 struct sk_buff_head *inputq)
1041{ 1042{
1043 struct sk_buff_head *mc_inputq = l->bc_rcvlink->inputq;
1042 struct tipc_msg *hdr = buf_msg(skb); 1044 struct tipc_msg *hdr = buf_msg(skb);
1043 1045
1044 switch (msg_user(hdr)) { 1046 switch (msg_user(hdr)) {
@@ -1046,13 +1048,16 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1046 case TIPC_MEDIUM_IMPORTANCE: 1048 case TIPC_MEDIUM_IMPORTANCE:
1047 case TIPC_HIGH_IMPORTANCE: 1049 case TIPC_HIGH_IMPORTANCE:
1048 case TIPC_CRITICAL_IMPORTANCE: 1050 case TIPC_CRITICAL_IMPORTANCE:
1049 if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) { 1051 if (unlikely(msg_in_group(hdr) || msg_mcast(hdr))) {
1050 skb_queue_tail(l->bc_rcvlink->inputq, skb); 1052 skb_queue_tail(mc_inputq, skb);
1051 return true; 1053 return true;
1052 } 1054 }
1053 case CONN_MANAGER: 1055 case CONN_MANAGER:
1054 skb_queue_tail(inputq, skb); 1056 skb_queue_tail(inputq, skb);
1055 return true; 1057 return true;
1058 case GROUP_PROTOCOL:
1059 skb_queue_tail(mc_inputq, skb);
1060 return true;
1056 case NAME_DISTRIBUTOR: 1061 case NAME_DISTRIBUTOR:
1057 l->bc_rcvlink->state = LINK_ESTABLISHED; 1062 l->bc_rcvlink->state = LINK_ESTABLISHED;
1058 skb_queue_tail(l->namedq, skb); 1063 skb_queue_tail(l->namedq, skb);
@@ -1170,7 +1175,7 @@ int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
1170 /* Unicast ACK */ 1175 /* Unicast ACK */
1171 l->rcv_unacked = 0; 1176 l->rcv_unacked = 0;
1172 l->stats.sent_acks++; 1177 l->stats.sent_acks++;
1173 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1178 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, 0, xmitq);
1174 return 0; 1179 return 0;
1175} 1180}
1176 1181
@@ -1184,7 +1189,7 @@ void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
1184 if (l->state == LINK_ESTABLISHING) 1189 if (l->state == LINK_ESTABLISHING)
1185 mtyp = ACTIVATE_MSG; 1190 mtyp = ACTIVATE_MSG;
1186 1191
1187 tipc_link_build_proto_msg(l, mtyp, 0, 0, 0, 0, xmitq); 1192 tipc_link_build_proto_msg(l, mtyp, 0, 0, 0, 0, 0, xmitq);
1188 1193
1189 /* Inform peer that this endpoint is going down if applicable */ 1194 /* Inform peer that this endpoint is going down if applicable */
1190 skb = skb_peek_tail(xmitq); 1195 skb = skb_peek_tail(xmitq);
@@ -1211,7 +1216,7 @@ static int tipc_link_build_nack_msg(struct tipc_link *l,
1211 } 1216 }
1212 1217
1213 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) 1218 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
1214 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1219 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, 0, xmitq);
1215 return 0; 1220 return 0;
1216} 1221}
1217 1222
@@ -1285,7 +1290,8 @@ drop:
1285} 1290}
1286 1291
1287static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, 1292static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1288 u16 rcvgap, int tolerance, int priority, 1293 bool probe_reply, u16 rcvgap,
1294 int tolerance, int priority,
1289 struct sk_buff_head *xmitq) 1295 struct sk_buff_head *xmitq)
1290{ 1296{
1291 struct tipc_link *bcl = l->bc_rcvlink; 1297 struct tipc_link *bcl = l->bc_rcvlink;
@@ -1333,6 +1339,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1333 msg_set_seq_gap(hdr, rcvgap); 1339 msg_set_seq_gap(hdr, rcvgap);
1334 msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl)); 1340 msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
1335 msg_set_probe(hdr, probe); 1341 msg_set_probe(hdr, probe);
1342 msg_set_is_keepalive(hdr, probe || probe_reply);
1336 tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id); 1343 tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id);
1337 msg_set_size(hdr, INT_H_SIZE + dlen); 1344 msg_set_size(hdr, INT_H_SIZE + dlen);
1338 skb_trim(skb, INT_H_SIZE + dlen); 1345 skb_trim(skb, INT_H_SIZE + dlen);
@@ -1438,6 +1445,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1438 u16 rcv_nxt = l->rcv_nxt; 1445 u16 rcv_nxt = l->rcv_nxt;
1439 u16 dlen = msg_data_sz(hdr); 1446 u16 dlen = msg_data_sz(hdr);
1440 int mtyp = msg_type(hdr); 1447 int mtyp = msg_type(hdr);
1448 bool reply = msg_probe(hdr);
1441 void *data; 1449 void *data;
1442 char *if_name; 1450 char *if_name;
1443 int rc = 0; 1451 int rc = 0;
@@ -1524,9 +1532,9 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1524 /* Send NACK if peer has sent pkts we haven't received yet */ 1532 /* Send NACK if peer has sent pkts we haven't received yet */
1525 if (more(peers_snd_nxt, rcv_nxt) && !tipc_link_is_synching(l)) 1533 if (more(peers_snd_nxt, rcv_nxt) && !tipc_link_is_synching(l))
1526 rcvgap = peers_snd_nxt - l->rcv_nxt; 1534 rcvgap = peers_snd_nxt - l->rcv_nxt;
1527 if (rcvgap || (msg_probe(hdr))) 1535 if (rcvgap || reply)
1528 tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap, 1536 tipc_link_build_proto_msg(l, STATE_MSG, 0, reply,
1529 0, 0, xmitq); 1537 rcvgap, 0, 0, xmitq);
1530 tipc_link_release_pkts(l, ack); 1538 tipc_link_release_pkts(l, ack);
1531 1539
1532 /* If NACK, retransmit will now start at right position */ 1540 /* If NACK, retransmit will now start at right position */
@@ -2118,14 +2126,14 @@ void tipc_link_set_tolerance(struct tipc_link *l, u32 tol,
2118 struct sk_buff_head *xmitq) 2126 struct sk_buff_head *xmitq)
2119{ 2127{
2120 l->tolerance = tol; 2128 l->tolerance = tol;
2121 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, tol, 0, xmitq); 2129 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, tol, 0, xmitq);
2122} 2130}
2123 2131
2124void tipc_link_set_prio(struct tipc_link *l, u32 prio, 2132void tipc_link_set_prio(struct tipc_link *l, u32 prio,
2125 struct sk_buff_head *xmitq) 2133 struct sk_buff_head *xmitq)
2126{ 2134{
2127 l->priority = prio; 2135 l->priority = prio;
2128 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, prio, xmitq); 2136 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, prio, xmitq);
2129} 2137}
2130 2138
2131void tipc_link_set_abort_limit(struct tipc_link *l, u32 limit) 2139void tipc_link_set_abort_limit(struct tipc_link *l, u32 limit)
diff --git a/net/tipc/monitor.c b/net/tipc/monitor.c
index 9e109bb1a207..8e884ed06d4b 100644
--- a/net/tipc/monitor.c
+++ b/net/tipc/monitor.c
@@ -530,8 +530,11 @@ void tipc_mon_prep(struct net *net, void *data, int *dlen,
530 u16 gen = mon->dom_gen; 530 u16 gen = mon->dom_gen;
531 u16 len; 531 u16 len;
532 532
533 if (!tipc_mon_is_active(net, mon)) 533 /* Send invalid record if not active */
534 if (!tipc_mon_is_active(net, mon)) {
535 dom->len = 0;
534 return; 536 return;
537 }
535 538
536 /* Send only a dummy record with ack if peer has acked our last sent */ 539 /* Send only a dummy record with ack if peer has acked our last sent */
537 if (likely(state->acked_gen == gen)) { 540 if (likely(state->acked_gen == gen)) {
@@ -559,6 +562,12 @@ void tipc_mon_get_state(struct net *net, u32 addr,
559 struct tipc_monitor *mon = tipc_monitor(net, bearer_id); 562 struct tipc_monitor *mon = tipc_monitor(net, bearer_id);
560 struct tipc_peer *peer; 563 struct tipc_peer *peer;
561 564
565 if (!tipc_mon_is_active(net, mon)) {
566 state->probing = false;
567 state->monitoring = true;
568 return;
569 }
570
562 /* Used cached state if table has not changed */ 571 /* Used cached state if table has not changed */
563 if (!state->probing && 572 if (!state->probing &&
564 (state->list_gen == mon->list_gen) && 573 (state->list_gen == mon->list_gen) &&
@@ -578,9 +587,9 @@ void tipc_mon_get_state(struct net *net, u32 addr,
578 read_unlock_bh(&mon->lock); 587 read_unlock_bh(&mon->lock);
579} 588}
580 589
581static void mon_timeout(unsigned long m) 590static void mon_timeout(struct timer_list *t)
582{ 591{
583 struct tipc_monitor *mon = (void *)m; 592 struct tipc_monitor *mon = from_timer(mon, t, timer);
584 struct tipc_peer *self; 593 struct tipc_peer *self;
585 int best_member_cnt = dom_size(mon->peer_cnt) - 1; 594 int best_member_cnt = dom_size(mon->peer_cnt) - 1;
586 595
@@ -623,7 +632,7 @@ int tipc_mon_create(struct net *net, int bearer_id)
623 self->is_up = true; 632 self->is_up = true;
624 self->is_head = true; 633 self->is_head = true;
625 INIT_LIST_HEAD(&self->list); 634 INIT_LIST_HEAD(&self->list);
626 setup_timer(&mon->timer, mon_timeout, (unsigned long)mon); 635 timer_setup(&mon->timer, mon_timeout, 0);
627 mon->timer_intv = msecs_to_jiffies(MON_TIMEOUT + (tn->random & 0xffff)); 636 mon->timer_intv = msecs_to_jiffies(MON_TIMEOUT + (tn->random & 0xffff));
628 mod_timer(&mon->timer, jiffies + mon->timer_intv); 637 mod_timer(&mon->timer, jiffies + mon->timer_intv);
629 return 0; 638 return 0;
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 17146c16ee2d..b0d07b35909d 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -174,7 +174,7 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
174 174
175 if (fragid == LAST_FRAGMENT) { 175 if (fragid == LAST_FRAGMENT) {
176 TIPC_SKB_CB(head)->validated = false; 176 TIPC_SKB_CB(head)->validated = false;
177 if (unlikely(!tipc_msg_validate(head))) 177 if (unlikely(!tipc_msg_validate(&head)))
178 goto err; 178 goto err;
179 *buf = head; 179 *buf = head;
180 TIPC_SKB_CB(head)->tail = NULL; 180 TIPC_SKB_CB(head)->tail = NULL;
@@ -201,11 +201,21 @@ err:
201 * TIPC will ignore the excess, under the assumption that it is optional info 201 * TIPC will ignore the excess, under the assumption that it is optional info
202 * introduced by a later release of the protocol. 202 * introduced by a later release of the protocol.
203 */ 203 */
204bool tipc_msg_validate(struct sk_buff *skb) 204bool tipc_msg_validate(struct sk_buff **_skb)
205{ 205{
206 struct tipc_msg *msg; 206 struct sk_buff *skb = *_skb;
207 struct tipc_msg *hdr;
207 int msz, hsz; 208 int msz, hsz;
208 209
210 /* Ensure that flow control ratio condition is satisfied */
211 if (unlikely(skb->truesize / buf_roundup_len(skb) > 4)) {
212 skb = skb_copy(skb, GFP_ATOMIC);
213 if (!skb)
214 return false;
215 kfree_skb(*_skb);
216 *_skb = skb;
217 }
218
209 if (unlikely(TIPC_SKB_CB(skb)->validated)) 219 if (unlikely(TIPC_SKB_CB(skb)->validated))
210 return true; 220 return true;
211 if (unlikely(!pskb_may_pull(skb, MIN_H_SIZE))) 221 if (unlikely(!pskb_may_pull(skb, MIN_H_SIZE)))
@@ -217,11 +227,11 @@ bool tipc_msg_validate(struct sk_buff *skb)
217 if (unlikely(!pskb_may_pull(skb, hsz))) 227 if (unlikely(!pskb_may_pull(skb, hsz)))
218 return false; 228 return false;
219 229
220 msg = buf_msg(skb); 230 hdr = buf_msg(skb);
221 if (unlikely(msg_version(msg) != TIPC_VERSION)) 231 if (unlikely(msg_version(hdr) != TIPC_VERSION))
222 return false; 232 return false;
223 233
224 msz = msg_size(msg); 234 msz = msg_size(hdr);
225 if (unlikely(msz < hsz)) 235 if (unlikely(msz < hsz))
226 return false; 236 return false;
227 if (unlikely((msz - hsz) > TIPC_MAX_USER_MSG_SIZE)) 237 if (unlikely((msz - hsz) > TIPC_MAX_USER_MSG_SIZE))
@@ -411,7 +421,7 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)
411 skb_pull(*iskb, offset); 421 skb_pull(*iskb, offset);
412 imsz = msg_size(buf_msg(*iskb)); 422 imsz = msg_size(buf_msg(*iskb));
413 skb_trim(*iskb, imsz); 423 skb_trim(*iskb, imsz);
414 if (unlikely(!tipc_msg_validate(*iskb))) 424 if (unlikely(!tipc_msg_validate(iskb)))
415 goto none; 425 goto none;
416 *pos += align(imsz); 426 *pos += align(imsz);
417 return true; 427 return true;
@@ -666,3 +676,10 @@ void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
666 } 676 }
667 kfree_skb(skb); 677 kfree_skb(skb);
668} 678}
679
680void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb,
681 struct sk_buff_head *xmitq)
682{
683 if (tipc_msg_reverse(tipc_own_addr(net), &skb, err))
684 __skb_queue_tail(xmitq, skb);
685}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index c843fd2bc48d..3e4384c222f7 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/msg.h: Include file for TIPC message header routines 2 * net/tipc/msg.h: Include file for TIPC message header routines
3 * 3 *
4 * Copyright (c) 2000-2007, 2014-2015 Ericsson AB 4 * Copyright (c) 2000-2007, 2014-2017 Ericsson AB
5 * Copyright (c) 2005-2008, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2008, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -61,10 +61,14 @@ struct plist;
61/* 61/*
62 * Payload message types 62 * Payload message types
63 */ 63 */
64#define TIPC_CONN_MSG 0 64#define TIPC_CONN_MSG 0
65#define TIPC_MCAST_MSG 1 65#define TIPC_MCAST_MSG 1
66#define TIPC_NAMED_MSG 2 66#define TIPC_NAMED_MSG 2
67#define TIPC_DIRECT_MSG 3 67#define TIPC_DIRECT_MSG 3
68#define TIPC_GRP_MEMBER_EVT 4
69#define TIPC_GRP_BCAST_MSG 5
70#define TIPC_GRP_MCAST_MSG 6
71#define TIPC_GRP_UCAST_MSG 7
68 72
69/* 73/*
70 * Internal message users 74 * Internal message users
@@ -73,11 +77,13 @@ struct plist;
73#define MSG_BUNDLER 6 77#define MSG_BUNDLER 6
74#define LINK_PROTOCOL 7 78#define LINK_PROTOCOL 7
75#define CONN_MANAGER 8 79#define CONN_MANAGER 8
80#define GROUP_PROTOCOL 9
76#define TUNNEL_PROTOCOL 10 81#define TUNNEL_PROTOCOL 10
77#define NAME_DISTRIBUTOR 11 82#define NAME_DISTRIBUTOR 11
78#define MSG_FRAGMENTER 12 83#define MSG_FRAGMENTER 12
79#define LINK_CONFIG 13 84#define LINK_CONFIG 13
80#define SOCK_WAKEUP 14 /* pseudo user */ 85#define SOCK_WAKEUP 14 /* pseudo user */
86#define TOP_SRV 15 /* pseudo user */
81 87
82/* 88/*
83 * Message header sizes 89 * Message header sizes
@@ -86,6 +92,7 @@ struct plist;
86#define BASIC_H_SIZE 32 /* Basic payload message */ 92#define BASIC_H_SIZE 32 /* Basic payload message */
87#define NAMED_H_SIZE 40 /* Named payload message */ 93#define NAMED_H_SIZE 40 /* Named payload message */
88#define MCAST_H_SIZE 44 /* Multicast payload message */ 94#define MCAST_H_SIZE 44 /* Multicast payload message */
95#define GROUP_H_SIZE 44 /* Group payload message */
89#define INT_H_SIZE 40 /* Internal messages */ 96#define INT_H_SIZE 40 /* Internal messages */
90#define MIN_H_SIZE 24 /* Smallest legal TIPC header size */ 97#define MIN_H_SIZE 24 /* Smallest legal TIPC header size */
91#define MAX_H_SIZE 60 /* Largest possible TIPC header size */ 98#define MAX_H_SIZE 60 /* Largest possible TIPC header size */
@@ -96,6 +103,7 @@ struct plist;
96 103
97struct tipc_skb_cb { 104struct tipc_skb_cb {
98 u32 bytes_read; 105 u32 bytes_read;
106 u32 orig_member;
99 struct sk_buff *tail; 107 struct sk_buff *tail;
100 bool validated; 108 bool validated;
101 u16 chain_imp; 109 u16 chain_imp;
@@ -188,6 +196,11 @@ static inline u32 msg_size(struct tipc_msg *m)
188 return msg_bits(m, 0, 0, 0x1ffff); 196 return msg_bits(m, 0, 0, 0x1ffff);
189} 197}
190 198
199static inline u32 msg_blocks(struct tipc_msg *m)
200{
201 return (msg_size(m) / 1024) + 1;
202}
203
191static inline u32 msg_data_sz(struct tipc_msg *m) 204static inline u32 msg_data_sz(struct tipc_msg *m)
192{ 205{
193 return msg_size(m) - msg_hdr_sz(m); 206 return msg_size(m) - msg_hdr_sz(m);
@@ -213,6 +226,16 @@ static inline void msg_set_dest_droppable(struct tipc_msg *m, u32 d)
213 msg_set_bits(m, 0, 19, 1, d); 226 msg_set_bits(m, 0, 19, 1, d);
214} 227}
215 228
229static inline int msg_is_keepalive(struct tipc_msg *m)
230{
231 return msg_bits(m, 0, 19, 1);
232}
233
234static inline void msg_set_is_keepalive(struct tipc_msg *m, u32 d)
235{
236 msg_set_bits(m, 0, 19, 1, d);
237}
238
216static inline int msg_src_droppable(struct tipc_msg *m) 239static inline int msg_src_droppable(struct tipc_msg *m)
217{ 240{
218 return msg_bits(m, 0, 18, 1); 241 return msg_bits(m, 0, 18, 1);
@@ -251,6 +274,18 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n)
251 msg_set_bits(m, 1, 29, 0x7, n); 274 msg_set_bits(m, 1, 29, 0x7, n);
252} 275}
253 276
277static inline int msg_in_group(struct tipc_msg *m)
278{
279 int mtyp = msg_type(m);
280
281 return mtyp >= TIPC_GRP_MEMBER_EVT && mtyp <= TIPC_GRP_UCAST_MSG;
282}
283
284static inline bool msg_is_grp_evt(struct tipc_msg *m)
285{
286 return msg_type(m) == TIPC_GRP_MEMBER_EVT;
287}
288
254static inline u32 msg_named(struct tipc_msg *m) 289static inline u32 msg_named(struct tipc_msg *m)
255{ 290{
256 return msg_type(m) == TIPC_NAMED_MSG; 291 return msg_type(m) == TIPC_NAMED_MSG;
@@ -258,7 +293,10 @@ static inline u32 msg_named(struct tipc_msg *m)
258 293
259static inline u32 msg_mcast(struct tipc_msg *m) 294static inline u32 msg_mcast(struct tipc_msg *m)
260{ 295{
261 return msg_type(m) == TIPC_MCAST_MSG; 296 int mtyp = msg_type(m);
297
298 return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG) ||
299 (mtyp == TIPC_GRP_MCAST_MSG));
262} 300}
263 301
264static inline u32 msg_connected(struct tipc_msg *m) 302static inline u32 msg_connected(struct tipc_msg *m)
@@ -514,6 +552,16 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
514#define DSC_RESP_MSG 1 552#define DSC_RESP_MSG 1
515 553
516/* 554/*
555 * Group protocol message types
556 */
557#define GRP_JOIN_MSG 0
558#define GRP_LEAVE_MSG 1
559#define GRP_ADV_MSG 2
560#define GRP_ACK_MSG 3
561#define GRP_RECLAIM_MSG 4
562#define GRP_REMIT_MSG 5
563
564/*
517 * Word 1 565 * Word 1
518 */ 566 */
519static inline u32 msg_seq_gap(struct tipc_msg *m) 567static inline u32 msg_seq_gap(struct tipc_msg *m)
@@ -764,12 +812,12 @@ static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)
764 msg_set_bits(m, 9, 16, 0xffff, n); 812 msg_set_bits(m, 9, 16, 0xffff, n);
765} 813}
766 814
767static inline u32 msg_adv_win(struct tipc_msg *m) 815static inline u16 msg_adv_win(struct tipc_msg *m)
768{ 816{
769 return msg_bits(m, 9, 0, 0xffff); 817 return msg_bits(m, 9, 0, 0xffff);
770} 818}
771 819
772static inline void msg_set_adv_win(struct tipc_msg *m, u32 n) 820static inline void msg_set_adv_win(struct tipc_msg *m, u16 n)
773{ 821{
774 msg_set_bits(m, 9, 0, 0xffff, n); 822 msg_set_bits(m, 9, 0, 0xffff, n);
775} 823}
@@ -794,6 +842,68 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
794 msg_set_bits(m, 9, 0, 0xffff, n); 842 msg_set_bits(m, 9, 0, 0xffff, n);
795} 843}
796 844
845static inline u16 msg_grp_bc_syncpt(struct tipc_msg *m)
846{
847 return msg_bits(m, 9, 16, 0xffff);
848}
849
850static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n)
851{
852 msg_set_bits(m, 9, 16, 0xffff, n);
853}
854
855static inline u16 msg_grp_bc_acked(struct tipc_msg *m)
856{
857 return msg_bits(m, 9, 16, 0xffff);
858}
859
860static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n)
861{
862 msg_set_bits(m, 9, 16, 0xffff, n);
863}
864
865static inline u16 msg_grp_remitted(struct tipc_msg *m)
866{
867 return msg_bits(m, 9, 16, 0xffff);
868}
869
870static inline void msg_set_grp_remitted(struct tipc_msg *m, u16 n)
871{
872 msg_set_bits(m, 9, 16, 0xffff, n);
873}
874
875/* Word 10
876 */
877static inline u16 msg_grp_evt(struct tipc_msg *m)
878{
879 return msg_bits(m, 10, 0, 0x3);
880}
881
882static inline void msg_set_grp_evt(struct tipc_msg *m, int n)
883{
884 msg_set_bits(m, 10, 0, 0x3, n);
885}
886
887static inline u16 msg_grp_bc_ack_req(struct tipc_msg *m)
888{
889 return msg_bits(m, 10, 0, 0x1);
890}
891
892static inline void msg_set_grp_bc_ack_req(struct tipc_msg *m, bool n)
893{
894 msg_set_bits(m, 10, 0, 0x1, n);
895}
896
897static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
898{
899 return msg_bits(m, 10, 16, 0xffff);
900}
901
902static inline void msg_set_grp_bc_seqno(struct tipc_msg *m, u32 n)
903{
904 msg_set_bits(m, 10, 16, 0xffff, n);
905}
906
797static inline bool msg_peer_link_is_up(struct tipc_msg *m) 907static inline bool msg_peer_link_is_up(struct tipc_msg *m)
798{ 908{
799 if (likely(msg_user(m) != LINK_PROTOCOL)) 909 if (likely(msg_user(m) != LINK_PROTOCOL))
@@ -816,8 +926,10 @@ static inline bool msg_is_reset(struct tipc_msg *hdr)
816} 926}
817 927
818struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp); 928struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp);
819bool tipc_msg_validate(struct sk_buff *skb); 929bool tipc_msg_validate(struct sk_buff **_skb);
820bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err); 930bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err);
931void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb,
932 struct sk_buff_head *xmitq);
821void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type, 933void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type,
822 u32 hsize, u32 destnode); 934 u32 hsize, u32 destnode);
823struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, 935struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
@@ -842,6 +954,11 @@ static inline u16 buf_seqno(struct sk_buff *skb)
842 return msg_seqno(buf_msg(skb)); 954 return msg_seqno(buf_msg(skb));
843} 955}
844 956
957static inline int buf_roundup_len(struct sk_buff *skb)
958{
959 return (skb->len / 1024 + 1) * 1024;
960}
961
845/* tipc_skb_peek(): peek and reserve first buffer in list 962/* tipc_skb_peek(): peek and reserve first buffer in list
846 * @list: list to be peeked in 963 * @list: list to be peeked in
847 * Returns pointer to first buffer in list, if any 964 * Returns pointer to first buffer in list, if any
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index bd0aac87b41a..b3829bcf63c7 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -43,6 +43,7 @@
43#include "bcast.h" 43#include "bcast.h"
44#include "addr.h" 44#include "addr.h"
45#include "node.h" 45#include "node.h"
46#include "group.h"
46#include <net/genetlink.h> 47#include <net/genetlink.h>
47 48
48#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ 49#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */
@@ -596,18 +597,47 @@ not_found:
596 return ref; 597 return ref;
597} 598}
598 599
599/** 600bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
600 * tipc_nametbl_mc_translate - find multicast destinations 601 struct list_head *dsts, int *dstcnt, u32 exclude,
601 * 602 bool all)
602 * Creates list of all local ports that overlap the given multicast address; 603{
603 * also determines if any off-node ports overlap. 604 u32 self = tipc_own_addr(net);
604 * 605 struct publication *publ;
605 * Note: Publications with a scope narrower than 'limit' are ignored. 606 struct name_info *info;
606 * (i.e. local node-scope publications mustn't receive messages arriving 607 struct name_seq *seq;
607 * from another node, even if the multcast link brought it here) 608 struct sub_seq *sseq;
608 * 609
609 * Returns non-zero if any off-node ports overlap 610 if (!tipc_in_scope(domain, self))
610 */ 611 return false;
612
613 *dstcnt = 0;
614 rcu_read_lock();
615 seq = nametbl_find_seq(net, type);
616 if (unlikely(!seq))
617 goto exit;
618 spin_lock_bh(&seq->lock);
619 sseq = nameseq_find_subseq(seq, instance);
620 if (likely(sseq)) {
621 info = sseq->info;
622 list_for_each_entry(publ, &info->zone_list, zone_list) {
623 if (!tipc_in_scope(domain, publ->node))
624 continue;
625 if (publ->ref == exclude && publ->node == self)
626 continue;
627 tipc_dest_push(dsts, publ->node, publ->ref);
628 (*dstcnt)++;
629 if (all)
630 continue;
631 list_move_tail(&publ->zone_list, &info->zone_list);
632 break;
633 }
634 }
635 spin_unlock_bh(&seq->lock);
636exit:
637 rcu_read_unlock();
638 return !list_empty(dsts);
639}
640
611int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 641int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
612 u32 limit, struct list_head *dports) 642 u32 limit, struct list_head *dports)
613{ 643{
@@ -634,7 +664,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
634 info = sseq->info; 664 info = sseq->info;
635 list_for_each_entry(publ, &info->node_list, node_list) { 665 list_for_each_entry(publ, &info->node_list, node_list) {
636 if (publ->scope <= limit) 666 if (publ->scope <= limit)
637 u32_push(dports, publ->ref); 667 tipc_dest_push(dports, 0, publ->ref);
638 } 668 }
639 669
640 if (info->cluster_list_size != info->node_list_size) 670 if (info->cluster_list_size != info->node_list_size)
@@ -667,7 +697,7 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
667 spin_lock_bh(&seq->lock); 697 spin_lock_bh(&seq->lock);
668 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); 698 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
669 stop = seq->sseqs + seq->first_free; 699 stop = seq->sseqs + seq->first_free;
670 for (; sseq->lower <= upper && sseq != stop; sseq++) { 700 for (; sseq != stop && sseq->lower <= upper; sseq++) {
671 info = sseq->info; 701 info = sseq->info;
672 list_for_each_entry(publ, &info->zone_list, zone_list) { 702 list_for_each_entry(publ, &info->zone_list, zone_list) {
673 if (tipc_in_scope(domain, publ->node)) 703 if (tipc_in_scope(domain, publ->node))
@@ -679,6 +709,37 @@ exit:
679 rcu_read_unlock(); 709 rcu_read_unlock();
680} 710}
681 711
712/* tipc_nametbl_build_group - build list of communication group members
713 */
714void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
715 u32 type, u32 domain)
716{
717 struct sub_seq *sseq, *stop;
718 struct name_info *info;
719 struct publication *p;
720 struct name_seq *seq;
721
722 rcu_read_lock();
723 seq = nametbl_find_seq(net, type);
724 if (!seq)
725 goto exit;
726
727 spin_lock_bh(&seq->lock);
728 sseq = seq->sseqs;
729 stop = seq->sseqs + seq->first_free;
730 for (; sseq != stop; sseq++) {
731 info = sseq->info;
732 list_for_each_entry(p, &info->zone_list, zone_list) {
733 if (!tipc_in_scope(domain, p->node))
734 continue;
735 tipc_group_add_member(grp, p->node, p->ref);
736 }
737 }
738 spin_unlock_bh(&seq->lock);
739exit:
740 rcu_read_unlock();
741}
742
682/* 743/*
683 * tipc_nametbl_publish - add name publication to network name tables 744 * tipc_nametbl_publish - add name publication to network name tables
684 */ 745 */
@@ -1057,78 +1118,79 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
1057 return skb->len; 1118 return skb->len;
1058} 1119}
1059 1120
1060bool u32_find(struct list_head *l, u32 value) 1121struct tipc_dest *tipc_dest_find(struct list_head *l, u32 node, u32 port)
1061{ 1122{
1062 struct u32_item *item; 1123 u64 value = (u64)node << 32 | port;
1124 struct tipc_dest *dst;
1063 1125
1064 list_for_each_entry(item, l, list) { 1126 list_for_each_entry(dst, l, list) {
1065 if (item->value == value) 1127 if (dst->value != value)
1066 return true; 1128 continue;
1129 return dst;
1067 } 1130 }
1068 return false; 1131 return NULL;
1069} 1132}
1070 1133
1071bool u32_push(struct list_head *l, u32 value) 1134bool tipc_dest_push(struct list_head *l, u32 node, u32 port)
1072{ 1135{
1073 struct u32_item *item; 1136 u64 value = (u64)node << 32 | port;
1137 struct tipc_dest *dst;
1074 1138
1075 list_for_each_entry(item, l, list) { 1139 if (tipc_dest_find(l, node, port))
1076 if (item->value == value)
1077 return false;
1078 }
1079 item = kmalloc(sizeof(*item), GFP_ATOMIC);
1080 if (unlikely(!item))
1081 return false; 1140 return false;
1082 1141
1083 item->value = value; 1142 dst = kmalloc(sizeof(*dst), GFP_ATOMIC);
1084 list_add(&item->list, l); 1143 if (unlikely(!dst))
1144 return false;
1145 dst->value = value;
1146 list_add(&dst->list, l);
1085 return true; 1147 return true;
1086} 1148}
1087 1149
1088u32 u32_pop(struct list_head *l) 1150bool tipc_dest_pop(struct list_head *l, u32 *node, u32 *port)
1089{ 1151{
1090 struct u32_item *item; 1152 struct tipc_dest *dst;
1091 u32 value = 0;
1092 1153
1093 if (list_empty(l)) 1154 if (list_empty(l))
1094 return 0; 1155 return false;
1095 item = list_first_entry(l, typeof(*item), list); 1156 dst = list_first_entry(l, typeof(*dst), list);
1096 value = item->value; 1157 if (port)
1097 list_del(&item->list); 1158 *port = dst->port;
1098 kfree(item); 1159 if (node)
1099 return value; 1160 *node = dst->node;
1161 list_del(&dst->list);
1162 kfree(dst);
1163 return true;
1100} 1164}
1101 1165
1102bool u32_del(struct list_head *l, u32 value) 1166bool tipc_dest_del(struct list_head *l, u32 node, u32 port)
1103{ 1167{
1104 struct u32_item *item, *tmp; 1168 struct tipc_dest *dst;
1105 1169
1106 list_for_each_entry_safe(item, tmp, l, list) { 1170 dst = tipc_dest_find(l, node, port);
1107 if (item->value != value) 1171 if (!dst)
1108 continue; 1172 return false;
1109 list_del(&item->list); 1173 list_del(&dst->list);
1110 kfree(item); 1174 kfree(dst);
1111 return true; 1175 return true;
1112 }
1113 return false;
1114} 1176}
1115 1177
1116void u32_list_purge(struct list_head *l) 1178void tipc_dest_list_purge(struct list_head *l)
1117{ 1179{
1118 struct u32_item *item, *tmp; 1180 struct tipc_dest *dst, *tmp;
1119 1181
1120 list_for_each_entry_safe(item, tmp, l, list) { 1182 list_for_each_entry_safe(dst, tmp, l, list) {
1121 list_del(&item->list); 1183 list_del(&dst->list);
1122 kfree(item); 1184 kfree(dst);
1123 } 1185 }
1124} 1186}
1125 1187
1126int u32_list_len(struct list_head *l) 1188int tipc_dest_list_len(struct list_head *l)
1127{ 1189{
1128 struct u32_item *item; 1190 struct tipc_dest *dst;
1129 int i = 0; 1191 int i = 0;
1130 1192
1131 list_for_each_entry(item, l, list) { 1193 list_for_each_entry(dst, l, list) {
1132 i++; 1194 i++;
1133 } 1195 }
1134 return i; 1196 return i;
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 6ebdeb1d84a5..71926e429446 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -40,6 +40,7 @@
40struct tipc_subscription; 40struct tipc_subscription;
41struct tipc_plist; 41struct tipc_plist;
42struct tipc_nlist; 42struct tipc_nlist;
43struct tipc_group;
43 44
44/* 45/*
45 * TIPC name types reserved for internal TIPC use (both current and planned) 46 * TIPC name types reserved for internal TIPC use (both current and planned)
@@ -101,9 +102,14 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
101u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); 102u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
102int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 103int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
103 u32 limit, struct list_head *dports); 104 u32 limit, struct list_head *dports);
105void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
106 u32 type, u32 domain);
104void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, 107void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
105 u32 upper, u32 domain, 108 u32 upper, u32 domain,
106 struct tipc_nlist *nodes); 109 struct tipc_nlist *nodes);
110bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
111 struct list_head *dsts, int *dstcnt, u32 exclude,
112 bool all);
107struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, 113struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
108 u32 upper, u32 scope, u32 port_ref, 114 u32 upper, u32 scope, u32 port_ref,
109 u32 key); 115 u32 key);
@@ -120,16 +126,22 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
120int tipc_nametbl_init(struct net *net); 126int tipc_nametbl_init(struct net *net);
121void tipc_nametbl_stop(struct net *net); 127void tipc_nametbl_stop(struct net *net);
122 128
123struct u32_item { 129struct tipc_dest {
124 struct list_head list; 130 struct list_head list;
125 u32 value; 131 union {
132 struct {
133 u32 port;
134 u32 node;
135 };
136 u64 value;
137 };
126}; 138};
127 139
128bool u32_push(struct list_head *l, u32 value); 140struct tipc_dest *tipc_dest_find(struct list_head *l, u32 node, u32 port);
129u32 u32_pop(struct list_head *l); 141bool tipc_dest_push(struct list_head *l, u32 node, u32 port);
130bool u32_find(struct list_head *l, u32 value); 142bool tipc_dest_pop(struct list_head *l, u32 *node, u32 *port);
131bool u32_del(struct list_head *l, u32 value); 143bool tipc_dest_del(struct list_head *l, u32 node, u32 port);
132void u32_list_purge(struct list_head *l); 144void tipc_dest_list_purge(struct list_head *l);
133int u32_list_len(struct list_head *l); 145int tipc_dest_list_len(struct list_head *l);
134 146
135#endif 147#endif
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 198dbc7adbe1..507017fe0f1b 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -153,11 +153,11 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
153 bool delete); 153 bool delete);
154static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq); 154static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
155static void tipc_node_delete(struct tipc_node *node); 155static void tipc_node_delete(struct tipc_node *node);
156static void tipc_node_timeout(unsigned long data); 156static void tipc_node_timeout(struct timer_list *t);
157static void tipc_node_fsm_evt(struct tipc_node *n, int evt); 157static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
158static struct tipc_node *tipc_node_find(struct net *net, u32 addr); 158static struct tipc_node *tipc_node_find(struct net *net, u32 addr);
159static void tipc_node_put(struct tipc_node *node); 159static void tipc_node_put(struct tipc_node *node);
160static bool tipc_node_is_up(struct tipc_node *n); 160static bool node_is_up(struct tipc_node *n);
161 161
162struct tipc_sock_conn { 162struct tipc_sock_conn {
163 u32 port; 163 u32 port;
@@ -361,7 +361,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
361 goto exit; 361 goto exit;
362 } 362 }
363 tipc_node_get(n); 363 tipc_node_get(n);
364 setup_timer(&n->timer, tipc_node_timeout, (unsigned long)n); 364 timer_setup(&n->timer, tipc_node_timeout, 0);
365 n->keepalive_intv = U32_MAX; 365 n->keepalive_intv = U32_MAX;
366 hlist_add_head_rcu(&n->hash, &tn->node_htable[tipc_hashfn(addr)]); 366 hlist_add_head_rcu(&n->hash, &tn->node_htable[tipc_hashfn(addr)]);
367 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 367 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
@@ -500,9 +500,9 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
500 500
501/* tipc_node_timeout - handle expiration of node timer 501/* tipc_node_timeout - handle expiration of node timer
502 */ 502 */
503static void tipc_node_timeout(unsigned long data) 503static void tipc_node_timeout(struct timer_list *t)
504{ 504{
505 struct tipc_node *n = (struct tipc_node *)data; 505 struct tipc_node *n = from_timer(n, t, timer);
506 struct tipc_link_entry *le; 506 struct tipc_link_entry *le;
507 struct sk_buff_head xmitq; 507 struct sk_buff_head xmitq;
508 int bearer_id; 508 int bearer_id;
@@ -657,7 +657,7 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
657 *slot1 = i; 657 *slot1 = i;
658 } 658 }
659 659
660 if (!tipc_node_is_up(n)) { 660 if (!node_is_up(n)) {
661 if (tipc_link_peer_is_down(l)) 661 if (tipc_link_peer_is_down(l))
662 tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); 662 tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
663 tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT); 663 tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT);
@@ -717,11 +717,27 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
717 tipc_sk_rcv(n->net, &le->inputq); 717 tipc_sk_rcv(n->net, &le->inputq);
718} 718}
719 719
720static bool tipc_node_is_up(struct tipc_node *n) 720static bool node_is_up(struct tipc_node *n)
721{ 721{
722 return n->active_links[0] != INVALID_BEARER_ID; 722 return n->active_links[0] != INVALID_BEARER_ID;
723} 723}
724 724
725bool tipc_node_is_up(struct net *net, u32 addr)
726{
727 struct tipc_node *n;
728 bool retval = false;
729
730 if (in_own_node(net, addr))
731 return true;
732
733 n = tipc_node_find(net, addr);
734 if (!n)
735 return false;
736 retval = node_is_up(n);
737 tipc_node_put(n);
738 return retval;
739}
740
725void tipc_node_check_dest(struct net *net, u32 onode, 741void tipc_node_check_dest(struct net *net, u32 onode,
726 struct tipc_bearer *b, 742 struct tipc_bearer *b,
727 u16 capabilities, u32 signature, 743 u16 capabilities, u32 signature,
@@ -1149,7 +1165,7 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
1149 1165
1150 if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr)) 1166 if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr))
1151 goto attr_msg_full; 1167 goto attr_msg_full;
1152 if (tipc_node_is_up(node)) 1168 if (node_is_up(node))
1153 if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP)) 1169 if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP))
1154 goto attr_msg_full; 1170 goto attr_msg_full;
1155 1171
@@ -1238,6 +1254,22 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1238 return 0; 1254 return 0;
1239} 1255}
1240 1256
1257/* tipc_node_distr_xmit(): send single buffer msgs to individual destinations
1258 * Note: this is only for SYSTEM_IMPORTANCE messages, which cannot be rejected
1259 */
1260int tipc_node_distr_xmit(struct net *net, struct sk_buff_head *xmitq)
1261{
1262 struct sk_buff *skb;
1263 u32 selector, dnode;
1264
1265 while ((skb = __skb_dequeue(xmitq))) {
1266 selector = msg_origport(buf_msg(skb));
1267 dnode = msg_destnode(buf_msg(skb));
1268 tipc_node_xmit_skb(net, skb, dnode, selector);
1269 }
1270 return 0;
1271}
1272
1241void tipc_node_broadcast(struct net *net, struct sk_buff *skb) 1273void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
1242{ 1274{
1243 struct sk_buff *txskb; 1275 struct sk_buff *txskb;
@@ -1249,7 +1281,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
1249 dst = n->addr; 1281 dst = n->addr;
1250 if (in_own_node(net, dst)) 1282 if (in_own_node(net, dst))
1251 continue; 1283 continue;
1252 if (!tipc_node_is_up(n)) 1284 if (!node_is_up(n))
1253 continue; 1285 continue;
1254 txskb = pskb_copy(skb, GFP_ATOMIC); 1286 txskb = pskb_copy(skb, GFP_ATOMIC);
1255 if (!txskb) 1287 if (!txskb)
@@ -1507,7 +1539,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1507 __skb_queue_head_init(&xmitq); 1539 __skb_queue_head_init(&xmitq);
1508 1540
1509 /* Ensure message is well-formed before touching the header */ 1541 /* Ensure message is well-formed before touching the header */
1510 if (unlikely(!tipc_msg_validate(skb))) 1542 if (unlikely(!tipc_msg_validate(&skb)))
1511 goto discard; 1543 goto discard;
1512 hdr = buf_msg(skb); 1544 hdr = buf_msg(skb);
1513 usr = msg_user(hdr); 1545 usr = msg_user(hdr);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 898c22916984..acd58d23a70e 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -48,7 +48,8 @@ enum {
48 TIPC_BCAST_SYNCH = (1 << 1), 48 TIPC_BCAST_SYNCH = (1 << 1),
49 TIPC_BCAST_STATE_NACK = (1 << 2), 49 TIPC_BCAST_STATE_NACK = (1 << 2),
50 TIPC_BLOCK_FLOWCTL = (1 << 3), 50 TIPC_BLOCK_FLOWCTL = (1 << 3),
51 TIPC_BCAST_RCAST = (1 << 4) 51 TIPC_BCAST_RCAST = (1 << 4),
52 TIPC_MCAST_GROUPS = (1 << 5)
52}; 53};
53 54
54#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ 55#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \
@@ -68,6 +69,7 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
68 char *linkname, size_t len); 69 char *linkname, size_t len);
69int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode, 70int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
70 int selector); 71 int selector);
72int tipc_node_distr_xmit(struct net *net, struct sk_buff_head *list);
71int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest, 73int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
72 u32 selector); 74 u32 selector);
73void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr); 75void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr);
@@ -76,6 +78,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
76int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port); 78int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
77void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); 79void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
78int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel); 80int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel);
81bool tipc_node_is_up(struct net *net, u32 addr);
79u16 tipc_node_get_capabilities(struct net *net, u32 addr); 82u16 tipc_node_get_capabilities(struct net *net, u32 addr);
80int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb); 83int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
81int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb); 84int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb);
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 3cd6402e812c..acaef80fb88c 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -36,6 +36,8 @@
36#include "server.h" 36#include "server.h"
37#include "core.h" 37#include "core.h"
38#include "socket.h" 38#include "socket.h"
39#include "addr.h"
40#include "msg.h"
39#include <net/sock.h> 41#include <net/sock.h>
40#include <linux/module.h> 42#include <linux/module.h>
41 43
@@ -105,13 +107,11 @@ static void tipc_conn_kref_release(struct kref *kref)
105 kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr)); 107 kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr));
106 sock_release(sock); 108 sock_release(sock);
107 con->sock = NULL; 109 con->sock = NULL;
108
109 spin_lock_bh(&s->idr_lock);
110 idr_remove(&s->conn_idr, con->conid);
111 s->idr_in_use--;
112 spin_unlock_bh(&s->idr_lock);
113 } 110 }
114 111 spin_lock_bh(&s->idr_lock);
112 idr_remove(&s->conn_idr, con->conid);
113 s->idr_in_use--;
114 spin_unlock_bh(&s->idr_lock);
115 tipc_clean_outqueues(con); 115 tipc_clean_outqueues(con);
116 kfree(con); 116 kfree(con);
117} 117}
@@ -197,7 +197,8 @@ static void tipc_close_conn(struct tipc_conn *con)
197 struct tipc_server *s = con->server; 197 struct tipc_server *s = con->server;
198 198
199 if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { 199 if (test_and_clear_bit(CF_CONNECTED, &con->flags)) {
200 tipc_unregister_callbacks(con); 200 if (con->sock)
201 tipc_unregister_callbacks(con);
201 202
202 if (con->conid) 203 if (con->conid)
203 s->tipc_conn_release(con->conid, con->usr_data); 204 s->tipc_conn_release(con->conid, con->usr_data);
@@ -207,8 +208,8 @@ static void tipc_close_conn(struct tipc_conn *con)
207 * are harmless for us here as we have already deleted this 208 * are harmless for us here as we have already deleted this
208 * connection from server connection list. 209 * connection from server connection list.
209 */ 210 */
210 kernel_sock_shutdown(con->sock, SHUT_RDWR); 211 if (con->sock)
211 212 kernel_sock_shutdown(con->sock, SHUT_RDWR);
212 conn_put(con); 213 conn_put(con);
213 } 214 }
214} 215}
@@ -487,38 +488,104 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
487 } 488 }
488} 489}
489 490
491bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
492 u32 lower, u32 upper, int *conid)
493{
494 struct tipc_subscriber *scbr;
495 struct tipc_subscr sub;
496 struct tipc_server *s;
497 struct tipc_conn *con;
498
499 sub.seq.type = type;
500 sub.seq.lower = lower;
501 sub.seq.upper = upper;
502 sub.timeout = TIPC_WAIT_FOREVER;
503 sub.filter = TIPC_SUB_PORTS;
504 *(u32 *)&sub.usr_handle = port;
505
506 con = tipc_alloc_conn(tipc_topsrv(net));
507 if (IS_ERR(con))
508 return false;
509
510 *conid = con->conid;
511 s = con->server;
512 scbr = s->tipc_conn_new(*conid);
513 if (!scbr) {
514 tipc_close_conn(con);
515 return false;
516 }
517
518 con->usr_data = scbr;
519 con->sock = NULL;
520 s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub));
521 return true;
522}
523
524void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
525{
526 struct tipc_conn *con;
527
528 con = tipc_conn_lookup(tipc_topsrv(net), conid);
529 if (!con)
530 return;
531 tipc_close_conn(con);
532 conn_put(con);
533}
534
535static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
536{
537 u32 port = *(u32 *)&evt->s.usr_handle;
538 u32 self = tipc_own_addr(net);
539 struct sk_buff_head evtq;
540 struct sk_buff *skb;
541
542 skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt),
543 self, self, port, port, 0);
544 if (!skb)
545 return;
546 msg_set_dest_droppable(buf_msg(skb), true);
547 memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
548 skb_queue_head_init(&evtq);
549 __skb_queue_tail(&evtq, skb);
550 tipc_sk_rcv(net, &evtq);
551}
552
490static void tipc_send_to_sock(struct tipc_conn *con) 553static void tipc_send_to_sock(struct tipc_conn *con)
491{ 554{
492 int count = 0;
493 struct tipc_server *s = con->server; 555 struct tipc_server *s = con->server;
494 struct outqueue_entry *e; 556 struct outqueue_entry *e;
557 struct tipc_event *evt;
495 struct msghdr msg; 558 struct msghdr msg;
559 int count = 0;
496 int ret; 560 int ret;
497 561
498 spin_lock_bh(&con->outqueue_lock); 562 spin_lock_bh(&con->outqueue_lock);
499 while (test_bit(CF_CONNECTED, &con->flags)) { 563 while (test_bit(CF_CONNECTED, &con->flags)) {
500 e = list_entry(con->outqueue.next, struct outqueue_entry, 564 e = list_entry(con->outqueue.next, struct outqueue_entry, list);
501 list);
502 if ((struct list_head *) e == &con->outqueue) 565 if ((struct list_head *) e == &con->outqueue)
503 break; 566 break;
504 spin_unlock_bh(&con->outqueue_lock);
505 567
506 memset(&msg, 0, sizeof(msg)); 568 spin_unlock_bh(&con->outqueue_lock);
507 msg.msg_flags = MSG_DONTWAIT;
508 569
509 if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { 570 if (con->sock) {
510 msg.msg_name = &e->dest; 571 memset(&msg, 0, sizeof(msg));
511 msg.msg_namelen = sizeof(struct sockaddr_tipc); 572 msg.msg_flags = MSG_DONTWAIT;
512 } 573 if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
513 ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, 574 msg.msg_name = &e->dest;
514 e->iov.iov_len); 575 msg.msg_namelen = sizeof(struct sockaddr_tipc);
515 if (ret == -EWOULDBLOCK || ret == 0) { 576 }
516 cond_resched(); 577 ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
517 goto out; 578 e->iov.iov_len);
518 } else if (ret < 0) { 579 if (ret == -EWOULDBLOCK || ret == 0) {
519 goto send_err; 580 cond_resched();
581 goto out;
582 } else if (ret < 0) {
583 goto send_err;
584 }
585 } else {
586 evt = e->iov.iov_base;
587 tipc_send_kern_top_evt(s->net, evt);
520 } 588 }
521
522 /* Don't starve users filling buffers */ 589 /* Don't starve users filling buffers */
523 if (++count >= MAX_SEND_MSG_COUNT) { 590 if (++count >= MAX_SEND_MSG_COUNT) {
524 cond_resched(); 591 cond_resched();
diff --git a/net/tipc/server.h b/net/tipc/server.h
index 34f8055afa3b..2113c9192633 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -83,13 +83,16 @@ struct tipc_server {
83int tipc_conn_sendmsg(struct tipc_server *s, int conid, 83int tipc_conn_sendmsg(struct tipc_server *s, int conid,
84 struct sockaddr_tipc *addr, void *data, size_t len); 84 struct sockaddr_tipc *addr, void *data, size_t len);
85 85
86bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
87 u32 lower, u32 upper, int *conid);
88void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
89
86/** 90/**
87 * tipc_conn_terminate - terminate connection with server 91 * tipc_conn_terminate - terminate connection with server
88 * 92 *
89 * Note: Must call it in process context since it might sleep 93 * Note: Must call it in process context since it might sleep
90 */ 94 */
91void tipc_conn_terminate(struct tipc_server *s, int conid); 95void tipc_conn_terminate(struct tipc_server *s, int conid);
92
93int tipc_server_start(struct tipc_server *s); 96int tipc_server_start(struct tipc_server *s);
94 97
95void tipc_server_stop(struct tipc_server *s); 98void tipc_server_stop(struct tipc_server *s);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d50edd6e0019..5d18c0caa92b 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/socket.c: TIPC socket API 2 * net/tipc/socket.c: TIPC socket API
3 * 3 *
4 * Copyright (c) 2001-2007, 2012-2016, Ericsson AB 4 * Copyright (c) 2001-2007, 2012-2017, Ericsson AB
5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems 5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -45,9 +45,10 @@
45#include "socket.h" 45#include "socket.h"
46#include "bcast.h" 46#include "bcast.h"
47#include "netlink.h" 47#include "netlink.h"
48#include "group.h"
48 49
49#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ 50#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
50#define CONN_PROBING_INTERVAL msecs_to_jiffies(3600000) /* [ms] => 1 h */ 51#define CONN_PROBING_INTV msecs_to_jiffies(3600000) /* [ms] => 1 h */
51#define TIPC_FWD_MSG 1 52#define TIPC_FWD_MSG 1
52#define TIPC_MAX_PORT 0xffffffff 53#define TIPC_MAX_PORT 0xffffffff
53#define TIPC_MIN_PORT 1 54#define TIPC_MIN_PORT 1
@@ -61,6 +62,11 @@ enum {
61 TIPC_CONNECTING = TCP_SYN_SENT, 62 TIPC_CONNECTING = TCP_SYN_SENT,
62}; 63};
63 64
65struct sockaddr_pair {
66 struct sockaddr_tipc sock;
67 struct sockaddr_tipc member;
68};
69
64/** 70/**
65 * struct tipc_sock - TIPC socket structure 71 * struct tipc_sock - TIPC socket structure
66 * @sk: socket - interacts with 'port' and with user via the socket API 72 * @sk: socket - interacts with 'port' and with user via the socket API
@@ -78,7 +84,7 @@ enum {
78 * @conn_timeout: the time we can wait for an unresponded setup request 84 * @conn_timeout: the time we can wait for an unresponded setup request
79 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue 85 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
80 * @cong_link_cnt: number of congested links 86 * @cong_link_cnt: number of congested links
81 * @sent_unacked: # messages sent by socket, and not yet acked by peer 87 * @snt_unacked: # messages sent by socket, and not yet acked by peer
82 * @rcv_unacked: # messages read by user, but not yet acked back to peer 88 * @rcv_unacked: # messages read by user, but not yet acked back to peer
83 * @peer: 'connected' peer for dgram/rdm 89 * @peer: 'connected' peer for dgram/rdm
84 * @node: hash table node 90 * @node: hash table node
@@ -109,20 +115,22 @@ struct tipc_sock {
109 struct rhash_head node; 115 struct rhash_head node;
110 struct tipc_mc_method mc_method; 116 struct tipc_mc_method mc_method;
111 struct rcu_head rcu; 117 struct rcu_head rcu;
118 struct tipc_group *group;
112}; 119};
113 120
114static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); 121static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
115static void tipc_data_ready(struct sock *sk); 122static void tipc_data_ready(struct sock *sk);
116static void tipc_write_space(struct sock *sk); 123static void tipc_write_space(struct sock *sk);
117static void tipc_sock_destruct(struct sock *sk); 124static void tipc_sock_destruct(struct sock *sk);
118static int tipc_release(struct socket *sock); 125static int tipc_release(struct socket *sock);
119static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags, 126static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
120 bool kern); 127 bool kern);
121static void tipc_sk_timeout(unsigned long data); 128static void tipc_sk_timeout(struct timer_list *t);
122static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, 129static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
123 struct tipc_name_seq const *seq); 130 struct tipc_name_seq const *seq);
124static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, 131static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
125 struct tipc_name_seq const *seq); 132 struct tipc_name_seq const *seq);
133static int tipc_sk_leave(struct tipc_sock *tsk);
126static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); 134static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
127static int tipc_sk_insert(struct tipc_sock *tsk); 135static int tipc_sk_insert(struct tipc_sock *tsk);
128static void tipc_sk_remove(struct tipc_sock *tsk); 136static void tipc_sk_remove(struct tipc_sock *tsk);
@@ -193,6 +201,11 @@ static bool tsk_conn_cong(struct tipc_sock *tsk)
193 return tsk->snt_unacked > tsk->snd_win; 201 return tsk->snt_unacked > tsk->snd_win;
194} 202}
195 203
204static u16 tsk_blocks(int len)
205{
206 return ((len / FLOWCTL_BLK_SZ) + 1);
207}
208
196/* tsk_blocks(): translate a buffer size in bytes to number of 209/* tsk_blocks(): translate a buffer size in bytes to number of
197 * advertisable blocks, taking into account the ratio truesize(len)/len 210 * advertisable blocks, taking into account the ratio truesize(len)/len
198 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ 211 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
@@ -451,9 +464,9 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
451 NAMED_H_SIZE, 0); 464 NAMED_H_SIZE, 0);
452 465
453 msg_set_origport(msg, tsk->portid); 466 msg_set_origport(msg, tsk->portid);
454 setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk); 467 timer_setup(&sk->sk_timer, tipc_sk_timeout, 0);
455 sk->sk_shutdown = 0; 468 sk->sk_shutdown = 0;
456 sk->sk_backlog_rcv = tipc_backlog_rcv; 469 sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
457 sk->sk_rcvbuf = sysctl_tipc_rmem[1]; 470 sk->sk_rcvbuf = sysctl_tipc_rmem[1];
458 sk->sk_data_ready = tipc_data_ready; 471 sk->sk_data_ready = tipc_data_ready;
459 sk->sk_write_space = tipc_write_space; 472 sk->sk_write_space = tipc_write_space;
@@ -559,13 +572,14 @@ static int tipc_release(struct socket *sock)
559 572
560 __tipc_shutdown(sock, TIPC_ERR_NO_PORT); 573 __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
561 sk->sk_shutdown = SHUTDOWN_MASK; 574 sk->sk_shutdown = SHUTDOWN_MASK;
575 tipc_sk_leave(tsk);
562 tipc_sk_withdraw(tsk, 0, NULL); 576 tipc_sk_withdraw(tsk, 0, NULL);
563 sk_stop_timer(sk, &sk->sk_timer); 577 sk_stop_timer(sk, &sk->sk_timer);
564 tipc_sk_remove(tsk); 578 tipc_sk_remove(tsk);
565 579
566 /* Reject any messages that accumulated in backlog queue */ 580 /* Reject any messages that accumulated in backlog queue */
567 release_sock(sk); 581 release_sock(sk);
568 u32_list_purge(&tsk->cong_links); 582 tipc_dest_list_purge(&tsk->cong_links);
569 tsk->cong_link_cnt = 0; 583 tsk->cong_link_cnt = 0;
570 call_rcu(&tsk->rcu, tipc_sk_callback); 584 call_rcu(&tsk->rcu, tipc_sk_callback);
571 sock->sk = NULL; 585 sock->sk = NULL;
@@ -601,7 +615,10 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
601 res = tipc_sk_withdraw(tsk, 0, NULL); 615 res = tipc_sk_withdraw(tsk, 0, NULL);
602 goto exit; 616 goto exit;
603 } 617 }
604 618 if (tsk->group) {
619 res = -EACCES;
620 goto exit;
621 }
605 if (uaddr_len < sizeof(struct sockaddr_tipc)) { 622 if (uaddr_len < sizeof(struct sockaddr_tipc)) {
606 res = -EINVAL; 623 res = -EINVAL;
607 goto exit; 624 goto exit;
@@ -698,38 +715,41 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
698{ 715{
699 struct sock *sk = sock->sk; 716 struct sock *sk = sock->sk;
700 struct tipc_sock *tsk = tipc_sk(sk); 717 struct tipc_sock *tsk = tipc_sk(sk);
701 u32 mask = 0; 718 struct tipc_group *grp = tsk->group;
719 u32 revents = 0;
702 720
703 sock_poll_wait(file, sk_sleep(sk), wait); 721 sock_poll_wait(file, sk_sleep(sk), wait);
704 722
705 if (sk->sk_shutdown & RCV_SHUTDOWN) 723 if (sk->sk_shutdown & RCV_SHUTDOWN)
706 mask |= POLLRDHUP | POLLIN | POLLRDNORM; 724 revents |= POLLRDHUP | POLLIN | POLLRDNORM;
707 if (sk->sk_shutdown == SHUTDOWN_MASK) 725 if (sk->sk_shutdown == SHUTDOWN_MASK)
708 mask |= POLLHUP; 726 revents |= POLLHUP;
709 727
710 switch (sk->sk_state) { 728 switch (sk->sk_state) {
711 case TIPC_ESTABLISHED: 729 case TIPC_ESTABLISHED:
712 if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk)) 730 if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
713 mask |= POLLOUT; 731 revents |= POLLOUT;
714 /* fall thru' */ 732 /* fall thru' */
715 case TIPC_LISTEN: 733 case TIPC_LISTEN:
716 case TIPC_CONNECTING: 734 case TIPC_CONNECTING:
717 if (!skb_queue_empty(&sk->sk_receive_queue)) 735 if (!skb_queue_empty(&sk->sk_receive_queue))
718 mask |= (POLLIN | POLLRDNORM); 736 revents |= POLLIN | POLLRDNORM;
719 break; 737 break;
720 case TIPC_OPEN: 738 case TIPC_OPEN:
721 if (!tsk->cong_link_cnt) 739 if (!grp || tipc_group_size(grp))
722 mask |= POLLOUT; 740 if (!tsk->cong_link_cnt)
723 if (tipc_sk_type_connectionless(sk) && 741 revents |= POLLOUT;
724 (!skb_queue_empty(&sk->sk_receive_queue))) 742 if (!tipc_sk_type_connectionless(sk))
725 mask |= (POLLIN | POLLRDNORM); 743 break;
744 if (skb_queue_empty(&sk->sk_receive_queue))
745 break;
746 revents |= POLLIN | POLLRDNORM;
726 break; 747 break;
727 case TIPC_DISCONNECTING: 748 case TIPC_DISCONNECTING:
728 mask = (POLLIN | POLLRDNORM | POLLHUP); 749 revents = POLLIN | POLLRDNORM | POLLHUP;
729 break; 750 break;
730 } 751 }
731 752 return revents;
732 return mask;
733} 753}
734 754
735/** 755/**
@@ -757,6 +777,9 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
757 struct tipc_nlist dsts; 777 struct tipc_nlist dsts;
758 int rc; 778 int rc;
759 779
780 if (tsk->group)
781 return -EACCES;
782
760 /* Block or return if any destination link is congested */ 783 /* Block or return if any destination link is congested */
761 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); 784 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
762 if (unlikely(rc)) 785 if (unlikely(rc))
@@ -794,6 +817,296 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
794} 817}
795 818
796/** 819/**
820 * tipc_send_group_msg - send a message to a member in the group
821 * @net: network namespace
822 * @m: message to send
823 * @mb: group member
824 * @dnode: destination node
825 * @dport: destination port
826 * @dlen: total length of message data
827 */
828static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
829 struct msghdr *m, struct tipc_member *mb,
830 u32 dnode, u32 dport, int dlen)
831{
832 u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
833 struct tipc_mc_method *method = &tsk->mc_method;
834 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
835 struct tipc_msg *hdr = &tsk->phdr;
836 struct sk_buff_head pkts;
837 int mtu, rc;
838
839 /* Complete message header */
840 msg_set_type(hdr, TIPC_GRP_UCAST_MSG);
841 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
842 msg_set_destport(hdr, dport);
843 msg_set_destnode(hdr, dnode);
844 msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
845
846 /* Build message as chain of buffers */
847 skb_queue_head_init(&pkts);
848 mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
849 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
850 if (unlikely(rc != dlen))
851 return rc;
852
853 /* Send message */
854 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
855 if (unlikely(rc == -ELINKCONG)) {
856 tipc_dest_push(&tsk->cong_links, dnode, 0);
857 tsk->cong_link_cnt++;
858 }
859
860 /* Update send window */
861 tipc_group_update_member(mb, blks);
862
863 /* A broadcast sent within next EXPIRE period must follow same path */
864 method->rcast = true;
865 method->mandatory = true;
866 return dlen;
867}
868
869/**
870 * tipc_send_group_unicast - send message to a member in the group
871 * @sock: socket structure
872 * @m: message to send
873 * @dlen: total length of message data
874 * @timeout: timeout to wait for wakeup
875 *
876 * Called from function tipc_sendmsg(), which has done all sanity checks
877 * Returns the number of bytes sent on success, or errno
878 */
879static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
880 int dlen, long timeout)
881{
882 struct sock *sk = sock->sk;
883 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
884 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
885 struct tipc_sock *tsk = tipc_sk(sk);
886 struct tipc_group *grp = tsk->group;
887 struct net *net = sock_net(sk);
888 struct tipc_member *mb = NULL;
889 u32 node, port;
890 int rc;
891
892 node = dest->addr.id.node;
893 port = dest->addr.id.ref;
894 if (!port && !node)
895 return -EHOSTUNREACH;
896
897 /* Block or return if destination link or member is congested */
898 rc = tipc_wait_for_cond(sock, &timeout,
899 !tipc_dest_find(&tsk->cong_links, node, 0) &&
900 !tipc_group_cong(grp, node, port, blks, &mb));
901 if (unlikely(rc))
902 return rc;
903
904 if (unlikely(!mb))
905 return -EHOSTUNREACH;
906
907 rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen);
908
909 return rc ? rc : dlen;
910}
911
912/**
913 * tipc_send_group_anycast - send message to any member with given identity
914 * @sock: socket structure
915 * @m: message to send
916 * @dlen: total length of message data
917 * @timeout: timeout to wait for wakeup
918 *
919 * Called from function tipc_sendmsg(), which has done all sanity checks
920 * Returns the number of bytes sent on success, or errno
921 */
922static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
923 int dlen, long timeout)
924{
925 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
926 struct sock *sk = sock->sk;
927 struct tipc_sock *tsk = tipc_sk(sk);
928 struct list_head *cong_links = &tsk->cong_links;
929 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
930 struct tipc_group *grp = tsk->group;
931 struct tipc_member *first = NULL;
932 struct tipc_member *mbr = NULL;
933 struct net *net = sock_net(sk);
934 u32 node, port, exclude;
935 u32 type, inst, domain;
936 struct list_head dsts;
937 int lookups = 0;
938 int dstcnt, rc;
939 bool cong;
940
941 INIT_LIST_HEAD(&dsts);
942
943 type = dest->addr.name.name.type;
944 inst = dest->addr.name.name.instance;
945 domain = addr_domain(net, dest->scope);
946 exclude = tipc_group_exclude(grp);
947
948 while (++lookups < 4) {
949 first = NULL;
950
951 /* Look for a non-congested destination member, if any */
952 while (1) {
953 if (!tipc_nametbl_lookup(net, type, inst, domain, &dsts,
954 &dstcnt, exclude, false))
955 return -EHOSTUNREACH;
956 tipc_dest_pop(&dsts, &node, &port);
957 cong = tipc_group_cong(grp, node, port, blks, &mbr);
958 if (!cong)
959 break;
960 if (mbr == first)
961 break;
962 if (!first)
963 first = mbr;
964 }
965
966 /* Start over if destination was not in member list */
967 if (unlikely(!mbr))
968 continue;
969
970 if (likely(!cong && !tipc_dest_find(cong_links, node, 0)))
971 break;
972
973 /* Block or return if destination link or member is congested */
974 rc = tipc_wait_for_cond(sock, &timeout,
975 !tipc_dest_find(cong_links, node, 0) &&
976 !tipc_group_cong(grp, node, port,
977 blks, &mbr));
978 if (unlikely(rc))
979 return rc;
980
981 /* Send, unless destination disappeared while waiting */
982 if (likely(mbr))
983 break;
984 }
985
986 if (unlikely(lookups >= 4))
987 return -EHOSTUNREACH;
988
989 rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen);
990
991 return rc ? rc : dlen;
992}
993
994/**
995 * tipc_send_group_bcast - send message to all members in communication group
996 * @sk: socket structure
997 * @m: message to send
998 * @dlen: total length of message data
999 * @timeout: timeout to wait for wakeup
1000 *
1001 * Called from function tipc_sendmsg(), which has done all sanity checks
1002 * Returns the number of bytes sent on success, or errno
1003 */
1004static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1005 int dlen, long timeout)
1006{
1007 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1008 struct sock *sk = sock->sk;
1009 struct net *net = sock_net(sk);
1010 struct tipc_sock *tsk = tipc_sk(sk);
1011 struct tipc_group *grp = tsk->group;
1012 struct tipc_nlist *dsts = tipc_group_dests(grp);
1013 struct tipc_mc_method *method = &tsk->mc_method;
1014 bool ack = method->mandatory && method->rcast;
1015 int blks = tsk_blocks(MCAST_H_SIZE + dlen);
1016 struct tipc_msg *hdr = &tsk->phdr;
1017 int mtu = tipc_bcast_get_mtu(net);
1018 struct sk_buff_head pkts;
1019 int rc = -EHOSTUNREACH;
1020
1021 if (!dsts->local && !dsts->remote)
1022 return -EHOSTUNREACH;
1023
1024 /* Block or return if any destination link or member is congested */
1025 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt &&
1026 !tipc_group_bc_cong(grp, blks));
1027 if (unlikely(rc))
1028 return rc;
1029
1030 /* Complete message header */
1031 if (dest) {
1032 msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
1033 msg_set_nameinst(hdr, dest->addr.name.name.instance);
1034 } else {
1035 msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
1036 msg_set_nameinst(hdr, 0);
1037 }
1038 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
1039 msg_set_destport(hdr, 0);
1040 msg_set_destnode(hdr, 0);
1041 msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
1042
1043 /* Avoid getting stuck with repeated forced replicasts */
1044 msg_set_grp_bc_ack_req(hdr, ack);
1045
1046 /* Build message as chain of buffers */
1047 skb_queue_head_init(&pkts);
1048 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1049 if (unlikely(rc != dlen))
1050 return rc;
1051
1052 /* Send message */
1053 rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
1054 if (unlikely(rc))
1055 return rc;
1056
1057 /* Update broadcast sequence number and send windows */
1058 tipc_group_update_bc_members(tsk->group, blks, ack);
1059
1060 /* Broadcast link is now free to choose method for next broadcast */
1061 method->mandatory = false;
1062 method->expires = jiffies;
1063
1064 return dlen;
1065}
1066
1067/**
1068 * tipc_send_group_mcast - send message to all members with given identity
1069 * @sock: socket structure
1070 * @m: message to send
1071 * @dlen: total length of message data
1072 * @timeout: timeout to wait for wakeup
1073 *
1074 * Called from function tipc_sendmsg(), which has done all sanity checks
1075 * Returns the number of bytes sent on success, or errno
1076 */
1077static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
1078 int dlen, long timeout)
1079{
1080 struct sock *sk = sock->sk;
1081 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1082 struct tipc_name_seq *seq = &dest->addr.nameseq;
1083 struct tipc_sock *tsk = tipc_sk(sk);
1084 struct tipc_group *grp = tsk->group;
1085 struct net *net = sock_net(sk);
1086 u32 domain, exclude, dstcnt;
1087 struct list_head dsts;
1088
1089 INIT_LIST_HEAD(&dsts);
1090
1091 if (seq->lower != seq->upper)
1092 return -ENOTSUPP;
1093
1094 domain = addr_domain(net, dest->scope);
1095 exclude = tipc_group_exclude(grp);
1096 if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain,
1097 &dsts, &dstcnt, exclude, true))
1098 return -EHOSTUNREACH;
1099
1100 if (dstcnt == 1) {
1101 tipc_dest_pop(&dsts, &dest->addr.id.node, &dest->addr.id.ref);
1102 return tipc_send_group_unicast(sock, m, dlen, timeout);
1103 }
1104
1105 tipc_dest_list_purge(&dsts);
1106 return tipc_send_group_bcast(sock, m, dlen, timeout);
1107}
1108
1109/**
797 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets 1110 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
798 * @arrvq: queue with arriving messages, to be cloned after destination lookup 1111 * @arrvq: queue with arriving messages, to be cloned after destination lookup
799 * @inputq: queue with cloned messages, delivered to socket after dest lookup 1112 * @inputq: queue with cloned messages, delivered to socket after dest lookup
@@ -803,13 +1116,15 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
803void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, 1116void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
804 struct sk_buff_head *inputq) 1117 struct sk_buff_head *inputq)
805{ 1118{
806 struct tipc_msg *msg;
807 struct list_head dports;
808 u32 portid;
809 u32 scope = TIPC_CLUSTER_SCOPE; 1119 u32 scope = TIPC_CLUSTER_SCOPE;
810 struct sk_buff_head tmpq; 1120 u32 self = tipc_own_addr(net);
811 uint hsz;
812 struct sk_buff *skb, *_skb; 1121 struct sk_buff *skb, *_skb;
1122 u32 lower = 0, upper = ~0;
1123 struct sk_buff_head tmpq;
1124 u32 portid, oport, onode;
1125 struct list_head dports;
1126 struct tipc_msg *msg;
1127 int user, mtyp, hsz;
813 1128
814 __skb_queue_head_init(&tmpq); 1129 __skb_queue_head_init(&tmpq);
815 INIT_LIST_HEAD(&dports); 1130 INIT_LIST_HEAD(&dports);
@@ -817,17 +1132,32 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
817 skb = tipc_skb_peek(arrvq, &inputq->lock); 1132 skb = tipc_skb_peek(arrvq, &inputq->lock);
818 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { 1133 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
819 msg = buf_msg(skb); 1134 msg = buf_msg(skb);
1135 user = msg_user(msg);
1136 mtyp = msg_type(msg);
1137 if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
1138 spin_lock_bh(&inputq->lock);
1139 if (skb_peek(arrvq) == skb) {
1140 __skb_dequeue(arrvq);
1141 __skb_queue_tail(inputq, skb);
1142 }
1143 refcount_dec(&skb->users);
1144 spin_unlock_bh(&inputq->lock);
1145 continue;
1146 }
820 hsz = skb_headroom(skb) + msg_hdr_sz(msg); 1147 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
821 1148 oport = msg_origport(msg);
822 if (in_own_node(net, msg_orignode(msg))) 1149 onode = msg_orignode(msg);
1150 if (onode == self)
823 scope = TIPC_NODE_SCOPE; 1151 scope = TIPC_NODE_SCOPE;
824 1152
825 /* Create destination port list and message clones: */ 1153 /* Create destination port list and message clones: */
826 tipc_nametbl_mc_translate(net, 1154 if (!msg_in_group(msg)) {
827 msg_nametype(msg), msg_namelower(msg), 1155 lower = msg_namelower(msg);
828 msg_nameupper(msg), scope, &dports); 1156 upper = msg_nameupper(msg);
829 portid = u32_pop(&dports); 1157 }
830 for (; portid; portid = u32_pop(&dports)) { 1158 tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper,
1159 scope, &dports);
1160 while (tipc_dest_pop(&dports, NULL, &portid)) {
831 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); 1161 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
832 if (_skb) { 1162 if (_skb) {
833 msg_set_destport(buf_msg(_skb), portid); 1163 msg_set_destport(buf_msg(_skb), portid);
@@ -850,16 +1180,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
850} 1180}
851 1181
852/** 1182/**
853 * tipc_sk_proto_rcv - receive a connection mng protocol message 1183 * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
854 * @tsk: receiving socket 1184 * @tsk: receiving socket
855 * @skb: pointer to message buffer. 1185 * @skb: pointer to message buffer.
856 */ 1186 */
857static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, 1187static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
858 struct sk_buff_head *xmitq) 1188 struct sk_buff_head *xmitq)
859{ 1189{
860 struct sock *sk = &tsk->sk;
861 u32 onode = tsk_own_node(tsk);
862 struct tipc_msg *hdr = buf_msg(skb); 1190 struct tipc_msg *hdr = buf_msg(skb);
1191 u32 onode = tsk_own_node(tsk);
1192 struct sock *sk = &tsk->sk;
863 int mtyp = msg_type(hdr); 1193 int mtyp = msg_type(hdr);
864 bool conn_cong; 1194 bool conn_cong;
865 1195
@@ -931,6 +1261,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
931 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 1261 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
932 struct list_head *clinks = &tsk->cong_links; 1262 struct list_head *clinks = &tsk->cong_links;
933 bool syn = !tipc_sk_type_connectionless(sk); 1263 bool syn = !tipc_sk_type_connectionless(sk);
1264 struct tipc_group *grp = tsk->group;
934 struct tipc_msg *hdr = &tsk->phdr; 1265 struct tipc_msg *hdr = &tsk->phdr;
935 struct tipc_name_seq *seq; 1266 struct tipc_name_seq *seq;
936 struct sk_buff_head pkts; 1267 struct sk_buff_head pkts;
@@ -941,18 +1272,31 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
941 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) 1272 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
942 return -EMSGSIZE; 1273 return -EMSGSIZE;
943 1274
1275 if (likely(dest)) {
1276 if (unlikely(m->msg_namelen < sizeof(*dest)))
1277 return -EINVAL;
1278 if (unlikely(dest->family != AF_TIPC))
1279 return -EINVAL;
1280 }
1281
1282 if (grp) {
1283 if (!dest)
1284 return tipc_send_group_bcast(sock, m, dlen, timeout);
1285 if (dest->addrtype == TIPC_ADDR_NAME)
1286 return tipc_send_group_anycast(sock, m, dlen, timeout);
1287 if (dest->addrtype == TIPC_ADDR_ID)
1288 return tipc_send_group_unicast(sock, m, dlen, timeout);
1289 if (dest->addrtype == TIPC_ADDR_MCAST)
1290 return tipc_send_group_mcast(sock, m, dlen, timeout);
1291 return -EINVAL;
1292 }
1293
944 if (unlikely(!dest)) { 1294 if (unlikely(!dest)) {
945 dest = &tsk->peer; 1295 dest = &tsk->peer;
946 if (!syn || dest->family != AF_TIPC) 1296 if (!syn || dest->family != AF_TIPC)
947 return -EDESTADDRREQ; 1297 return -EDESTADDRREQ;
948 } 1298 }
949 1299
950 if (unlikely(m->msg_namelen < sizeof(*dest)))
951 return -EINVAL;
952
953 if (unlikely(dest->family != AF_TIPC))
954 return -EINVAL;
955
956 if (unlikely(syn)) { 1300 if (unlikely(syn)) {
957 if (sk->sk_state == TIPC_LISTEN) 1301 if (sk->sk_state == TIPC_LISTEN)
958 return -EPIPE; 1302 return -EPIPE;
@@ -985,7 +1329,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
985 msg_set_destport(hdr, dport); 1329 msg_set_destport(hdr, dport);
986 if (unlikely(!dport && !dnode)) 1330 if (unlikely(!dport && !dnode))
987 return -EHOSTUNREACH; 1331 return -EHOSTUNREACH;
988
989 } else if (dest->addrtype == TIPC_ADDR_ID) { 1332 } else if (dest->addrtype == TIPC_ADDR_ID) {
990 dnode = dest->addr.id.node; 1333 dnode = dest->addr.id.node;
991 msg_set_type(hdr, TIPC_DIRECT_MSG); 1334 msg_set_type(hdr, TIPC_DIRECT_MSG);
@@ -996,7 +1339,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
996 } 1339 }
997 1340
998 /* Block or return if destination link is congested */ 1341 /* Block or return if destination link is congested */
999 rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode)); 1342 rc = tipc_wait_for_cond(sock, &timeout,
1343 !tipc_dest_find(clinks, dnode, 0));
1000 if (unlikely(rc)) 1344 if (unlikely(rc))
1001 return rc; 1345 return rc;
1002 1346
@@ -1008,7 +1352,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
1008 1352
1009 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); 1353 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1010 if (unlikely(rc == -ELINKCONG)) { 1354 if (unlikely(rc == -ELINKCONG)) {
1011 u32_push(clinks, dnode); 1355 tipc_dest_push(clinks, dnode, 0);
1012 tsk->cong_link_cnt++; 1356 tsk->cong_link_cnt++;
1013 rc = 0; 1357 rc = 0;
1014 } 1358 }
@@ -1128,7 +1472,7 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1128 msg_set_lookup_scope(msg, 0); 1472 msg_set_lookup_scope(msg, 0);
1129 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1473 msg_set_hdr_sz(msg, SHORT_H_SIZE);
1130 1474
1131 sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTERVAL); 1475 sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
1132 tipc_set_sk_state(sk, TIPC_ESTABLISHED); 1476 tipc_set_sk_state(sk, TIPC_ESTABLISHED);
1133 tipc_node_add_conn(net, peer_node, tsk->portid, peer_port); 1477 tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1134 tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid); 1478 tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
@@ -1142,26 +1486,38 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1142} 1486}
1143 1487
1144/** 1488/**
1145 * set_orig_addr - capture sender's address for received message 1489 * tipc_sk_set_orig_addr - capture sender's address for received message
1146 * @m: descriptor for message info 1490 * @m: descriptor for message info
1147 * @msg: received message header 1491 * @hdr: received message header
1148 * 1492 *
1149 * Note: Address is not captured if not requested by receiver. 1493 * Note: Address is not captured if not requested by receiver.
1150 */ 1494 */
1151static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg) 1495static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb)
1152{ 1496{
1153 DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name); 1497 DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name);
1498 struct tipc_msg *hdr = buf_msg(skb);
1154 1499
1155 if (addr) { 1500 if (!srcaddr)
1156 addr->family = AF_TIPC; 1501 return;
1157 addr->addrtype = TIPC_ADDR_ID; 1502
1158 memset(&addr->addr, 0, sizeof(addr->addr)); 1503 srcaddr->sock.family = AF_TIPC;
1159 addr->addr.id.ref = msg_origport(msg); 1504 srcaddr->sock.addrtype = TIPC_ADDR_ID;
1160 addr->addr.id.node = msg_orignode(msg); 1505 srcaddr->sock.addr.id.ref = msg_origport(hdr);
1161 addr->addr.name.domain = 0; /* could leave uninitialized */ 1506 srcaddr->sock.addr.id.node = msg_orignode(hdr);
1162 addr->scope = 0; /* could leave uninitialized */ 1507 srcaddr->sock.addr.name.domain = 0;
1163 m->msg_namelen = sizeof(struct sockaddr_tipc); 1508 srcaddr->sock.scope = 0;
1164 } 1509 m->msg_namelen = sizeof(struct sockaddr_tipc);
1510
1511 if (!msg_in_group(hdr))
1512 return;
1513
1514 /* Group message users may also want to know sending member's id */
1515 srcaddr->member.family = AF_TIPC;
1516 srcaddr->member.addrtype = TIPC_ADDR_NAME;
1517 srcaddr->member.addr.name.name.type = msg_nametype(hdr);
1518 srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member;
1519 srcaddr->member.addr.name.domain = 0;
1520 m->msg_namelen = sizeof(*srcaddr);
1165} 1521}
1166 1522
1167/** 1523/**
@@ -1318,11 +1674,13 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1318 size_t buflen, int flags) 1674 size_t buflen, int flags)
1319{ 1675{
1320 struct sock *sk = sock->sk; 1676 struct sock *sk = sock->sk;
1321 struct tipc_sock *tsk = tipc_sk(sk);
1322 struct sk_buff *skb;
1323 struct tipc_msg *hdr;
1324 bool connected = !tipc_sk_type_connectionless(sk); 1677 bool connected = !tipc_sk_type_connectionless(sk);
1678 struct tipc_sock *tsk = tipc_sk(sk);
1325 int rc, err, hlen, dlen, copy; 1679 int rc, err, hlen, dlen, copy;
1680 struct sk_buff_head xmitq;
1681 struct tipc_msg *hdr;
1682 struct sk_buff *skb;
1683 bool grp_evt;
1326 long timeout; 1684 long timeout;
1327 1685
1328 /* Catch invalid receive requests */ 1686 /* Catch invalid receive requests */
@@ -1336,8 +1694,8 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1336 } 1694 }
1337 timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); 1695 timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1338 1696
1697 /* Step rcv queue to first msg with data or error; wait if necessary */
1339 do { 1698 do {
1340 /* Look at first msg in receive queue; wait if necessary */
1341 rc = tipc_wait_for_rcvmsg(sock, &timeout); 1699 rc = tipc_wait_for_rcvmsg(sock, &timeout);
1342 if (unlikely(rc)) 1700 if (unlikely(rc))
1343 goto exit; 1701 goto exit;
@@ -1346,13 +1704,14 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1346 dlen = msg_data_sz(hdr); 1704 dlen = msg_data_sz(hdr);
1347 hlen = msg_hdr_sz(hdr); 1705 hlen = msg_hdr_sz(hdr);
1348 err = msg_errcode(hdr); 1706 err = msg_errcode(hdr);
1707 grp_evt = msg_is_grp_evt(hdr);
1349 if (likely(dlen || err)) 1708 if (likely(dlen || err))
1350 break; 1709 break;
1351 tsk_advance_rx_queue(sk); 1710 tsk_advance_rx_queue(sk);
1352 } while (1); 1711 } while (1);
1353 1712
1354 /* Collect msg meta data, including error code and rejected data */ 1713 /* Collect msg meta data, including error code and rejected data */
1355 set_orig_addr(m, hdr); 1714 tipc_sk_set_orig_addr(m, skb);
1356 rc = tipc_sk_anc_data_recv(m, hdr, tsk); 1715 rc = tipc_sk_anc_data_recv(m, hdr, tsk);
1357 if (unlikely(rc)) 1716 if (unlikely(rc))
1358 goto exit; 1717 goto exit;
@@ -1372,15 +1731,33 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1372 if (unlikely(rc)) 1731 if (unlikely(rc))
1373 goto exit; 1732 goto exit;
1374 1733
1734 /* Mark message as group event if applicable */
1735 if (unlikely(grp_evt)) {
1736 if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
1737 m->msg_flags |= MSG_EOR;
1738 m->msg_flags |= MSG_OOB;
1739 copy = 0;
1740 }
1741
1375 /* Caption of data or error code/rejected data was successful */ 1742 /* Caption of data or error code/rejected data was successful */
1376 if (unlikely(flags & MSG_PEEK)) 1743 if (unlikely(flags & MSG_PEEK))
1377 goto exit; 1744 goto exit;
1378 1745
1746 /* Send group flow control advertisement when applicable */
1747 if (tsk->group && msg_in_group(hdr) && !grp_evt) {
1748 skb_queue_head_init(&xmitq);
1749 tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
1750 msg_orignode(hdr), msg_origport(hdr),
1751 &xmitq);
1752 tipc_node_distr_xmit(sock_net(sk), &xmitq);
1753 }
1754
1379 tsk_advance_rx_queue(sk); 1755 tsk_advance_rx_queue(sk);
1756
1380 if (likely(!connected)) 1757 if (likely(!connected))
1381 goto exit; 1758 goto exit;
1382 1759
1383 /* Send connection flow control ack when applicable */ 1760 /* Send connection flow control advertisement when applicable */
1384 tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); 1761 tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
1385 if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) 1762 if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
1386 tipc_sk_send_ack(tsk); 1763 tipc_sk_send_ack(tsk);
@@ -1446,7 +1823,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
1446 1823
1447 /* Collect msg meta data, incl. error code and rejected data */ 1824 /* Collect msg meta data, incl. error code and rejected data */
1448 if (!copied) { 1825 if (!copied) {
1449 set_orig_addr(m, hdr); 1826 tipc_sk_set_orig_addr(m, skb);
1450 rc = tipc_sk_anc_data_recv(m, hdr, tsk); 1827 rc = tipc_sk_anc_data_recv(m, hdr, tsk);
1451 if (rc) 1828 if (rc)
1452 break; 1829 break;
@@ -1532,14 +1909,51 @@ static void tipc_sock_destruct(struct sock *sk)
1532 __skb_queue_purge(&sk->sk_receive_queue); 1909 __skb_queue_purge(&sk->sk_receive_queue);
1533} 1910}
1534 1911
1912static void tipc_sk_proto_rcv(struct sock *sk,
1913 struct sk_buff_head *inputq,
1914 struct sk_buff_head *xmitq)
1915{
1916 struct sk_buff *skb = __skb_dequeue(inputq);
1917 struct tipc_sock *tsk = tipc_sk(sk);
1918 struct tipc_msg *hdr = buf_msg(skb);
1919 struct tipc_group *grp = tsk->group;
1920 bool wakeup = false;
1921
1922 switch (msg_user(hdr)) {
1923 case CONN_MANAGER:
1924 tipc_sk_conn_proto_rcv(tsk, skb, xmitq);
1925 return;
1926 case SOCK_WAKEUP:
1927 tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
1928 tsk->cong_link_cnt--;
1929 wakeup = true;
1930 break;
1931 case GROUP_PROTOCOL:
1932 tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
1933 break;
1934 case TOP_SRV:
1935 tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
1936 skb, inputq, xmitq);
1937 skb = NULL;
1938 break;
1939 default:
1940 break;
1941 }
1942
1943 if (wakeup)
1944 sk->sk_write_space(sk);
1945
1946 kfree_skb(skb);
1947}
1948
1535/** 1949/**
1536 * filter_connect - Handle all incoming messages for a connection-based socket 1950 * tipc_filter_connect - Handle incoming message for a connection-based socket
1537 * @tsk: TIPC socket 1951 * @tsk: TIPC socket
1538 * @skb: pointer to message buffer. Set to NULL if buffer is consumed 1952 * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1539 * 1953 *
1540 * Returns true if everything ok, false otherwise 1954 * Returns true if everything ok, false otherwise
1541 */ 1955 */
1542static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) 1956static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
1543{ 1957{
1544 struct sock *sk = &tsk->sk; 1958 struct sock *sk = &tsk->sk;
1545 struct net *net = sock_net(sk); 1959 struct net *net = sock_net(sk);
@@ -1643,6 +2057,9 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1643 struct tipc_sock *tsk = tipc_sk(sk); 2057 struct tipc_sock *tsk = tipc_sk(sk);
1644 struct tipc_msg *hdr = buf_msg(skb); 2058 struct tipc_msg *hdr = buf_msg(skb);
1645 2059
2060 if (unlikely(msg_in_group(hdr)))
2061 return sk->sk_rcvbuf;
2062
1646 if (unlikely(!msg_connected(hdr))) 2063 if (unlikely(!msg_connected(hdr)))
1647 return sk->sk_rcvbuf << msg_importance(hdr); 2064 return sk->sk_rcvbuf << msg_importance(hdr);
1648 2065
@@ -1653,7 +2070,7 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1653} 2070}
1654 2071
1655/** 2072/**
1656 * filter_rcv - validate incoming message 2073 * tipc_sk_filter_rcv - validate incoming message
1657 * @sk: socket 2074 * @sk: socket
1658 * @skb: pointer to message. 2075 * @skb: pointer to message.
1659 * 2076 *
@@ -1662,99 +2079,71 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1662 * 2079 *
1663 * Called with socket lock already taken 2080 * Called with socket lock already taken
1664 * 2081 *
1665 * Returns true if message was added to socket receive queue, otherwise false
1666 */ 2082 */
1667static bool filter_rcv(struct sock *sk, struct sk_buff *skb, 2083static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
1668 struct sk_buff_head *xmitq) 2084 struct sk_buff_head *xmitq)
1669{ 2085{
2086 bool sk_conn = !tipc_sk_type_connectionless(sk);
1670 struct tipc_sock *tsk = tipc_sk(sk); 2087 struct tipc_sock *tsk = tipc_sk(sk);
2088 struct tipc_group *grp = tsk->group;
1671 struct tipc_msg *hdr = buf_msg(skb); 2089 struct tipc_msg *hdr = buf_msg(skb);
1672 unsigned int limit = rcvbuf_limit(sk, skb); 2090 struct net *net = sock_net(sk);
1673 int err = TIPC_OK; 2091 struct sk_buff_head inputq;
1674 int usr = msg_user(hdr); 2092 int limit, err = TIPC_OK;
1675 u32 onode;
1676 2093
1677 if (unlikely(msg_user(hdr) == CONN_MANAGER)) { 2094 TIPC_SKB_CB(skb)->bytes_read = 0;
1678 tipc_sk_proto_rcv(tsk, skb, xmitq); 2095 __skb_queue_head_init(&inputq);
1679 return false; 2096 __skb_queue_tail(&inputq, skb);
1680 }
1681 2097
1682 if (unlikely(usr == SOCK_WAKEUP)) { 2098 if (unlikely(!msg_isdata(hdr)))
1683 onode = msg_orignode(hdr); 2099 tipc_sk_proto_rcv(sk, &inputq, xmitq);
1684 kfree_skb(skb);
1685 u32_del(&tsk->cong_links, onode);
1686 tsk->cong_link_cnt--;
1687 sk->sk_write_space(sk);
1688 return false;
1689 }
1690 2100
1691 /* Drop if illegal message type */ 2101 if (unlikely(grp))
1692 if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) { 2102 tipc_group_filter_msg(grp, &inputq, xmitq);
1693 kfree_skb(skb);
1694 return false;
1695 }
1696 2103
1697 /* Reject if wrong message type for current socket state */ 2104 /* Validate and add to receive buffer if there is space */
1698 if (tipc_sk_type_connectionless(sk)) { 2105 while ((skb = __skb_dequeue(&inputq))) {
1699 if (msg_connected(hdr)) { 2106 hdr = buf_msg(skb);
2107 limit = rcvbuf_limit(sk, skb);
2108 if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
2109 (!sk_conn && msg_connected(hdr)) ||
2110 (!grp && msg_in_group(hdr)))
1700 err = TIPC_ERR_NO_PORT; 2111 err = TIPC_ERR_NO_PORT;
1701 goto reject; 2112 else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit)
1702 } 2113 err = TIPC_ERR_OVERLOAD;
1703 } else if (unlikely(!filter_connect(tsk, skb))) {
1704 err = TIPC_ERR_NO_PORT;
1705 goto reject;
1706 }
1707 2114
1708 /* Reject message if there isn't room to queue it */ 2115 if (unlikely(err)) {
1709 if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) { 2116 tipc_skb_reject(net, err, skb, xmitq);
1710 err = TIPC_ERR_OVERLOAD; 2117 err = TIPC_OK;
1711 goto reject; 2118 continue;
2119 }
2120 __skb_queue_tail(&sk->sk_receive_queue, skb);
2121 skb_set_owner_r(skb, sk);
2122 sk->sk_data_ready(sk);
1712 } 2123 }
1713
1714 /* Enqueue message */
1715 TIPC_SKB_CB(skb)->bytes_read = 0;
1716 __skb_queue_tail(&sk->sk_receive_queue, skb);
1717 skb_set_owner_r(skb, sk);
1718
1719 sk->sk_data_ready(sk);
1720 return true;
1721
1722reject:
1723 if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
1724 __skb_queue_tail(xmitq, skb);
1725 return false;
1726} 2124}
1727 2125
1728/** 2126/**
1729 * tipc_backlog_rcv - handle incoming message from backlog queue 2127 * tipc_sk_backlog_rcv - handle incoming message from backlog queue
1730 * @sk: socket 2128 * @sk: socket
1731 * @skb: message 2129 * @skb: message
1732 * 2130 *
1733 * Caller must hold socket lock 2131 * Caller must hold socket lock
1734 *
1735 * Returns 0
1736 */ 2132 */
1737static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) 2133static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1738{ 2134{
1739 unsigned int truesize = skb->truesize; 2135 unsigned int before = sk_rmem_alloc_get(sk);
1740 struct sk_buff_head xmitq; 2136 struct sk_buff_head xmitq;
1741 u32 dnode, selector; 2137 unsigned int added;
1742 2138
1743 __skb_queue_head_init(&xmitq); 2139 __skb_queue_head_init(&xmitq);
1744 2140
1745 if (likely(filter_rcv(sk, skb, &xmitq))) { 2141 tipc_sk_filter_rcv(sk, skb, &xmitq);
1746 atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt); 2142 added = sk_rmem_alloc_get(sk) - before;
1747 return 0; 2143 atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt);
1748 }
1749 2144
1750 if (skb_queue_empty(&xmitq)) 2145 /* Send pending response/rejected messages, if any */
1751 return 0; 2146 tipc_node_distr_xmit(sock_net(sk), &xmitq);
1752
1753 /* Send response/rejected message */
1754 skb = __skb_dequeue(&xmitq);
1755 dnode = msg_destnode(buf_msg(skb));
1756 selector = msg_origport(buf_msg(skb));
1757 tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
1758 return 0; 2147 return 0;
1759} 2148}
1760 2149
@@ -1786,7 +2175,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1786 2175
1787 /* Add message directly to receive queue if possible */ 2176 /* Add message directly to receive queue if possible */
1788 if (!sock_owned_by_user(sk)) { 2177 if (!sock_owned_by_user(sk)) {
1789 filter_rcv(sk, skb, xmitq); 2178 tipc_sk_filter_rcv(sk, skb, xmitq);
1790 continue; 2179 continue;
1791 } 2180 }
1792 2181
@@ -1833,14 +2222,10 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1833 spin_unlock_bh(&sk->sk_lock.slock); 2222 spin_unlock_bh(&sk->sk_lock.slock);
1834 } 2223 }
1835 /* Send pending response/rejected messages, if any */ 2224 /* Send pending response/rejected messages, if any */
1836 while ((skb = __skb_dequeue(&xmitq))) { 2225 tipc_node_distr_xmit(sock_net(sk), &xmitq);
1837 dnode = msg_destnode(buf_msg(skb));
1838 tipc_node_xmit_skb(net, skb, dnode, dport);
1839 }
1840 sock_put(sk); 2226 sock_put(sk);
1841 continue; 2227 continue;
1842 } 2228 }
1843
1844 /* No destination socket => dequeue skb if still there */ 2229 /* No destination socket => dequeue skb if still there */
1845 skb = tipc_skb_dequeue(inputq, dport); 2230 skb = tipc_skb_dequeue(inputq, dport);
1846 if (!skb) 2231 if (!skb)
@@ -1903,28 +2288,32 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1903 int previous; 2288 int previous;
1904 int res = 0; 2289 int res = 0;
1905 2290
2291 if (destlen != sizeof(struct sockaddr_tipc))
2292 return -EINVAL;
2293
1906 lock_sock(sk); 2294 lock_sock(sk);
1907 2295
1908 /* DGRAM/RDM connect(), just save the destaddr */ 2296 if (tsk->group) {
1909 if (tipc_sk_type_connectionless(sk)) { 2297 res = -EINVAL;
1910 if (dst->family == AF_UNSPEC) {
1911 memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
1912 } else if (destlen != sizeof(struct sockaddr_tipc)) {
1913 res = -EINVAL;
1914 } else {
1915 memcpy(&tsk->peer, dest, destlen);
1916 }
1917 goto exit; 2298 goto exit;
1918 } 2299 }
1919 2300
1920 /* 2301 if (dst->family == AF_UNSPEC) {
1921 * Reject connection attempt using multicast address 2302 memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
1922 * 2303 if (!tipc_sk_type_connectionless(sk))
1923 * Note: send_msg() validates the rest of the address fields, 2304 res = -EINVAL;
1924 * so there's no need to do it here 2305 goto exit;
1925 */ 2306 } else if (dst->family != AF_TIPC) {
1926 if (dst->addrtype == TIPC_ADDR_MCAST) {
1927 res = -EINVAL; 2307 res = -EINVAL;
2308 }
2309 if (dst->addrtype != TIPC_ADDR_ID && dst->addrtype != TIPC_ADDR_NAME)
2310 res = -EINVAL;
2311 if (res)
2312 goto exit;
2313
2314 /* DGRAM/RDM connect(), just save the destaddr */
2315 if (tipc_sk_type_connectionless(sk)) {
2316 memcpy(&tsk->peer, dest, destlen);
1928 goto exit; 2317 goto exit;
1929 } 2318 }
1930 2319
@@ -2141,46 +2530,43 @@ static int tipc_shutdown(struct socket *sock, int how)
2141 return res; 2530 return res;
2142} 2531}
2143 2532
2144static void tipc_sk_timeout(unsigned long data) 2533static void tipc_sk_timeout(struct timer_list *t)
2145{ 2534{
2146 struct tipc_sock *tsk = (struct tipc_sock *)data; 2535 struct sock *sk = from_timer(sk, t, sk_timer);
2147 struct sock *sk = &tsk->sk; 2536 struct tipc_sock *tsk = tipc_sk(sk);
2148 struct sk_buff *skb = NULL; 2537 u32 peer_port = tsk_peer_port(tsk);
2149 u32 peer_port, peer_node; 2538 u32 peer_node = tsk_peer_node(tsk);
2150 u32 own_node = tsk_own_node(tsk); 2539 u32 own_node = tsk_own_node(tsk);
2540 u32 own_port = tsk->portid;
2541 struct net *net = sock_net(sk);
2542 struct sk_buff *skb = NULL;
2151 2543
2152 bh_lock_sock(sk); 2544 bh_lock_sock(sk);
2153 if (!tipc_sk_connected(sk)) { 2545 if (!tipc_sk_connected(sk))
2154 bh_unlock_sock(sk); 2546 goto exit;
2547
2548 /* Try again later if socket is busy */
2549 if (sock_owned_by_user(sk)) {
2550 sk_reset_timer(sk, &sk->sk_timer, jiffies + HZ / 20);
2155 goto exit; 2551 goto exit;
2156 } 2552 }
2157 peer_port = tsk_peer_port(tsk);
2158 peer_node = tsk_peer_node(tsk);
2159 2553
2160 if (tsk->probe_unacked) { 2554 if (tsk->probe_unacked) {
2161 if (!sock_owned_by_user(sk)) { 2555 tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2162 tipc_set_sk_state(sk, TIPC_DISCONNECTING); 2556 tipc_node_remove_conn(net, peer_node, peer_port);
2163 tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk), 2557 sk->sk_state_change(sk);
2164 tsk_peer_port(tsk));
2165 sk->sk_state_change(sk);
2166 } else {
2167 /* Try again later */
2168 sk_reset_timer(sk, &sk->sk_timer, (HZ / 20));
2169 }
2170
2171 bh_unlock_sock(sk);
2172 goto exit; 2558 goto exit;
2173 } 2559 }
2174 2560 /* Send new probe */
2175 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, 2561 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 0,
2176 INT_H_SIZE, 0, peer_node, own_node, 2562 peer_node, own_node, peer_port, own_port,
2177 peer_port, tsk->portid, TIPC_OK); 2563 TIPC_OK);
2178 tsk->probe_unacked = true; 2564 tsk->probe_unacked = true;
2179 sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTERVAL); 2565 sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
2566exit:
2180 bh_unlock_sock(sk); 2567 bh_unlock_sock(sk);
2181 if (skb) 2568 if (skb)
2182 tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid); 2569 tipc_node_xmit_skb(net, skb, peer_node, own_port);
2183exit:
2184 sock_put(sk); 2570 sock_put(sk);
2185} 2571}
2186 2572
@@ -2345,6 +2731,58 @@ void tipc_sk_rht_destroy(struct net *net)
2345 rhashtable_destroy(&tn->sk_rht); 2731 rhashtable_destroy(&tn->sk_rht);
2346} 2732}
2347 2733
2734static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
2735{
2736 struct net *net = sock_net(&tsk->sk);
2737 u32 domain = addr_domain(net, mreq->scope);
2738 struct tipc_group *grp = tsk->group;
2739 struct tipc_msg *hdr = &tsk->phdr;
2740 struct tipc_name_seq seq;
2741 int rc;
2742
2743 if (mreq->type < TIPC_RESERVED_TYPES)
2744 return -EACCES;
2745 if (grp)
2746 return -EACCES;
2747 grp = tipc_group_create(net, tsk->portid, mreq);
2748 if (!grp)
2749 return -ENOMEM;
2750 tsk->group = grp;
2751 msg_set_lookup_scope(hdr, mreq->scope);
2752 msg_set_nametype(hdr, mreq->type);
2753 msg_set_dest_droppable(hdr, true);
2754 seq.type = mreq->type;
2755 seq.lower = mreq->instance;
2756 seq.upper = seq.lower;
2757 tipc_nametbl_build_group(net, grp, mreq->type, domain);
2758 rc = tipc_sk_publish(tsk, mreq->scope, &seq);
2759 if (rc) {
2760 tipc_group_delete(net, grp);
2761 tsk->group = NULL;
2762 }
2763
2764 /* Eliminate any risk that a broadcast overtakes the sent JOIN */
2765 tsk->mc_method.rcast = true;
2766 tsk->mc_method.mandatory = true;
2767 return rc;
2768}
2769
2770static int tipc_sk_leave(struct tipc_sock *tsk)
2771{
2772 struct net *net = sock_net(&tsk->sk);
2773 struct tipc_group *grp = tsk->group;
2774 struct tipc_name_seq seq;
2775 int scope;
2776
2777 if (!grp)
2778 return -EINVAL;
2779 tipc_group_self(grp, &seq, &scope);
2780 tipc_group_delete(net, grp);
2781 tsk->group = NULL;
2782 tipc_sk_withdraw(tsk, scope, &seq);
2783 return 0;
2784}
2785
2348/** 2786/**
2349 * tipc_setsockopt - set socket option 2787 * tipc_setsockopt - set socket option
2350 * @sock: socket structure 2788 * @sock: socket structure
@@ -2363,6 +2801,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2363{ 2801{
2364 struct sock *sk = sock->sk; 2802 struct sock *sk = sock->sk;
2365 struct tipc_sock *tsk = tipc_sk(sk); 2803 struct tipc_sock *tsk = tipc_sk(sk);
2804 struct tipc_group_req mreq;
2366 u32 value = 0; 2805 u32 value = 0;
2367 int res = 0; 2806 int res = 0;
2368 2807
@@ -2378,9 +2817,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2378 case TIPC_CONN_TIMEOUT: 2817 case TIPC_CONN_TIMEOUT:
2379 if (ol < sizeof(value)) 2818 if (ol < sizeof(value))
2380 return -EINVAL; 2819 return -EINVAL;
2381 res = get_user(value, (u32 __user *)ov); 2820 if (get_user(value, (u32 __user *)ov))
2382 if (res) 2821 return -EFAULT;
2383 return res; 2822 break;
2823 case TIPC_GROUP_JOIN:
2824 if (ol < sizeof(mreq))
2825 return -EINVAL;
2826 if (copy_from_user(&mreq, ov, sizeof(mreq)))
2827 return -EFAULT;
2384 break; 2828 break;
2385 default: 2829 default:
2386 if (ov || ol) 2830 if (ov || ol)
@@ -2413,6 +2857,12 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2413 tsk->mc_method.rcast = true; 2857 tsk->mc_method.rcast = true;
2414 tsk->mc_method.mandatory = true; 2858 tsk->mc_method.mandatory = true;
2415 break; 2859 break;
2860 case TIPC_GROUP_JOIN:
2861 res = tipc_sk_join(tsk, &mreq);
2862 break;
2863 case TIPC_GROUP_LEAVE:
2864 res = tipc_sk_leave(tsk);
2865 break;
2416 default: 2866 default:
2417 res = -EINVAL; 2867 res = -EINVAL;
2418 } 2868 }
@@ -2440,7 +2890,8 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2440{ 2890{
2441 struct sock *sk = sock->sk; 2891 struct sock *sk = sock->sk;
2442 struct tipc_sock *tsk = tipc_sk(sk); 2892 struct tipc_sock *tsk = tipc_sk(sk);
2443 int len; 2893 struct tipc_name_seq seq;
2894 int len, scope;
2444 u32 value; 2895 u32 value;
2445 int res; 2896 int res;
2446 2897
@@ -2474,6 +2925,12 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2474 case TIPC_SOCK_RECVQ_DEPTH: 2925 case TIPC_SOCK_RECVQ_DEPTH:
2475 value = skb_queue_len(&sk->sk_receive_queue); 2926 value = skb_queue_len(&sk->sk_receive_queue);
2476 break; 2927 break;
2928 case TIPC_GROUP_JOIN:
2929 seq.type = 0;
2930 if (tsk->group)
2931 tipc_group_self(tsk->group, &seq, &scope);
2932 value = seq.type;
2933 break;
2477 default: 2934 default:
2478 res = -EINVAL; 2935 res = -EINVAL;
2479 } 2936 }
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index be3d9e3183dc..251065dfd8df 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -133,9 +133,9 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
133 node); 133 node);
134} 134}
135 135
136static void tipc_subscrp_timeout(unsigned long data) 136static void tipc_subscrp_timeout(struct timer_list *t)
137{ 137{
138 struct tipc_subscription *sub = (struct tipc_subscription *)data; 138 struct tipc_subscription *sub = from_timer(sub, t, timer);
139 struct tipc_subscriber *subscriber = sub->subscriber; 139 struct tipc_subscriber *subscriber = sub->subscriber;
140 140
141 spin_lock_bh(&subscriber->lock); 141 spin_lock_bh(&subscriber->lock);
@@ -303,7 +303,7 @@ static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
303 tipc_subscrb_get(subscriber); 303 tipc_subscrb_get(subscriber);
304 spin_unlock_bh(&subscriber->lock); 304 spin_unlock_bh(&subscriber->lock);
305 305
306 setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub); 306 timer_setup(&sub->timer, tipc_subscrp_timeout, 0);
307 timeout = htohl(sub->evt.s.timeout, swap); 307 timeout = htohl(sub->evt.s.timeout, swap);
308 308
309 if (timeout != TIPC_WAIT_FOREVER) 309 if (timeout != TIPC_WAIT_FOREVER)
diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c
index ecca64fc6a6f..3deabcab4882 100644
--- a/net/tipc/udp_media.c
+++ b/net/tipc/udp_media.c
@@ -371,10 +371,6 @@ static int tipc_udp_recv(struct sock *sk, struct sk_buff *skb)
371 goto rcu_out; 371 goto rcu_out;
372 } 372 }
373 373
374 tipc_rcv(sock_net(sk), skb, b);
375 rcu_read_unlock();
376 return 0;
377
378rcu_out: 374rcu_out:
379 rcu_read_unlock(); 375 rcu_read_unlock();
380out: 376out: