aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c230
1 files changed, 166 insertions, 64 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index b8670bf262e2..96ceefeb9daf 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -217,12 +217,13 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
217 */ 217 */
218static void bclink_retransmit_pkt(u32 after, u32 to) 218static void bclink_retransmit_pkt(u32 after, u32 to)
219{ 219{
220 struct sk_buff *buf; 220 struct sk_buff *skb;
221 221
222 buf = bcl->first_out; 222 skb_queue_walk(&bcl->outqueue, skb) {
223 while (buf && less_eq(buf_seqno(buf), after)) 223 if (more(buf_seqno(skb), after))
224 buf = buf->next; 224 break;
225 tipc_link_retransmit(bcl, buf, mod(to - after)); 225 }
226 tipc_link_retransmit(bcl, skb, mod(to - after));
226} 227}
227 228
228/** 229/**
@@ -232,8 +233,11 @@ static void bclink_retransmit_pkt(u32 after, u32 to)
232 */ 233 */
233void tipc_bclink_wakeup_users(void) 234void tipc_bclink_wakeup_users(void)
234{ 235{
235 while (skb_queue_len(&bclink->link.waiting_sks)) 236 struct sk_buff *skb;
236 tipc_sk_rcv(skb_dequeue(&bclink->link.waiting_sks)); 237
238 while ((skb = skb_dequeue(&bclink->link.waiting_sks)))
239 tipc_sk_rcv(skb);
240
237} 241}
238 242
239/** 243/**
@@ -245,14 +249,14 @@ void tipc_bclink_wakeup_users(void)
245 */ 249 */
246void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 250void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
247{ 251{
248 struct sk_buff *crs; 252 struct sk_buff *skb, *tmp;
249 struct sk_buff *next; 253 struct sk_buff *next;
250 unsigned int released = 0; 254 unsigned int released = 0;
251 255
252 tipc_bclink_lock(); 256 tipc_bclink_lock();
253 /* Bail out if tx queue is empty (no clean up is required) */ 257 /* Bail out if tx queue is empty (no clean up is required) */
254 crs = bcl->first_out; 258 skb = skb_peek(&bcl->outqueue);
255 if (!crs) 259 if (!skb)
256 goto exit; 260 goto exit;
257 261
258 /* Determine which messages need to be acknowledged */ 262 /* Determine which messages need to be acknowledged */
@@ -271,43 +275,43 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
271 * Bail out if specified sequence number does not correspond 275 * Bail out if specified sequence number does not correspond
272 * to a message that has been sent and not yet acknowledged 276 * to a message that has been sent and not yet acknowledged
273 */ 277 */
274 if (less(acked, buf_seqno(crs)) || 278 if (less(acked, buf_seqno(skb)) ||
275 less(bcl->fsm_msg_cnt, acked) || 279 less(bcl->fsm_msg_cnt, acked) ||
276 less_eq(acked, n_ptr->bclink.acked)) 280 less_eq(acked, n_ptr->bclink.acked))
277 goto exit; 281 goto exit;
278 } 282 }
279 283
280 /* Skip over packets that node has previously acknowledged */ 284 /* Skip over packets that node has previously acknowledged */
281 while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked)) 285 skb_queue_walk(&bcl->outqueue, skb) {
282 crs = crs->next; 286 if (more(buf_seqno(skb), n_ptr->bclink.acked))
287 break;
288 }
283 289
284 /* Update packets that node is now acknowledging */ 290 /* Update packets that node is now acknowledging */
291 skb_queue_walk_from_safe(&bcl->outqueue, skb, tmp) {
292 if (more(buf_seqno(skb), acked))
293 break;
285 294
286 while (crs && less_eq(buf_seqno(crs), acked)) { 295 next = tipc_skb_queue_next(&bcl->outqueue, skb);
287 next = crs->next; 296 if (skb != bcl->next_out) {
288 297 bcbuf_decr_acks(skb);
289 if (crs != bcl->next_out) 298 } else {
290 bcbuf_decr_acks(crs); 299 bcbuf_set_acks(skb, 0);
291 else {
292 bcbuf_set_acks(crs, 0);
293 bcl->next_out = next; 300 bcl->next_out = next;
294 bclink_set_last_sent(); 301 bclink_set_last_sent();
295 } 302 }
296 303
297 if (bcbuf_acks(crs) == 0) { 304 if (bcbuf_acks(skb) == 0) {
298 bcl->first_out = next; 305 __skb_unlink(skb, &bcl->outqueue);
299 bcl->out_queue_size--; 306 kfree_skb(skb);
300 kfree_skb(crs);
301 released = 1; 307 released = 1;
302 } 308 }
303 crs = next;
304 } 309 }
305 n_ptr->bclink.acked = acked; 310 n_ptr->bclink.acked = acked;
306 311
307 /* Try resolving broadcast link congestion, if necessary */ 312 /* Try resolving broadcast link congestion, if necessary */
308
309 if (unlikely(bcl->next_out)) { 313 if (unlikely(bcl->next_out)) {
310 tipc_link_push_queue(bcl); 314 tipc_link_push_packets(bcl);
311 bclink_set_last_sent(); 315 bclink_set_last_sent();
312 } 316 }
313 if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks))) 317 if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks)))
@@ -327,19 +331,16 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
327 struct sk_buff *buf; 331 struct sk_buff *buf;
328 332
329 /* Ignore "stale" link state info */ 333 /* Ignore "stale" link state info */
330
331 if (less_eq(last_sent, n_ptr->bclink.last_in)) 334 if (less_eq(last_sent, n_ptr->bclink.last_in))
332 return; 335 return;
333 336
334 /* Update link synchronization state; quit if in sync */ 337 /* Update link synchronization state; quit if in sync */
335
336 bclink_update_last_sent(n_ptr, last_sent); 338 bclink_update_last_sent(n_ptr, last_sent);
337 339
338 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 340 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
339 return; 341 return;
340 342
341 /* Update out-of-sync state; quit if loss is still unconfirmed */ 343 /* Update out-of-sync state; quit if loss is still unconfirmed */
342
343 if ((++n_ptr->bclink.oos_state) == 1) { 344 if ((++n_ptr->bclink.oos_state) == 1) {
344 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) 345 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
345 return; 346 return;
@@ -347,15 +348,15 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
347 } 348 }
348 349
349 /* Don't NACK if one has been recently sent (or seen) */ 350 /* Don't NACK if one has been recently sent (or seen) */
350
351 if (n_ptr->bclink.oos_state & 0x1) 351 if (n_ptr->bclink.oos_state & 0x1)
352 return; 352 return;
353 353
354 /* Send NACK */ 354 /* Send NACK */
355
356 buf = tipc_buf_acquire(INT_H_SIZE); 355 buf = tipc_buf_acquire(INT_H_SIZE);
357 if (buf) { 356 if (buf) {
358 struct tipc_msg *msg = buf_msg(buf); 357 struct tipc_msg *msg = buf_msg(buf);
358 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue);
359 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
359 360
360 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, 361 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
361 INT_H_SIZE, n_ptr->addr); 362 INT_H_SIZE, n_ptr->addr);
@@ -363,9 +364,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
363 msg_set_mc_netid(msg, tipc_net_id); 364 msg_set_mc_netid(msg, tipc_net_id);
364 msg_set_bcast_ack(msg, n_ptr->bclink.last_in); 365 msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
365 msg_set_bcgap_after(msg, n_ptr->bclink.last_in); 366 msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
366 msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head 367 msg_set_bcgap_to(msg, to);
367 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
368 : n_ptr->bclink.last_sent);
369 368
370 tipc_bclink_lock(); 369 tipc_bclink_lock();
371 tipc_bearer_send(MAX_BEARERS, buf, NULL); 370 tipc_bearer_send(MAX_BEARERS, buf, NULL);
@@ -402,20 +401,20 @@ static void bclink_peek_nack(struct tipc_msg *msg)
402 401
403/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster 402/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
404 * and to identified node local sockets 403 * and to identified node local sockets
405 * @buf: chain of buffers containing message 404 * @list: chain of buffers containing message
406 * Consumes the buffer chain, except when returning -ELINKCONG 405 * Consumes the buffer chain, except when returning -ELINKCONG
407 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 406 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
408 */ 407 */
409int tipc_bclink_xmit(struct sk_buff *buf) 408int tipc_bclink_xmit(struct sk_buff_head *list)
410{ 409{
411 int rc = 0; 410 int rc = 0;
412 int bc = 0; 411 int bc = 0;
413 struct sk_buff *clbuf; 412 struct sk_buff *skb;
414 413
415 /* Prepare clone of message for local node */ 414 /* Prepare clone of message for local node */
416 clbuf = tipc_msg_reassemble(buf); 415 skb = tipc_msg_reassemble(list);
417 if (unlikely(!clbuf)) { 416 if (unlikely(!skb)) {
418 kfree_skb_list(buf); 417 __skb_queue_purge(list);
419 return -EHOSTUNREACH; 418 return -EHOSTUNREACH;
420 } 419 }
421 420
@@ -423,11 +422,13 @@ int tipc_bclink_xmit(struct sk_buff *buf)
423 if (likely(bclink)) { 422 if (likely(bclink)) {
424 tipc_bclink_lock(); 423 tipc_bclink_lock();
425 if (likely(bclink->bcast_nodes.count)) { 424 if (likely(bclink->bcast_nodes.count)) {
426 rc = __tipc_link_xmit(bcl, buf); 425 rc = __tipc_link_xmit(bcl, list);
427 if (likely(!rc)) { 426 if (likely(!rc)) {
427 u32 len = skb_queue_len(&bcl->outqueue);
428
428 bclink_set_last_sent(); 429 bclink_set_last_sent();
429 bcl->stats.queue_sz_counts++; 430 bcl->stats.queue_sz_counts++;
430 bcl->stats.accu_queue_sz += bcl->out_queue_size; 431 bcl->stats.accu_queue_sz += len;
431 } 432 }
432 bc = 1; 433 bc = 1;
433 } 434 }
@@ -435,13 +436,13 @@ int tipc_bclink_xmit(struct sk_buff *buf)
435 } 436 }
436 437
437 if (unlikely(!bc)) 438 if (unlikely(!bc))
438 kfree_skb_list(buf); 439 __skb_queue_purge(list);
439 440
440 /* Deliver message clone */ 441 /* Deliver message clone */
441 if (likely(!rc)) 442 if (likely(!rc))
442 tipc_sk_mcast_rcv(clbuf); 443 tipc_sk_mcast_rcv(skb);
443 else 444 else
444 kfree_skb(clbuf); 445 kfree_skb(skb);
445 446
446 return rc; 447 return rc;
447} 448}
@@ -462,7 +463,6 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
462 * Unicast an ACK periodically, ensuring that 463 * Unicast an ACK periodically, ensuring that
463 * all nodes in the cluster don't ACK at the same time 464 * all nodes in the cluster don't ACK at the same time
464 */ 465 */
465
466 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { 466 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
467 tipc_link_proto_xmit(node->active_links[node->addr & 1], 467 tipc_link_proto_xmit(node->active_links[node->addr & 1],
468 STATE_MSG, 0, 0, 0, 0, 0); 468 STATE_MSG, 0, 0, 0, 0, 0);
@@ -484,7 +484,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
484 int deferred = 0; 484 int deferred = 0;
485 485
486 /* Screen out unwanted broadcast messages */ 486 /* Screen out unwanted broadcast messages */
487
488 if (msg_mc_netid(msg) != tipc_net_id) 487 if (msg_mc_netid(msg) != tipc_net_id)
489 goto exit; 488 goto exit;
490 489
@@ -497,7 +496,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
497 goto unlock; 496 goto unlock;
498 497
499 /* Handle broadcast protocol message */ 498 /* Handle broadcast protocol message */
500
501 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 499 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
502 if (msg_type(msg) != STATE_MSG) 500 if (msg_type(msg) != STATE_MSG)
503 goto unlock; 501 goto unlock;
@@ -518,14 +516,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)
518 } 516 }
519 517
520 /* Handle in-sequence broadcast message */ 518 /* Handle in-sequence broadcast message */
521
522 seqno = msg_seqno(msg); 519 seqno = msg_seqno(msg);
523 next_in = mod(node->bclink.last_in + 1); 520 next_in = mod(node->bclink.last_in + 1);
524 521
525 if (likely(seqno == next_in)) { 522 if (likely(seqno == next_in)) {
526receive: 523receive:
527 /* Deliver message to destination */ 524 /* Deliver message to destination */
528
529 if (likely(msg_isdata(msg))) { 525 if (likely(msg_isdata(msg))) {
530 tipc_bclink_lock(); 526 tipc_bclink_lock();
531 bclink_accept_pkt(node, seqno); 527 bclink_accept_pkt(node, seqno);
@@ -574,7 +570,6 @@ receive:
574 buf = NULL; 570 buf = NULL;
575 571
576 /* Determine new synchronization state */ 572 /* Determine new synchronization state */
577
578 tipc_node_lock(node); 573 tipc_node_lock(node);
579 if (unlikely(!tipc_node_is_up(node))) 574 if (unlikely(!tipc_node_is_up(node)))
580 goto unlock; 575 goto unlock;
@@ -582,33 +577,26 @@ receive:
582 if (node->bclink.last_in == node->bclink.last_sent) 577 if (node->bclink.last_in == node->bclink.last_sent)
583 goto unlock; 578 goto unlock;
584 579
585 if (!node->bclink.deferred_head) { 580 if (skb_queue_empty(&node->bclink.deferred_queue)) {
586 node->bclink.oos_state = 1; 581 node->bclink.oos_state = 1;
587 goto unlock; 582 goto unlock;
588 } 583 }
589 584
590 msg = buf_msg(node->bclink.deferred_head); 585 msg = buf_msg(skb_peek(&node->bclink.deferred_queue));
591 seqno = msg_seqno(msg); 586 seqno = msg_seqno(msg);
592 next_in = mod(next_in + 1); 587 next_in = mod(next_in + 1);
593 if (seqno != next_in) 588 if (seqno != next_in)
594 goto unlock; 589 goto unlock;
595 590
596 /* Take in-sequence message from deferred queue & deliver it */ 591 /* Take in-sequence message from deferred queue & deliver it */
597 592 buf = __skb_dequeue(&node->bclink.deferred_queue);
598 buf = node->bclink.deferred_head;
599 node->bclink.deferred_head = buf->next;
600 buf->next = NULL;
601 node->bclink.deferred_size--;
602 goto receive; 593 goto receive;
603 } 594 }
604 595
605 /* Handle out-of-sequence broadcast message */ 596 /* Handle out-of-sequence broadcast message */
606
607 if (less(next_in, seqno)) { 597 if (less(next_in, seqno)) {
608 deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, 598 deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue,
609 &node->bclink.deferred_tail,
610 buf); 599 buf);
611 node->bclink.deferred_size += deferred;
612 bclink_update_last_sent(node, seqno); 600 bclink_update_last_sent(node, seqno);
613 buf = NULL; 601 buf = NULL;
614 } 602 }
@@ -767,6 +755,118 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
767 tipc_bclink_unlock(); 755 tipc_bclink_unlock();
768} 756}
769 757
758static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
759 struct tipc_stats *stats)
760{
761 int i;
762 struct nlattr *nest;
763
764 struct nla_map {
765 __u32 key;
766 __u32 val;
767 };
768
769 struct nla_map map[] = {
770 {TIPC_NLA_STATS_RX_INFO, stats->recv_info},
771 {TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
772 {TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
773 {TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
774 {TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
775 {TIPC_NLA_STATS_TX_INFO, stats->sent_info},
776 {TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
777 {TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
778 {TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
779 {TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
780 {TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
781 {TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
782 {TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
783 {TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
784 {TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
785 {TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
786 {TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
787 {TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
788 {TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
789 (stats->accu_queue_sz / stats->queue_sz_counts) : 0}
790 };
791
792 nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
793 if (!nest)
794 return -EMSGSIZE;
795
796 for (i = 0; i < ARRAY_SIZE(map); i++)
797 if (nla_put_u32(skb, map[i].key, map[i].val))
798 goto msg_full;
799
800 nla_nest_end(skb, nest);
801
802 return 0;
803msg_full:
804 nla_nest_cancel(skb, nest);
805
806 return -EMSGSIZE;
807}
808
809int tipc_nl_add_bc_link(struct tipc_nl_msg *msg)
810{
811 int err;
812 void *hdr;
813 struct nlattr *attrs;
814 struct nlattr *prop;
815
816 if (!bcl)
817 return 0;
818
819 tipc_bclink_lock();
820
821 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
822 NLM_F_MULTI, TIPC_NL_LINK_GET);
823 if (!hdr)
824 return -EMSGSIZE;
825
826 attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
827 if (!attrs)
828 goto msg_full;
829
830 /* The broadcast link is always up */
831 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
832 goto attr_msg_full;
833
834 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
835 goto attr_msg_full;
836 if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
837 goto attr_msg_full;
838 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->next_in_no))
839 goto attr_msg_full;
840 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->next_out_no))
841 goto attr_msg_full;
842
843 prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
844 if (!prop)
845 goto attr_msg_full;
846 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0]))
847 goto prop_msg_full;
848 nla_nest_end(msg->skb, prop);
849
850 err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
851 if (err)
852 goto attr_msg_full;
853
854 tipc_bclink_unlock();
855 nla_nest_end(msg->skb, attrs);
856 genlmsg_end(msg->skb, hdr);
857
858 return 0;
859
860prop_msg_full:
861 nla_nest_cancel(msg->skb, prop);
862attr_msg_full:
863 nla_nest_cancel(msg->skb, attrs);
864msg_full:
865 tipc_bclink_unlock();
866 genlmsg_cancel(msg->skb, hdr);
867
868 return -EMSGSIZE;
869}
770 870
771int tipc_bclink_stats(char *buf, const u32 buf_size) 871int tipc_bclink_stats(char *buf, const u32 buf_size)
772{ 872{
@@ -851,7 +951,9 @@ int tipc_bclink_init(void)
851 sprintf(bcbearer->media.name, "tipc-broadcast"); 951 sprintf(bcbearer->media.name, "tipc-broadcast");
852 952
853 spin_lock_init(&bclink->lock); 953 spin_lock_init(&bclink->lock);
854 __skb_queue_head_init(&bcl->waiting_sks); 954 __skb_queue_head_init(&bcl->outqueue);
955 __skb_queue_head_init(&bcl->deferred_queue);
956 skb_queue_head_init(&bcl->waiting_sks);
855 bcl->next_out_no = 1; 957 bcl->next_out_no = 1;
856 spin_lock_init(&bclink->node.lock); 958 spin_lock_init(&bclink->node.lock);
857 __skb_queue_head_init(&bclink->node.waiting_sks); 959 __skb_queue_head_init(&bclink->node.waiting_sks);