aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c98
1 files changed, 51 insertions, 47 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3b7bd2174330..08d64e7527cb 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -112,6 +112,11 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
112 return tipc_net(net)->bcbase; 112 return tipc_net(net)->bcbase;
113} 113}
114 114
115static struct tipc_link *tipc_bc_sndlink(struct net *net)
116{
117 return tipc_net(net)->bcl;
118}
119
115/** 120/**
116 * tipc_nmap_equal - test for equality of node maps 121 * tipc_nmap_equal - test for equality of node maps
117 */ 122 */
@@ -121,6 +126,7 @@ static int tipc_nmap_equal(struct tipc_node_map *nm_a,
121 return !memcmp(nm_a, nm_b, sizeof(*nm_a)); 126 return !memcmp(nm_a, nm_b, sizeof(*nm_a));
122} 127}
123 128
129static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq);
124static void tipc_nmap_diff(struct tipc_node_map *nm_a, 130static void tipc_nmap_diff(struct tipc_node_map *nm_a,
125 struct tipc_node_map *nm_b, 131 struct tipc_node_map *nm_b,
126 struct tipc_node_map *nm_diff); 132 struct tipc_node_map *nm_diff);
@@ -148,14 +154,14 @@ uint tipc_bcast_get_mtu(void)
148 return MAX_PKT_DEFAULT_MCAST; 154 return MAX_PKT_DEFAULT_MCAST;
149} 155}
150 156
151static u32 bcbuf_acks(struct sk_buff *buf) 157static u16 bcbuf_acks(struct sk_buff *skb)
152{ 158{
153 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 159 return TIPC_SKB_CB(skb)->ackers;
154} 160}
155 161
156static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) 162static void bcbuf_set_acks(struct sk_buff *buf, u16 ackers)
157{ 163{
158 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; 164 TIPC_SKB_CB(buf)->ackers = ackers;
159} 165}
160 166
161static void bcbuf_decr_acks(struct sk_buff *buf) 167static void bcbuf_decr_acks(struct sk_buff *buf)
@@ -166,9 +172,10 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
166void tipc_bclink_add_node(struct net *net, u32 addr) 172void tipc_bclink_add_node(struct net *net, u32 addr)
167{ 173{
168 struct tipc_net *tn = net_generic(net, tipc_net_id); 174 struct tipc_net *tn = net_generic(net, tipc_net_id);
169 175 struct tipc_link *l = tipc_bc_sndlink(net);
170 tipc_bclink_lock(net); 176 tipc_bclink_lock(net);
171 tipc_nmap_add(&tn->bcbase->bcast_nodes, addr); 177 tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
178 tipc_link_add_bc_peer(l);
172 tipc_bclink_unlock(net); 179 tipc_bclink_unlock(net);
173} 180}
174 181
@@ -178,6 +185,7 @@ void tipc_bclink_remove_node(struct net *net, u32 addr)
178 185
179 tipc_bclink_lock(net); 186 tipc_bclink_lock(net);
180 tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr); 187 tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
188 tn->bcl->ackers--;
181 189
182 /* Last node? => reset backlog queue */ 190 /* Last node? => reset backlog queue */
183 if (!tn->bcbase->bcast_nodes.count) 191 if (!tn->bcbase->bcast_nodes.count)
@@ -295,7 +303,6 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
295 303
296 if (unlikely(!n_ptr->bclink.recv_permitted)) 304 if (unlikely(!n_ptr->bclink.recv_permitted))
297 return; 305 return;
298
299 tipc_bclink_lock(net); 306 tipc_bclink_lock(net);
300 307
301 /* Bail out if tx queue is empty (no clean up is required) */ 308 /* Bail out if tx queue is empty (no clean up is required) */
@@ -324,13 +331,11 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
324 less_eq(acked, n_ptr->bclink.acked)) 331 less_eq(acked, n_ptr->bclink.acked))
325 goto exit; 332 goto exit;
326 } 333 }
327
328 /* Skip over packets that node has previously acknowledged */ 334 /* Skip over packets that node has previously acknowledged */
329 skb_queue_walk(&tn->bcl->transmq, skb) { 335 skb_queue_walk(&tn->bcl->transmq, skb) {
330 if (more(buf_seqno(skb), n_ptr->bclink.acked)) 336 if (more(buf_seqno(skb), n_ptr->bclink.acked))
331 break; 337 break;
332 } 338 }
333
334 /* Update packets that node is now acknowledging */ 339 /* Update packets that node is now acknowledging */
335 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) { 340 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
336 if (more(buf_seqno(skb), acked)) 341 if (more(buf_seqno(skb), acked))
@@ -367,6 +372,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
367 struct sk_buff *buf; 372 struct sk_buff *buf;
368 struct net *net = n_ptr->net; 373 struct net *net = n_ptr->net;
369 struct tipc_net *tn = net_generic(net, tipc_net_id); 374 struct tipc_net *tn = net_generic(net, tipc_net_id);
375 struct tipc_link *bcl = tn->bcl;
370 376
371 /* Ignore "stale" link state info */ 377 /* Ignore "stale" link state info */
372 if (less_eq(last_sent, n_ptr->bclink.last_in)) 378 if (less_eq(last_sent, n_ptr->bclink.last_in))
@@ -375,6 +381,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
375 /* Update link synchronization state; quit if in sync */ 381 /* Update link synchronization state; quit if in sync */
376 bclink_update_last_sent(n_ptr, last_sent); 382 bclink_update_last_sent(n_ptr, last_sent);
377 383
384 /* This is a good location for statistical profiling */
385 bcl->stats.queue_sz_counts++;
386 bcl->stats.accu_queue_sz += skb_queue_len(&bcl->transmq);
387
378 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 388 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
379 return; 389 return;
380 390
@@ -468,52 +478,35 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
468 */ 478 */
469int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) 479int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
470{ 480{
471 struct tipc_net *tn = net_generic(net, tipc_net_id); 481 struct tipc_link *l = tipc_bc_sndlink(net);
472 struct tipc_link *bcl = tn->bcl; 482 struct sk_buff_head xmitq, inputq, rcvq;
473 struct tipc_bc_base *bclink = tn->bcbase;
474 int rc = 0; 483 int rc = 0;
475 int bc = 0;
476 struct sk_buff *skb;
477 struct sk_buff_head arrvq;
478 struct sk_buff_head inputq;
479 484
480 /* Prepare clone of message for local node */ 485 __skb_queue_head_init(&rcvq);
481 skb = tipc_msg_reassemble(list); 486 __skb_queue_head_init(&xmitq);
482 if (unlikely(!skb)) 487 skb_queue_head_init(&inputq);
483 return -EHOSTUNREACH;
484 488
485 /* Broadcast to all nodes */ 489 /* Prepare message clone for local node */
486 if (likely(bclink)) { 490 if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
487 tipc_bclink_lock(net); 491 return -EHOSTUNREACH;
488 if (likely(bclink->bcast_nodes.count)) {
489 rc = __tipc_link_xmit(net, bcl, list);
490 if (likely(!rc)) {
491 u32 len = skb_queue_len(&bcl->transmq);
492
493 bclink_set_last_sent(net);
494 bcl->stats.queue_sz_counts++;
495 bcl->stats.accu_queue_sz += len;
496 }
497 bc = 1;
498 }
499 tipc_bclink_unlock(net);
500 }
501 492
502 if (unlikely(!bc)) 493 tipc_bcast_lock(net);
503 __skb_queue_purge(list); 494 if (tipc_link_bc_peers(l))
495 rc = tipc_link_xmit(l, list, &xmitq);
496 bclink_set_last_sent(net);
497 tipc_bcast_unlock(net);
504 498
499 /* Don't send to local node if adding to link failed */
505 if (unlikely(rc)) { 500 if (unlikely(rc)) {
506 kfree_skb(skb); 501 __skb_queue_purge(&rcvq);
507 return rc; 502 return rc;
508 } 503 }
509 /* Deliver message clone */ 504 /* Broadcast to all nodes, inluding local node */
510 __skb_queue_head_init(&arrvq); 505 tipc_bcbearer_xmit(net, &xmitq);
511 skb_queue_head_init(&inputq); 506 tipc_sk_mcast_rcv(net, &rcvq, &inputq);
512 __skb_queue_tail(&arrvq, skb); 507 __skb_queue_purge(list);
513 tipc_sk_mcast_rcv(net, &arrvq, &inputq); 508 return 0;
514 return rc;
515} 509}
516
517/** 510/**
518 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 511 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
519 * 512 *
@@ -564,7 +557,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
564 node = tipc_node_find(net, msg_prevnode(msg)); 557 node = tipc_node_find(net, msg_prevnode(msg));
565 if (unlikely(!node)) 558 if (unlikely(!node))
566 goto exit; 559 goto exit;
567
568 tipc_node_lock(node); 560 tipc_node_lock(node);
569 if (unlikely(!node->bclink.recv_permitted)) 561 if (unlikely(!node->bclink.recv_permitted))
570 goto unlock; 562 goto unlock;
@@ -589,7 +581,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
589 tipc_node_put(node); 581 tipc_node_put(node);
590 goto exit; 582 goto exit;
591 } 583 }
592
593 /* Handle in-sequence broadcast message */ 584 /* Handle in-sequence broadcast message */
594 seqno = msg_seqno(msg); 585 seqno = msg_seqno(msg);
595 next_in = mod(node->bclink.last_in + 1); 586 next_in = mod(node->bclink.last_in + 1);
@@ -778,6 +769,19 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
778 return 0; 769 return 0;
779} 770}
780 771
772static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq)
773{
774 struct sk_buff *skb, *tmp;
775
776 skb_queue_walk_safe(xmitq, skb, tmp) {
777 __skb_dequeue(xmitq);
778 tipc_bcbearer_send(net, skb, NULL, NULL);
779
780 /* Until we remove cloning in tipc_l2_send_msg(): */
781 kfree_skb(skb);
782 }
783}
784
781/** 785/**
782 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 786 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
783 */ 787 */