aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2017-10-13 05:04:23 -0400
committerDavid S. Miller <davem@davemloft.net>2017-10-13 11:46:00 -0400
commit75da2163dbb6af9f2dce1d80056d11d290dd19a5 (patch)
tree3c38c9e2a9085c1422826e861e5252fdb42b7d40 /net/tipc
parenta80ae5306a7346d4e52f59462878beb8362f4bbd (diff)
tipc: introduce communication groups
As a preparation for introducing flow control for multicast and datagram messaging we need a more strictly defined framework than we have now. A socket must be able keep track of exactly how many and which other sockets it is allowed to communicate with at any moment, and keep the necessary state for those. We therefore introduce a new concept we have named Communication Group. Sockets can join a group via a new setsockopt() call TIPC_GROUP_JOIN. The call takes four parameters: 'type' serves as group identifier, 'instance' serves as an logical member identifier, and 'scope' indicates the visibility of the group (node/cluster/zone). Finally, 'flags' makes it possible to set certain properties for the member. For now, there is only one flag, indicating if the creator of the socket wants to receive a copy of broadcast or multicast messages it is sending via the socket, and if wants to be eligible as destination for its own anycasts. A group is closed, i.e., sockets which have not joined a group will not be able to send messages to or receive messages from members of the group, and vice versa. Any member of a group can send multicast ('group broadcast') messages to all group members, optionally including itself, using the primitive send(). The messages are received via the recvmsg() primitive. A socket can only be member of one group at a time. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/Makefile2
-rw-r--r--net/tipc/group.c404
-rw-r--r--net/tipc/group.h64
-rw-r--r--net/tipc/link.c3
-rw-r--r--net/tipc/msg.h50
-rw-r--r--net/tipc/name_table.c44
-rw-r--r--net/tipc/name_table.h3
-rw-r--r--net/tipc/node.h3
-rw-r--r--net/tipc/socket.c209
9 files changed, 734 insertions, 48 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index 31b9f9c52974..a3af73ec0b78 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -8,7 +8,7 @@ tipc-y += addr.o bcast.o bearer.o \
8 core.o link.o discover.o msg.o \ 8 core.o link.o discover.o msg.o \
9 name_distr.o subscr.o monitor.o name_table.o net.o \ 9 name_distr.o subscr.o monitor.o name_table.o net.o \
10 netlink.o netlink_compat.o node.o socket.o eth_media.o \ 10 netlink.o netlink_compat.o node.o socket.o eth_media.o \
11 server.o socket.o 11 server.o socket.o group.o
12 12
13tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o 13tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o
14tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o 14tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o
diff --git a/net/tipc/group.c b/net/tipc/group.c
new file mode 100644
index 000000000000..3f0e1ce1e3b9
--- /dev/null
+++ b/net/tipc/group.c
@@ -0,0 +1,404 @@
1/*
2 * net/tipc/group.c: TIPC group messaging code
3 *
4 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include "core.h"
37#include "addr.h"
38#include "group.h"
39#include "bcast.h"
40#include "server.h"
41#include "msg.h"
42#include "socket.h"
43#include "node.h"
44#include "name_table.h"
45#include "subscr.h"
46
47#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
48#define ADV_IDLE ADV_UNIT
49
50enum mbr_state {
51 MBR_QUARANTINED,
52 MBR_DISCOVERED,
53 MBR_JOINING,
54 MBR_PUBLISHED,
55 MBR_JOINED,
56 MBR_LEAVING
57};
58
59struct tipc_member {
60 struct rb_node tree_node;
61 struct list_head list;
62 u32 node;
63 u32 port;
64 enum mbr_state state;
65 u16 bc_rcv_nxt;
66};
67
68struct tipc_group {
69 struct rb_root members;
70 struct tipc_nlist dests;
71 struct net *net;
72 int subid;
73 u32 type;
74 u32 instance;
75 u32 domain;
76 u32 scope;
77 u32 portid;
78 u16 member_cnt;
79 u16 bc_snd_nxt;
80 bool loopback;
81};
82
83static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
84 int mtyp, struct sk_buff_head *xmitq);
85
86u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
87{
88 return grp->bc_snd_nxt;
89}
90
91static bool tipc_group_is_receiver(struct tipc_member *m)
92{
93 return m && m->state >= MBR_JOINED;
94}
95
96int tipc_group_size(struct tipc_group *grp)
97{
98 return grp->member_cnt;
99}
100
101struct tipc_group *tipc_group_create(struct net *net, u32 portid,
102 struct tipc_group_req *mreq)
103{
104 struct tipc_group *grp;
105 u32 type = mreq->type;
106
107 grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
108 if (!grp)
109 return NULL;
110 tipc_nlist_init(&grp->dests, tipc_own_addr(net));
111 grp->members = RB_ROOT;
112 grp->net = net;
113 grp->portid = portid;
114 grp->domain = addr_domain(net, mreq->scope);
115 grp->type = type;
116 grp->instance = mreq->instance;
117 grp->scope = mreq->scope;
118 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
119 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
120 return grp;
121 kfree(grp);
122 return NULL;
123}
124
125void tipc_group_delete(struct net *net, struct tipc_group *grp)
126{
127 struct rb_root *tree = &grp->members;
128 struct tipc_member *m, *tmp;
129 struct sk_buff_head xmitq;
130
131 __skb_queue_head_init(&xmitq);
132
133 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
134 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
135 list_del(&m->list);
136 kfree(m);
137 }
138 tipc_node_distr_xmit(net, &xmitq);
139 tipc_nlist_purge(&grp->dests);
140 tipc_topsrv_kern_unsubscr(net, grp->subid);
141 kfree(grp);
142}
143
144struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
145 u32 node, u32 port)
146{
147 struct rb_node *n = grp->members.rb_node;
148 u64 nkey, key = (u64)node << 32 | port;
149 struct tipc_member *m;
150
151 while (n) {
152 m = container_of(n, struct tipc_member, tree_node);
153 nkey = (u64)m->node << 32 | m->port;
154 if (key < nkey)
155 n = n->rb_left;
156 else if (key > nkey)
157 n = n->rb_right;
158 else
159 return m;
160 }
161 return NULL;
162}
163
164static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
165 u32 node)
166{
167 struct tipc_member *m;
168 struct rb_node *n;
169
170 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
171 m = container_of(n, struct tipc_member, tree_node);
172 if (m->node == node)
173 return m;
174 }
175 return NULL;
176}
177
178static void tipc_group_add_to_tree(struct tipc_group *grp,
179 struct tipc_member *m)
180{
181 u64 nkey, key = (u64)m->node << 32 | m->port;
182 struct rb_node **n, *parent = NULL;
183 struct tipc_member *tmp;
184
185 n = &grp->members.rb_node;
186 while (*n) {
187 tmp = container_of(*n, struct tipc_member, tree_node);
188 parent = *n;
189 tmp = container_of(parent, struct tipc_member, tree_node);
190 nkey = (u64)tmp->node << 32 | tmp->port;
191 if (key < nkey)
192 n = &(*n)->rb_left;
193 else if (key > nkey)
194 n = &(*n)->rb_right;
195 else
196 return;
197 }
198 rb_link_node(&m->tree_node, parent, n);
199 rb_insert_color(&m->tree_node, &grp->members);
200}
201
202static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
203 u32 node, u32 port,
204 int state)
205{
206 struct tipc_member *m;
207
208 m = kzalloc(sizeof(*m), GFP_ATOMIC);
209 if (!m)
210 return NULL;
211 INIT_LIST_HEAD(&m->list);
212 m->node = node;
213 m->port = port;
214 grp->member_cnt++;
215 tipc_group_add_to_tree(grp, m);
216 tipc_nlist_add(&grp->dests, m->node);
217 m->state = state;
218 return m;
219}
220
221void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
222{
223 tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
224}
225
226static void tipc_group_delete_member(struct tipc_group *grp,
227 struct tipc_member *m)
228{
229 rb_erase(&m->tree_node, &grp->members);
230 grp->member_cnt--;
231 list_del_init(&m->list);
232
233 /* If last member on a node, remove node from dest list */
234 if (!tipc_group_find_node(grp, m->node))
235 tipc_nlist_del(&grp->dests, m->node);
236
237 kfree(m);
238}
239
240struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
241{
242 return &grp->dests;
243}
244
245void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
246 int *scope)
247{
248 seq->type = grp->type;
249 seq->lower = grp->instance;
250 seq->upper = grp->instance;
251 *scope = grp->scope;
252}
253
254void tipc_group_update_bc_members(struct tipc_group *grp)
255{
256 grp->bc_snd_nxt++;
257}
258
259/* tipc_group_filter_msg() - determine if we should accept arriving message
260 */
261void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
262 struct sk_buff_head *xmitq)
263{
264 struct sk_buff *skb = __skb_dequeue(inputq);
265 struct tipc_member *m;
266 struct tipc_msg *hdr;
267 u32 node, port;
268 int mtyp;
269
270 if (!skb)
271 return;
272
273 hdr = buf_msg(skb);
274 mtyp = msg_type(hdr);
275 node = msg_orignode(hdr);
276 port = msg_origport(hdr);
277
278 if (!msg_in_group(hdr))
279 goto drop;
280
281 m = tipc_group_find_member(grp, node, port);
282 if (!tipc_group_is_receiver(m))
283 goto drop;
284
285 __skb_queue_tail(inputq, skb);
286
287 m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
288 return;
289drop:
290 kfree_skb(skb);
291}
292
293static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
294 int mtyp, struct sk_buff_head *xmitq)
295{
296 struct tipc_msg *hdr;
297 struct sk_buff *skb;
298
299 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
300 m->node, tipc_own_addr(grp->net),
301 m->port, grp->portid, 0);
302 if (!skb)
303 return;
304
305 hdr = buf_msg(skb);
306 if (mtyp == GRP_JOIN_MSG)
307 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
308 __skb_queue_tail(xmitq, skb);
309}
310
311void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
312 struct sk_buff_head *xmitq)
313{
314 u32 node = msg_orignode(hdr);
315 u32 port = msg_origport(hdr);
316 struct tipc_member *m;
317
318 if (!grp)
319 return;
320
321 m = tipc_group_find_member(grp, node, port);
322
323 switch (msg_type(hdr)) {
324 case GRP_JOIN_MSG:
325 if (!m)
326 m = tipc_group_create_member(grp, node, port,
327 MBR_QUARANTINED);
328 if (!m)
329 return;
330 m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
331
332 /* Wait until PUBLISH event is received */
333 if (m->state == MBR_DISCOVERED)
334 m->state = MBR_JOINING;
335 else if (m->state == MBR_PUBLISHED)
336 m->state = MBR_JOINED;
337 return;
338 case GRP_LEAVE_MSG:
339 if (!m)
340 return;
341
342 /* Wait until WITHDRAW event is received */
343 if (m->state != MBR_LEAVING) {
344 m->state = MBR_LEAVING;
345 return;
346 }
347 /* Otherwise deliver already received WITHDRAW event */
348 tipc_group_delete_member(grp, m);
349 return;
350 default:
351 pr_warn("Received unknown GROUP_PROTO message\n");
352 }
353}
354
355/* tipc_group_member_evt() - receive and handle a member up/down event
356 */
357void tipc_group_member_evt(struct tipc_group *grp,
358 struct sk_buff *skb,
359 struct sk_buff_head *xmitq)
360{
361 struct tipc_msg *hdr = buf_msg(skb);
362 struct tipc_event *evt = (void *)msg_data(hdr);
363 u32 node = evt->port.node;
364 u32 port = evt->port.ref;
365 struct tipc_member *m;
366 struct net *net;
367 u32 self;
368
369 if (!grp)
370 goto drop;
371
372 net = grp->net;
373 self = tipc_own_addr(net);
374 if (!grp->loopback && node == self && port == grp->portid)
375 goto drop;
376
377 m = tipc_group_find_member(grp, node, port);
378
379 if (evt->event == TIPC_PUBLISHED) {
380 if (!m)
381 m = tipc_group_create_member(grp, node, port,
382 MBR_DISCOVERED);
383 if (!m)
384 goto drop;
385
386 /* Wait if JOIN message not yet received */
387 if (m->state == MBR_DISCOVERED)
388 m->state = MBR_PUBLISHED;
389 else
390 m->state = MBR_JOINED;
391 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
392 } else if (evt->event == TIPC_WITHDRAWN) {
393 if (!m)
394 goto drop;
395
396 /* Keep back event if more messages might be expected */
397 if (m->state != MBR_LEAVING && tipc_node_is_up(net, node))
398 m->state = MBR_LEAVING;
399 else
400 tipc_group_delete_member(grp, m);
401 }
402drop:
403 kfree_skb(skb);
404}
diff --git a/net/tipc/group.h b/net/tipc/group.h
new file mode 100644
index 000000000000..9bdf4479fc03
--- /dev/null
+++ b/net/tipc/group.h
@@ -0,0 +1,64 @@
1/*
2 * net/tipc/group.h: Include file for TIPC group unicast/multicast functions
3 *
4 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#ifndef _TIPC_GROUP_H
37#define _TIPC_GROUP_H
38
39#include "core.h"
40
41struct tipc_group;
42struct tipc_member;
43struct tipc_msg;
44
45struct tipc_group *tipc_group_create(struct net *net, u32 portid,
46 struct tipc_group_req *mreq);
47void tipc_group_delete(struct net *net, struct tipc_group *grp);
48void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port);
49struct tipc_nlist *tipc_group_dests(struct tipc_group *grp);
50void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
51 int *scope);
52void tipc_group_filter_msg(struct tipc_group *grp,
53 struct sk_buff_head *inputq,
54 struct sk_buff_head *xmitq);
55void tipc_group_member_evt(struct tipc_group *grp,
56 struct sk_buff *skb,
57 struct sk_buff_head *xmitq);
58void tipc_group_proto_rcv(struct tipc_group *grp,
59 struct tipc_msg *hdr,
60 struct sk_buff_head *xmitq);
61void tipc_group_update_bc_members(struct tipc_group *grp);
62u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
63int tipc_group_size(struct tipc_group *grp);
64#endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ac0144f532aa..bd25bff63925 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1046,11 +1046,12 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1046 case TIPC_MEDIUM_IMPORTANCE: 1046 case TIPC_MEDIUM_IMPORTANCE:
1047 case TIPC_HIGH_IMPORTANCE: 1047 case TIPC_HIGH_IMPORTANCE:
1048 case TIPC_CRITICAL_IMPORTANCE: 1048 case TIPC_CRITICAL_IMPORTANCE:
1049 if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) { 1049 if (unlikely(msg_mcast(hdr))) {
1050 skb_queue_tail(l->bc_rcvlink->inputq, skb); 1050 skb_queue_tail(l->bc_rcvlink->inputq, skb);
1051 return true; 1051 return true;
1052 } 1052 }
1053 case CONN_MANAGER: 1053 case CONN_MANAGER:
1054 case GROUP_PROTOCOL:
1054 skb_queue_tail(inputq, skb); 1055 skb_queue_tail(inputq, skb);
1055 return true; 1056 return true;
1056 case NAME_DISTRIBUTOR: 1057 case NAME_DISTRIBUTOR:
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index be3e38aa9dd2..dad400935405 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/msg.h: Include file for TIPC message header routines 2 * net/tipc/msg.h: Include file for TIPC message header routines
3 * 3 *
4 * Copyright (c) 2000-2007, 2014-2015 Ericsson AB 4 * Copyright (c) 2000-2007, 2014-2017 Ericsson AB
5 * Copyright (c) 2005-2008, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2008, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -61,10 +61,11 @@ struct plist;
61/* 61/*
62 * Payload message types 62 * Payload message types
63 */ 63 */
64#define TIPC_CONN_MSG 0 64#define TIPC_CONN_MSG 0
65#define TIPC_MCAST_MSG 1 65#define TIPC_MCAST_MSG 1
66#define TIPC_NAMED_MSG 2 66#define TIPC_NAMED_MSG 2
67#define TIPC_DIRECT_MSG 3 67#define TIPC_DIRECT_MSG 3
68#define TIPC_GRP_BCAST_MSG 4
68 69
69/* 70/*
70 * Internal message users 71 * Internal message users
@@ -73,6 +74,7 @@ struct plist;
73#define MSG_BUNDLER 6 74#define MSG_BUNDLER 6
74#define LINK_PROTOCOL 7 75#define LINK_PROTOCOL 7
75#define CONN_MANAGER 8 76#define CONN_MANAGER 8
77#define GROUP_PROTOCOL 9
76#define TUNNEL_PROTOCOL 10 78#define TUNNEL_PROTOCOL 10
77#define NAME_DISTRIBUTOR 11 79#define NAME_DISTRIBUTOR 11
78#define MSG_FRAGMENTER 12 80#define MSG_FRAGMENTER 12
@@ -87,6 +89,7 @@ struct plist;
87#define BASIC_H_SIZE 32 /* Basic payload message */ 89#define BASIC_H_SIZE 32 /* Basic payload message */
88#define NAMED_H_SIZE 40 /* Named payload message */ 90#define NAMED_H_SIZE 40 /* Named payload message */
89#define MCAST_H_SIZE 44 /* Multicast payload message */ 91#define MCAST_H_SIZE 44 /* Multicast payload message */
92#define GROUP_H_SIZE 44 /* Group payload message */
90#define INT_H_SIZE 40 /* Internal messages */ 93#define INT_H_SIZE 40 /* Internal messages */
91#define MIN_H_SIZE 24 /* Smallest legal TIPC header size */ 94#define MIN_H_SIZE 24 /* Smallest legal TIPC header size */
92#define MAX_H_SIZE 60 /* Largest possible TIPC header size */ 95#define MAX_H_SIZE 60 /* Largest possible TIPC header size */
@@ -252,6 +255,11 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n)
252 msg_set_bits(m, 1, 29, 0x7, n); 255 msg_set_bits(m, 1, 29, 0x7, n);
253} 256}
254 257
258static inline int msg_in_group(struct tipc_msg *m)
259{
260 return (msg_type(m) == TIPC_GRP_BCAST_MSG);
261}
262
255static inline u32 msg_named(struct tipc_msg *m) 263static inline u32 msg_named(struct tipc_msg *m)
256{ 264{
257 return msg_type(m) == TIPC_NAMED_MSG; 265 return msg_type(m) == TIPC_NAMED_MSG;
@@ -259,7 +267,9 @@ static inline u32 msg_named(struct tipc_msg *m)
259 267
260static inline u32 msg_mcast(struct tipc_msg *m) 268static inline u32 msg_mcast(struct tipc_msg *m)
261{ 269{
262 return msg_type(m) == TIPC_MCAST_MSG; 270 int mtyp = msg_type(m);
271
272 return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG));
263} 273}
264 274
265static inline u32 msg_connected(struct tipc_msg *m) 275static inline u32 msg_connected(struct tipc_msg *m)
@@ -515,6 +525,12 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
515#define DSC_RESP_MSG 1 525#define DSC_RESP_MSG 1
516 526
517/* 527/*
528 * Group protocol message types
529 */
530#define GRP_JOIN_MSG 0
531#define GRP_LEAVE_MSG 1
532
533/*
518 * Word 1 534 * Word 1
519 */ 535 */
520static inline u32 msg_seq_gap(struct tipc_msg *m) 536static inline u32 msg_seq_gap(struct tipc_msg *m)
@@ -795,6 +811,28 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
795 msg_set_bits(m, 9, 0, 0xffff, n); 811 msg_set_bits(m, 9, 0, 0xffff, n);
796} 812}
797 813
814static inline u16 msg_grp_bc_syncpt(struct tipc_msg *m)
815{
816 return msg_bits(m, 9, 16, 0xffff);
817}
818
819static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n)
820{
821 msg_set_bits(m, 9, 16, 0xffff, n);
822}
823
824/* Word 10
825 */
826static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
827{
828 return msg_bits(m, 10, 16, 0xffff);
829}
830
831static inline void msg_set_grp_bc_seqno(struct tipc_msg *m, u32 n)
832{
833 msg_set_bits(m, 10, 16, 0xffff, n);
834}
835
798static inline bool msg_peer_link_is_up(struct tipc_msg *m) 836static inline bool msg_peer_link_is_up(struct tipc_msg *m)
799{ 837{
800 if (likely(msg_user(m) != LINK_PROTOCOL)) 838 if (likely(msg_user(m) != LINK_PROTOCOL))
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 76bd2777baaf..114d72bab827 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -43,6 +43,7 @@
43#include "bcast.h" 43#include "bcast.h"
44#include "addr.h" 44#include "addr.h"
45#include "node.h" 45#include "node.h"
46#include "group.h"
46#include <net/genetlink.h> 47#include <net/genetlink.h>
47 48
48#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ 49#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */
@@ -596,18 +597,6 @@ not_found:
596 return ref; 597 return ref;
597} 598}
598 599
599/**
600 * tipc_nametbl_mc_translate - find multicast destinations
601 *
602 * Creates list of all local ports that overlap the given multicast address;
603 * also determines if any off-node ports overlap.
604 *
605 * Note: Publications with a scope narrower than 'limit' are ignored.
606 * (i.e. local node-scope publications mustn't receive messages arriving
607 * from another node, even if the multcast link brought it here)
608 *
609 * Returns non-zero if any off-node ports overlap
610 */
611int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 600int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
612 u32 limit, struct list_head *dports) 601 u32 limit, struct list_head *dports)
613{ 602{
@@ -679,6 +668,37 @@ exit:
679 rcu_read_unlock(); 668 rcu_read_unlock();
680} 669}
681 670
671/* tipc_nametbl_build_group - build list of communication group members
672 */
673void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
674 u32 type, u32 domain)
675{
676 struct sub_seq *sseq, *stop;
677 struct name_info *info;
678 struct publication *p;
679 struct name_seq *seq;
680
681 rcu_read_lock();
682 seq = nametbl_find_seq(net, type);
683 if (!seq)
684 goto exit;
685
686 spin_lock_bh(&seq->lock);
687 sseq = seq->sseqs;
688 stop = seq->sseqs + seq->first_free;
689 for (; sseq != stop; sseq++) {
690 info = sseq->info;
691 list_for_each_entry(p, &info->zone_list, zone_list) {
692 if (!tipc_in_scope(domain, p->node))
693 continue;
694 tipc_group_add_member(grp, p->node, p->ref);
695 }
696 }
697 spin_unlock_bh(&seq->lock);
698exit:
699 rcu_read_unlock();
700}
701
682/* 702/*
683 * tipc_nametbl_publish - add name publication to network name tables 703 * tipc_nametbl_publish - add name publication to network name tables
684 */ 704 */
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index d121175a92b5..97646b17a4a2 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -40,6 +40,7 @@
40struct tipc_subscription; 40struct tipc_subscription;
41struct tipc_plist; 41struct tipc_plist;
42struct tipc_nlist; 42struct tipc_nlist;
43struct tipc_group;
43 44
44/* 45/*
45 * TIPC name types reserved for internal TIPC use (both current and planned) 46 * TIPC name types reserved for internal TIPC use (both current and planned)
@@ -101,6 +102,8 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
101u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); 102u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
102int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 103int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
103 u32 limit, struct list_head *dports); 104 u32 limit, struct list_head *dports);
105void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
106 u32 type, u32 domain);
104void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, 107void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
105 u32 upper, u32 domain, 108 u32 upper, u32 domain,
106 struct tipc_nlist *nodes); 109 struct tipc_nlist *nodes);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index df2f2197c4ad..acd58d23a70e 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -48,7 +48,8 @@ enum {
48 TIPC_BCAST_SYNCH = (1 << 1), 48 TIPC_BCAST_SYNCH = (1 << 1),
49 TIPC_BCAST_STATE_NACK = (1 << 2), 49 TIPC_BCAST_STATE_NACK = (1 << 2),
50 TIPC_BLOCK_FLOWCTL = (1 << 3), 50 TIPC_BLOCK_FLOWCTL = (1 << 3),
51 TIPC_BCAST_RCAST = (1 << 4) 51 TIPC_BCAST_RCAST = (1 << 4),
52 TIPC_MCAST_GROUPS = (1 << 5)
52}; 53};
53 54
54#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ 55#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index daf7c4df4531..64bbf9d03629 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/socket.c: TIPC socket API 2 * net/tipc/socket.c: TIPC socket API
3 * 3 *
4 * Copyright (c) 2001-2007, 2012-2016, Ericsson AB 4 * Copyright (c) 2001-2007, 2012-2017, Ericsson AB
5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems 5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -45,6 +45,7 @@
45#include "socket.h" 45#include "socket.h"
46#include "bcast.h" 46#include "bcast.h"
47#include "netlink.h" 47#include "netlink.h"
48#include "group.h"
48 49
49#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ 50#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
50#define CONN_PROBING_INTERVAL msecs_to_jiffies(3600000) /* [ms] => 1 h */ 51#define CONN_PROBING_INTERVAL msecs_to_jiffies(3600000) /* [ms] => 1 h */
@@ -78,7 +79,7 @@ enum {
78 * @conn_timeout: the time we can wait for an unresponded setup request 79 * @conn_timeout: the time we can wait for an unresponded setup request
79 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue 80 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
80 * @cong_link_cnt: number of congested links 81 * @cong_link_cnt: number of congested links
81 * @sent_unacked: # messages sent by socket, and not yet acked by peer 82 * @snt_unacked: # messages sent by socket, and not yet acked by peer
82 * @rcv_unacked: # messages read by user, but not yet acked back to peer 83 * @rcv_unacked: # messages read by user, but not yet acked back to peer
83 * @peer: 'connected' peer for dgram/rdm 84 * @peer: 'connected' peer for dgram/rdm
84 * @node: hash table node 85 * @node: hash table node
@@ -109,6 +110,7 @@ struct tipc_sock {
109 struct rhash_head node; 110 struct rhash_head node;
110 struct tipc_mc_method mc_method; 111 struct tipc_mc_method mc_method;
111 struct rcu_head rcu; 112 struct rcu_head rcu;
113 struct tipc_group *group;
112}; 114};
113 115
114static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb); 116static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
@@ -123,6 +125,7 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
123 struct tipc_name_seq const *seq); 125 struct tipc_name_seq const *seq);
124static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, 126static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
125 struct tipc_name_seq const *seq); 127 struct tipc_name_seq const *seq);
128static int tipc_sk_leave(struct tipc_sock *tsk);
126static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); 129static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
127static int tipc_sk_insert(struct tipc_sock *tsk); 130static int tipc_sk_insert(struct tipc_sock *tsk);
128static void tipc_sk_remove(struct tipc_sock *tsk); 131static void tipc_sk_remove(struct tipc_sock *tsk);
@@ -559,6 +562,7 @@ static int tipc_release(struct socket *sock)
559 562
560 __tipc_shutdown(sock, TIPC_ERR_NO_PORT); 563 __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
561 sk->sk_shutdown = SHUTDOWN_MASK; 564 sk->sk_shutdown = SHUTDOWN_MASK;
565 tipc_sk_leave(tsk);
562 tipc_sk_withdraw(tsk, 0, NULL); 566 tipc_sk_withdraw(tsk, 0, NULL);
563 sk_stop_timer(sk, &sk->sk_timer); 567 sk_stop_timer(sk, &sk->sk_timer);
564 tipc_sk_remove(tsk); 568 tipc_sk_remove(tsk);
@@ -601,7 +605,10 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
601 res = tipc_sk_withdraw(tsk, 0, NULL); 605 res = tipc_sk_withdraw(tsk, 0, NULL);
602 goto exit; 606 goto exit;
603 } 607 }
604 608 if (tsk->group) {
609 res = -EACCES;
610 goto exit;
611 }
605 if (uaddr_len < sizeof(struct sockaddr_tipc)) { 612 if (uaddr_len < sizeof(struct sockaddr_tipc)) {
606 res = -EINVAL; 613 res = -EINVAL;
607 goto exit; 614 goto exit;
@@ -698,6 +705,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
698{ 705{
699 struct sock *sk = sock->sk; 706 struct sock *sk = sock->sk;
700 struct tipc_sock *tsk = tipc_sk(sk); 707 struct tipc_sock *tsk = tipc_sk(sk);
708 struct tipc_group *grp = tsk->group;
701 u32 mask = 0; 709 u32 mask = 0;
702 710
703 sock_poll_wait(file, sk_sleep(sk), wait); 711 sock_poll_wait(file, sk_sleep(sk), wait);
@@ -718,8 +726,9 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
718 mask |= (POLLIN | POLLRDNORM); 726 mask |= (POLLIN | POLLRDNORM);
719 break; 727 break;
720 case TIPC_OPEN: 728 case TIPC_OPEN:
721 if (!tsk->cong_link_cnt) 729 if (!grp || tipc_group_size(grp))
722 mask |= POLLOUT; 730 if (!tsk->cong_link_cnt)
731 mask |= POLLOUT;
723 if (tipc_sk_type_connectionless(sk) && 732 if (tipc_sk_type_connectionless(sk) &&
724 (!skb_queue_empty(&sk->sk_receive_queue))) 733 (!skb_queue_empty(&sk->sk_receive_queue)))
725 mask |= (POLLIN | POLLRDNORM); 734 mask |= (POLLIN | POLLRDNORM);
@@ -757,6 +766,9 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
757 struct tipc_nlist dsts; 766 struct tipc_nlist dsts;
758 int rc; 767 int rc;
759 768
769 if (tsk->group)
770 return -EACCES;
771
760 /* Block or return if any destination link is congested */ 772 /* Block or return if any destination link is congested */
761 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); 773 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
762 if (unlikely(rc)) 774 if (unlikely(rc))
@@ -794,6 +806,64 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
794} 806}
795 807
796/** 808/**
809 * tipc_send_group_bcast - send message to all members in communication group
810 * @sk: socket structure
811 * @m: message to send
812 * @dlen: total length of message data
813 * @timeout: timeout to wait for wakeup
814 *
815 * Called from function tipc_sendmsg(), which has done all sanity checks
816 * Returns the number of bytes sent on success, or errno
817 */
818static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
819 int dlen, long timeout)
820{
821 struct sock *sk = sock->sk;
822 struct net *net = sock_net(sk);
823 struct tipc_sock *tsk = tipc_sk(sk);
824 struct tipc_group *grp = tsk->group;
825 struct tipc_nlist *dsts = tipc_group_dests(grp);
826 struct tipc_mc_method *method = &tsk->mc_method;
827 struct tipc_msg *hdr = &tsk->phdr;
828 int mtu = tipc_bcast_get_mtu(net);
829 struct sk_buff_head pkts;
830 int rc = -EHOSTUNREACH;
831
832 if (!dsts->local && !dsts->remote)
833 return -EHOSTUNREACH;
834
835 /* Block or return if any destination link is congested */
836 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
837 if (unlikely(rc))
838 return rc;
839
840 /* Complete message header */
841 msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
842 msg_set_hdr_sz(hdr, MCAST_H_SIZE);
843 msg_set_destport(hdr, 0);
844 msg_set_destnode(hdr, 0);
845 msg_set_nameinst(hdr, 0);
846 msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
847
848 /* Build message as chain of buffers */
849 skb_queue_head_init(&pkts);
850 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
851 if (unlikely(rc != dlen))
852 return rc;
853
854 /* Send message */
855 rc = tipc_mcast_xmit(net, &pkts, method, dsts,
856 &tsk->cong_link_cnt);
857 if (unlikely(rc))
858 return rc;
859
860 /* Update broadcast sequence number */
861 tipc_group_update_bc_members(tsk->group);
862
863 return dlen;
864}
865
866/**
797 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets 867 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
798 * @arrvq: queue with arriving messages, to be cloned after destination lookup 868 * @arrvq: queue with arriving messages, to be cloned after destination lookup
799 * @inputq: queue with cloned messages, delivered to socket after dest lookup 869 * @inputq: queue with cloned messages, delivered to socket after dest lookup
@@ -803,13 +873,15 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
803void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, 873void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
804 struct sk_buff_head *inputq) 874 struct sk_buff_head *inputq)
805{ 875{
806 struct tipc_msg *msg;
807 struct list_head dports;
808 u32 portid;
809 u32 scope = TIPC_CLUSTER_SCOPE; 876 u32 scope = TIPC_CLUSTER_SCOPE;
810 struct sk_buff_head tmpq; 877 u32 self = tipc_own_addr(net);
811 uint hsz;
812 struct sk_buff *skb, *_skb; 878 struct sk_buff *skb, *_skb;
879 u32 lower = 0, upper = ~0;
880 struct sk_buff_head tmpq;
881 u32 portid, oport, onode;
882 struct list_head dports;
883 struct tipc_msg *msg;
884 int hsz;
813 885
814 __skb_queue_head_init(&tmpq); 886 __skb_queue_head_init(&tmpq);
815 INIT_LIST_HEAD(&dports); 887 INIT_LIST_HEAD(&dports);
@@ -818,14 +890,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
818 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { 890 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
819 msg = buf_msg(skb); 891 msg = buf_msg(skb);
820 hsz = skb_headroom(skb) + msg_hdr_sz(msg); 892 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
821 893 oport = msg_origport(msg);
822 if (in_own_node(net, msg_orignode(msg))) 894 onode = msg_orignode(msg);
895 if (onode == self)
823 scope = TIPC_NODE_SCOPE; 896 scope = TIPC_NODE_SCOPE;
824 897
825 /* Create destination port list and message clones: */ 898 /* Create destination port list and message clones: */
826 tipc_nametbl_mc_translate(net, 899 if (!msg_in_group(msg)) {
827 msg_nametype(msg), msg_namelower(msg), 900 lower = msg_namelower(msg);
828 msg_nameupper(msg), scope, &dports); 901 upper = msg_nameupper(msg);
902 }
903 tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper,
904 scope, &dports);
829 while (tipc_dest_pop(&dports, NULL, &portid)) { 905 while (tipc_dest_pop(&dports, NULL, &portid)) {
830 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); 906 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
831 if (_skb) { 907 if (_skb) {
@@ -895,10 +971,6 @@ exit:
895 kfree_skb(skb); 971 kfree_skb(skb);
896} 972}
897 973
898static void tipc_sk_top_evt(struct tipc_sock *tsk, struct tipc_event *evt)
899{
900}
901
902/** 974/**
903 * tipc_sendmsg - send message in connectionless manner 975 * tipc_sendmsg - send message in connectionless manner
904 * @sock: socket structure 976 * @sock: socket structure
@@ -934,6 +1006,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
934 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 1006 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
935 struct list_head *clinks = &tsk->cong_links; 1007 struct list_head *clinks = &tsk->cong_links;
936 bool syn = !tipc_sk_type_connectionless(sk); 1008 bool syn = !tipc_sk_type_connectionless(sk);
1009 struct tipc_group *grp = tsk->group;
937 struct tipc_msg *hdr = &tsk->phdr; 1010 struct tipc_msg *hdr = &tsk->phdr;
938 struct tipc_name_seq *seq; 1011 struct tipc_name_seq *seq;
939 struct sk_buff_head pkts; 1012 struct sk_buff_head pkts;
@@ -944,6 +1017,9 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
944 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) 1017 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
945 return -EMSGSIZE; 1018 return -EMSGSIZE;
946 1019
1020 if (unlikely(grp))
1021 return tipc_send_group_bcast(sock, m, dlen, timeout);
1022
947 if (unlikely(!dest)) { 1023 if (unlikely(!dest)) {
948 dest = &tsk->peer; 1024 dest = &tsk->peer;
949 if (!syn || dest->family != AF_TIPC) 1025 if (!syn || dest->family != AF_TIPC)
@@ -1543,6 +1619,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
1543 struct sk_buff *skb = __skb_dequeue(inputq); 1619 struct sk_buff *skb = __skb_dequeue(inputq);
1544 struct tipc_sock *tsk = tipc_sk(sk); 1620 struct tipc_sock *tsk = tipc_sk(sk);
1545 struct tipc_msg *hdr = buf_msg(skb); 1621 struct tipc_msg *hdr = buf_msg(skb);
1622 struct tipc_group *grp = tsk->group;
1546 1623
1547 switch (msg_user(hdr)) { 1624 switch (msg_user(hdr)) {
1548 case CONN_MANAGER: 1625 case CONN_MANAGER:
@@ -1553,8 +1630,12 @@ static void tipc_sk_proto_rcv(struct sock *sk,
1553 tsk->cong_link_cnt--; 1630 tsk->cong_link_cnt--;
1554 sk->sk_write_space(sk); 1631 sk->sk_write_space(sk);
1555 break; 1632 break;
1633 case GROUP_PROTOCOL:
1634 tipc_group_proto_rcv(grp, hdr, xmitq);
1635 break;
1556 case TOP_SRV: 1636 case TOP_SRV:
1557 tipc_sk_top_evt(tsk, (void *)msg_data(hdr)); 1637 tipc_group_member_evt(tsk->group, skb, xmitq);
1638 skb = NULL;
1558 break; 1639 break;
1559 default: 1640 default:
1560 break; 1641 break;
@@ -1699,6 +1780,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
1699{ 1780{
1700 bool sk_conn = !tipc_sk_type_connectionless(sk); 1781 bool sk_conn = !tipc_sk_type_connectionless(sk);
1701 struct tipc_sock *tsk = tipc_sk(sk); 1782 struct tipc_sock *tsk = tipc_sk(sk);
1783 struct tipc_group *grp = tsk->group;
1702 struct tipc_msg *hdr = buf_msg(skb); 1784 struct tipc_msg *hdr = buf_msg(skb);
1703 struct net *net = sock_net(sk); 1785 struct net *net = sock_net(sk);
1704 struct sk_buff_head inputq; 1786 struct sk_buff_head inputq;
@@ -1710,15 +1792,19 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
1710 1792
1711 if (unlikely(!msg_isdata(hdr))) 1793 if (unlikely(!msg_isdata(hdr)))
1712 tipc_sk_proto_rcv(sk, &inputq, xmitq); 1794 tipc_sk_proto_rcv(sk, &inputq, xmitq);
1713 else if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) 1795 else if (unlikely(msg_type(hdr) > TIPC_GRP_BCAST_MSG))
1714 return kfree_skb(skb); 1796 return kfree_skb(skb);
1715 1797
1798 if (unlikely(grp))
1799 tipc_group_filter_msg(grp, &inputq, xmitq);
1800
1716 /* Validate and add to receive buffer if there is space */ 1801 /* Validate and add to receive buffer if there is space */
1717 while ((skb = __skb_dequeue(&inputq))) { 1802 while ((skb = __skb_dequeue(&inputq))) {
1718 hdr = buf_msg(skb); 1803 hdr = buf_msg(skb);
1719 limit = rcvbuf_limit(sk, skb); 1804 limit = rcvbuf_limit(sk, skb);
1720 if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) || 1805 if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
1721 (!sk_conn && msg_connected(hdr))) 1806 (!sk_conn && msg_connected(hdr)) ||
1807 (!grp && msg_in_group(hdr)))
1722 err = TIPC_ERR_NO_PORT; 1808 err = TIPC_ERR_NO_PORT;
1723 else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) 1809 else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit)
1724 err = TIPC_ERR_OVERLOAD; 1810 err = TIPC_ERR_OVERLOAD;
@@ -1837,7 +1923,6 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1837 sock_put(sk); 1923 sock_put(sk);
1838 continue; 1924 continue;
1839 } 1925 }
1840
1841 /* No destination socket => dequeue skb if still there */ 1926 /* No destination socket => dequeue skb if still there */
1842 skb = tipc_skb_dequeue(inputq, dport); 1927 skb = tipc_skb_dequeue(inputq, dport);
1843 if (!skb) 1928 if (!skb)
@@ -1905,6 +1990,11 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1905 1990
1906 lock_sock(sk); 1991 lock_sock(sk);
1907 1992
1993 if (tsk->group) {
1994 res = -EINVAL;
1995 goto exit;
1996 }
1997
1908 if (dst->family == AF_UNSPEC) { 1998 if (dst->family == AF_UNSPEC) {
1909 memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc)); 1999 memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
1910 if (!tipc_sk_type_connectionless(sk)) 2000 if (!tipc_sk_type_connectionless(sk))
@@ -2341,6 +2431,52 @@ void tipc_sk_rht_destroy(struct net *net)
2341 rhashtable_destroy(&tn->sk_rht); 2431 rhashtable_destroy(&tn->sk_rht);
2342} 2432}
2343 2433
2434static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
2435{
2436 struct net *net = sock_net(&tsk->sk);
2437 u32 domain = addr_domain(net, mreq->scope);
2438 struct tipc_group *grp = tsk->group;
2439 struct tipc_msg *hdr = &tsk->phdr;
2440 struct tipc_name_seq seq;
2441 int rc;
2442
2443 if (mreq->type < TIPC_RESERVED_TYPES)
2444 return -EACCES;
2445 if (grp)
2446 return -EACCES;
2447 grp = tipc_group_create(net, tsk->portid, mreq);
2448 if (!grp)
2449 return -ENOMEM;
2450 tsk->group = grp;
2451 msg_set_lookup_scope(hdr, mreq->scope);
2452 msg_set_nametype(hdr, mreq->type);
2453 msg_set_dest_droppable(hdr, true);
2454 seq.type = mreq->type;
2455 seq.lower = mreq->instance;
2456 seq.upper = seq.lower;
2457 tipc_nametbl_build_group(net, grp, mreq->type, domain);
2458 rc = tipc_sk_publish(tsk, mreq->scope, &seq);
2459 if (rc)
2460 tipc_group_delete(net, grp);
2461 return rc;
2462}
2463
2464static int tipc_sk_leave(struct tipc_sock *tsk)
2465{
2466 struct net *net = sock_net(&tsk->sk);
2467 struct tipc_group *grp = tsk->group;
2468 struct tipc_name_seq seq;
2469 int scope;
2470
2471 if (!grp)
2472 return -EINVAL;
2473 tipc_group_self(grp, &seq, &scope);
2474 tipc_group_delete(net, grp);
2475 tsk->group = NULL;
2476 tipc_sk_withdraw(tsk, scope, &seq);
2477 return 0;
2478}
2479
2344/** 2480/**
2345 * tipc_setsockopt - set socket option 2481 * tipc_setsockopt - set socket option
2346 * @sock: socket structure 2482 * @sock: socket structure
@@ -2359,6 +2495,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2359{ 2495{
2360 struct sock *sk = sock->sk; 2496 struct sock *sk = sock->sk;
2361 struct tipc_sock *tsk = tipc_sk(sk); 2497 struct tipc_sock *tsk = tipc_sk(sk);
2498 struct tipc_group_req mreq;
2362 u32 value = 0; 2499 u32 value = 0;
2363 int res = 0; 2500 int res = 0;
2364 2501
@@ -2374,9 +2511,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2374 case TIPC_CONN_TIMEOUT: 2511 case TIPC_CONN_TIMEOUT:
2375 if (ol < sizeof(value)) 2512 if (ol < sizeof(value))
2376 return -EINVAL; 2513 return -EINVAL;
2377 res = get_user(value, (u32 __user *)ov); 2514 if (get_user(value, (u32 __user *)ov))
2378 if (res) 2515 return -EFAULT;
2379 return res; 2516 break;
2517 case TIPC_GROUP_JOIN:
2518 if (ol < sizeof(mreq))
2519 return -EINVAL;
2520 if (copy_from_user(&mreq, ov, sizeof(mreq)))
2521 return -EFAULT;
2380 break; 2522 break;
2381 default: 2523 default:
2382 if (ov || ol) 2524 if (ov || ol)
@@ -2409,6 +2551,12 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2409 tsk->mc_method.rcast = true; 2551 tsk->mc_method.rcast = true;
2410 tsk->mc_method.mandatory = true; 2552 tsk->mc_method.mandatory = true;
2411 break; 2553 break;
2554 case TIPC_GROUP_JOIN:
2555 res = tipc_sk_join(tsk, &mreq);
2556 break;
2557 case TIPC_GROUP_LEAVE:
2558 res = tipc_sk_leave(tsk);
2559 break;
2412 default: 2560 default:
2413 res = -EINVAL; 2561 res = -EINVAL;
2414 } 2562 }
@@ -2436,7 +2584,8 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2436{ 2584{
2437 struct sock *sk = sock->sk; 2585 struct sock *sk = sock->sk;
2438 struct tipc_sock *tsk = tipc_sk(sk); 2586 struct tipc_sock *tsk = tipc_sk(sk);
2439 int len; 2587 struct tipc_name_seq seq;
2588 int len, scope;
2440 u32 value; 2589 u32 value;
2441 int res; 2590 int res;
2442 2591
@@ -2470,6 +2619,12 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2470 case TIPC_SOCK_RECVQ_DEPTH: 2619 case TIPC_SOCK_RECVQ_DEPTH:
2471 value = skb_queue_len(&sk->sk_receive_queue); 2620 value = skb_queue_len(&sk->sk_receive_queue);
2472 break; 2621 break;
2622 case TIPC_GROUP_JOIN:
2623 seq.type = 0;
2624 if (tsk->group)
2625 tipc_group_self(tsk->group, &seq, &scope);
2626 value = seq.type;
2627 break;
2473 default: 2628 default:
2474 res = -EINVAL; 2629 res = -EINVAL;
2475 } 2630 }