summaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2017-10-13 05:04:31 -0400
committerDavid S. Miller <davem@davemloft.net>2017-10-13 11:46:01 -0400
commit2f487712b89376fce267223bbb0db93d393d4b09 (patch)
tree2f40d5c08f966de843218ff96d797fdc8ca77985 /net/tipc
parentb87a5ea31c935a7f7e11ca85df2ec7917921e96d (diff)
tipc: guarantee that group broadcast doesn't bypass group unicast
We need a mechanism guaranteeing that group unicasts sent out from a socket are not bypassed by later sent broadcasts from the same socket. We do this as follows: - Each time a unicast is sent, we set a the broadcast method for the socket to "replicast" and "mandatory". This forces the first subsequent broadcast message to follow the same network and data path as the preceding unicast to a destination, hence preventing it from overtaking the latter. - In order to make the 'same data path' statement above true, we let group unicasts pass through the multicast link input queue, instead of as previously through the unicast link input queue. - In the first broadcast following a unicast, we set a new header flag, requiring all recipients to immediately acknowledge its reception. - During the period before all the expected acknowledges are received, the socket refuses to accept any more broadcast attempts, i.e., by blocking or returning EAGAIN. This period should typically not be longer than a few microseconds. - When all acknowledges have been received, the sending socket will open up for subsequent broadcasts, this time giving the link layer freedom to itself select the best transmission method. - The forced and/or abrupt transmission method changes described above may lead to broadcasts arriving out of order to the recipients. We remedy this by introducing code that checks and if necessary re-orders such messages at the receiving end. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/group.c47
-rw-r--r--net/tipc/group.h4
-rw-r--r--net/tipc/link.c5
-rw-r--r--net/tipc/msg.h21
-rw-r--r--net/tipc/socket.c34
5 files changed, 94 insertions, 17 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 985e0ce32e8e..eab862e047dc 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -71,6 +71,7 @@ struct tipc_member {
71 u16 advertised; 71 u16 advertised;
72 u16 window; 72 u16 window;
73 u16 bc_rcv_nxt; 73 u16 bc_rcv_nxt;
74 u16 bc_acked;
74 bool usr_pending; 75 bool usr_pending;
75}; 76};
76 77
@@ -87,6 +88,7 @@ struct tipc_group {
87 u32 portid; 88 u32 portid;
88 u16 member_cnt; 89 u16 member_cnt;
89 u16 bc_snd_nxt; 90 u16 bc_snd_nxt;
91 u16 bc_ackers;
90 bool loopback; 92 bool loopback;
91 bool events; 93 bool events;
92}; 94};
@@ -258,6 +260,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
258 m->group = grp; 260 m->group = grp;
259 m->node = node; 261 m->node = node;
260 m->port = port; 262 m->port = port;
263 m->bc_acked = grp->bc_snd_nxt - 1;
261 grp->member_cnt++; 264 grp->member_cnt++;
262 tipc_group_add_to_tree(grp, m); 265 tipc_group_add_to_tree(grp, m);
263 tipc_nlist_add(&grp->dests, m->node); 266 tipc_nlist_add(&grp->dests, m->node);
@@ -275,6 +278,11 @@ static void tipc_group_delete_member(struct tipc_group *grp,
275{ 278{
276 rb_erase(&m->tree_node, &grp->members); 279 rb_erase(&m->tree_node, &grp->members);
277 grp->member_cnt--; 280 grp->member_cnt--;
281
282 /* Check if we were waiting for replicast ack from this member */
283 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1))
284 grp->bc_ackers--;
285
278 list_del_init(&m->list); 286 list_del_init(&m->list);
279 list_del_init(&m->congested); 287 list_del_init(&m->congested);
280 288
@@ -325,16 +333,23 @@ void tipc_group_update_member(struct tipc_member *m, int len)
325 list_add_tail(&m->congested, &grp->congested); 333 list_add_tail(&m->congested, &grp->congested);
326} 334}
327 335
328void tipc_group_update_bc_members(struct tipc_group *grp, int len) 336void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
329{ 337{
338 u16 prev = grp->bc_snd_nxt - 1;
330 struct tipc_member *m; 339 struct tipc_member *m;
331 struct rb_node *n; 340 struct rb_node *n;
332 341
333 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 342 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
334 m = container_of(n, struct tipc_member, tree_node); 343 m = container_of(n, struct tipc_member, tree_node);
335 if (tipc_group_is_enabled(m)) 344 if (tipc_group_is_enabled(m)) {
336 tipc_group_update_member(m, len); 345 tipc_group_update_member(m, len);
346 m->bc_acked = prev;
347 }
337 } 348 }
349
350 /* Mark number of acknowledges to expect, if any */
351 if (ack)
352 grp->bc_ackers = grp->member_cnt;
338 grp->bc_snd_nxt++; 353 grp->bc_snd_nxt++;
339} 354}
340 355
@@ -372,6 +387,10 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
372{ 387{
373 struct tipc_member *m = NULL; 388 struct tipc_member *m = NULL;
374 389
390 /* If prev bcast was replicast, reject until all receivers have acked */
391 if (grp->bc_ackers)
392 return true;
393
375 if (list_empty(&grp->congested)) 394 if (list_empty(&grp->congested))
376 return false; 395 return false;
377 396
@@ -391,7 +410,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
391 struct sk_buff *_skb, *tmp; 410 struct sk_buff *_skb, *tmp;
392 int mtyp = msg_type(hdr); 411 int mtyp = msg_type(hdr);
393 412
394 /* Bcast may be bypassed by unicast, - sort it in */ 413 /* Bcast may be bypassed by unicast or other bcast, - sort it in */
395 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 414 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
396 skb_queue_walk_safe(defq, _skb, tmp) { 415 skb_queue_walk_safe(defq, _skb, tmp) {
397 _hdr = buf_msg(_skb); 416 _hdr = buf_msg(_skb);
@@ -412,10 +431,10 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
412 struct sk_buff_head *xmitq) 431 struct sk_buff_head *xmitq)
413{ 432{
414 struct sk_buff *skb = __skb_dequeue(inputq); 433 struct sk_buff *skb = __skb_dequeue(inputq);
434 bool ack, deliver, update;
415 struct sk_buff_head *defq; 435 struct sk_buff_head *defq;
416 struct tipc_member *m; 436 struct tipc_member *m;
417 struct tipc_msg *hdr; 437 struct tipc_msg *hdr;
418 bool deliver, update;
419 u32 node, port; 438 u32 node, port;
420 int mtyp, blks; 439 int mtyp, blks;
421 440
@@ -451,6 +470,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
451 hdr = buf_msg(skb); 470 hdr = buf_msg(skb);
452 mtyp = msg_type(hdr); 471 mtyp = msg_type(hdr);
453 deliver = true; 472 deliver = true;
473 ack = false;
454 update = false; 474 update = false;
455 475
456 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 476 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
@@ -466,6 +486,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
466 /* Fall thru */ 486 /* Fall thru */
467 case TIPC_GRP_BCAST_MSG: 487 case TIPC_GRP_BCAST_MSG:
468 m->bc_rcv_nxt++; 488 m->bc_rcv_nxt++;
489 ack = msg_grp_bc_ack_req(hdr);
469 break; 490 break;
470 case TIPC_GRP_UCAST_MSG: 491 case TIPC_GRP_UCAST_MSG:
471 break; 492 break;
@@ -480,6 +501,9 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
480 else 501 else
481 kfree_skb(skb); 502 kfree_skb(skb);
482 503
504 if (ack)
505 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
506
483 if (!update) 507 if (!update)
484 continue; 508 continue;
485 509
@@ -540,6 +564,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
540 } else if (mtyp == GRP_ADV_MSG) { 564 } else if (mtyp == GRP_ADV_MSG) {
541 msg_set_adv_win(hdr, adv); 565 msg_set_adv_win(hdr, adv);
542 m->advertised += adv; 566 m->advertised += adv;
567 } else if (mtyp == GRP_ACK_MSG) {
568 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
543 } 569 }
544 __skb_queue_tail(xmitq, skb); 570 __skb_queue_tail(xmitq, skb);
545} 571}
@@ -593,7 +619,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
593 } 619 }
594 /* Otherwise deliver already received WITHDRAW event */ 620 /* Otherwise deliver already received WITHDRAW event */
595 __skb_queue_tail(inputq, m->event_msg); 621 __skb_queue_tail(inputq, m->event_msg);
596 *usr_wakeup = m->usr_pending; 622 *usr_wakeup = true;
597 tipc_group_delete_member(grp, m); 623 tipc_group_delete_member(grp, m);
598 list_del_init(&m->congested); 624 list_del_init(&m->congested);
599 return; 625 return;
@@ -605,6 +631,15 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
605 m->usr_pending = false; 631 m->usr_pending = false;
606 list_del_init(&m->congested); 632 list_del_init(&m->congested);
607 return; 633 return;
634 case GRP_ACK_MSG:
635 if (!m)
636 return;
637 m->bc_acked = msg_grp_bc_acked(hdr);
638 if (--grp->bc_ackers)
639 break;
640 *usr_wakeup = true;
641 m->usr_pending = false;
642 return;
608 default: 643 default:
609 pr_warn("Received unknown GROUP_PROTO message\n"); 644 pr_warn("Received unknown GROUP_PROTO message\n");
610 } 645 }
@@ -678,7 +713,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
678 713
679 TIPC_SKB_CB(skb)->orig_member = m->instance; 714 TIPC_SKB_CB(skb)->orig_member = m->instance;
680 715
681 *usr_wakeup = m->usr_pending; 716 *usr_wakeup = true;
682 m->usr_pending = false; 717 m->usr_pending = false;
683 718
684 /* Hold back event if more messages might be expected */ 719 /* Hold back event if more messages might be expected */
diff --git a/net/tipc/group.h b/net/tipc/group.h
index e432066a211e..d525e1cd7de5 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -61,7 +61,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
61 struct tipc_msg *hdr, 61 struct tipc_msg *hdr,
62 struct sk_buff_head *inputq, 62 struct sk_buff_head *inputq,
63 struct sk_buff_head *xmitq); 63 struct sk_buff_head *xmitq);
64void tipc_group_update_bc_members(struct tipc_group *grp, int len); 64void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack);
65bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 65bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
66 int len, struct tipc_member **m); 66 int len, struct tipc_member **m);
67bool tipc_group_bc_cong(struct tipc_group *grp, int len); 67bool tipc_group_bc_cong(struct tipc_group *grp, int len);
@@ -69,7 +69,5 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
69 u32 port, struct sk_buff_head *xmitq); 69 u32 port, struct sk_buff_head *xmitq);
70u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); 70u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
71void tipc_group_update_member(struct tipc_member *m, int len); 71void tipc_group_update_member(struct tipc_member *m, int len);
72struct tipc_member *tipc_group_find_sender(struct tipc_group *grp,
73 u32 node, u32 port);
74int tipc_group_size(struct tipc_group *grp); 72int tipc_group_size(struct tipc_group *grp);
75#endif 73#endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index bd25bff63925..70a21499804d 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1046,13 +1046,12 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1046 case TIPC_MEDIUM_IMPORTANCE: 1046 case TIPC_MEDIUM_IMPORTANCE:
1047 case TIPC_HIGH_IMPORTANCE: 1047 case TIPC_HIGH_IMPORTANCE:
1048 case TIPC_CRITICAL_IMPORTANCE: 1048 case TIPC_CRITICAL_IMPORTANCE:
1049 if (unlikely(msg_mcast(hdr))) { 1049 if (unlikely(msg_in_group(hdr) || msg_mcast(hdr))) {
1050 skb_queue_tail(l->bc_rcvlink->inputq, skb); 1050 skb_queue_tail(l->bc_rcvlink->inputq, skb);
1051 return true; 1051 return true;
1052 } 1052 }
1053 case CONN_MANAGER:
1054 case GROUP_PROTOCOL: 1053 case GROUP_PROTOCOL:
1055 skb_queue_tail(inputq, skb); 1054 case CONN_MANAGER:
1056 return true; 1055 return true;
1057 case NAME_DISTRIBUTOR: 1056 case NAME_DISTRIBUTOR:
1058 l->bc_rcvlink->state = LINK_ESTABLISHED; 1057 l->bc_rcvlink->state = LINK_ESTABLISHED;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index d6f98215267e..52c6a2e01995 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -547,6 +547,7 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
547#define GRP_JOIN_MSG 0 547#define GRP_JOIN_MSG 0
548#define GRP_LEAVE_MSG 1 548#define GRP_LEAVE_MSG 1
549#define GRP_ADV_MSG 2 549#define GRP_ADV_MSG 2
550#define GRP_ACK_MSG 3
550 551
551/* 552/*
552 * Word 1 553 * Word 1
@@ -839,6 +840,16 @@ static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n)
839 msg_set_bits(m, 9, 16, 0xffff, n); 840 msg_set_bits(m, 9, 16, 0xffff, n);
840} 841}
841 842
843static inline u16 msg_grp_bc_acked(struct tipc_msg *m)
844{
845 return msg_bits(m, 9, 16, 0xffff);
846}
847
848static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n)
849{
850 msg_set_bits(m, 9, 16, 0xffff, n);
851}
852
842/* Word 10 853/* Word 10
843 */ 854 */
844static inline u16 msg_grp_evt(struct tipc_msg *m) 855static inline u16 msg_grp_evt(struct tipc_msg *m)
@@ -851,6 +862,16 @@ static inline void msg_set_grp_evt(struct tipc_msg *m, int n)
851 msg_set_bits(m, 10, 0, 0x3, n); 862 msg_set_bits(m, 10, 0, 0x3, n);
852} 863}
853 864
865static inline u16 msg_grp_bc_ack_req(struct tipc_msg *m)
866{
867 return msg_bits(m, 10, 0, 0x1);
868}
869
870static inline void msg_set_grp_bc_ack_req(struct tipc_msg *m, bool n)
871{
872 msg_set_bits(m, 10, 0, 0x1, n);
873}
874
854static inline u16 msg_grp_bc_seqno(struct tipc_msg *m) 875static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
855{ 876{
856 return msg_bits(m, 10, 16, 0xffff); 877 return msg_bits(m, 10, 16, 0xffff);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 3276b7a0d445..b1f1c3c2b1e2 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -831,6 +831,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
831 u32 dnode, u32 dport, int dlen) 831 u32 dnode, u32 dport, int dlen)
832{ 832{
833 u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group); 833 u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
834 struct tipc_mc_method *method = &tsk->mc_method;
834 int blks = tsk_blocks(GROUP_H_SIZE + dlen); 835 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
835 struct tipc_msg *hdr = &tsk->phdr; 836 struct tipc_msg *hdr = &tsk->phdr;
836 struct sk_buff_head pkts; 837 struct sk_buff_head pkts;
@@ -857,9 +858,12 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
857 tsk->cong_link_cnt++; 858 tsk->cong_link_cnt++;
858 } 859 }
859 860
860 /* Update send window and sequence number */ 861 /* Update send window */
861 tipc_group_update_member(mb, blks); 862 tipc_group_update_member(mb, blks);
862 863
864 /* A broadcast sent within next EXPIRE period must follow same path */
865 method->rcast = true;
866 method->mandatory = true;
863 return dlen; 867 return dlen;
864} 868}
865 869
@@ -1008,6 +1012,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1008 struct tipc_group *grp = tsk->group; 1012 struct tipc_group *grp = tsk->group;
1009 struct tipc_nlist *dsts = tipc_group_dests(grp); 1013 struct tipc_nlist *dsts = tipc_group_dests(grp);
1010 struct tipc_mc_method *method = &tsk->mc_method; 1014 struct tipc_mc_method *method = &tsk->mc_method;
1015 bool ack = method->mandatory && method->rcast;
1011 int blks = tsk_blocks(MCAST_H_SIZE + dlen); 1016 int blks = tsk_blocks(MCAST_H_SIZE + dlen);
1012 struct tipc_msg *hdr = &tsk->phdr; 1017 struct tipc_msg *hdr = &tsk->phdr;
1013 int mtu = tipc_bcast_get_mtu(net); 1018 int mtu = tipc_bcast_get_mtu(net);
@@ -1036,6 +1041,9 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1036 msg_set_destnode(hdr, 0); 1041 msg_set_destnode(hdr, 0);
1037 msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp)); 1042 msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
1038 1043
1044 /* Avoid getting stuck with repeated forced replicasts */
1045 msg_set_grp_bc_ack_req(hdr, ack);
1046
1039 /* Build message as chain of buffers */ 1047 /* Build message as chain of buffers */
1040 skb_queue_head_init(&pkts); 1048 skb_queue_head_init(&pkts);
1041 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); 1049 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
@@ -1043,13 +1051,17 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1043 return rc; 1051 return rc;
1044 1052
1045 /* Send message */ 1053 /* Send message */
1046 rc = tipc_mcast_xmit(net, &pkts, method, dsts, 1054 rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
1047 &tsk->cong_link_cnt);
1048 if (unlikely(rc)) 1055 if (unlikely(rc))
1049 return rc; 1056 return rc;
1050 1057
1051 /* Update broadcast sequence number and send windows */ 1058 /* Update broadcast sequence number and send windows */
1052 tipc_group_update_bc_members(tsk->group, blks); 1059 tipc_group_update_bc_members(tsk->group, blks, ack);
1060
1061 /* Broadcast link is now free to choose method for next broadcast */
1062 method->mandatory = false;
1063 method->expires = jiffies;
1064
1053 return dlen; 1065 return dlen;
1054} 1066}
1055 1067
@@ -1113,7 +1125,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1113 u32 portid, oport, onode; 1125 u32 portid, oport, onode;
1114 struct list_head dports; 1126 struct list_head dports;
1115 struct tipc_msg *msg; 1127 struct tipc_msg *msg;
1116 int hsz; 1128 int user, mtyp, hsz;
1117 1129
1118 __skb_queue_head_init(&tmpq); 1130 __skb_queue_head_init(&tmpq);
1119 INIT_LIST_HEAD(&dports); 1131 INIT_LIST_HEAD(&dports);
@@ -1121,6 +1133,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1121 skb = tipc_skb_peek(arrvq, &inputq->lock); 1133 skb = tipc_skb_peek(arrvq, &inputq->lock);
1122 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { 1134 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
1123 msg = buf_msg(skb); 1135 msg = buf_msg(skb);
1136 user = msg_user(msg);
1137 mtyp = msg_type(msg);
1138 if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
1139 spin_lock_bh(&inputq->lock);
1140 if (skb_peek(arrvq) == skb) {
1141 __skb_dequeue(arrvq);
1142 __skb_queue_tail(inputq, skb);
1143 }
1144 refcount_dec(&skb->users);
1145 spin_unlock_bh(&inputq->lock);
1146 continue;
1147 }
1124 hsz = skb_headroom(skb) + msg_hdr_sz(msg); 1148 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
1125 oport = msg_origport(msg); 1149 oport = msg_origport(msg);
1126 onode = msg_orignode(msg); 1150 onode = msg_orignode(msg);