summaryrefslogtreecommitdiffstats
path: root/net/tipc/group.c
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2017-10-13 05:04:31 -0400
committerDavid S. Miller <davem@davemloft.net>2017-10-13 11:46:01 -0400
commit2f487712b89376fce267223bbb0db93d393d4b09 (patch)
tree2f40d5c08f966de843218ff96d797fdc8ca77985 /net/tipc/group.c
parentb87a5ea31c935a7f7e11ca85df2ec7917921e96d (diff)
tipc: guarantee that group broadcast doesn't bypass group unicast
We need a mechanism guaranteeing that group unicasts sent out from a socket are not bypassed by later sent broadcasts from the same socket. We do this as follows: - Each time a unicast is sent, we set a the broadcast method for the socket to "replicast" and "mandatory". This forces the first subsequent broadcast message to follow the same network and data path as the preceding unicast to a destination, hence preventing it from overtaking the latter. - In order to make the 'same data path' statement above true, we let group unicasts pass through the multicast link input queue, instead of as previously through the unicast link input queue. - In the first broadcast following a unicast, we set a new header flag, requiring all recipients to immediately acknowledge its reception. - During the period before all the expected acknowledges are received, the socket refuses to accept any more broadcast attempts, i.e., by blocking or returning EAGAIN. This period should typically not be longer than a few microseconds. - When all acknowledges have been received, the sending socket will open up for subsequent broadcasts, this time giving the link layer freedom to itself select the best transmission method. - The forced and/or abrupt transmission method changes described above may lead to broadcasts arriving out of order to the recipients. We remedy this by introducing code that checks and if necessary re-orders such messages at the receiving end. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/group.c')
-rw-r--r--net/tipc/group.c47
1 files changed, 41 insertions, 6 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 985e0ce32e8e..eab862e047dc 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -71,6 +71,7 @@ struct tipc_member {
71 u16 advertised; 71 u16 advertised;
72 u16 window; 72 u16 window;
73 u16 bc_rcv_nxt; 73 u16 bc_rcv_nxt;
74 u16 bc_acked;
74 bool usr_pending; 75 bool usr_pending;
75}; 76};
76 77
@@ -87,6 +88,7 @@ struct tipc_group {
87 u32 portid; 88 u32 portid;
88 u16 member_cnt; 89 u16 member_cnt;
89 u16 bc_snd_nxt; 90 u16 bc_snd_nxt;
91 u16 bc_ackers;
90 bool loopback; 92 bool loopback;
91 bool events; 93 bool events;
92}; 94};
@@ -258,6 +260,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
258 m->group = grp; 260 m->group = grp;
259 m->node = node; 261 m->node = node;
260 m->port = port; 262 m->port = port;
263 m->bc_acked = grp->bc_snd_nxt - 1;
261 grp->member_cnt++; 264 grp->member_cnt++;
262 tipc_group_add_to_tree(grp, m); 265 tipc_group_add_to_tree(grp, m);
263 tipc_nlist_add(&grp->dests, m->node); 266 tipc_nlist_add(&grp->dests, m->node);
@@ -275,6 +278,11 @@ static void tipc_group_delete_member(struct tipc_group *grp,
275{ 278{
276 rb_erase(&m->tree_node, &grp->members); 279 rb_erase(&m->tree_node, &grp->members);
277 grp->member_cnt--; 280 grp->member_cnt--;
281
282 /* Check if we were waiting for replicast ack from this member */
283 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1))
284 grp->bc_ackers--;
285
278 list_del_init(&m->list); 286 list_del_init(&m->list);
279 list_del_init(&m->congested); 287 list_del_init(&m->congested);
280 288
@@ -325,16 +333,23 @@ void tipc_group_update_member(struct tipc_member *m, int len)
325 list_add_tail(&m->congested, &grp->congested); 333 list_add_tail(&m->congested, &grp->congested);
326} 334}
327 335
328void tipc_group_update_bc_members(struct tipc_group *grp, int len) 336void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
329{ 337{
338 u16 prev = grp->bc_snd_nxt - 1;
330 struct tipc_member *m; 339 struct tipc_member *m;
331 struct rb_node *n; 340 struct rb_node *n;
332 341
333 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 342 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
334 m = container_of(n, struct tipc_member, tree_node); 343 m = container_of(n, struct tipc_member, tree_node);
335 if (tipc_group_is_enabled(m)) 344 if (tipc_group_is_enabled(m)) {
336 tipc_group_update_member(m, len); 345 tipc_group_update_member(m, len);
346 m->bc_acked = prev;
347 }
337 } 348 }
349
350 /* Mark number of acknowledges to expect, if any */
351 if (ack)
352 grp->bc_ackers = grp->member_cnt;
338 grp->bc_snd_nxt++; 353 grp->bc_snd_nxt++;
339} 354}
340 355
@@ -372,6 +387,10 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
372{ 387{
373 struct tipc_member *m = NULL; 388 struct tipc_member *m = NULL;
374 389
390 /* If prev bcast was replicast, reject until all receivers have acked */
391 if (grp->bc_ackers)
392 return true;
393
375 if (list_empty(&grp->congested)) 394 if (list_empty(&grp->congested))
376 return false; 395 return false;
377 396
@@ -391,7 +410,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
391 struct sk_buff *_skb, *tmp; 410 struct sk_buff *_skb, *tmp;
392 int mtyp = msg_type(hdr); 411 int mtyp = msg_type(hdr);
393 412
394 /* Bcast may be bypassed by unicast, - sort it in */ 413 /* Bcast may be bypassed by unicast or other bcast, - sort it in */
395 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 414 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
396 skb_queue_walk_safe(defq, _skb, tmp) { 415 skb_queue_walk_safe(defq, _skb, tmp) {
397 _hdr = buf_msg(_skb); 416 _hdr = buf_msg(_skb);
@@ -412,10 +431,10 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
412 struct sk_buff_head *xmitq) 431 struct sk_buff_head *xmitq)
413{ 432{
414 struct sk_buff *skb = __skb_dequeue(inputq); 433 struct sk_buff *skb = __skb_dequeue(inputq);
434 bool ack, deliver, update;
415 struct sk_buff_head *defq; 435 struct sk_buff_head *defq;
416 struct tipc_member *m; 436 struct tipc_member *m;
417 struct tipc_msg *hdr; 437 struct tipc_msg *hdr;
418 bool deliver, update;
419 u32 node, port; 438 u32 node, port;
420 int mtyp, blks; 439 int mtyp, blks;
421 440
@@ -451,6 +470,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
451 hdr = buf_msg(skb); 470 hdr = buf_msg(skb);
452 mtyp = msg_type(hdr); 471 mtyp = msg_type(hdr);
453 deliver = true; 472 deliver = true;
473 ack = false;
454 update = false; 474 update = false;
455 475
456 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 476 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
@@ -466,6 +486,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
466 /* Fall thru */ 486 /* Fall thru */
467 case TIPC_GRP_BCAST_MSG: 487 case TIPC_GRP_BCAST_MSG:
468 m->bc_rcv_nxt++; 488 m->bc_rcv_nxt++;
489 ack = msg_grp_bc_ack_req(hdr);
469 break; 490 break;
470 case TIPC_GRP_UCAST_MSG: 491 case TIPC_GRP_UCAST_MSG:
471 break; 492 break;
@@ -480,6 +501,9 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
480 else 501 else
481 kfree_skb(skb); 502 kfree_skb(skb);
482 503
504 if (ack)
505 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
506
483 if (!update) 507 if (!update)
484 continue; 508 continue;
485 509
@@ -540,6 +564,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
540 } else if (mtyp == GRP_ADV_MSG) { 564 } else if (mtyp == GRP_ADV_MSG) {
541 msg_set_adv_win(hdr, adv); 565 msg_set_adv_win(hdr, adv);
542 m->advertised += adv; 566 m->advertised += adv;
567 } else if (mtyp == GRP_ACK_MSG) {
568 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
543 } 569 }
544 __skb_queue_tail(xmitq, skb); 570 __skb_queue_tail(xmitq, skb);
545} 571}
@@ -593,7 +619,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
593 } 619 }
594 /* Otherwise deliver already received WITHDRAW event */ 620 /* Otherwise deliver already received WITHDRAW event */
595 __skb_queue_tail(inputq, m->event_msg); 621 __skb_queue_tail(inputq, m->event_msg);
596 *usr_wakeup = m->usr_pending; 622 *usr_wakeup = true;
597 tipc_group_delete_member(grp, m); 623 tipc_group_delete_member(grp, m);
598 list_del_init(&m->congested); 624 list_del_init(&m->congested);
599 return; 625 return;
@@ -605,6 +631,15 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
605 m->usr_pending = false; 631 m->usr_pending = false;
606 list_del_init(&m->congested); 632 list_del_init(&m->congested);
607 return; 633 return;
634 case GRP_ACK_MSG:
635 if (!m)
636 return;
637 m->bc_acked = msg_grp_bc_acked(hdr);
638 if (--grp->bc_ackers)
639 break;
640 *usr_wakeup = true;
641 m->usr_pending = false;
642 return;
608 default: 643 default:
609 pr_warn("Received unknown GROUP_PROTO message\n"); 644 pr_warn("Received unknown GROUP_PROTO message\n");
610 } 645 }
@@ -678,7 +713,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
678 713
679 TIPC_SKB_CB(skb)->orig_member = m->instance; 714 TIPC_SKB_CB(skb)->orig_member = m->instance;
680 715
681 *usr_wakeup = m->usr_pending; 716 *usr_wakeup = true;
682 m->usr_pending = false; 717 m->usr_pending = false;
683 718
684 /* Hold back event if more messages might be expected */ 719 /* Hold back event if more messages might be expected */