aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c204
1 files changed, 174 insertions, 30 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index aa1babbea385..7d99029df342 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/bcast.c: TIPC broadcast code 2 * net/tipc/bcast.c: TIPC broadcast code
3 * 3 *
4 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB 4 * Copyright (c) 2004-2006, 2014-2016, Ericsson AB
5 * Copyright (c) 2004, Intel Corporation. 5 * Copyright (c) 2004, Intel Corporation.
6 * Copyright (c) 2005, 2010-2011, Wind River Systems 6 * Copyright (c) 2005, 2010-2011, Wind River Systems
7 * All rights reserved. 7 * All rights reserved.
@@ -39,9 +39,8 @@
39#include "socket.h" 39#include "socket.h"
40#include "msg.h" 40#include "msg.h"
41#include "bcast.h" 41#include "bcast.h"
42#include "name_distr.h"
43#include "link.h" 42#include "link.h"
44#include "node.h" 43#include "name_table.h"
45 44
46#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ 45#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */
47#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ 46#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */
@@ -54,12 +53,20 @@ const char tipc_bclink_name[] = "broadcast-link";
54 * @inputq: data input queue; will only carry SOCK_WAKEUP messages 53 * @inputq: data input queue; will only carry SOCK_WAKEUP messages
55 * @dest: array keeping number of reachable destinations per bearer 54 * @dest: array keeping number of reachable destinations per bearer
56 * @primary_bearer: a bearer having links to all broadcast destinations, if any 55 * @primary_bearer: a bearer having links to all broadcast destinations, if any
56 * @bcast_support: indicates if primary bearer, if any, supports broadcast
57 * @rcast_support: indicates if all peer nodes support replicast
58 * @rc_ratio: dest count as percentage of cluster size where send method changes
59 * @bc_threshold: calculated drom rc_ratio; if dests > threshold use broadcast
57 */ 60 */
58struct tipc_bc_base { 61struct tipc_bc_base {
59 struct tipc_link *link; 62 struct tipc_link *link;
60 struct sk_buff_head inputq; 63 struct sk_buff_head inputq;
61 int dests[MAX_BEARERS]; 64 int dests[MAX_BEARERS];
62 int primary_bearer; 65 int primary_bearer;
66 bool bcast_support;
67 bool rcast_support;
68 int rc_ratio;
69 int bc_threshold;
63}; 70};
64 71
65static struct tipc_bc_base *tipc_bc_base(struct net *net) 72static struct tipc_bc_base *tipc_bc_base(struct net *net)
@@ -69,7 +76,20 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
69 76
70int tipc_bcast_get_mtu(struct net *net) 77int tipc_bcast_get_mtu(struct net *net)
71{ 78{
72 return tipc_link_mtu(tipc_bc_sndlink(net)); 79 return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
80}
81
82void tipc_bcast_disable_rcast(struct net *net)
83{
84 tipc_bc_base(net)->rcast_support = false;
85}
86
87static void tipc_bcbase_calc_bc_threshold(struct net *net)
88{
89 struct tipc_bc_base *bb = tipc_bc_base(net);
90 int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net));
91
92 bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100);
73} 93}
74 94
75/* tipc_bcbase_select_primary(): find a bearer with links to all destinations, 95/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
@@ -79,9 +99,10 @@ static void tipc_bcbase_select_primary(struct net *net)
79{ 99{
80 struct tipc_bc_base *bb = tipc_bc_base(net); 100 struct tipc_bc_base *bb = tipc_bc_base(net);
81 int all_dests = tipc_link_bc_peers(bb->link); 101 int all_dests = tipc_link_bc_peers(bb->link);
82 int i, mtu; 102 int i, mtu, prim;
83 103
84 bb->primary_bearer = INVALID_BEARER_ID; 104 bb->primary_bearer = INVALID_BEARER_ID;
105 bb->bcast_support = true;
85 106
86 if (!all_dests) 107 if (!all_dests)
87 return; 108 return;
@@ -93,7 +114,7 @@ static void tipc_bcbase_select_primary(struct net *net)
93 mtu = tipc_bearer_mtu(net, i); 114 mtu = tipc_bearer_mtu(net, i);
94 if (mtu < tipc_link_mtu(bb->link)) 115 if (mtu < tipc_link_mtu(bb->link))
95 tipc_link_set_mtu(bb->link, mtu); 116 tipc_link_set_mtu(bb->link, mtu);
96 117 bb->bcast_support &= tipc_bearer_bcast_support(net, i);
97 if (bb->dests[i] < all_dests) 118 if (bb->dests[i] < all_dests)
98 continue; 119 continue;
99 120
@@ -103,6 +124,9 @@ static void tipc_bcbase_select_primary(struct net *net)
103 if ((i ^ tipc_own_addr(net)) & 1) 124 if ((i ^ tipc_own_addr(net)) & 1)
104 break; 125 break;
105 } 126 }
127 prim = bb->primary_bearer;
128 if (prim != INVALID_BEARER_ID)
129 bb->bcast_support = tipc_bearer_bcast_support(net, prim);
106} 130}
107 131
108void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) 132void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id)
@@ -170,45 +194,131 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
170 __skb_queue_purge(&_xmitq); 194 __skb_queue_purge(&_xmitq);
171} 195}
172 196
173/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster 197static void tipc_bcast_select_xmit_method(struct net *net, int dests,
174 * and to identified node local sockets 198 struct tipc_mc_method *method)
199{
200 struct tipc_bc_base *bb = tipc_bc_base(net);
201 unsigned long exp = method->expires;
202
203 /* Broadcast supported by used bearer/bearers? */
204 if (!bb->bcast_support) {
205 method->rcast = true;
206 return;
207 }
208 /* Any destinations which don't support replicast ? */
209 if (!bb->rcast_support) {
210 method->rcast = false;
211 return;
212 }
213 /* Can current method be changed ? */
214 method->expires = jiffies + TIPC_METHOD_EXPIRE;
215 if (method->mandatory || time_before(jiffies, exp))
216 return;
217
218 /* Determine method to use now */
219 method->rcast = dests <= bb->bc_threshold;
220}
221
222/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
175 * @net: the applicable net namespace 223 * @net: the applicable net namespace
176 * @list: chain of buffers containing message 224 * @pkts: chain of buffers containing message
177 * Consumes the buffer chain, except when returning -ELINKCONG 225 * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
178 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 226 * Consumes the buffer chain.
227 * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
179 */ 228 */
180int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) 229static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
230 u16 *cong_link_cnt)
181{ 231{
182 struct tipc_link *l = tipc_bc_sndlink(net); 232 struct tipc_link *l = tipc_bc_sndlink(net);
183 struct sk_buff_head xmitq, inputq, rcvq; 233 struct sk_buff_head xmitq;
184 int rc = 0; 234 int rc = 0;
185 235
186 __skb_queue_head_init(&rcvq);
187 __skb_queue_head_init(&xmitq); 236 __skb_queue_head_init(&xmitq);
188 skb_queue_head_init(&inputq);
189
190 /* Prepare message clone for local node */
191 if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
192 return -EHOSTUNREACH;
193
194 tipc_bcast_lock(net); 237 tipc_bcast_lock(net);
195 if (tipc_link_bc_peers(l)) 238 if (tipc_link_bc_peers(l))
196 rc = tipc_link_xmit(l, list, &xmitq); 239 rc = tipc_link_xmit(l, pkts, &xmitq);
197 tipc_bcast_unlock(net); 240 tipc_bcast_unlock(net);
198 241 tipc_bcbase_xmit(net, &xmitq);
199 /* Don't send to local node if adding to link failed */ 242 __skb_queue_purge(pkts);
200 if (unlikely(rc)) { 243 if (rc == -ELINKCONG) {
201 __skb_queue_purge(&rcvq); 244 *cong_link_cnt = 1;
202 return rc; 245 rc = 0;
203 } 246 }
247 return rc;
248}
204 249
205 /* Broadcast to all nodes, inluding local node */ 250/* tipc_rcast_xmit - replicate and send a message to given destination nodes
206 tipc_bcbase_xmit(net, &xmitq); 251 * @net: the applicable net namespace
207 tipc_sk_mcast_rcv(net, &rcvq, &inputq); 252 * @pkts: chain of buffers containing message
208 __skb_queue_purge(list); 253 * @dests: list of destination nodes
254 * @cong_link_cnt: returns number of congested links
255 * @cong_links: returns identities of congested links
256 * Returns 0 if success, otherwise errno
257 */
258static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
259 struct tipc_nlist *dests, u16 *cong_link_cnt)
260{
261 struct sk_buff_head _pkts;
262 struct u32_item *n, *tmp;
263 u32 dst, selector;
264
265 selector = msg_link_selector(buf_msg(skb_peek(pkts)));
266 __skb_queue_head_init(&_pkts);
267
268 list_for_each_entry_safe(n, tmp, &dests->list, list) {
269 dst = n->value;
270 if (!tipc_msg_pskb_copy(dst, pkts, &_pkts))
271 return -ENOMEM;
272
273 /* Any other return value than -ELINKCONG is ignored */
274 if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG)
275 (*cong_link_cnt)++;
276 }
209 return 0; 277 return 0;
210} 278}
211 279
280/* tipc_mcast_xmit - deliver message to indicated destination nodes
281 * and to identified node local sockets
282 * @net: the applicable net namespace
283 * @pkts: chain of buffers containing message
284 * @method: send method to be used
285 * @dests: destination nodes for message.
286 * @cong_link_cnt: returns number of encountered congested destination links
287 * Consumes buffer chain.
288 * Returns 0 if success, otherwise errno
289 */
290int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
291 struct tipc_mc_method *method, struct tipc_nlist *dests,
292 u16 *cong_link_cnt)
293{
294 struct sk_buff_head inputq, localq;
295 int rc = 0;
296
297 skb_queue_head_init(&inputq);
298 skb_queue_head_init(&localq);
299
300 /* Clone packets before they are consumed by next call */
301 if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
302 rc = -ENOMEM;
303 goto exit;
304 }
305 /* Send according to determined transmit method */
306 if (dests->remote) {
307 tipc_bcast_select_xmit_method(net, dests->remote, method);
308 if (method->rcast)
309 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
310 else
311 rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
312 }
313
314 if (dests->local)
315 tipc_sk_mcast_rcv(net, &localq, &inputq);
316exit:
317 /* This queue should normally be empty by now */
318 __skb_queue_purge(pkts);
319 return rc;
320}
321
212/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link 322/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
213 * 323 *
214 * RCU is locked, no other locks set 324 * RCU is locked, no other locks set
@@ -313,6 +423,7 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
313 tipc_bcast_lock(net); 423 tipc_bcast_lock(net);
314 tipc_link_add_bc_peer(snd_l, uc_l, xmitq); 424 tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
315 tipc_bcbase_select_primary(net); 425 tipc_bcbase_select_primary(net);
426 tipc_bcbase_calc_bc_threshold(net);
316 tipc_bcast_unlock(net); 427 tipc_bcast_unlock(net);
317} 428}
318 429
@@ -331,6 +442,7 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
331 tipc_bcast_lock(net); 442 tipc_bcast_lock(net);
332 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); 443 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
333 tipc_bcbase_select_primary(net); 444 tipc_bcbase_select_primary(net);
445 tipc_bcbase_calc_bc_threshold(net);
334 tipc_bcast_unlock(net); 446 tipc_bcast_unlock(net);
335 447
336 tipc_bcbase_xmit(net, &xmitq); 448 tipc_bcbase_xmit(net, &xmitq);
@@ -413,6 +525,8 @@ int tipc_bcast_init(struct net *net)
413 goto enomem; 525 goto enomem;
414 bb->link = l; 526 bb->link = l;
415 tn->bcl = l; 527 tn->bcl = l;
528 bb->rc_ratio = 25;
529 bb->rcast_support = true;
416 return 0; 530 return 0;
417enomem: 531enomem:
418 kfree(bb); 532 kfree(bb);
@@ -428,3 +542,33 @@ void tipc_bcast_stop(struct net *net)
428 kfree(tn->bcbase); 542 kfree(tn->bcbase);
429 kfree(tn->bcl); 543 kfree(tn->bcl);
430} 544}
545
546void tipc_nlist_init(struct tipc_nlist *nl, u32 self)
547{
548 memset(nl, 0, sizeof(*nl));
549 INIT_LIST_HEAD(&nl->list);
550 nl->self = self;
551}
552
553void tipc_nlist_add(struct tipc_nlist *nl, u32 node)
554{
555 if (node == nl->self)
556 nl->local = true;
557 else if (u32_push(&nl->list, node))
558 nl->remote++;
559}
560
561void tipc_nlist_del(struct tipc_nlist *nl, u32 node)
562{
563 if (node == nl->self)
564 nl->local = false;
565 else if (u32_del(&nl->list, node))
566 nl->remote--;
567}
568
569void tipc_nlist_purge(struct tipc_nlist *nl)
570{
571 u32_list_purge(&nl->list);
572 nl->remote = 0;
573 nl->local = 0;
574}