aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/Makefile2
-rw-r--r--net/tipc/group.c404
-rw-r--r--net/tipc/group.h64
-rw-r--r--net/tipc/link.c3
-rw-r--r--net/tipc/msg.h50
-rw-r--r--net/tipc/name_table.c44
-rw-r--r--net/tipc/name_table.h3
-rw-r--r--net/tipc/node.h3
-rw-r--r--net/tipc/socket.c209
9 files changed, 734 insertions, 48 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index 31b9f9c52974..a3af73ec0b78 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -8,7 +8,7 @@ tipc-y += addr.o bcast.o bearer.o \
8 core.o link.o discover.o msg.o \ 8 core.o link.o discover.o msg.o \
9 name_distr.o subscr.o monitor.o name_table.o net.o \ 9 name_distr.o subscr.o monitor.o name_table.o net.o \
10 netlink.o netlink_compat.o node.o socket.o eth_media.o \ 10 netlink.o netlink_compat.o node.o socket.o eth_media.o \
11 server.o socket.o 11 server.o socket.o group.o
12 12
13tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o 13tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o
14tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o 14tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o
diff --git a/net/tipc/group.c b/net/tipc/group.c
new file mode 100644
index 000000000000..3f0e1ce1e3b9
--- /dev/null
+++ b/net/tipc/group.c
@@ -0,0 +1,404 @@
1/*
2 * net/tipc/group.c: TIPC group messaging code
3 *
4 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include "core.h"
37#include "addr.h"
38#include "group.h"
39#include "bcast.h"
40#include "server.h"
41#include "msg.h"
42#include "socket.h"
43#include "node.h"
44#include "name_table.h"
45#include "subscr.h"
46
47#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
48#define ADV_IDLE ADV_UNIT
49
50enum mbr_state {
51 MBR_QUARANTINED,
52 MBR_DISCOVERED,
53 MBR_JOINING,
54 MBR_PUBLISHED,
55 MBR_JOINED,
56 MBR_LEAVING
57};
58
59struct tipc_member {
60 struct rb_node tree_node;
61 struct list_head list;
62 u32 node;
63 u32 port;
64 enum mbr_state state;
65 u16 bc_rcv_nxt;
66};
67
68struct tipc_group {
69 struct rb_root members;
70 struct tipc_nlist dests;
71 struct net *net;
72 int subid;
73 u32 type;
74 u32 instance;
75 u32 domain;
76 u32 scope;
77 u32 portid;
78 u16 member_cnt;
79 u16 bc_snd_nxt;
80 bool loopback;
81};
82
83static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
84 int mtyp, struct sk_buff_head *xmitq);
85
86u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
87{
88 return grp->bc_snd_nxt;
89}
90
91static bool tipc_group_is_receiver(struct tipc_member *m)
92{
93 return m && m->state >= MBR_JOINED;
94}
95
96int tipc_group_size(struct tipc_group *grp)
97{
98 return grp->member_cnt;
99}
100
101struct tipc_group *tipc_group_create(struct net *net, u32 portid,
102 struct tipc_group_req *mreq)
103{
104 struct tipc_group *grp;
105 u32 type = mreq->type;
106
107 grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
108 if (!grp)
109 return NULL;
110 tipc_nlist_init(&grp->dests, tipc_own_addr(net));
111 grp->members = RB_ROOT;
112 grp->net = net;
113 grp->portid = portid;
114 grp->domain = addr_domain(net, mreq->scope);
115 grp->type = type;
116 grp->instance = mreq->instance;
117 grp->scope = mreq->scope;
118 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
119 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
120 return grp;
121 kfree(grp);
122 return NULL;
123}
124
125void tipc_group_delete(struct net *net, struct tipc_group *grp)
126{
127 struct rb_root *tree = &grp->members;
128 struct tipc_member *m, *tmp;
129 struct sk_buff_head xmitq;
130
131 __skb_queue_head_init(&xmitq);
132
133 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
134 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
135 list_del(&m->list);
136 kfree(m);
137 }
138 tipc_node_distr_xmit(net, &xmitq);
139 tipc_nlist_purge(&grp->dests);
140 tipc_topsrv_kern_unsubscr(net, grp->subid);
141 kfree(grp);
142}
143
144struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
145 u32 node, u32 port)
146{
147 struct rb_node *n = grp->members.rb_node;
148 u64 nkey, key = (u64)node << 32 | port;
149 struct tipc_member *m;
150
151 while (n) {
152 m = container_of(n, struct tipc_member, tree_node);
153 nkey = (u64)m->node << 32 | m->port;
154 if (key < nkey)
155 n = n->rb_left;
156 else if (key > nkey)
157 n = n->rb_right;
158 else
159 return m;
160 }
161 return NULL;
162}
163
164static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
165 u32 node)
166{
167 struct tipc_member *m;
168 struct rb_node *n;
169
170 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
171 m = container_of(n, struct tipc_member, tree_node);
172 if (m->node == node)
173 return m;
174 }
175 return NULL;
176}
177
178static void tipc_group_add_to_tree(struct tipc_group *grp,
179 struct tipc_member *m)
180{
181 u64 nkey, key = (u64)m->node << 32 | m->port;
182 struct rb_node **n, *parent = NULL;
183 struct tipc_member *tmp;
184
185 n = &grp->members.rb_node;
186 while (*n) {
187 tmp = container_of(*n, struct tipc_member, tree_node);
188 parent = *n;
189 tmp = container_of(parent, struct tipc_member, tree_node);
190 nkey = (u64)tmp->node << 32 | tmp->port;
191 if (key < nkey)
192 n = &(*n)->rb_left;
193 else if (key > nkey)
194 n = &(*n)->rb_right;
195 else
196 return;
197 }
198 rb_link_node(&m->tree_node, parent, n);
199 rb_insert_color(&m->tree_node, &grp->members);
200}
201
202static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
203 u32 node, u32 port,
204 int state)
205{
206 struct tipc_member *m;
207
208 m = kzalloc(sizeof(*m), GFP_ATOMIC);
209 if (!m)
210 return NULL;
211 INIT_LIST_HEAD(&m->list);
212 m->node = node;
213 m->port = port;
214 grp->member_cnt++;
215 tipc_group_add_to_tree(grp, m);
216 tipc_nlist_add(&grp->dests, m->node);
217 m->state = state;
218 return m;
219}
220
221void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
222{
223 tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
224}
225
226static void tipc_group_delete_member(struct tipc_group *grp,
227 struct tipc_member *m)
228{
229 rb_erase(&m->tree_node, &grp->members);
230 grp->member_cnt--;
231 list_del_init(&m->list);
232
233 /* If last member on a node, remove node from dest list */
234 if (!tipc_group_find_node(grp, m->node))
235 tipc_nlist_del(&grp->dests, m->node);
236
237 kfree(m);
238}
239
240struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
241{
242 return &grp->dests;
243}
244
245void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
246 int *scope)
247{
248 seq->type = grp->type;
249 seq->lower = grp->instance;
250 seq->upper = grp->instance;
251 *scope = grp->scope;
252}
253
254void tipc_group_update_bc_members(struct tipc_group *grp)
255{
256 grp->bc_snd_nxt++;
257}
258
259/* tipc_group_filter_msg() - determine if we should accept arriving message
260 */
261void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
262 struct sk_buff_head *xmitq)
263{
264 struct sk_buff *skb = __skb_dequeue(inputq);
265 struct tipc_member *m;
266 struct tipc_msg *hdr;
267 u32 node, port;
268 int mtyp;
269
270 if (!skb)
271 return;
272
273 hdr = buf_msg(skb);
274 mtyp = msg_type(hdr);
275 node = msg_orignode(hdr);
276 port = msg_origport(hdr);
277
278 if (!msg_in_group(hdr))
279 goto drop;
280
281 m = tipc_group_find_member(grp, node, port);
282 if (!tipc_group_is_receiver(m))
283 goto drop;
284
285 __skb_queue_tail(inputq, skb);
286
287 m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
288 return;
289drop:
290 kfree_skb(skb);
291}
292
293static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
294 int mtyp, struct sk_buff_head *xmitq)
295{
296 struct tipc_msg *hdr;
297 struct sk_buff *skb;
298
299 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
300 m->node, tipc_own_addr(grp->net),
301 m->port, grp->portid, 0);
302 if (!skb)
303 return;
304
305 hdr = buf_msg(skb);
306 if (mtyp == GRP_JOIN_MSG)
307 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
308 __skb_queue_tail(xmitq, skb);
309}
310
311void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
312 struct sk_buff_head *xmitq)
313{
314 u32 node = msg_orignode(hdr);
315 u32 port = msg_origport(hdr);
316 struct tipc_member *m;
317
318 if (!grp)
319 return;
320
321 m = tipc_group_find_member(grp, node, port);
322
323 switch (msg_type(hdr)) {
324 case GRP_JOIN_MSG:
325 if (!m)
326 m = tipc_group_create_member(grp, node, port,
327 MBR_QUARANTINED);
328 if (!m)
329 return;
330 m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
331
332 /* Wait until PUBLISH event is received */
333 if (m->state == MBR_DISCOVERED)
334 m->state = MBR_JOINING;
335 else if (m->state == MBR_PUBLISHED)
336 m->state = MBR_JOINED;
337 return;
338 case GRP_LEAVE_MSG:
339 if (!m)
340 return;
341
342 /* Wait until WITHDRAW event is received */
343 if (m->state != MBR_LEAVING) {
344 m->state = MBR_LEAVING;
345 return;
346 }
347 /* Otherwise deliver already received WITHDRAW event */
348 tipc_group_delete_member(grp, m);
349 return;
350 default:
351 pr_warn("Received unknown GROUP_PROTO message\n");
352 }
353}
354
355/* tipc_group_member_evt() - receive and handle a member up/down event
356 */
357void tipc_group_member_evt(struct tipc_group *grp,
358 struct sk_buff *skb,
359 struct sk_buff_head *xmitq)
360{
361 struct tipc_msg *hdr = buf_msg(skb);
362 struct tipc_event *evt = (void *)msg_data(hdr);
363 u32 node = evt->port.node;
364 u32 port = evt->port.ref;
365 struct tipc_member *m;
366 struct net *net;
367 u32 self;
368
369 if (!grp)
370 goto drop;
371
372 net = grp->net;
373 self = tipc_own_addr(net);
374 if (!grp->loopback && node == self && port == grp->portid)
375 goto drop;
376
377 m = tipc_group_find_member(grp, node, port);
378
379 if (evt->event == TIPC_PUBLISHED) {
380 if (!m)
381 m = tipc_group_create_member(grp, node, port,
382 MBR_DISCOVERED);
383 if (!m)
384 goto drop;
385
386 /* Wait if JOIN message not yet received */
387 if (m->state == MBR_DISCOVERED)
388 m->state = MBR_PUBLISHED;
389 else
390 m->state = MBR_JOINED;
391 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
392 } else if (evt->event == TIPC_WITHDRAWN) {
393 if (!m)
394 goto drop;
395
396 /* Keep back event if more messages might be expected */
397 if (m->state != MBR_LEAVING && tipc_node_is_up(net, node))
398 m->state = MBR_LEAVING;
399 else
400 tipc_group_delete_member(grp, m);
401 }
402drop:
403 kfree_skb(skb);
404}
diff --git a/net/tipc/group.h b/net/tipc/group.h
new file mode 100644
index 000000000000..9bdf4479fc03
--- /dev/null
+++ b/net/tipc/group.h
@@ -0,0 +1,64 @@
1/*
2 * net/tipc/group.h: Include file for TIPC group unicast/multicast functions
3 *
4 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#ifndef _TIPC_GROUP_H
37#define _TIPC_GROUP_H
38
39#include "core.h"
40
41struct tipc_group;
42struct tipc_member;
43struct tipc_msg;
44
45struct tipc_group *tipc_group_create(struct net *net, u32 portid,
46 struct tipc_group_req *mreq);
47void tipc_group_delete(struct net *net, struct tipc_group *grp);
48void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port);
49struct tipc_nlist *tipc_group_dests(struct tipc_group *grp);
50void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
51 int *scope);
52void tipc_group_filter_msg(struct tipc_group *grp,
53 struct sk_buff_head *inputq,
54 struct sk_buff_head *xmitq);
55void tipc_group_member_evt(struct tipc_group *grp,
56 struct sk_buff *skb,
57 struct sk_buff_head *xmitq);
58void tipc_group_proto_rcv(struct tipc_group *grp,
59 struct tipc_msg *hdr,
60 struct sk_buff_head *xmitq);
61void tipc_group_update_bc_members(struct tipc_group *grp);
62u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
63int tipc_group_size(struct tipc_group *grp);
64#endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ac0144f532aa..bd25bff63925 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1046,11 +1046,12 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1046 case TIPC_MEDIUM_IMPORTANCE: 1046 case TIPC_MEDIUM_IMPORTANCE:
1047 case TIPC_HIGH_IMPORTANCE: 1047 case TIPC_HIGH_IMPORTANCE:
1048 case TIPC_CRITICAL_IMPORTANCE: 1048 case TIPC_CRITICAL_IMPORTANCE:
1049 if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) { 1049 if (unlikely(msg_mcast(hdr))) {
1050 skb_queue_tail(l->bc_rcvlink->inputq, skb); 1050 skb_queue_tail(l->bc_rcvlink->inputq, skb);
1051 return true; 1051 return true;
1052 } 1052 }
1053 case CONN_MANAGER: 1053 case CONN_MANAGER:
1054 case GROUP_PROTOCOL:
1054 skb_queue_tail(inputq, skb); 1055 skb_queue_tail(inputq, skb);
1055 return true; 1056 return true;
1056 case NAME_DISTRIBUTOR: 1057 case NAME_DISTRIBUTOR:
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index be3e38aa9dd2..dad400935405 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/msg.h: Include file for TIPC message header routines 2 * net/tipc/msg.h: Include file for TIPC message header routines
3 * 3 *
4 * Copyright (c) 2000-2007, 2014-2015 Ericsson AB 4 * Copyright (c) 2000-2007, 2014-2017 Ericsson AB
5 * Copyright (c) 2005-2008, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2008, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -61,10 +61,11 @@ struct plist;
61/* 61/*
62 * Payload message types 62 * Payload message types
63 */ 63 */
64#define TIPC_CONN_MSG 0 64#define TIPC_CONN_MSG 0
65#define TIPC_MCAST_MSG 1 65#define TIPC_MCAST_MSG 1
66#define TIPC_NAMED_MSG 2 66#define TIPC_NAMED_MSG 2
67#define TIPC_DIRECT_MSG 3 67#define TIPC_DIRECT_MSG 3
68#define TIPC_GRP_BCAST_MSG 4
68 69
69/* 70/*
70 * Internal message users 71 * Internal message users
@@ -73,6 +74,7 @@ struct plist;
73#define MSG_BUNDLER 6 74#define MSG_BUNDLER 6
74#define LINK_PROTOCOL 7 75#define LINK_PROTOCOL 7
75#define CONN_MANAGER 8 76#define CONN_MANAGER 8
77#define GROUP_PROTOCOL 9
76#define TUNNEL_PROTOCOL 10 78#define TUNNEL_PROTOCOL 10
77#define NAME_DISTRIBUTOR 11 79#define NAME_DISTRIBUTOR 11
78#define MSG_FRAGMENTER 12 80#define MSG_FRAGMENTER 12
@@ -87,6 +89,7 @@ struct plist;
87#define BASIC_H_SIZE 32 /* Basic payload message */ 89#define BASIC_H_SIZE 32 /* Basic payload message */
88#define NAMED_H_SIZE 40 /* Named payload message */ 90#define NAMED_H_SIZE 40 /* Named payload message */
89#define MCAST_H_SIZE 44 /* Multicast payload message */ 91#define MCAST_H_SIZE 44 /* Multicast payload message */
92#define GROUP_H_SIZE 44 /* Group payload message */
90#define INT_H_SIZE 40 /* Internal messages */ 93#define INT_H_SIZE 40 /* Internal messages */
91#define MIN_H_SIZE 24 /* Smallest legal TIPC header size */ 94#define MIN_H_SIZE 24 /* Smallest legal TIPC header size */
92#define MAX_H_SIZE 60 /* Largest possible TIPC header size */ 95#define MAX_H_SIZE 60 /* Largest possible TIPC header size */
@@ -252,6 +255,11 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n)
252 msg_set_bits(m, 1, 29, 0x7, n); 255 msg_set_bits(m, 1, 29, 0x7, n);
253} 256}
254 257
258static inline int msg_in_group(struct tipc_msg *m)
259{
260 return (msg_type(m) == TIPC_GRP_BCAST_MSG);
261}
262
255static inline u32 msg_named(struct tipc_msg *m) 263static inline u32 msg_named(struct tipc_msg *m)
256{ 264{
257 return msg_type(m) == TIPC_NAMED_MSG; 265 return msg_type(m) == TIPC_NAMED_MSG;
@@ -259,7 +267,9 @@ static inline u32 msg_named(struct tipc_msg *m)
259 267
260static inline u32 msg_mcast(struct tipc_msg *m) 268static inline u32 msg_mcast(struct tipc_msg *m)
261{ 269{
262 return msg_type(m) == TIPC_MCAST_MSG; 270 int mtyp = msg_type(m);
271
272 return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG));
263} 273}
264 274
265static inline u32 msg_connected(struct tipc_msg *m) 275static inline u32 msg_connected(struct tipc_msg *m)
@@ -515,6 +525,12 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
515#define DSC_RESP_MSG 1 525#define DSC_RESP_MSG 1
516 526
517/* 527/*
528 * Group protocol message types
529 */
530#define GRP_JOIN_MSG 0
531#define GRP_LEAVE_MSG 1
532
533/*
518 * Word 1 534 * Word 1
519 */ 535 */
520static inline u32 msg_seq_gap(struct tipc_msg *m) 536static inline u32 msg_seq_gap(struct tipc_msg *m)
@@ -795,6 +811,28 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
795 msg_set_bits(m, 9, 0, 0xffff, n); 811 msg_set_bits(m, 9, 0, 0xffff, n);
796} 812}
797 813
814static inline u16 msg_grp_bc_syncpt(struct tipc_msg *m)
815{
816 return msg_bits(m, 9, 16, 0xffff);
817}
818
819static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n)
820{
821 msg_set_bits(m, 9, 16, 0xffff, n);
822}
823
824/* Word 10
825 */
826static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
827{
828 return msg_bits(m, 10, 16, 0xffff);
829}
830
831static inline void msg_set_grp_bc_seqno(struct tipc_msg *m, u32 n)
832{
833 msg_set_bits(m, 10, 16, 0xffff, n);
834}
835
798static inline bool msg_peer_link_is_up(struct tipc_msg *m) 836static inline bool msg_peer_link_is_up(struct tipc_msg *m)
799{ 837{
800 if (likely(msg_user(m) != LINK_PROTOCOL)) 838 if (likely(msg_user(m) != LINK_PROTOCOL))
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 76bd2777baaf..114d72bab827 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -43,6 +43,7 @@
43#include "bcast.h" 43#include "bcast.h"
44#include "addr.h" 44#include "addr.h"
45#include "node.h" 45#include "node.h"
46#include "group.h"
46#include <net/genetlink.h> 47#include <net/genetlink.h>
47 48
48#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ 49#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */
@@ -596,18 +597,6 @@ not_found:
596 return ref; 597 return ref;
597} 598}
598 599
599/**
600 * tipc_nametbl_mc_translate - find multicast destinations
601 *
602 * Creates list of all local ports that overlap the given multicast address;
603 * also determines if any off-node ports overlap.
604 *
605 * Note: Publications with a scope narrower than 'limit' are ignored.
606 * (i.e. local node-scope publications mustn't receive messages arriving
607 * from another node, even if the multcast link brought it here)
608 *
609 * Returns non-zero if any off-node ports overlap
610 */
611int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 600int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
612 u32 limit, struct list_head *dports) 601 u32 limit, struct list_head *dports)
613{ 602{
@@ -679,6 +668,37 @@ exit:
679 rcu_read_unlock(); 668 rcu_read_unlock();
680} 669}
681 670
671/* tipc_nametbl_build_group - build list of communication group members
672 */
673void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
674 u32 type, u32 domain)
675{
676 struct sub_seq *sseq, *stop;
677 struct name_info *info;
678 struct publication *p;
679 struct name_seq *seq;
680
681 rcu_read_lock();
682 seq = nametbl_find_seq(net, type);
683 if (!seq)
684 goto exit;
685
686 spin_lock_bh(&seq->lock);
687 sseq = seq->sseqs;
688 stop = seq->sseqs + seq->first_free;
689 for (; sseq != stop; sseq++) {
690 info = sseq->info;
691 list_for_each_entry(p, &info->zone_list, zone_list) {
692 if (!tipc_in_scope(domain, p->node))
693 continue;
694 tipc_group_add_member(grp, p->node, p->ref);
695 }
696 }
697 spin_unlock_bh(&seq->lock);
698exit:
699 rcu_read_unlock();
700}
701
682/* 702/*
683 * tipc_nametbl_publish - add name publication to network name tables 703 * tipc_nametbl_publish - add name publication to network name tables
684 */ 704 */
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index d121175a92b5..97646b17a4a2 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -40,6 +40,7 @@
40struct tipc_subscription; 40struct tipc_subscription;
41struct tipc_plist; 41struct tipc_plist;
42struct tipc_nlist; 42struct tipc_nlist;
43struct tipc_group;
43 44
44/* 45/*
45 * TIPC name types reserved for internal TIPC use (both current and planned) 46 * TIPC name types reserved for internal TIPC use (both current and planned)
@@ -101,6 +102,8 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
101u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); 102u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
102int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 103int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
103 u32 limit, struct list_head *dports); 104 u32 limit, struct list_head *dports);
105void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
106 u32 type, u32 domain);
104void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, 107void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
105 u32 upper, u32 domain, 108 u32 upper, u32 domain,
106 struct tipc_nlist *nodes); 109 struct tipc_nlist *nodes);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index df2f2197c4ad..acd58d23a70e 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -48,7 +48,8 @@ enum {
48 TIPC_BCAST_SYNCH = (1 << 1), 48 TIPC_BCAST_SYNCH = (1 << 1),
49 TIPC_BCAST_STATE_NACK = (1 << 2), 49 TIPC_BCAST_STATE_NACK = (1 << 2),
50 TIPC_BLOCK_FLOWCTL = (1 << 3), 50 TIPC_BLOCK_FLOWCTL = (1 << 3),
51 TIPC_BCAST_RCAST = (1 << 4) 51 TIPC_BCAST_RCAST = (1 << 4),
52 TIPC_MCAST_GROUPS = (1 << 5)
52}; 53};
53 54
54#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ 55#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index daf7c4df4531..64bbf9d03629 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/socket.c: TIPC socket API 2 * net/tipc/socket.c: TIPC socket API
3 * 3 *
4 * Copyright (c) 2001-2007, 2012-2016, Ericsson AB 4 * Copyright (c) 2001-2007, 2012-2017, Ericsson AB
5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems 5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -45,6 +45,7 @@
45#include "socket.h" 45#include "socket.h"
46#include "bcast.h" 46#include "bcast.h"
47#include "netlink.h" 47#include "netlink.h"
48#include "group.h"
48 49
49#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ 50#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
50#define CONN_PROBING_INTERVAL msecs_to_jiffies(3600000) /* [ms] => 1 h */ 51#define CONN_PROBING_INTERVAL msecs_to_jiffies(3600000) /* [ms] => 1 h */
@@ -78,7 +79,7 @@ enum {
78 * @conn_timeout: the time we can wait for an unresponded setup request 79 * @conn_timeout: the time we can wait for an unresponded setup request
79 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue 80 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
80 * @cong_link_cnt: number of congested links 81 * @cong_link_cnt: number of congested links
81 * @sent_unacked: # messages sent by socket, and not yet acked by peer 82 * @snt_unacked: # messages sent by socket, and not yet acked by peer
82 * @rcv_unacked: # messages read by user, but not yet acked back to peer 83 * @rcv_unacked: # messages read by user, but not yet acked back to peer
83 * @peer: 'connected' peer for dgram/rdm 84 * @peer: 'connected' peer for dgram/rdm
84 * @node: hash table node 85 * @node: hash table node
@@ -109,6 +110,7 @@ struct tipc_sock {
109 struct rhash_head node; 110 struct rhash_head node;
110 struct tipc_mc_method mc_method; 111 struct tipc_mc_method mc_method;
111 struct rcu_head rcu; 112 struct rcu_head rcu;
113 struct tipc_group *group;
112}; 114};
113 115
114static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb); 116static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
@@ -123,6 +125,7 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
123 struct tipc_name_seq const *seq); 125 struct tipc_name_seq const *seq);
124static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, 126static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
125 struct tipc_name_seq const *seq); 127 struct tipc_name_seq const *seq);
128static int tipc_sk_leave(struct tipc_sock *tsk);
126static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); 129static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
127static int tipc_sk_insert(struct tipc_sock *tsk); 130static int tipc_sk_insert(struct tipc_sock *tsk);
128static void tipc_sk_remove(struct tipc_sock *tsk); 131static void tipc_sk_remove(struct tipc_sock *tsk);
@@ -559,6 +562,7 @@ static int tipc_release(struct socket *sock)
559 562
560 __tipc_shutdown(sock, TIPC_ERR_NO_PORT); 563 __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
561 sk->sk_shutdown = SHUTDOWN_MASK; 564 sk->sk_shutdown = SHUTDOWN_MASK;
565 tipc_sk_leave(tsk);
562 tipc_sk_withdraw(tsk, 0, NULL); 566 tipc_sk_withdraw(tsk, 0, NULL);
563 sk_stop_timer(sk, &sk->sk_timer); 567 sk_stop_timer(sk, &sk->sk_timer);
564 tipc_sk_remove(tsk); 568 tipc_sk_remove(tsk);
@@ -601,7 +605,10 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
601 res = tipc_sk_withdraw(tsk, 0, NULL); 605 res = tipc_sk_withdraw(tsk, 0, NULL);
602 goto exit; 606 goto exit;
603 } 607 }
604 608 if (tsk->group) {
609 res = -EACCES;
610 goto exit;
611 }
605 if (uaddr_len < sizeof(struct sockaddr_tipc)) { 612 if (uaddr_len < sizeof(struct sockaddr_tipc)) {
606 res = -EINVAL; 613 res = -EINVAL;
607 goto exit; 614 goto exit;
@@ -698,6 +705,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
698{ 705{
699 struct sock *sk = sock->sk; 706 struct sock *sk = sock->sk;
700 struct tipc_sock *tsk = tipc_sk(sk); 707 struct tipc_sock *tsk = tipc_sk(sk);
708 struct tipc_group *grp = tsk->group;
701 u32 mask = 0; 709 u32 mask = 0;
702 710
703 sock_poll_wait(file, sk_sleep(sk), wait); 711 sock_poll_wait(file, sk_sleep(sk), wait);
@@ -718,8 +726,9 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
718 mask |= (POLLIN | POLLRDNORM); 726 mask |= (POLLIN | POLLRDNORM);
719 break; 727 break;
720 case TIPC_OPEN: 728 case TIPC_OPEN:
721 if (!tsk->cong_link_cnt) 729 if (!grp || tipc_group_size(grp))
722 mask |= POLLOUT; 730 if (!tsk->cong_link_cnt)
731 mask |= POLLOUT;
723 if (tipc_sk_type_connectionless(sk) && 732 if (tipc_sk_type_connectionless(sk) &&
724 (!skb_queue_empty(&sk->sk_receive_queue))) 733 (!skb_queue_empty(&sk->sk_receive_queue)))
725 mask |= (POLLIN | POLLRDNORM); 734 mask |= (POLLIN | POLLRDNORM);
@@ -757,6 +766,9 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
757 struct tipc_nlist dsts; 766 struct tipc_nlist dsts;
758 int rc; 767 int rc;
759 768
769 if (tsk->group)
770 return -EACCES;
771
760 /* Block or return if any destination link is congested */ 772 /* Block or return if any destination link is congested */
761 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); 773 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
762 if (unlikely(rc)) 774 if (unlikely(rc))
@@ -794,6 +806,64 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
794} 806}
795 807
796/** 808/**
809 * tipc_send_group_bcast - send message to all members in communication group
810 * @sk: socket structure
811 * @m: message to send
812 * @dlen: total length of message data
813 * @timeout: timeout to wait for wakeup
814 *
815 * Called from function tipc_sendmsg(), which has done all sanity checks
816 * Returns the number of bytes sent on success, or errno
817 */
818static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
819 int dlen, long timeout)
820{
821 struct sock *sk = sock->sk;
822 struct net *net = sock_net(sk);
823 struct tipc_sock *tsk = tipc_sk(sk);
824 struct tipc_group *grp = tsk->group;
825 struct tipc_nlist *dsts = tipc_group_dests(grp);
826 struct tipc_mc_method *method = &tsk->mc_method;
827 struct tipc_msg *hdr = &tsk->phdr;
828 int mtu = tipc_bcast_get_mtu(net);
829 struct sk_buff_head pkts;
830 int rc = -EHOSTUNREACH;
831
832 if (!dsts->local && !dsts->remote)
833 return -EHOSTUNREACH;
834
835 /* Block or return if any destination link is congested */
836 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
837 if (unlikely(rc))
838 return rc;
839
840 /* Complete message header */
841 msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
842 msg_set_hdr_sz(hdr, MCAST_H_SIZE);
843 msg_set_destport(hdr, 0);
844 msg_set_destnode(hdr, 0);
845 msg_set_nameinst(hdr, 0);
846 msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
847
848 /* Build message as chain of buffers */
849 skb_queue_head_init(&pkts);
850 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
851 if (unlikely(rc != dlen))
852 return rc;
853
854 /* Send message */
855 rc = tipc_mcast_xmit(net, &pkts, method, dsts,
856 &tsk->cong_link_cnt);
857 if (unlikely(rc))
858 return rc;
859
860 /* Update broadcast sequence number */
861 tipc_group_update_bc_members(tsk->group);
862
863 return dlen;
864}
865
866/**
797 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets 867 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
798 * @arrvq: queue with arriving messages, to be cloned after destination lookup 868 * @arrvq: queue with arriving messages, to be cloned after destination lookup
799 * @inputq: queue with cloned messages, delivered to socket after dest lookup 869 * @inputq: queue with cloned messages, delivered to socket after dest lookup
@@ -803,13 +873,15 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
803void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, 873void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
804 struct sk_buff_head *inputq) 874 struct sk_buff_head *inputq)
805{ 875{
806 struct tipc_msg *msg;
807 struct list_head dports;
808 u32 portid;
809 u32 scope = TIPC_CLUSTER_SCOPE; 876 u32 scope = TIPC_CLUSTER_SCOPE;
810 struct sk_buff_head tmpq; 877 u32 self = tipc_own_addr(net);
811 uint hsz;
812 struct sk_buff *skb, *_skb; 878 struct sk_buff *skb, *_skb;
879 u32 lower = 0, upper = ~0;
880 struct sk_buff_head tmpq;
881 u32 portid, oport, onode;
882 struct list_head dports;
883 struct tipc_msg *msg;
884 int hsz;
813 885
814 __skb_queue_head_init(&tmpq); 886 __skb_queue_head_init(&tmpq);
815 INIT_LIST_HEAD(&dports); 887 INIT_LIST_HEAD(&dports);
@@ -818,14 +890,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
818 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { 890 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
819 msg = buf_msg(skb); 891 msg = buf_msg(skb);
820 hsz = skb_headroom(skb) + msg_hdr_sz(msg); 892 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
821 893 oport = msg_origport(msg);
822 if (in_own_node(net, msg_orignode(msg))) 894 onode = msg_orignode(msg);
895 if (onode == self)
823 scope = TIPC_NODE_SCOPE; 896 scope = TIPC_NODE_SCOPE;
824 897
825 /* Create destination port list and message clones: */ 898 /* Create destination port list and message clones: */
826 tipc_nametbl_mc_translate(net, 899 if (!msg_in_group(msg)) {
827 msg_nametype(msg), msg_namelower(msg), 900 lower = msg_namelower(msg);
828 msg_nameupper(msg), scope, &dports); 901 upper = msg_nameupper(msg);
902 }
903 tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper,
904 scope, &dports);
829 while (tipc_dest_pop(&dports, NULL, &portid)) { 905 while (tipc_dest_pop(&dports, NULL, &portid)) {
830 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); 906 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
831 if (_skb) { 907 if (_skb) {
@@ -895,10 +971,6 @@ exit:
895 kfree_skb(skb); 971 kfree_skb(skb);
896} 972}
897 973
898static void tipc_sk_top_evt(struct tipc_sock *tsk, struct tipc_event *evt)
899{
900}
901
902/** 974/**
903 * tipc_sendmsg - send message in connectionless manner 975 * tipc_sendmsg - send message in connectionless manner
904 * @sock: socket structure 976 * @sock: socket structure
@@ -934,6 +1006,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
934 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 1006 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
935 struct list_head *clinks = &tsk->cong_links; 1007 struct list_head *clinks = &tsk->cong_links;
936 bool syn = !tipc_sk_type_connectionless(sk); 1008 bool syn = !tipc_sk_type_connectionless(sk);
1009 struct tipc_group *grp = tsk->group;
937 struct tipc_msg *hdr = &tsk->phdr; 1010 struct tipc_msg *hdr = &tsk->phdr;
938 struct tipc_name_seq *seq; 1011 struct tipc_name_seq *seq;
939 struct sk_buff_head pkts; 1012 struct sk_buff_head pkts;
@@ -944,6 +1017,9 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
944 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) 1017 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
945 return -EMSGSIZE; 1018 return -EMSGSIZE;
946 1019
1020 if (unlikely(grp))
1021 return tipc_send_group_bcast(sock, m, dlen, timeout);
1022
947 if (unlikely(!dest)) { 1023 if (unlikely(!dest)) {
948 dest = &tsk->peer; 1024 dest = &tsk->peer;
949 if (!syn || dest->family != AF_TIPC) 1025 if (!syn || dest->family != AF_TIPC)
@@ -1543,6 +1619,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
1543 struct sk_buff *skb = __skb_dequeue(inputq); 1619 struct sk_buff *skb = __skb_dequeue(inputq);
1544 struct tipc_sock *tsk = tipc_sk(sk); 1620 struct tipc_sock *tsk = tipc_sk(sk);
1545 struct tipc_msg *hdr = buf_msg(skb); 1621 struct tipc_msg *hdr = buf_msg(skb);
1622 struct tipc_group *grp = tsk->group;
1546 1623
1547 switch (msg_user(hdr)) { 1624 switch (msg_user(hdr)) {
1548 case CONN_MANAGER: 1625 case CONN_MANAGER:
@@ -1553,8 +1630,12 @@ static void tipc_sk_proto_rcv(struct sock *sk,
1553 tsk->cong_link_cnt--; 1630 tsk->cong_link_cnt--;
1554 sk->sk_write_space(sk); 1631 sk->sk_write_space(sk);
1555 break; 1632 break;
1633 case GROUP_PROTOCOL:
1634 tipc_group_proto_rcv(grp, hdr, xmitq);
1635 break;
1556 case TOP_SRV: 1636 case TOP_SRV:
1557 tipc_sk_top_evt(tsk, (void *)msg_data(hdr)); 1637 tipc_group_member_evt(tsk->group, skb, xmitq);
1638 skb = NULL;
1558 break; 1639 break;
1559 default: 1640 default:
1560 break; 1641 break;
@@ -1699,6 +1780,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
1699{ 1780{
1700 bool sk_conn = !tipc_sk_type_connectionless(sk); 1781 bool sk_conn = !tipc_sk_type_connectionless(sk);
1701 struct tipc_sock *tsk = tipc_sk(sk); 1782 struct tipc_sock *tsk = tipc_sk(sk);
1783 struct tipc_group *grp = tsk->group;
1702 struct tipc_msg *hdr = buf_msg(skb); 1784 struct tipc_msg *hdr = buf_msg(skb);
1703 struct net *net = sock_net(sk); 1785 struct net *net = sock_net(sk);
1704 struct sk_buff_head inputq; 1786 struct sk_buff_head inputq;
@@ -1710,15 +1792,19 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
1710 1792
1711 if (unlikely(!msg_isdata(hdr))) 1793 if (unlikely(!msg_isdata(hdr)))
1712 tipc_sk_proto_rcv(sk, &inputq, xmitq); 1794 tipc_sk_proto_rcv(sk, &inputq, xmitq);
1713 else if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) 1795 else if (unlikely(msg_type(hdr) > TIPC_GRP_BCAST_MSG))
1714 return kfree_skb(skb); 1796 return kfree_skb(skb);
1715 1797
1798 if (unlikely(grp))
1799 tipc_group_filter_msg(grp, &inputq, xmitq);
1800
1716 /* Validate and add to receive buffer if there is space */ 1801 /* Validate and add to receive buffer if there is space */
1717 while ((skb = __skb_dequeue(&inputq))) { 1802 while ((skb = __skb_dequeue(&inputq))) {
1718 hdr = buf_msg(skb); 1803 hdr = buf_msg(skb);
1719 limit = rcvbuf_limit(sk, skb); 1804 limit = rcvbuf_limit(sk, skb);
1720 if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) || 1805 if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
1721 (!sk_conn && msg_connected(hdr))) 1806 (!sk_conn && msg_connected(hdr)) ||
1807 (!grp && msg_in_group(hdr)))
1722 err = TIPC_ERR_NO_PORT; 1808 err = TIPC_ERR_NO_PORT;
1723 else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) 1809 else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit)
1724 err = TIPC_ERR_OVERLOAD; 1810 err = TIPC_ERR_OVERLOAD;
@@ -1837,7 +1923,6 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1837 sock_put(sk); 1923 sock_put(sk);
1838 continue; 1924 continue;
1839 } 1925 }
1840
1841 /* No destination socket => dequeue skb if still there */ 1926 /* No destination socket => dequeue skb if still there */
1842 skb = tipc_skb_dequeue(inputq, dport); 1927 skb = tipc_skb_dequeue(inputq, dport);
1843 if (!skb) 1928 if (!skb)
@@ -1905,6 +1990,11 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1905 1990
1906 lock_sock(sk); 1991 lock_sock(sk);
1907 1992
1993 if (tsk->group) {
1994 res = -EINVAL;
1995 goto exit;
1996 }
1997
1908 if (dst->family == AF_UNSPEC) { 1998 if (dst->family == AF_UNSPEC) {
1909 memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc)); 1999 memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
1910 if (!tipc_sk_type_connectionless(sk)) 2000 if (!tipc_sk_type_connectionless(sk))
@@ -2341,6 +2431,52 @@ void tipc_sk_rht_destroy(struct net *net)
2341 rhashtable_destroy(&tn->sk_rht); 2431 rhashtable_destroy(&tn->sk_rht);
2342} 2432}
2343 2433
2434static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
2435{
2436 struct net *net = sock_net(&tsk->sk);
2437 u32 domain = addr_domain(net, mreq->scope);
2438 struct tipc_group *grp = tsk->group;
2439 struct tipc_msg *hdr = &tsk->phdr;
2440 struct tipc_name_seq seq;
2441 int rc;
2442
2443 if (mreq->type < TIPC_RESERVED_TYPES)
2444 return -EACCES;
2445 if (grp)
2446 return -EACCES;
2447 grp = tipc_group_create(net, tsk->portid, mreq);
2448 if (!grp)
2449 return -ENOMEM;
2450 tsk->group = grp;
2451 msg_set_lookup_scope(hdr, mreq->scope);
2452 msg_set_nametype(hdr, mreq->type);
2453 msg_set_dest_droppable(hdr, true);
2454 seq.type = mreq->type;
2455 seq.lower = mreq->instance;
2456 seq.upper = seq.lower;
2457 tipc_nametbl_build_group(net, grp, mreq->type, domain);
2458 rc = tipc_sk_publish(tsk, mreq->scope, &seq);
2459 if (rc)
2460 tipc_group_delete(net, grp);
2461 return rc;
2462}
2463
2464static int tipc_sk_leave(struct tipc_sock *tsk)
2465{
2466 struct net *net = sock_net(&tsk->sk);
2467 struct tipc_group *grp = tsk->group;
2468 struct tipc_name_seq seq;
2469 int scope;
2470
2471 if (!grp)
2472 return -EINVAL;
2473 tipc_group_self(grp, &seq, &scope);
2474 tipc_group_delete(net, grp);
2475 tsk->group = NULL;
2476 tipc_sk_withdraw(tsk, scope, &seq);
2477 return 0;
2478}
2479
2344/** 2480/**
2345 * tipc_setsockopt - set socket option 2481 * tipc_setsockopt - set socket option
2346 * @sock: socket structure 2482 * @sock: socket structure
@@ -2359,6 +2495,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2359{ 2495{
2360 struct sock *sk = sock->sk; 2496 struct sock *sk = sock->sk;
2361 struct tipc_sock *tsk = tipc_sk(sk); 2497 struct tipc_sock *tsk = tipc_sk(sk);
2498 struct tipc_group_req mreq;
2362 u32 value = 0; 2499 u32 value = 0;
2363 int res = 0; 2500 int res = 0;
2364 2501
@@ -2374,9 +2511,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2374 case TIPC_CONN_TIMEOUT: 2511 case TIPC_CONN_TIMEOUT:
2375 if (ol < sizeof(value)) 2512 if (ol < sizeof(value))
2376 return -EINVAL; 2513 return -EINVAL;
2377 res = get_user(value, (u32 __user *)ov); 2514 if (get_user(value, (u32 __user *)ov))
2378 if (res) 2515 return -EFAULT;
2379 return res; 2516 break;
2517 case TIPC_GROUP_JOIN:
2518 if (ol < sizeof(mreq))
2519 return -EINVAL;
2520 if (copy_from_user(&mreq, ov, sizeof(mreq)))
2521 return -EFAULT;
2380 break; 2522 break;
2381 default: 2523 default:
2382 if (ov || ol) 2524 if (ov || ol)
@@ -2409,6 +2551,12 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2409 tsk->mc_method.rcast = true; 2551 tsk->mc_method.rcast = true;
2410 tsk->mc_method.mandatory = true; 2552 tsk->mc_method.mandatory = true;
2411 break; 2553 break;
2554 case TIPC_GROUP_JOIN:
2555 res = tipc_sk_join(tsk, &mreq);
2556 break;
2557 case TIPC_GROUP_LEAVE:
2558 res = tipc_sk_leave(tsk);
2559 break;
2412 default: 2560 default:
2413 res = -EINVAL; 2561 res = -EINVAL;
2414 } 2562 }
@@ -2436,7 +2584,8 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2436{ 2584{
2437 struct sock *sk = sock->sk; 2585 struct sock *sk = sock->sk;
2438 struct tipc_sock *tsk = tipc_sk(sk); 2586 struct tipc_sock *tsk = tipc_sk(sk);
2439 int len; 2587 struct tipc_name_seq seq;
2588 int len, scope;
2440 u32 value; 2589 u32 value;
2441 int res; 2590 int res;
2442 2591
@@ -2470,6 +2619,12 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2470 case TIPC_SOCK_RECVQ_DEPTH: 2619 case TIPC_SOCK_RECVQ_DEPTH:
2471 value = skb_queue_len(&sk->sk_receive_queue); 2620 value = skb_queue_len(&sk->sk_receive_queue);
2472 break; 2621 break;
2622 case TIPC_GROUP_JOIN:
2623 seq.type = 0;
2624 if (tsk->group)
2625 tipc_group_self(tsk->group, &seq, &scope);
2626 value = seq.type;
2627 break;
2473 default: 2628 default:
2474 res = -EINVAL; 2629 res = -EINVAL;
2475 } 2630 }