aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/group.c47
-rw-r--r--net/tipc/group.h4
-rw-r--r--net/tipc/link.c5
-rw-r--r--net/tipc/msg.h21
-rw-r--r--net/tipc/socket.c34
5 files changed, 94 insertions, 17 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 985e0ce32e8e..eab862e047dc 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -71,6 +71,7 @@ struct tipc_member {
71 u16 advertised; 71 u16 advertised;
72 u16 window; 72 u16 window;
73 u16 bc_rcv_nxt; 73 u16 bc_rcv_nxt;
74 u16 bc_acked;
74 bool usr_pending; 75 bool usr_pending;
75}; 76};
76 77
@@ -87,6 +88,7 @@ struct tipc_group {
87 u32 portid; 88 u32 portid;
88 u16 member_cnt; 89 u16 member_cnt;
89 u16 bc_snd_nxt; 90 u16 bc_snd_nxt;
91 u16 bc_ackers;
90 bool loopback; 92 bool loopback;
91 bool events; 93 bool events;
92}; 94};
@@ -258,6 +260,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
258 m->group = grp; 260 m->group = grp;
259 m->node = node; 261 m->node = node;
260 m->port = port; 262 m->port = port;
263 m->bc_acked = grp->bc_snd_nxt - 1;
261 grp->member_cnt++; 264 grp->member_cnt++;
262 tipc_group_add_to_tree(grp, m); 265 tipc_group_add_to_tree(grp, m);
263 tipc_nlist_add(&grp->dests, m->node); 266 tipc_nlist_add(&grp->dests, m->node);
@@ -275,6 +278,11 @@ static void tipc_group_delete_member(struct tipc_group *grp,
275{ 278{
276 rb_erase(&m->tree_node, &grp->members); 279 rb_erase(&m->tree_node, &grp->members);
277 grp->member_cnt--; 280 grp->member_cnt--;
281
282 /* Check if we were waiting for replicast ack from this member */
283 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1))
284 grp->bc_ackers--;
285
278 list_del_init(&m->list); 286 list_del_init(&m->list);
279 list_del_init(&m->congested); 287 list_del_init(&m->congested);
280 288
@@ -325,16 +333,23 @@ void tipc_group_update_member(struct tipc_member *m, int len)
325 list_add_tail(&m->congested, &grp->congested); 333 list_add_tail(&m->congested, &grp->congested);
326} 334}
327 335
328void tipc_group_update_bc_members(struct tipc_group *grp, int len) 336void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
329{ 337{
338 u16 prev = grp->bc_snd_nxt - 1;
330 struct tipc_member *m; 339 struct tipc_member *m;
331 struct rb_node *n; 340 struct rb_node *n;
332 341
333 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 342 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
334 m = container_of(n, struct tipc_member, tree_node); 343 m = container_of(n, struct tipc_member, tree_node);
335 if (tipc_group_is_enabled(m)) 344 if (tipc_group_is_enabled(m)) {
336 tipc_group_update_member(m, len); 345 tipc_group_update_member(m, len);
346 m->bc_acked = prev;
347 }
337 } 348 }
349
350 /* Mark number of acknowledges to expect, if any */
351 if (ack)
352 grp->bc_ackers = grp->member_cnt;
338 grp->bc_snd_nxt++; 353 grp->bc_snd_nxt++;
339} 354}
340 355
@@ -372,6 +387,10 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
372{ 387{
373 struct tipc_member *m = NULL; 388 struct tipc_member *m = NULL;
374 389
390 /* If prev bcast was replicast, reject until all receivers have acked */
391 if (grp->bc_ackers)
392 return true;
393
375 if (list_empty(&grp->congested)) 394 if (list_empty(&grp->congested))
376 return false; 395 return false;
377 396
@@ -391,7 +410,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
391 struct sk_buff *_skb, *tmp; 410 struct sk_buff *_skb, *tmp;
392 int mtyp = msg_type(hdr); 411 int mtyp = msg_type(hdr);
393 412
394 /* Bcast may be bypassed by unicast, - sort it in */ 413 /* Bcast may be bypassed by unicast or other bcast, - sort it in */
395 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 414 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
396 skb_queue_walk_safe(defq, _skb, tmp) { 415 skb_queue_walk_safe(defq, _skb, tmp) {
397 _hdr = buf_msg(_skb); 416 _hdr = buf_msg(_skb);
@@ -412,10 +431,10 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
412 struct sk_buff_head *xmitq) 431 struct sk_buff_head *xmitq)
413{ 432{
414 struct sk_buff *skb = __skb_dequeue(inputq); 433 struct sk_buff *skb = __skb_dequeue(inputq);
434 bool ack, deliver, update;
415 struct sk_buff_head *defq; 435 struct sk_buff_head *defq;
416 struct tipc_member *m; 436 struct tipc_member *m;
417 struct tipc_msg *hdr; 437 struct tipc_msg *hdr;
418 bool deliver, update;
419 u32 node, port; 438 u32 node, port;
420 int mtyp, blks; 439 int mtyp, blks;
421 440
@@ -451,6 +470,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
451 hdr = buf_msg(skb); 470 hdr = buf_msg(skb);
452 mtyp = msg_type(hdr); 471 mtyp = msg_type(hdr);
453 deliver = true; 472 deliver = true;
473 ack = false;
454 update = false; 474 update = false;
455 475
456 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 476 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
@@ -466,6 +486,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
466 /* Fall thru */ 486 /* Fall thru */
467 case TIPC_GRP_BCAST_MSG: 487 case TIPC_GRP_BCAST_MSG:
468 m->bc_rcv_nxt++; 488 m->bc_rcv_nxt++;
489 ack = msg_grp_bc_ack_req(hdr);
469 break; 490 break;
470 case TIPC_GRP_UCAST_MSG: 491 case TIPC_GRP_UCAST_MSG:
471 break; 492 break;
@@ -480,6 +501,9 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
480 else 501 else
481 kfree_skb(skb); 502 kfree_skb(skb);
482 503
504 if (ack)
505 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
506
483 if (!update) 507 if (!update)
484 continue; 508 continue;
485 509
@@ -540,6 +564,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
540 } else if (mtyp == GRP_ADV_MSG) { 564 } else if (mtyp == GRP_ADV_MSG) {
541 msg_set_adv_win(hdr, adv); 565 msg_set_adv_win(hdr, adv);
542 m->advertised += adv; 566 m->advertised += adv;
567 } else if (mtyp == GRP_ACK_MSG) {
568 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
543 } 569 }
544 __skb_queue_tail(xmitq, skb); 570 __skb_queue_tail(xmitq, skb);
545} 571}
@@ -593,7 +619,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
593 } 619 }
594 /* Otherwise deliver already received WITHDRAW event */ 620 /* Otherwise deliver already received WITHDRAW event */
595 __skb_queue_tail(inputq, m->event_msg); 621 __skb_queue_tail(inputq, m->event_msg);
596 *usr_wakeup = m->usr_pending; 622 *usr_wakeup = true;
597 tipc_group_delete_member(grp, m); 623 tipc_group_delete_member(grp, m);
598 list_del_init(&m->congested); 624 list_del_init(&m->congested);
599 return; 625 return;
@@ -605,6 +631,15 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
605 m->usr_pending = false; 631 m->usr_pending = false;
606 list_del_init(&m->congested); 632 list_del_init(&m->congested);
607 return; 633 return;
634 case GRP_ACK_MSG:
635 if (!m)
636 return;
637 m->bc_acked = msg_grp_bc_acked(hdr);
638 if (--grp->bc_ackers)
639 break;
640 *usr_wakeup = true;
641 m->usr_pending = false;
642 return;
608 default: 643 default:
609 pr_warn("Received unknown GROUP_PROTO message\n"); 644 pr_warn("Received unknown GROUP_PROTO message\n");
610 } 645 }
@@ -678,7 +713,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
678 713
679 TIPC_SKB_CB(skb)->orig_member = m->instance; 714 TIPC_SKB_CB(skb)->orig_member = m->instance;
680 715
681 *usr_wakeup = m->usr_pending; 716 *usr_wakeup = true;
682 m->usr_pending = false; 717 m->usr_pending = false;
683 718
684 /* Hold back event if more messages might be expected */ 719 /* Hold back event if more messages might be expected */
diff --git a/net/tipc/group.h b/net/tipc/group.h
index e432066a211e..d525e1cd7de5 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -61,7 +61,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
61 struct tipc_msg *hdr, 61 struct tipc_msg *hdr,
62 struct sk_buff_head *inputq, 62 struct sk_buff_head *inputq,
63 struct sk_buff_head *xmitq); 63 struct sk_buff_head *xmitq);
64void tipc_group_update_bc_members(struct tipc_group *grp, int len); 64void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack);
65bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 65bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
66 int len, struct tipc_member **m); 66 int len, struct tipc_member **m);
67bool tipc_group_bc_cong(struct tipc_group *grp, int len); 67bool tipc_group_bc_cong(struct tipc_group *grp, int len);
@@ -69,7 +69,5 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
69 u32 port, struct sk_buff_head *xmitq); 69 u32 port, struct sk_buff_head *xmitq);
70u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); 70u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
71void tipc_group_update_member(struct tipc_member *m, int len); 71void tipc_group_update_member(struct tipc_member *m, int len);
72struct tipc_member *tipc_group_find_sender(struct tipc_group *grp,
73 u32 node, u32 port);
74int tipc_group_size(struct tipc_group *grp); 72int tipc_group_size(struct tipc_group *grp);
75#endif 73#endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index bd25bff63925..70a21499804d 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1046,13 +1046,12 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1046 case TIPC_MEDIUM_IMPORTANCE: 1046 case TIPC_MEDIUM_IMPORTANCE:
1047 case TIPC_HIGH_IMPORTANCE: 1047 case TIPC_HIGH_IMPORTANCE:
1048 case TIPC_CRITICAL_IMPORTANCE: 1048 case TIPC_CRITICAL_IMPORTANCE:
1049 if (unlikely(msg_mcast(hdr))) { 1049 if (unlikely(msg_in_group(hdr) || msg_mcast(hdr))) {
1050 skb_queue_tail(l->bc_rcvlink->inputq, skb); 1050 skb_queue_tail(l->bc_rcvlink->inputq, skb);
1051 return true; 1051 return true;
1052 } 1052 }
1053 case CONN_MANAGER:
1054 case GROUP_PROTOCOL: 1053 case GROUP_PROTOCOL:
1055 skb_queue_tail(inputq, skb); 1054 case CONN_MANAGER:
1056 return true; 1055 return true;
1057 case NAME_DISTRIBUTOR: 1056 case NAME_DISTRIBUTOR:
1058 l->bc_rcvlink->state = LINK_ESTABLISHED; 1057 l->bc_rcvlink->state = LINK_ESTABLISHED;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index d6f98215267e..52c6a2e01995 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -547,6 +547,7 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
547#define GRP_JOIN_MSG 0 547#define GRP_JOIN_MSG 0
548#define GRP_LEAVE_MSG 1 548#define GRP_LEAVE_MSG 1
549#define GRP_ADV_MSG 2 549#define GRP_ADV_MSG 2
550#define GRP_ACK_MSG 3
550 551
551/* 552/*
552 * Word 1 553 * Word 1
@@ -839,6 +840,16 @@ static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n)
839 msg_set_bits(m, 9, 16, 0xffff, n); 840 msg_set_bits(m, 9, 16, 0xffff, n);
840} 841}
841 842
843static inline u16 msg_grp_bc_acked(struct tipc_msg *m)
844{
845 return msg_bits(m, 9, 16, 0xffff);
846}
847
848static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n)
849{
850 msg_set_bits(m, 9, 16, 0xffff, n);
851}
852
842/* Word 10 853/* Word 10
843 */ 854 */
844static inline u16 msg_grp_evt(struct tipc_msg *m) 855static inline u16 msg_grp_evt(struct tipc_msg *m)
@@ -851,6 +862,16 @@ static inline void msg_set_grp_evt(struct tipc_msg *m, int n)
851 msg_set_bits(m, 10, 0, 0x3, n); 862 msg_set_bits(m, 10, 0, 0x3, n);
852} 863}
853 864
865static inline u16 msg_grp_bc_ack_req(struct tipc_msg *m)
866{
867 return msg_bits(m, 10, 0, 0x1);
868}
869
870static inline void msg_set_grp_bc_ack_req(struct tipc_msg *m, bool n)
871{
872 msg_set_bits(m, 10, 0, 0x1, n);
873}
874
854static inline u16 msg_grp_bc_seqno(struct tipc_msg *m) 875static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
855{ 876{
856 return msg_bits(m, 10, 16, 0xffff); 877 return msg_bits(m, 10, 16, 0xffff);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 3276b7a0d445..b1f1c3c2b1e2 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -831,6 +831,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
831 u32 dnode, u32 dport, int dlen) 831 u32 dnode, u32 dport, int dlen)
832{ 832{
833 u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group); 833 u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
834 struct tipc_mc_method *method = &tsk->mc_method;
834 int blks = tsk_blocks(GROUP_H_SIZE + dlen); 835 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
835 struct tipc_msg *hdr = &tsk->phdr; 836 struct tipc_msg *hdr = &tsk->phdr;
836 struct sk_buff_head pkts; 837 struct sk_buff_head pkts;
@@ -857,9 +858,12 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
857 tsk->cong_link_cnt++; 858 tsk->cong_link_cnt++;
858 } 859 }
859 860
860 /* Update send window and sequence number */ 861 /* Update send window */
861 tipc_group_update_member(mb, blks); 862 tipc_group_update_member(mb, blks);
862 863
864 /* A broadcast sent within next EXPIRE period must follow same path */
865 method->rcast = true;
866 method->mandatory = true;
863 return dlen; 867 return dlen;
864} 868}
865 869
@@ -1008,6 +1012,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1008 struct tipc_group *grp = tsk->group; 1012 struct tipc_group *grp = tsk->group;
1009 struct tipc_nlist *dsts = tipc_group_dests(grp); 1013 struct tipc_nlist *dsts = tipc_group_dests(grp);
1010 struct tipc_mc_method *method = &tsk->mc_method; 1014 struct tipc_mc_method *method = &tsk->mc_method;
1015 bool ack = method->mandatory && method->rcast;
1011 int blks = tsk_blocks(MCAST_H_SIZE + dlen); 1016 int blks = tsk_blocks(MCAST_H_SIZE + dlen);
1012 struct tipc_msg *hdr = &tsk->phdr; 1017 struct tipc_msg *hdr = &tsk->phdr;
1013 int mtu = tipc_bcast_get_mtu(net); 1018 int mtu = tipc_bcast_get_mtu(net);
@@ -1036,6 +1041,9 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1036 msg_set_destnode(hdr, 0); 1041 msg_set_destnode(hdr, 0);
1037 msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp)); 1042 msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
1038 1043
1044 /* Avoid getting stuck with repeated forced replicasts */
1045 msg_set_grp_bc_ack_req(hdr, ack);
1046
1039 /* Build message as chain of buffers */ 1047 /* Build message as chain of buffers */
1040 skb_queue_head_init(&pkts); 1048 skb_queue_head_init(&pkts);
1041 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); 1049 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
@@ -1043,13 +1051,17 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1043 return rc; 1051 return rc;
1044 1052
1045 /* Send message */ 1053 /* Send message */
1046 rc = tipc_mcast_xmit(net, &pkts, method, dsts, 1054 rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
1047 &tsk->cong_link_cnt);
1048 if (unlikely(rc)) 1055 if (unlikely(rc))
1049 return rc; 1056 return rc;
1050 1057
1051 /* Update broadcast sequence number and send windows */ 1058 /* Update broadcast sequence number and send windows */
1052 tipc_group_update_bc_members(tsk->group, blks); 1059 tipc_group_update_bc_members(tsk->group, blks, ack);
1060
1061 /* Broadcast link is now free to choose method for next broadcast */
1062 method->mandatory = false;
1063 method->expires = jiffies;
1064
1053 return dlen; 1065 return dlen;
1054} 1066}
1055 1067
@@ -1113,7 +1125,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1113 u32 portid, oport, onode; 1125 u32 portid, oport, onode;
1114 struct list_head dports; 1126 struct list_head dports;
1115 struct tipc_msg *msg; 1127 struct tipc_msg *msg;
1116 int hsz; 1128 int user, mtyp, hsz;
1117 1129
1118 __skb_queue_head_init(&tmpq); 1130 __skb_queue_head_init(&tmpq);
1119 INIT_LIST_HEAD(&dports); 1131 INIT_LIST_HEAD(&dports);
@@ -1121,6 +1133,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1121 skb = tipc_skb_peek(arrvq, &inputq->lock); 1133 skb = tipc_skb_peek(arrvq, &inputq->lock);
1122 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { 1134 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
1123 msg = buf_msg(skb); 1135 msg = buf_msg(skb);
1136 user = msg_user(msg);
1137 mtyp = msg_type(msg);
1138 if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
1139 spin_lock_bh(&inputq->lock);
1140 if (skb_peek(arrvq) == skb) {
1141 __skb_dequeue(arrvq);
1142 __skb_queue_tail(inputq, skb);
1143 }
1144 refcount_dec(&skb->users);
1145 spin_unlock_bh(&inputq->lock);
1146 continue;
1147 }
1124 hsz = skb_headroom(skb) + msg_hdr_sz(msg); 1148 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
1125 oport = msg_origport(msg); 1149 oport = msg_origport(msg);
1126 onode = msg_orignode(msg); 1150 onode = msg_orignode(msg);