summaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-10-22 08:51:42 -0400
committerDavid S. Miller <davem@davemloft.net>2015-10-24 09:56:39 -0400
commitb06b281e79375fcbd9ffaec7c5fdc350b888d089 (patch)
tree74227c840529e15377aad08e5dc4ddab21fba2cd /net/tipc
parent5266698661401afc5e4a1a521cf9ba10724d10dd (diff)
tipc: simplify bearer level broadcast
Until now, we have been keeping track of the exact set of broadcast destinations though the help structure tipc_node_map. This leads us to have to maintain a whole infrastructure for supporting this, including a pseudo-bearer and a number of functions to manipulate both the bearers and the node map correctly. Apart from the complexity, this approach is also limiting, as struct tipc_node_map only can support cluster local broadcast if we want to avoid it becoming excessively large. We want to eliminate this limitation, in order to enable introduction of scoped multicast in the future. A closer analysis reveals that it is unnecessary maintaining this "full set" overview; it is sufficient to keep a counter per bearer, indicating how many nodes can be reached via this bearer at the moment. The protocol is now robust enough to handle transitional discrepancies between the nominal number of reachable destinations, as expected by the broadcast protocol itself, and the number which is actually reachable at the moment. The initial broadcast synchronization, in conjunction with the retransmission mechanism, ensures that all packets will eventually be acknowledged by the correct set of destinations. This commit introduces these changes. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c143
-rw-r--r--net/tipc/bcast.h8
-rw-r--r--net/tipc/bearer.c35
-rw-r--r--net/tipc/bearer.h3
-rw-r--r--net/tipc/node.c7
5 files changed, 151 insertions, 45 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index ea28c2919b38..74ee09ac430d 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -90,10 +90,12 @@ struct tipc_bcbearer {
90 90
91/** 91/**
92 * struct tipc_bc_base - link used for broadcast messages 92 * struct tipc_bc_base - link used for broadcast messages
93 * @link: (non-standard) broadcast link structure 93 * @link: broadcast send link structure
94 * @node: (non-standard) node structure representing b'cast link's peer node 94 * @node: (non-standard) node structure representing b'cast link's peer node
95 * @bcast_nodes: map of broadcast-capable nodes 95 * @bcast_nodes: map of broadcast-capable nodes
96 * @retransmit_to: node that most recently requested a retransmit 96 * @retransmit_to: node that most recently requested a retransmit
97 * @dest_nnt: array indicating number of reachable destinations per bearer
98 * @bearers: array of bearers, sorted by number of reachable destinations
97 * 99 *
98 * Handles sequence numbering, fragmentation, bundling, etc. 100 * Handles sequence numbering, fragmentation, bundling, etc.
99 */ 101 */
@@ -103,6 +105,8 @@ struct tipc_bc_base {
103 struct sk_buff_head arrvq; 105 struct sk_buff_head arrvq;
104 struct sk_buff_head inputq; 106 struct sk_buff_head inputq;
105 struct sk_buff_head namedq; 107 struct sk_buff_head namedq;
108 int dests[MAX_BEARERS];
109 int primary_bearer;
106 struct tipc_node_map bcast_nodes; 110 struct tipc_node_map bcast_nodes;
107 struct tipc_node *retransmit_to; 111 struct tipc_node *retransmit_to;
108}; 112};
@@ -164,6 +168,52 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
164 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); 168 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
165} 169}
166 170
171/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
172 * if any, and make it primary bearer
173 */
174static void tipc_bcbase_select_primary(struct net *net)
175{
176 struct tipc_bc_base *bb = tipc_bc_base(net);
177 int all_dests = tipc_link_bc_peers(bb->link);
178 int i;
179
180 bb->primary_bearer = INVALID_BEARER_ID;
181
182 if (!all_dests)
183 return;
184
185 for (i = 0; i < MAX_BEARERS; i++) {
186 if (bb->dests[i] < all_dests)
187 continue;
188
189 bb->primary_bearer = i;
190
191 /* Reduce risk that all nodes select same primary */
192 if ((i ^ tipc_own_addr(net)) & 1)
193 break;
194 }
195}
196
197void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id)
198{
199 struct tipc_bc_base *bb = tipc_bc_base(net);
200
201 tipc_bcast_lock(net);
202 bb->dests[bearer_id]++;
203 tipc_bcbase_select_primary(net);
204 tipc_bcast_unlock(net);
205}
206
207void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id)
208{
209 struct tipc_bc_base *bb = tipc_bc_base(net);
210
211 tipc_bcast_lock(net);
212 bb->dests[bearer_id]--;
213 tipc_bcbase_select_primary(net);
214 tipc_bcast_unlock(net);
215}
216
167static void bclink_set_last_sent(struct net *net) 217static void bclink_set_last_sent(struct net *net)
168{ 218{
169 struct tipc_net *tn = net_generic(net, tipc_net_id); 219 struct tipc_net *tn = net_generic(net, tipc_net_id);
@@ -439,6 +489,51 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
439 tipc_node_put(n_ptr); 489 tipc_node_put(n_ptr);
440} 490}
441 491
492/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers
493 *
494 * Note that number of reachable destinations, as indicated in the dests[]
495 * array, may transitionally differ from the number of destinations indicated
496 * in each sent buffer. We can sustain this. Excess destination nodes will
497 * drop and never acknowledge the unexpected packets, and missing destinations
498 * will either require retransmission (if they are just about to be added to
499 * the bearer), or be removed from the buffer's 'ackers' counter (if they
500 * just went down)
501 */
502static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
503{
504 int bearer_id;
505 struct tipc_bc_base *bb = tipc_bc_base(net);
506 struct sk_buff *skb, *_skb;
507 struct sk_buff_head _xmitq;
508
509 if (skb_queue_empty(xmitq))
510 return;
511
512 /* The typical case: at least one bearer has links to all nodes */
513 bearer_id = bb->primary_bearer;
514 if (bearer_id >= 0) {
515 tipc_bearer_bc_xmit(net, bearer_id, xmitq);
516 return;
517 }
518
519 /* We have to transmit across all bearers */
520 skb_queue_head_init(&_xmitq);
521 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
522 if (!bb->dests[bearer_id])
523 continue;
524
525 skb_queue_walk(xmitq, skb) {
526 _skb = pskb_copy_for_clone(skb, GFP_ATOMIC);
527 if (!_skb)
528 break;
529 __skb_queue_tail(&_xmitq, _skb);
530 }
531 tipc_bearer_bc_xmit(net, bearer_id, &_xmitq);
532 }
533 __skb_queue_purge(xmitq);
534 __skb_queue_purge(&_xmitq);
535}
536
442/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster 537/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
443 * and to identified node local sockets 538 * and to identified node local sockets
444 * @net: the applicable net namespace 539 * @net: the applicable net namespace
@@ -463,7 +558,6 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
463 tipc_bcast_lock(net); 558 tipc_bcast_lock(net);
464 if (tipc_link_bc_peers(l)) 559 if (tipc_link_bc_peers(l))
465 rc = tipc_link_xmit(l, list, &xmitq); 560 rc = tipc_link_xmit(l, list, &xmitq);
466 bclink_set_last_sent(net);
467 tipc_bcast_unlock(net); 561 tipc_bcast_unlock(net);
468 562
469 /* Don't send to local node if adding to link failed */ 563 /* Don't send to local node if adding to link failed */
@@ -473,7 +567,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
473 } 567 }
474 568
475 /* Broadcast to all nodes, inluding local node */ 569 /* Broadcast to all nodes, inluding local node */
476 tipc_bcbearer_xmit(net, &xmitq); 570 tipc_bcbase_xmit(net, &xmitq);
477 tipc_sk_mcast_rcv(net, &rcvq, &inputq); 571 tipc_sk_mcast_rcv(net, &rcvq, &inputq);
478 __skb_queue_purge(list); 572 __skb_queue_purge(list);
479 return 0; 573 return 0;
@@ -504,8 +598,7 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb)
504 rc = tipc_link_rcv(l, skb, NULL); 598 rc = tipc_link_rcv(l, skb, NULL);
505 tipc_bcast_unlock(net); 599 tipc_bcast_unlock(net);
506 600
507 if (!skb_queue_empty(&xmitq)) 601 tipc_bcbase_xmit(net, &xmitq);
508 tipc_bcbearer_xmit(net, &xmitq);
509 602
510 /* Any socket wakeup messages ? */ 603 /* Any socket wakeup messages ? */
511 if (!skb_queue_empty(inputq)) 604 if (!skb_queue_empty(inputq))
@@ -529,7 +622,7 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
529 tipc_link_bc_ack_rcv(l, acked, &xmitq); 622 tipc_link_bc_ack_rcv(l, acked, &xmitq);
530 tipc_bcast_unlock(net); 623 tipc_bcast_unlock(net);
531 624
532 tipc_bcbearer_xmit(net, &xmitq); 625 tipc_bcbase_xmit(net, &xmitq);
533 626
534 /* Any socket wakeup messages ? */ 627 /* Any socket wakeup messages ? */
535 if (!skb_queue_empty(inputq)) 628 if (!skb_queue_empty(inputq))
@@ -557,7 +650,7 @@ void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
557 } 650 }
558 tipc_bcast_unlock(net); 651 tipc_bcast_unlock(net);
559 652
560 tipc_bcbearer_xmit(net, &xmitq); 653 tipc_bcbase_xmit(net, &xmitq);
561 654
562 /* Any socket wakeup messages ? */ 655 /* Any socket wakeup messages ? */
563 if (!skb_queue_empty(inputq)) 656 if (!skb_queue_empty(inputq))
@@ -568,38 +661,35 @@ void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
568 * 661 *
569 * RCU is locked, node lock is set 662 * RCU is locked, node lock is set
570 */ 663 */
571void tipc_bcast_add_peer(struct net *net, u32 addr, struct tipc_link *uc_l, 664void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
572 struct sk_buff_head *xmitq) 665 struct sk_buff_head *xmitq)
573{ 666{
574 struct tipc_net *tn = net_generic(net, tipc_net_id);
575 struct tipc_link *snd_l = tipc_bc_sndlink(net); 667 struct tipc_link *snd_l = tipc_bc_sndlink(net);
576 668
577 tipc_bclink_lock(net); 669 tipc_bcast_lock(net);
578 tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
579 tipc_link_add_bc_peer(snd_l, uc_l, xmitq); 670 tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
580 tipc_bclink_unlock(net); 671 tipc_bcbase_select_primary(net);
672 tipc_bcast_unlock(net);
581} 673}
582 674
583/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer 675/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
584 * 676 *
585 * RCU is locked, node lock is set 677 * RCU is locked, node lock is set
586 */ 678 */
587void tipc_bcast_remove_peer(struct net *net, u32 addr, 679void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
588 struct tipc_link *rcv_l)
589{ 680{
590 struct tipc_net *tn = net_generic(net, tipc_net_id);
591 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
592 struct tipc_link *snd_l = tipc_bc_sndlink(net); 681 struct tipc_link *snd_l = tipc_bc_sndlink(net);
682 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
593 struct sk_buff_head xmitq; 683 struct sk_buff_head xmitq;
594 684
595 __skb_queue_head_init(&xmitq); 685 __skb_queue_head_init(&xmitq);
596 686
597 tipc_bclink_lock(net); 687 tipc_bcast_lock(net);
598 tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
599 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); 688 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
600 tipc_bclink_unlock(net); 689 tipc_bcbase_select_primary(net);
690 tipc_bcast_unlock(net);
601 691
602 tipc_bcbearer_xmit(net, &xmitq); 692 tipc_bcbase_xmit(net, &xmitq);
603 693
604 /* Any socket wakeup messages ? */ 694 /* Any socket wakeup messages ? */
605 if (!skb_queue_empty(inputq)) 695 if (!skb_queue_empty(inputq))
@@ -869,19 +959,6 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
869 return 0; 959 return 0;
870} 960}
871 961
872static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq)
873{
874 struct sk_buff *skb, *tmp;
875
876 skb_queue_walk_safe(xmitq, skb, tmp) {
877 __skb_dequeue(xmitq);
878 tipc_bcbearer_send(net, skb, NULL, NULL);
879
880 /* Until we remove cloning in tipc_l2_send_msg(): */
881 kfree_skb(skb);
882 }
883}
884
885/** 962/**
886 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 963 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
887 */ 964 */
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 568a57cd89e6..76b747a73b0b 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -47,11 +47,11 @@ struct tipc_node_map;
47int tipc_bcast_init(struct net *net); 47int tipc_bcast_init(struct net *net);
48void tipc_bcast_reinit(struct net *net); 48void tipc_bcast_reinit(struct net *net);
49void tipc_bcast_stop(struct net *net); 49void tipc_bcast_stop(struct net *net);
50void tipc_bcast_add_peer(struct net *net, u32 addr, 50void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
51 struct tipc_link *l,
52 struct sk_buff_head *xmitq); 51 struct sk_buff_head *xmitq);
53void tipc_bcast_remove_peer(struct net *net, u32 addr, 52void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
54 struct tipc_link *rcv_bcl); 53void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
54void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
55struct tipc_node *tipc_bclink_retransmit_to(struct net *tn); 55struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);
56void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); 56void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
57void tipc_bclink_rcv(struct net *net, struct sk_buff *buf); 57void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 82b278668ab7..62f47ecc6b84 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -193,10 +193,8 @@ void tipc_bearer_add_dest(struct net *net, u32 bearer_id, u32 dest)
193 193
194 rcu_read_lock(); 194 rcu_read_lock();
195 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); 195 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
196 if (b_ptr) { 196 if (b_ptr)
197 tipc_bcbearer_sort(net, &b_ptr->nodes, dest, true);
198 tipc_disc_add_dest(b_ptr->link_req); 197 tipc_disc_add_dest(b_ptr->link_req);
199 }
200 rcu_read_unlock(); 198 rcu_read_unlock();
201} 199}
202 200
@@ -207,10 +205,8 @@ void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest)
207 205
208 rcu_read_lock(); 206 rcu_read_lock();
209 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); 207 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
210 if (b_ptr) { 208 if (b_ptr)
211 tipc_bcbearer_sort(net, &b_ptr->nodes, dest, false);
212 tipc_disc_remove_dest(b_ptr->link_req); 209 tipc_disc_remove_dest(b_ptr->link_req);
213 }
214 rcu_read_unlock(); 210 rcu_read_unlock();
215} 211}
216 212
@@ -494,6 +490,33 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id,
494 rcu_read_unlock(); 490 rcu_read_unlock();
495} 491}
496 492
493/* tipc_bearer_bc_xmit() - broadcast buffers to all destinations
494 */
495void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
496 struct sk_buff_head *xmitq)
497{
498 struct tipc_net *tn = tipc_net(net);
499 int net_id = tn->net_id;
500 struct tipc_bearer *b;
501 struct sk_buff *skb, *tmp;
502 struct tipc_msg *hdr;
503
504 rcu_read_lock();
505 b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
506 if (likely(b)) {
507 skb_queue_walk_safe(xmitq, skb, tmp) {
508 hdr = buf_msg(skb);
509 msg_set_non_seq(hdr, 1);
510 msg_set_mc_netid(hdr, net_id);
511 __skb_dequeue(xmitq);
512 b->media->send_msg(net, skb, b, &b->bcast_addr);
513 /* Until we remove cloning in tipc_l2_send_msg(): */
514 kfree_skb(skb);
515 }
516 }
517 rcu_read_unlock();
518}
519
497/** 520/**
498 * tipc_l2_rcv_msg - handle incoming TIPC message from an interface 521 * tipc_l2_rcv_msg - handle incoming TIPC message from an interface
499 * @buf: the received packet 522 * @buf: the received packet
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 6426f242f626..9fc1e074f7c0 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -163,6 +163,7 @@ struct tipc_bearer {
163 u32 identity; 163 u32 identity;
164 struct tipc_link_req *link_req; 164 struct tipc_link_req *link_req;
165 char net_plane; 165 char net_plane;
166 int node_cnt;
166 struct tipc_node_map nodes; 167 struct tipc_node_map nodes;
167}; 168};
168 169
@@ -220,5 +221,7 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
220void tipc_bearer_xmit(struct net *net, u32 bearer_id, 221void tipc_bearer_xmit(struct net *net, u32 bearer_id,
221 struct sk_buff_head *xmitq, 222 struct sk_buff_head *xmitq,
222 struct tipc_media_addr *dst); 223 struct tipc_media_addr *dst);
224void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
225 struct sk_buff_head *xmitq);
223 226
224#endif /* _TIPC_BEARER_H */ 227#endif /* _TIPC_BEARER_H */
diff --git a/net/tipc/node.c b/net/tipc/node.c
index cd924552244b..b27439097d6c 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -346,6 +346,7 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
346 n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE; 346 n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE;
347 347
348 tipc_bearer_add_dest(n->net, bearer_id, n->addr); 348 tipc_bearer_add_dest(n->net, bearer_id, n->addr);
349 tipc_bcast_inc_bearer_dst_cnt(n->net, bearer_id);
349 350
350 pr_debug("Established link <%s> on network plane %c\n", 351 pr_debug("Established link <%s> on network plane %c\n",
351 nl->name, nl->net_plane); 352 nl->name, nl->net_plane);
@@ -356,7 +357,7 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
356 *slot1 = bearer_id; 357 *slot1 = bearer_id;
357 tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT); 358 tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);
358 n->action_flags |= TIPC_NOTIFY_NODE_UP; 359 n->action_flags |= TIPC_NOTIFY_NODE_UP;
359 tipc_bcast_add_peer(n->net, n->addr, nl, xmitq); 360 tipc_bcast_add_peer(n->net, nl, xmitq);
360 return; 361 return;
361 } 362 }
362 363
@@ -443,8 +444,10 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
443 tipc_link_build_reset_msg(l, xmitq); 444 tipc_link_build_reset_msg(l, xmitq);
444 *maddr = &n->links[*bearer_id].maddr; 445 *maddr = &n->links[*bearer_id].maddr;
445 node_lost_contact(n, &le->inputq); 446 node_lost_contact(n, &le->inputq);
447 tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
446 return; 448 return;
447 } 449 }
450 tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
448 451
449 /* There is still a working link => initiate failover */ 452 /* There is still a working link => initiate failover */
450 tnl = node_active_link(n, 0); 453 tnl = node_active_link(n, 0);
@@ -860,7 +863,7 @@ static void node_lost_contact(struct tipc_node *n,
860 tipc_addr_string_fill(addr_string, n->addr)); 863 tipc_addr_string_fill(addr_string, n->addr));
861 864
862 /* Clean up broadcast state */ 865 /* Clean up broadcast state */
863 tipc_bcast_remove_peer(n->net, n->addr, n->bc_entry.link); 866 tipc_bcast_remove_peer(n->net, n->bc_entry.link);
864 867
865 /* Abort any ongoing link failover */ 868 /* Abort any ongoing link failover */
866 for (i = 0; i < MAX_BEARERS; i++) { 869 for (i = 0; i < MAX_BEARERS; i++) {