summaryrefslogtreecommitdiffstats
path: root/net/tipc/group.c
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2017-10-13 05:04:26 -0400
committerDavid S. Miller <davem@davemloft.net>2017-10-13 11:46:00 -0400
commitb7d42635517fde2b095deddd0fba37be2302a285 (patch)
tree531940dc481a0e08ca7a2bbd0a328452344b82e1 /net/tipc/group.c
parentae236fb208a6fbbd2e7a6913385e8fb78ac807f8 (diff)
tipc: introduce flow control for group broadcast messages
We introduce an end-to-end flow control mechanism for group broadcast messages. This ensures that no messages are ever lost because of destination receive buffer overflow, with minimal impact on performance. For now, the algorithm is based on the assumption that there is only one active transmitter at any moment in time. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/group.c')
-rw-r--r--net/tipc/group.c148
1 files changed, 144 insertions, 4 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 1bfa9348b26d..b8ed70abba01 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -46,6 +46,7 @@
46 46
47#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) 47#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
48#define ADV_IDLE ADV_UNIT 48#define ADV_IDLE ADV_UNIT
49#define ADV_ACTIVE (ADV_UNIT * 12)
49 50
50enum mbr_state { 51enum mbr_state {
51 MBR_QUARANTINED, 52 MBR_QUARANTINED,
@@ -59,16 +60,22 @@ enum mbr_state {
59struct tipc_member { 60struct tipc_member {
60 struct rb_node tree_node; 61 struct rb_node tree_node;
61 struct list_head list; 62 struct list_head list;
63 struct list_head congested;
62 struct sk_buff *event_msg; 64 struct sk_buff *event_msg;
65 struct tipc_group *group;
63 u32 node; 66 u32 node;
64 u32 port; 67 u32 port;
65 u32 instance; 68 u32 instance;
66 enum mbr_state state; 69 enum mbr_state state;
70 u16 advertised;
71 u16 window;
67 u16 bc_rcv_nxt; 72 u16 bc_rcv_nxt;
73 bool usr_pending;
68}; 74};
69 75
70struct tipc_group { 76struct tipc_group {
71 struct rb_root members; 77 struct rb_root members;
78 struct list_head congested;
72 struct tipc_nlist dests; 79 struct tipc_nlist dests;
73 struct net *net; 80 struct net *net;
74 int subid; 81 int subid;
@@ -86,11 +93,24 @@ struct tipc_group {
86static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 93static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
87 int mtyp, struct sk_buff_head *xmitq); 94 int mtyp, struct sk_buff_head *xmitq);
88 95
96static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
97{
98 int mcnt = grp->member_cnt + 1;
99
100 /* Scale to bytes, considering worst-case truesize/msgsize ratio */
101 return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4;
102}
103
89u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) 104u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
90{ 105{
91 return grp->bc_snd_nxt; 106 return grp->bc_snd_nxt;
92} 107}
93 108
109static bool tipc_group_is_enabled(struct tipc_member *m)
110{
111 return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
112}
113
94static bool tipc_group_is_receiver(struct tipc_member *m) 114static bool tipc_group_is_receiver(struct tipc_member *m)
95{ 115{
96 return m && m->state >= MBR_JOINED; 116 return m && m->state >= MBR_JOINED;
@@ -111,6 +131,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
111 if (!grp) 131 if (!grp)
112 return NULL; 132 return NULL;
113 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 133 tipc_nlist_init(&grp->dests, tipc_own_addr(net));
134 INIT_LIST_HEAD(&grp->congested);
114 grp->members = RB_ROOT; 135 grp->members = RB_ROOT;
115 grp->net = net; 136 grp->net = net;
116 grp->portid = portid; 137 grp->portid = portid;
@@ -213,6 +234,8 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
213 if (!m) 234 if (!m)
214 return NULL; 235 return NULL;
215 INIT_LIST_HEAD(&m->list); 236 INIT_LIST_HEAD(&m->list);
237 INIT_LIST_HEAD(&m->congested);
238 m->group = grp;
216 m->node = node; 239 m->node = node;
217 m->port = port; 240 m->port = port;
218 grp->member_cnt++; 241 grp->member_cnt++;
@@ -233,6 +256,7 @@ static void tipc_group_delete_member(struct tipc_group *grp,
233 rb_erase(&m->tree_node, &grp->members); 256 rb_erase(&m->tree_node, &grp->members);
234 grp->member_cnt--; 257 grp->member_cnt--;
235 list_del_init(&m->list); 258 list_del_init(&m->list);
259 list_del_init(&m->congested);
236 260
237 /* If last member on a node, remove node from dest list */ 261 /* If last member on a node, remove node from dest list */
238 if (!tipc_group_find_node(grp, m->node)) 262 if (!tipc_group_find_node(grp, m->node))
@@ -255,11 +279,59 @@ void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
255 *scope = grp->scope; 279 *scope = grp->scope;
256} 280}
257 281
258void tipc_group_update_bc_members(struct tipc_group *grp) 282void tipc_group_update_member(struct tipc_member *m, int len)
283{
284 struct tipc_group *grp = m->group;
285 struct tipc_member *_m, *tmp;
286
287 if (!tipc_group_is_enabled(m))
288 return;
289
290 m->window -= len;
291
292 if (m->window >= ADV_IDLE)
293 return;
294
295 if (!list_empty(&m->congested))
296 return;
297
298 /* Sort member into congested members' list */
299 list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
300 if (m->window > _m->window)
301 continue;
302 list_add_tail(&m->congested, &_m->congested);
303 return;
304 }
305 list_add_tail(&m->congested, &grp->congested);
306}
307
308void tipc_group_update_bc_members(struct tipc_group *grp, int len)
259{ 309{
310 struct tipc_member *m;
311 struct rb_node *n;
312
313 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
314 m = container_of(n, struct tipc_member, tree_node);
315 if (tipc_group_is_enabled(m))
316 tipc_group_update_member(m, len);
317 }
260 grp->bc_snd_nxt++; 318 grp->bc_snd_nxt++;
261} 319}
262 320
321bool tipc_group_bc_cong(struct tipc_group *grp, int len)
322{
323 struct tipc_member *m;
324
325 if (list_empty(&grp->congested))
326 return false;
327
328 m = list_first_entry(&grp->congested, struct tipc_member, congested);
329 if (m->window >= len)
330 return false;
331
332 return true;
333}
334
263/* tipc_group_filter_msg() - determine if we should accept arriving message 335/* tipc_group_filter_msg() - determine if we should accept arriving message
264 */ 336 */
265void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, 337void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
@@ -302,11 +374,36 @@ drop:
302 kfree_skb(skb); 374 kfree_skb(skb);
303} 375}
304 376
377void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
378 u32 port, struct sk_buff_head *xmitq)
379{
380 struct tipc_member *m;
381
382 m = tipc_group_find_member(grp, node, port);
383 if (!m)
384 return;
385
386 m->advertised -= blks;
387
388 switch (m->state) {
389 case MBR_JOINED:
390 if (m->advertised <= (ADV_ACTIVE - ADV_UNIT))
391 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
392 break;
393 case MBR_DISCOVERED:
394 case MBR_JOINING:
395 case MBR_LEAVING:
396 default:
397 break;
398 }
399}
400
305static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 401static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
306 int mtyp, struct sk_buff_head *xmitq) 402 int mtyp, struct sk_buff_head *xmitq)
307{ 403{
308 struct tipc_msg *hdr; 404 struct tipc_msg *hdr;
309 struct sk_buff *skb; 405 struct sk_buff *skb;
406 int adv = 0;
310 407
311 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, 408 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
312 m->node, tipc_own_addr(grp->net), 409 m->node, tipc_own_addr(grp->net),
@@ -314,14 +411,24 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
314 if (!skb) 411 if (!skb)
315 return; 412 return;
316 413
414 if (m->state == MBR_JOINED)
415 adv = ADV_ACTIVE - m->advertised;
416
317 hdr = buf_msg(skb); 417 hdr = buf_msg(skb);
318 if (mtyp == GRP_JOIN_MSG) 418
419 if (mtyp == GRP_JOIN_MSG) {
319 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 420 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
421 msg_set_adv_win(hdr, adv);
422 m->advertised += adv;
423 } else if (mtyp == GRP_ADV_MSG) {
424 msg_set_adv_win(hdr, adv);
425 m->advertised += adv;
426 }
320 __skb_queue_tail(xmitq, skb); 427 __skb_queue_tail(xmitq, skb);
321} 428}
322 429
323void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, 430void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
324 struct sk_buff_head *inputq, 431 struct tipc_msg *hdr, struct sk_buff_head *inputq,
325 struct sk_buff_head *xmitq) 432 struct sk_buff_head *xmitq)
326{ 433{
327 u32 node = msg_orignode(hdr); 434 u32 node = msg_orignode(hdr);
@@ -341,14 +448,22 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
341 if (!m) 448 if (!m)
342 return; 449 return;
343 m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); 450 m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
451 m->window += msg_adv_win(hdr);
344 452
345 /* Wait until PUBLISH event is received */ 453 /* Wait until PUBLISH event is received */
346 if (m->state == MBR_DISCOVERED) { 454 if (m->state == MBR_DISCOVERED) {
347 m->state = MBR_JOINING; 455 m->state = MBR_JOINING;
348 } else if (m->state == MBR_PUBLISHED) { 456 } else if (m->state == MBR_PUBLISHED) {
349 m->state = MBR_JOINED; 457 m->state = MBR_JOINED;
458 *usr_wakeup = true;
459 m->usr_pending = false;
460 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
350 __skb_queue_tail(inputq, m->event_msg); 461 __skb_queue_tail(inputq, m->event_msg);
351 } 462 }
463 if (m->window < ADV_IDLE)
464 tipc_group_update_member(m, 0);
465 else
466 list_del_init(&m->congested);
352 return; 467 return;
353 case GRP_LEAVE_MSG: 468 case GRP_LEAVE_MSG:
354 if (!m) 469 if (!m)
@@ -361,14 +476,28 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
361 } 476 }
362 /* Otherwise deliver already received WITHDRAW event */ 477 /* Otherwise deliver already received WITHDRAW event */
363 __skb_queue_tail(inputq, m->event_msg); 478 __skb_queue_tail(inputq, m->event_msg);
479 *usr_wakeup = m->usr_pending;
364 tipc_group_delete_member(grp, m); 480 tipc_group_delete_member(grp, m);
481 list_del_init(&m->congested);
482 return;
483 case GRP_ADV_MSG:
484 if (!m)
485 return;
486 m->window += msg_adv_win(hdr);
487 *usr_wakeup = m->usr_pending;
488 m->usr_pending = false;
489 list_del_init(&m->congested);
365 return; 490 return;
366 default: 491 default:
367 pr_warn("Received unknown GROUP_PROTO message\n"); 492 pr_warn("Received unknown GROUP_PROTO message\n");
368 } 493 }
369} 494}
370 495
496/* tipc_group_member_evt() - receive and handle a member up/down event
497 */
371void tipc_group_member_evt(struct tipc_group *grp, 498void tipc_group_member_evt(struct tipc_group *grp,
499 bool *usr_wakeup,
500 int *sk_rcvbuf,
372 struct sk_buff *skb, 501 struct sk_buff *skb,
373 struct sk_buff_head *inputq, 502 struct sk_buff_head *inputq,
374 struct sk_buff_head *xmitq) 503 struct sk_buff_head *xmitq)
@@ -416,16 +545,25 @@ void tipc_group_member_evt(struct tipc_group *grp,
416 } else { 545 } else {
417 __skb_queue_tail(inputq, skb); 546 __skb_queue_tail(inputq, skb);
418 m->state = MBR_JOINED; 547 m->state = MBR_JOINED;
548 *usr_wakeup = true;
549 m->usr_pending = false;
419 } 550 }
420 m->instance = instance; 551 m->instance = instance;
421 TIPC_SKB_CB(skb)->orig_member = m->instance; 552 TIPC_SKB_CB(skb)->orig_member = m->instance;
422 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 553 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
554 if (m->window < ADV_IDLE)
555 tipc_group_update_member(m, 0);
556 else
557 list_del_init(&m->congested);
423 } else if (event == TIPC_WITHDRAWN) { 558 } else if (event == TIPC_WITHDRAWN) {
424 if (!m) 559 if (!m)
425 goto drop; 560 goto drop;
426 561
427 TIPC_SKB_CB(skb)->orig_member = m->instance; 562 TIPC_SKB_CB(skb)->orig_member = m->instance;
428 563
564 *usr_wakeup = m->usr_pending;
565 m->usr_pending = false;
566
429 /* Hold back event if more messages might be expected */ 567 /* Hold back event if more messages might be expected */
430 if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) { 568 if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
431 m->event_msg = skb; 569 m->event_msg = skb;
@@ -434,7 +572,9 @@ void tipc_group_member_evt(struct tipc_group *grp,
434 __skb_queue_tail(inputq, skb); 572 __skb_queue_tail(inputq, skb);
435 tipc_group_delete_member(grp, m); 573 tipc_group_delete_member(grp, m);
436 } 574 }
575 list_del_init(&m->congested);
437 } 576 }
577 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
438 return; 578 return;
439drop: 579drop:
440 kfree_skb(skb); 580 kfree_skb(skb);