summaryrefslogtreecommitdiffstats
path: root/net/tipc/group.c
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2017-10-13 05:04:23 -0400
committerDavid S. Miller <davem@davemloft.net>2017-10-13 11:46:00 -0400
commit75da2163dbb6af9f2dce1d80056d11d290dd19a5 (patch)
tree3c38c9e2a9085c1422826e861e5252fdb42b7d40 /net/tipc/group.c
parenta80ae5306a7346d4e52f59462878beb8362f4bbd (diff)
tipc: introduce communication groups
As a preparation for introducing flow control for multicast and datagram messaging we need a more strictly defined framework than we have now. A socket must be able keep track of exactly how many and which other sockets it is allowed to communicate with at any moment, and keep the necessary state for those. We therefore introduce a new concept we have named Communication Group. Sockets can join a group via a new setsockopt() call TIPC_GROUP_JOIN. The call takes four parameters: 'type' serves as group identifier, 'instance' serves as an logical member identifier, and 'scope' indicates the visibility of the group (node/cluster/zone). Finally, 'flags' makes it possible to set certain properties for the member. For now, there is only one flag, indicating if the creator of the socket wants to receive a copy of broadcast or multicast messages it is sending via the socket, and if wants to be eligible as destination for its own anycasts. A group is closed, i.e., sockets which have not joined a group will not be able to send messages to or receive messages from members of the group, and vice versa. Any member of a group can send multicast ('group broadcast') messages to all group members, optionally including itself, using the primitive send(). The messages are received via the recvmsg() primitive. A socket can only be member of one group at a time. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/group.c')
-rw-r--r--net/tipc/group.c404
1 files changed, 404 insertions, 0 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
new file mode 100644
index 000000000000..3f0e1ce1e3b9
--- /dev/null
+++ b/net/tipc/group.c
@@ -0,0 +1,404 @@
1/*
2 * net/tipc/group.c: TIPC group messaging code
3 *
4 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include "core.h"
37#include "addr.h"
38#include "group.h"
39#include "bcast.h"
40#include "server.h"
41#include "msg.h"
42#include "socket.h"
43#include "node.h"
44#include "name_table.h"
45#include "subscr.h"
46
47#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
48#define ADV_IDLE ADV_UNIT
49
50enum mbr_state {
51 MBR_QUARANTINED,
52 MBR_DISCOVERED,
53 MBR_JOINING,
54 MBR_PUBLISHED,
55 MBR_JOINED,
56 MBR_LEAVING
57};
58
59struct tipc_member {
60 struct rb_node tree_node;
61 struct list_head list;
62 u32 node;
63 u32 port;
64 enum mbr_state state;
65 u16 bc_rcv_nxt;
66};
67
68struct tipc_group {
69 struct rb_root members;
70 struct tipc_nlist dests;
71 struct net *net;
72 int subid;
73 u32 type;
74 u32 instance;
75 u32 domain;
76 u32 scope;
77 u32 portid;
78 u16 member_cnt;
79 u16 bc_snd_nxt;
80 bool loopback;
81};
82
83static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
84 int mtyp, struct sk_buff_head *xmitq);
85
86u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
87{
88 return grp->bc_snd_nxt;
89}
90
91static bool tipc_group_is_receiver(struct tipc_member *m)
92{
93 return m && m->state >= MBR_JOINED;
94}
95
96int tipc_group_size(struct tipc_group *grp)
97{
98 return grp->member_cnt;
99}
100
101struct tipc_group *tipc_group_create(struct net *net, u32 portid,
102 struct tipc_group_req *mreq)
103{
104 struct tipc_group *grp;
105 u32 type = mreq->type;
106
107 grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
108 if (!grp)
109 return NULL;
110 tipc_nlist_init(&grp->dests, tipc_own_addr(net));
111 grp->members = RB_ROOT;
112 grp->net = net;
113 grp->portid = portid;
114 grp->domain = addr_domain(net, mreq->scope);
115 grp->type = type;
116 grp->instance = mreq->instance;
117 grp->scope = mreq->scope;
118 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
119 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
120 return grp;
121 kfree(grp);
122 return NULL;
123}
124
125void tipc_group_delete(struct net *net, struct tipc_group *grp)
126{
127 struct rb_root *tree = &grp->members;
128 struct tipc_member *m, *tmp;
129 struct sk_buff_head xmitq;
130
131 __skb_queue_head_init(&xmitq);
132
133 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
134 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
135 list_del(&m->list);
136 kfree(m);
137 }
138 tipc_node_distr_xmit(net, &xmitq);
139 tipc_nlist_purge(&grp->dests);
140 tipc_topsrv_kern_unsubscr(net, grp->subid);
141 kfree(grp);
142}
143
144struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
145 u32 node, u32 port)
146{
147 struct rb_node *n = grp->members.rb_node;
148 u64 nkey, key = (u64)node << 32 | port;
149 struct tipc_member *m;
150
151 while (n) {
152 m = container_of(n, struct tipc_member, tree_node);
153 nkey = (u64)m->node << 32 | m->port;
154 if (key < nkey)
155 n = n->rb_left;
156 else if (key > nkey)
157 n = n->rb_right;
158 else
159 return m;
160 }
161 return NULL;
162}
163
164static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
165 u32 node)
166{
167 struct tipc_member *m;
168 struct rb_node *n;
169
170 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
171 m = container_of(n, struct tipc_member, tree_node);
172 if (m->node == node)
173 return m;
174 }
175 return NULL;
176}
177
178static void tipc_group_add_to_tree(struct tipc_group *grp,
179 struct tipc_member *m)
180{
181 u64 nkey, key = (u64)m->node << 32 | m->port;
182 struct rb_node **n, *parent = NULL;
183 struct tipc_member *tmp;
184
185 n = &grp->members.rb_node;
186 while (*n) {
187 tmp = container_of(*n, struct tipc_member, tree_node);
188 parent = *n;
189 tmp = container_of(parent, struct tipc_member, tree_node);
190 nkey = (u64)tmp->node << 32 | tmp->port;
191 if (key < nkey)
192 n = &(*n)->rb_left;
193 else if (key > nkey)
194 n = &(*n)->rb_right;
195 else
196 return;
197 }
198 rb_link_node(&m->tree_node, parent, n);
199 rb_insert_color(&m->tree_node, &grp->members);
200}
201
202static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
203 u32 node, u32 port,
204 int state)
205{
206 struct tipc_member *m;
207
208 m = kzalloc(sizeof(*m), GFP_ATOMIC);
209 if (!m)
210 return NULL;
211 INIT_LIST_HEAD(&m->list);
212 m->node = node;
213 m->port = port;
214 grp->member_cnt++;
215 tipc_group_add_to_tree(grp, m);
216 tipc_nlist_add(&grp->dests, m->node);
217 m->state = state;
218 return m;
219}
220
221void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
222{
223 tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
224}
225
226static void tipc_group_delete_member(struct tipc_group *grp,
227 struct tipc_member *m)
228{
229 rb_erase(&m->tree_node, &grp->members);
230 grp->member_cnt--;
231 list_del_init(&m->list);
232
233 /* If last member on a node, remove node from dest list */
234 if (!tipc_group_find_node(grp, m->node))
235 tipc_nlist_del(&grp->dests, m->node);
236
237 kfree(m);
238}
239
240struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
241{
242 return &grp->dests;
243}
244
245void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
246 int *scope)
247{
248 seq->type = grp->type;
249 seq->lower = grp->instance;
250 seq->upper = grp->instance;
251 *scope = grp->scope;
252}
253
254void tipc_group_update_bc_members(struct tipc_group *grp)
255{
256 grp->bc_snd_nxt++;
257}
258
259/* tipc_group_filter_msg() - determine if we should accept arriving message
260 */
261void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
262 struct sk_buff_head *xmitq)
263{
264 struct sk_buff *skb = __skb_dequeue(inputq);
265 struct tipc_member *m;
266 struct tipc_msg *hdr;
267 u32 node, port;
268 int mtyp;
269
270 if (!skb)
271 return;
272
273 hdr = buf_msg(skb);
274 mtyp = msg_type(hdr);
275 node = msg_orignode(hdr);
276 port = msg_origport(hdr);
277
278 if (!msg_in_group(hdr))
279 goto drop;
280
281 m = tipc_group_find_member(grp, node, port);
282 if (!tipc_group_is_receiver(m))
283 goto drop;
284
285 __skb_queue_tail(inputq, skb);
286
287 m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
288 return;
289drop:
290 kfree_skb(skb);
291}
292
293static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
294 int mtyp, struct sk_buff_head *xmitq)
295{
296 struct tipc_msg *hdr;
297 struct sk_buff *skb;
298
299 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
300 m->node, tipc_own_addr(grp->net),
301 m->port, grp->portid, 0);
302 if (!skb)
303 return;
304
305 hdr = buf_msg(skb);
306 if (mtyp == GRP_JOIN_MSG)
307 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
308 __skb_queue_tail(xmitq, skb);
309}
310
311void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
312 struct sk_buff_head *xmitq)
313{
314 u32 node = msg_orignode(hdr);
315 u32 port = msg_origport(hdr);
316 struct tipc_member *m;
317
318 if (!grp)
319 return;
320
321 m = tipc_group_find_member(grp, node, port);
322
323 switch (msg_type(hdr)) {
324 case GRP_JOIN_MSG:
325 if (!m)
326 m = tipc_group_create_member(grp, node, port,
327 MBR_QUARANTINED);
328 if (!m)
329 return;
330 m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
331
332 /* Wait until PUBLISH event is received */
333 if (m->state == MBR_DISCOVERED)
334 m->state = MBR_JOINING;
335 else if (m->state == MBR_PUBLISHED)
336 m->state = MBR_JOINED;
337 return;
338 case GRP_LEAVE_MSG:
339 if (!m)
340 return;
341
342 /* Wait until WITHDRAW event is received */
343 if (m->state != MBR_LEAVING) {
344 m->state = MBR_LEAVING;
345 return;
346 }
347 /* Otherwise deliver already received WITHDRAW event */
348 tipc_group_delete_member(grp, m);
349 return;
350 default:
351 pr_warn("Received unknown GROUP_PROTO message\n");
352 }
353}
354
355/* tipc_group_member_evt() - receive and handle a member up/down event
356 */
357void tipc_group_member_evt(struct tipc_group *grp,
358 struct sk_buff *skb,
359 struct sk_buff_head *xmitq)
360{
361 struct tipc_msg *hdr = buf_msg(skb);
362 struct tipc_event *evt = (void *)msg_data(hdr);
363 u32 node = evt->port.node;
364 u32 port = evt->port.ref;
365 struct tipc_member *m;
366 struct net *net;
367 u32 self;
368
369 if (!grp)
370 goto drop;
371
372 net = grp->net;
373 self = tipc_own_addr(net);
374 if (!grp->loopback && node == self && port == grp->portid)
375 goto drop;
376
377 m = tipc_group_find_member(grp, node, port);
378
379 if (evt->event == TIPC_PUBLISHED) {
380 if (!m)
381 m = tipc_group_create_member(grp, node, port,
382 MBR_DISCOVERED);
383 if (!m)
384 goto drop;
385
386 /* Wait if JOIN message not yet received */
387 if (m->state == MBR_DISCOVERED)
388 m->state = MBR_PUBLISHED;
389 else
390 m->state = MBR_JOINED;
391 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
392 } else if (evt->event == TIPC_WITHDRAWN) {
393 if (!m)
394 goto drop;
395
396 /* Keep back event if more messages might be expected */
397 if (m->state != MBR_LEAVING && tipc_node_is_up(net, node))
398 m->state = MBR_LEAVING;
399 else
400 tipc_group_delete_member(grp, m);
401 }
402drop:
403 kfree_skb(skb);
404}