aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c336
1 files changed, 161 insertions, 175 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 8eb87b11d100..e00441a2092f 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -157,39 +157,14 @@ u32 tipc_bclink_get_last_sent(void)
157 return bcl->fsm_msg_cnt; 157 return bcl->fsm_msg_cnt;
158} 158}
159 159
160/** 160static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
161 * bclink_set_gap - set gap according to contents of current deferred pkt queue
162 *
163 * Called with 'node' locked, bc_lock unlocked
164 */
165
166static void bclink_set_gap(struct tipc_node *n_ptr)
167{
168 struct sk_buff *buf = n_ptr->bclink.deferred_head;
169
170 n_ptr->bclink.gap_after = n_ptr->bclink.gap_to =
171 mod(n_ptr->bclink.last_in);
172 if (unlikely(buf != NULL))
173 n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1);
174}
175
176/**
177 * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment
178 *
179 * This mechanism endeavours to prevent all nodes in network from trying
180 * to ACK or NACK at the same time.
181 *
182 * Note: TIPC uses a different trigger to distribute ACKs than it does to
183 * distribute NACKs, but tries to use the same spacing (divide by 16).
184 */
185
186static int bclink_ack_allowed(u32 n)
187{ 161{
188 return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag; 162 node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
163 seqno : node->bclink.last_sent;
189} 164}
190 165
191 166
192/** 167/*
193 * tipc_bclink_retransmit_to - get most recent node to request retransmission 168 * tipc_bclink_retransmit_to - get most recent node to request retransmission
194 * 169 *
195 * Called with bc_lock locked 170 * Called with bc_lock locked
@@ -281,7 +256,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
281 if (bcbuf_acks(crs) == 0) { 256 if (bcbuf_acks(crs) == 0) {
282 bcl->first_out = next; 257 bcl->first_out = next;
283 bcl->out_queue_size--; 258 bcl->out_queue_size--;
284 buf_discard(crs); 259 kfree_skb(crs);
285 released = 1; 260 released = 1;
286 } 261 }
287 crs = next; 262 crs = next;
@@ -300,140 +275,94 @@ exit:
300 spin_unlock_bh(&bc_lock); 275 spin_unlock_bh(&bc_lock);
301} 276}
302 277
303/** 278/*
304 * bclink_send_ack - unicast an ACK msg 279 * tipc_bclink_update_link_state - update broadcast link state
305 * 280 *
306 * tipc_net_lock and node lock set 281 * tipc_net_lock and node lock set
307 */ 282 */
308 283
309static void bclink_send_ack(struct tipc_node *n_ptr) 284void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
310{ 285{
311 struct tipc_link *l_ptr = n_ptr->active_links[n_ptr->addr & 1]; 286 struct sk_buff *buf;
312 287
313 if (l_ptr != NULL) 288 /* Ignore "stale" link state info */
314 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
315}
316 289
317/** 290 if (less_eq(last_sent, n_ptr->bclink.last_in))
318 * bclink_send_nack- broadcast a NACK msg 291 return;
319 *
320 * tipc_net_lock and node lock set
321 */
322 292
323static void bclink_send_nack(struct tipc_node *n_ptr) 293 /* Update link synchronization state; quit if in sync */
324{ 294
325 struct sk_buff *buf; 295 bclink_update_last_sent(n_ptr, last_sent);
326 struct tipc_msg *msg; 296
297 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
298 return;
299
300 /* Update out-of-sync state; quit if loss is still unconfirmed */
301
302 if ((++n_ptr->bclink.oos_state) == 1) {
303 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
304 return;
305 n_ptr->bclink.oos_state++;
306 }
327 307
328 if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to)) 308 /* Don't NACK if one has been recently sent (or seen) */
309
310 if (n_ptr->bclink.oos_state & 0x1)
329 return; 311 return;
330 312
313 /* Send NACK */
314
331 buf = tipc_buf_acquire(INT_H_SIZE); 315 buf = tipc_buf_acquire(INT_H_SIZE);
332 if (buf) { 316 if (buf) {
333 msg = buf_msg(buf); 317 struct tipc_msg *msg = buf_msg(buf);
318
334 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, 319 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
335 INT_H_SIZE, n_ptr->addr); 320 INT_H_SIZE, n_ptr->addr);
336 msg_set_non_seq(msg, 1); 321 msg_set_non_seq(msg, 1);
337 msg_set_mc_netid(msg, tipc_net_id); 322 msg_set_mc_netid(msg, tipc_net_id);
338 msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in)); 323 msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
339 msg_set_bcgap_after(msg, n_ptr->bclink.gap_after); 324 msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
340 msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); 325 msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head
341 msg_set_bcast_tag(msg, tipc_own_tag); 326 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
327 : n_ptr->bclink.last_sent);
342 328
329 spin_lock_bh(&bc_lock);
343 tipc_bearer_send(&bcbearer->bearer, buf, NULL); 330 tipc_bearer_send(&bcbearer->bearer, buf, NULL);
344 bcl->stats.sent_nacks++; 331 bcl->stats.sent_nacks++;
345 buf_discard(buf); 332 spin_unlock_bh(&bc_lock);
346 333 kfree_skb(buf);
347 /*
348 * Ensure we doesn't send another NACK msg to the node
349 * until 16 more deferred messages arrive from it
350 * (i.e. helps prevent all nodes from NACK'ing at same time)
351 */
352 334
353 n_ptr->bclink.nack_sync = tipc_own_tag; 335 n_ptr->bclink.oos_state++;
354 } 336 }
355} 337}
356 338
357/** 339/*
358 * tipc_bclink_check_gap - send a NACK if a sequence gap exists 340 * bclink_peek_nack - monitor retransmission requests sent by other nodes
359 * 341 *
360 * tipc_net_lock and node lock set 342 * Delay any upcoming NACK by this node if another node has already
361 */ 343 * requested the first message this node is going to ask for.
362
363void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent)
364{
365 if (!n_ptr->bclink.supported ||
366 less_eq(last_sent, mod(n_ptr->bclink.last_in)))
367 return;
368
369 bclink_set_gap(n_ptr);
370 if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to)
371 n_ptr->bclink.gap_to = last_sent;
372 bclink_send_nack(n_ptr);
373}
374
375/**
376 * tipc_bclink_peek_nack - process a NACK msg meant for another node
377 * 344 *
378 * Only tipc_net_lock set. 345 * Only tipc_net_lock set.
379 */ 346 */
380 347
381static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to) 348static void bclink_peek_nack(struct tipc_msg *msg)
382{ 349{
383 struct tipc_node *n_ptr = tipc_node_find(dest); 350 struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg));
384 u32 my_after, my_to;
385 351
386 if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr))) 352 if (unlikely(!n_ptr))
387 return; 353 return;
354
388 tipc_node_lock(n_ptr); 355 tipc_node_lock(n_ptr);
389 /*
390 * Modify gap to suppress unnecessary NACKs from this node
391 */
392 my_after = n_ptr->bclink.gap_after;
393 my_to = n_ptr->bclink.gap_to;
394
395 if (less_eq(gap_after, my_after)) {
396 if (less(my_after, gap_to) && less(gap_to, my_to))
397 n_ptr->bclink.gap_after = gap_to;
398 else if (less_eq(my_to, gap_to))
399 n_ptr->bclink.gap_to = n_ptr->bclink.gap_after;
400 } else if (less_eq(gap_after, my_to)) {
401 if (less_eq(my_to, gap_to))
402 n_ptr->bclink.gap_to = gap_after;
403 } else {
404 /*
405 * Expand gap if missing bufs not in deferred queue:
406 */
407 struct sk_buff *buf = n_ptr->bclink.deferred_head;
408 u32 prev = n_ptr->bclink.gap_to;
409 356
410 for (; buf; buf = buf->next) { 357 if (n_ptr->bclink.supported &&
411 u32 seqno = buf_seqno(buf); 358 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
359 (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
360 n_ptr->bclink.oos_state = 2;
412 361
413 if (mod(seqno - prev) != 1) {
414 buf = NULL;
415 break;
416 }
417 if (seqno == gap_after)
418 break;
419 prev = seqno;
420 }
421 if (buf == NULL)
422 n_ptr->bclink.gap_to = gap_after;
423 }
424 /*
425 * Some nodes may send a complementary NACK now:
426 */
427 if (bclink_ack_allowed(sender_tag + 1)) {
428 if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) {
429 bclink_send_nack(n_ptr);
430 bclink_set_gap(n_ptr);
431 }
432 }
433 tipc_node_unlock(n_ptr); 362 tipc_node_unlock(n_ptr);
434} 363}
435 364
436/** 365/*
437 * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster 366 * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster
438 */ 367 */
439 368
@@ -445,7 +374,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf)
445 374
446 if (!bclink->bcast_nodes.count) { 375 if (!bclink->bcast_nodes.count) {
447 res = msg_data_sz(buf_msg(buf)); 376 res = msg_data_sz(buf_msg(buf));
448 buf_discard(buf); 377 kfree_skb(buf);
449 goto exit; 378 goto exit;
450 } 379 }
451 380
@@ -460,7 +389,33 @@ exit:
460 return res; 389 return res;
461} 390}
462 391
463/** 392/*
393 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
394 *
395 * Called with both sending node's lock and bc_lock taken.
396 */
397
398static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
399{
400 bclink_update_last_sent(node, seqno);
401 node->bclink.last_in = seqno;
402 node->bclink.oos_state = 0;
403 bcl->stats.recv_info++;
404
405 /*
406 * Unicast an ACK periodically, ensuring that
407 * all nodes in the cluster don't ACK at the same time
408 */
409
410 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
411 tipc_link_send_proto_msg(
412 node->active_links[node->addr & 1],
413 STATE_MSG, 0, 0, 0, 0, 0);
414 bcl->stats.sent_acks++;
415 }
416}
417
418/*
464 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards 419 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards
465 * 420 *
466 * tipc_net_lock is read_locked, no other locks set 421 * tipc_net_lock is read_locked, no other locks set
@@ -472,7 +427,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
472 struct tipc_node *node; 427 struct tipc_node *node;
473 u32 next_in; 428 u32 next_in;
474 u32 seqno; 429 u32 seqno;
475 struct sk_buff *deferred; 430 int deferred;
476 431
477 /* Screen out unwanted broadcast messages */ 432 /* Screen out unwanted broadcast messages */
478 433
@@ -487,6 +442,8 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
487 if (unlikely(!node->bclink.supported)) 442 if (unlikely(!node->bclink.supported))
488 goto unlock; 443 goto unlock;
489 444
445 /* Handle broadcast protocol message */
446
490 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 447 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
491 if (msg_type(msg) != STATE_MSG) 448 if (msg_type(msg) != STATE_MSG)
492 goto unlock; 449 goto unlock;
@@ -501,89 +458,118 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
501 spin_unlock_bh(&bc_lock); 458 spin_unlock_bh(&bc_lock);
502 } else { 459 } else {
503 tipc_node_unlock(node); 460 tipc_node_unlock(node);
504 tipc_bclink_peek_nack(msg_destnode(msg), 461 bclink_peek_nack(msg);
505 msg_bcast_tag(msg),
506 msg_bcgap_after(msg),
507 msg_bcgap_to(msg));
508 } 462 }
509 goto exit; 463 goto exit;
510 } 464 }
511 465
512 /* Handle in-sequence broadcast message */ 466 /* Handle in-sequence broadcast message */
513 467
514receive:
515 next_in = mod(node->bclink.last_in + 1);
516 seqno = msg_seqno(msg); 468 seqno = msg_seqno(msg);
469 next_in = mod(node->bclink.last_in + 1);
517 470
518 if (likely(seqno == next_in)) { 471 if (likely(seqno == next_in)) {
519 bcl->stats.recv_info++; 472receive:
520 node->bclink.last_in++; 473 /* Deliver message to destination */
521 bclink_set_gap(node); 474
522 if (unlikely(bclink_ack_allowed(seqno))) {
523 bclink_send_ack(node);
524 bcl->stats.sent_acks++;
525 }
526 if (likely(msg_isdata(msg))) { 475 if (likely(msg_isdata(msg))) {
476 spin_lock_bh(&bc_lock);
477 bclink_accept_pkt(node, seqno);
478 spin_unlock_bh(&bc_lock);
527 tipc_node_unlock(node); 479 tipc_node_unlock(node);
528 if (likely(msg_mcast(msg))) 480 if (likely(msg_mcast(msg)))
529 tipc_port_recv_mcast(buf, NULL); 481 tipc_port_recv_mcast(buf, NULL);
530 else 482 else
531 buf_discard(buf); 483 kfree_skb(buf);
532 } else if (msg_user(msg) == MSG_BUNDLER) { 484 } else if (msg_user(msg) == MSG_BUNDLER) {
485 spin_lock_bh(&bc_lock);
486 bclink_accept_pkt(node, seqno);
533 bcl->stats.recv_bundles++; 487 bcl->stats.recv_bundles++;
534 bcl->stats.recv_bundled += msg_msgcnt(msg); 488 bcl->stats.recv_bundled += msg_msgcnt(msg);
489 spin_unlock_bh(&bc_lock);
535 tipc_node_unlock(node); 490 tipc_node_unlock(node);
536 tipc_link_recv_bundle(buf); 491 tipc_link_recv_bundle(buf);
537 } else if (msg_user(msg) == MSG_FRAGMENTER) { 492 } else if (msg_user(msg) == MSG_FRAGMENTER) {
493 int ret = tipc_link_recv_fragment(&node->bclink.defragm,
494 &buf, &msg);
495 if (ret < 0)
496 goto unlock;
497 spin_lock_bh(&bc_lock);
498 bclink_accept_pkt(node, seqno);
538 bcl->stats.recv_fragments++; 499 bcl->stats.recv_fragments++;
539 if (tipc_link_recv_fragment(&node->bclink.defragm, 500 if (ret > 0)
540 &buf, &msg))
541 bcl->stats.recv_fragmented++; 501 bcl->stats.recv_fragmented++;
502 spin_unlock_bh(&bc_lock);
542 tipc_node_unlock(node); 503 tipc_node_unlock(node);
543 tipc_net_route_msg(buf); 504 tipc_net_route_msg(buf);
544 } else if (msg_user(msg) == NAME_DISTRIBUTOR) { 505 } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
506 spin_lock_bh(&bc_lock);
507 bclink_accept_pkt(node, seqno);
508 spin_unlock_bh(&bc_lock);
545 tipc_node_unlock(node); 509 tipc_node_unlock(node);
546 tipc_named_recv(buf); 510 tipc_named_recv(buf);
547 } else { 511 } else {
512 spin_lock_bh(&bc_lock);
513 bclink_accept_pkt(node, seqno);
514 spin_unlock_bh(&bc_lock);
548 tipc_node_unlock(node); 515 tipc_node_unlock(node);
549 buf_discard(buf); 516 kfree_skb(buf);
550 } 517 }
551 buf = NULL; 518 buf = NULL;
519
520 /* Determine new synchronization state */
521
552 tipc_node_lock(node); 522 tipc_node_lock(node);
553 deferred = node->bclink.deferred_head; 523 if (unlikely(!tipc_node_is_up(node)))
554 if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) { 524 goto unlock;
555 buf = deferred; 525
556 msg = buf_msg(buf); 526 if (node->bclink.last_in == node->bclink.last_sent)
557 node->bclink.deferred_head = deferred->next; 527 goto unlock;
558 goto receive; 528
559 } 529 if (!node->bclink.deferred_head) {
560 } else if (less(next_in, seqno)) { 530 node->bclink.oos_state = 1;
561 u32 gap_after = node->bclink.gap_after; 531 goto unlock;
562 u32 gap_to = node->bclink.gap_to;
563
564 if (tipc_link_defer_pkt(&node->bclink.deferred_head,
565 &node->bclink.deferred_tail,
566 buf)) {
567 node->bclink.nack_sync++;
568 bcl->stats.deferred_recv++;
569 if (seqno == mod(gap_after + 1))
570 node->bclink.gap_after = seqno;
571 else if (less(gap_after, seqno) && less(seqno, gap_to))
572 node->bclink.gap_to = seqno;
573 } 532 }
533
534 msg = buf_msg(node->bclink.deferred_head);
535 seqno = msg_seqno(msg);
536 next_in = mod(next_in + 1);
537 if (seqno != next_in)
538 goto unlock;
539
540 /* Take in-sequence message from deferred queue & deliver it */
541
542 buf = node->bclink.deferred_head;
543 node->bclink.deferred_head = buf->next;
544 node->bclink.deferred_size--;
545 goto receive;
546 }
547
548 /* Handle out-of-sequence broadcast message */
549
550 if (less(next_in, seqno)) {
551 deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
552 &node->bclink.deferred_tail,
553 buf);
554 node->bclink.deferred_size += deferred;
555 bclink_update_last_sent(node, seqno);
574 buf = NULL; 556 buf = NULL;
575 if (bclink_ack_allowed(node->bclink.nack_sync)) { 557 } else
576 if (gap_to != gap_after) 558 deferred = 0;
577 bclink_send_nack(node); 559
578 bclink_set_gap(node); 560 spin_lock_bh(&bc_lock);
579 } 561
580 } else { 562 if (deferred)
563 bcl->stats.deferred_recv++;
564 else
581 bcl->stats.duplicates++; 565 bcl->stats.duplicates++;
582 } 566
567 spin_unlock_bh(&bc_lock);
568
583unlock: 569unlock:
584 tipc_node_unlock(node); 570 tipc_node_unlock(node);
585exit: 571exit:
586 buf_discard(buf); 572 kfree_skb(buf);
587} 573}
588 574
589u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 575u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)