aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2012-02-07 12:31:01 -0500
committerDavid S. Miller <davem@davemloft.net>2012-02-07 12:31:01 -0500
commit17b8a74f00474fb4fe6154aa426a80bcf1220997 (patch)
tree59340858fe44bb4697c771af7976ad798fe0e1c1
parent0e15df490eef6f5080b84533dcd3068a78122768 (diff)
parentdff10e9e637663c8c5dd0bae1a8f0e899cbb4a36 (diff)
Merge branch 'tipc_net-next' of git://git.kernel.org/pub/scm/linux/kernel/git/paulg/linux
-rw-r--r--net/tipc/bcast.c324
-rw-r--r--net/tipc/bcast.h2
-rw-r--r--net/tipc/link.c148
-rw-r--r--net/tipc/name_distr.c4
-rw-r--r--net/tipc/node.c14
-rw-r--r--net/tipc/node.h18
-rw-r--r--net/tipc/port.c5
7 files changed, 255 insertions, 260 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 8eb87b11d100..41ecf313073c 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -157,39 +157,14 @@ u32 tipc_bclink_get_last_sent(void)
157 return bcl->fsm_msg_cnt; 157 return bcl->fsm_msg_cnt;
158} 158}
159 159
160/** 160static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
161 * bclink_set_gap - set gap according to contents of current deferred pkt queue
162 *
163 * Called with 'node' locked, bc_lock unlocked
164 */
165
166static void bclink_set_gap(struct tipc_node *n_ptr)
167{
168 struct sk_buff *buf = n_ptr->bclink.deferred_head;
169
170 n_ptr->bclink.gap_after = n_ptr->bclink.gap_to =
171 mod(n_ptr->bclink.last_in);
172 if (unlikely(buf != NULL))
173 n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1);
174}
175
176/**
177 * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment
178 *
179 * This mechanism endeavours to prevent all nodes in network from trying
180 * to ACK or NACK at the same time.
181 *
182 * Note: TIPC uses a different trigger to distribute ACKs than it does to
183 * distribute NACKs, but tries to use the same spacing (divide by 16).
184 */
185
186static int bclink_ack_allowed(u32 n)
187{ 161{
188 return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag; 162 node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
163 seqno : node->bclink.last_sent;
189} 164}
190 165
191 166
192/** 167/*
193 * tipc_bclink_retransmit_to - get most recent node to request retransmission 168 * tipc_bclink_retransmit_to - get most recent node to request retransmission
194 * 169 *
195 * Called with bc_lock locked 170 * Called with bc_lock locked
@@ -300,140 +275,94 @@ exit:
300 spin_unlock_bh(&bc_lock); 275 spin_unlock_bh(&bc_lock);
301} 276}
302 277
303/** 278/*
304 * bclink_send_ack - unicast an ACK msg 279 * tipc_bclink_update_link_state - update broadcast link state
305 * 280 *
306 * tipc_net_lock and node lock set 281 * tipc_net_lock and node lock set
307 */ 282 */
308 283
309static void bclink_send_ack(struct tipc_node *n_ptr) 284void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
310{ 285{
311 struct tipc_link *l_ptr = n_ptr->active_links[n_ptr->addr & 1]; 286 struct sk_buff *buf;
312 287
313 if (l_ptr != NULL) 288 /* Ignore "stale" link state info */
314 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
315}
316 289
317/** 290 if (less_eq(last_sent, n_ptr->bclink.last_in))
318 * bclink_send_nack- broadcast a NACK msg 291 return;
319 *
320 * tipc_net_lock and node lock set
321 */
322 292
323static void bclink_send_nack(struct tipc_node *n_ptr) 293 /* Update link synchronization state; quit if in sync */
324{ 294
325 struct sk_buff *buf; 295 bclink_update_last_sent(n_ptr, last_sent);
326 struct tipc_msg *msg; 296
297 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
298 return;
299
300 /* Update out-of-sync state; quit if loss is still unconfirmed */
301
302 if ((++n_ptr->bclink.oos_state) == 1) {
303 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
304 return;
305 n_ptr->bclink.oos_state++;
306 }
327 307
328 if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to)) 308 /* Don't NACK if one has been recently sent (or seen) */
309
310 if (n_ptr->bclink.oos_state & 0x1)
329 return; 311 return;
330 312
313 /* Send NACK */
314
331 buf = tipc_buf_acquire(INT_H_SIZE); 315 buf = tipc_buf_acquire(INT_H_SIZE);
332 if (buf) { 316 if (buf) {
333 msg = buf_msg(buf); 317 struct tipc_msg *msg = buf_msg(buf);
318
334 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, 319 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
335 INT_H_SIZE, n_ptr->addr); 320 INT_H_SIZE, n_ptr->addr);
336 msg_set_non_seq(msg, 1); 321 msg_set_non_seq(msg, 1);
337 msg_set_mc_netid(msg, tipc_net_id); 322 msg_set_mc_netid(msg, tipc_net_id);
338 msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in)); 323 msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
339 msg_set_bcgap_after(msg, n_ptr->bclink.gap_after); 324 msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
340 msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); 325 msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head
341 msg_set_bcast_tag(msg, tipc_own_tag); 326 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
327 : n_ptr->bclink.last_sent);
342 328
329 spin_lock_bh(&bc_lock);
343 tipc_bearer_send(&bcbearer->bearer, buf, NULL); 330 tipc_bearer_send(&bcbearer->bearer, buf, NULL);
344 bcl->stats.sent_nacks++; 331 bcl->stats.sent_nacks++;
332 spin_unlock_bh(&bc_lock);
345 buf_discard(buf); 333 buf_discard(buf);
346 334
347 /* 335 n_ptr->bclink.oos_state++;
348 * Ensure we doesn't send another NACK msg to the node
349 * until 16 more deferred messages arrive from it
350 * (i.e. helps prevent all nodes from NACK'ing at same time)
351 */
352
353 n_ptr->bclink.nack_sync = tipc_own_tag;
354 } 336 }
355} 337}
356 338
357/** 339/*
358 * tipc_bclink_check_gap - send a NACK if a sequence gap exists 340 * bclink_peek_nack - monitor retransmission requests sent by other nodes
359 * 341 *
360 * tipc_net_lock and node lock set 342 * Delay any upcoming NACK by this node if another node has already
361 */ 343 * requested the first message this node is going to ask for.
362
363void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent)
364{
365 if (!n_ptr->bclink.supported ||
366 less_eq(last_sent, mod(n_ptr->bclink.last_in)))
367 return;
368
369 bclink_set_gap(n_ptr);
370 if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to)
371 n_ptr->bclink.gap_to = last_sent;
372 bclink_send_nack(n_ptr);
373}
374
375/**
376 * tipc_bclink_peek_nack - process a NACK msg meant for another node
377 * 344 *
378 * Only tipc_net_lock set. 345 * Only tipc_net_lock set.
379 */ 346 */
380 347
381static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to) 348static void bclink_peek_nack(struct tipc_msg *msg)
382{ 349{
383 struct tipc_node *n_ptr = tipc_node_find(dest); 350 struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg));
384 u32 my_after, my_to;
385 351
386 if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr))) 352 if (unlikely(!n_ptr))
387 return; 353 return;
354
388 tipc_node_lock(n_ptr); 355 tipc_node_lock(n_ptr);
389 /*
390 * Modify gap to suppress unnecessary NACKs from this node
391 */
392 my_after = n_ptr->bclink.gap_after;
393 my_to = n_ptr->bclink.gap_to;
394
395 if (less_eq(gap_after, my_after)) {
396 if (less(my_after, gap_to) && less(gap_to, my_to))
397 n_ptr->bclink.gap_after = gap_to;
398 else if (less_eq(my_to, gap_to))
399 n_ptr->bclink.gap_to = n_ptr->bclink.gap_after;
400 } else if (less_eq(gap_after, my_to)) {
401 if (less_eq(my_to, gap_to))
402 n_ptr->bclink.gap_to = gap_after;
403 } else {
404 /*
405 * Expand gap if missing bufs not in deferred queue:
406 */
407 struct sk_buff *buf = n_ptr->bclink.deferred_head;
408 u32 prev = n_ptr->bclink.gap_to;
409 356
410 for (; buf; buf = buf->next) { 357 if (n_ptr->bclink.supported &&
411 u32 seqno = buf_seqno(buf); 358 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
359 (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
360 n_ptr->bclink.oos_state = 2;
412 361
413 if (mod(seqno - prev) != 1) {
414 buf = NULL;
415 break;
416 }
417 if (seqno == gap_after)
418 break;
419 prev = seqno;
420 }
421 if (buf == NULL)
422 n_ptr->bclink.gap_to = gap_after;
423 }
424 /*
425 * Some nodes may send a complementary NACK now:
426 */
427 if (bclink_ack_allowed(sender_tag + 1)) {
428 if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) {
429 bclink_send_nack(n_ptr);
430 bclink_set_gap(n_ptr);
431 }
432 }
433 tipc_node_unlock(n_ptr); 362 tipc_node_unlock(n_ptr);
434} 363}
435 364
436/** 365/*
437 * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster 366 * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster
438 */ 367 */
439 368
@@ -460,7 +389,33 @@ exit:
460 return res; 389 return res;
461} 390}
462 391
463/** 392/*
393 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
394 *
395 * Called with both sending node's lock and bc_lock taken.
396 */
397
398static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
399{
400 bclink_update_last_sent(node, seqno);
401 node->bclink.last_in = seqno;
402 node->bclink.oos_state = 0;
403 bcl->stats.recv_info++;
404
405 /*
406 * Unicast an ACK periodically, ensuring that
407 * all nodes in the cluster don't ACK at the same time
408 */
409
410 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
411 tipc_link_send_proto_msg(
412 node->active_links[node->addr & 1],
413 STATE_MSG, 0, 0, 0, 0, 0);
414 bcl->stats.sent_acks++;
415 }
416}
417
418/*
464 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards 419 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards
465 * 420 *
466 * tipc_net_lock is read_locked, no other locks set 421 * tipc_net_lock is read_locked, no other locks set
@@ -472,7 +427,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
472 struct tipc_node *node; 427 struct tipc_node *node;
473 u32 next_in; 428 u32 next_in;
474 u32 seqno; 429 u32 seqno;
475 struct sk_buff *deferred; 430 int deferred;
476 431
477 /* Screen out unwanted broadcast messages */ 432 /* Screen out unwanted broadcast messages */
478 433
@@ -487,6 +442,8 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
487 if (unlikely(!node->bclink.supported)) 442 if (unlikely(!node->bclink.supported))
488 goto unlock; 443 goto unlock;
489 444
445 /* Handle broadcast protocol message */
446
490 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 447 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
491 if (msg_type(msg) != STATE_MSG) 448 if (msg_type(msg) != STATE_MSG)
492 goto unlock; 449 goto unlock;
@@ -501,85 +458,114 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
501 spin_unlock_bh(&bc_lock); 458 spin_unlock_bh(&bc_lock);
502 } else { 459 } else {
503 tipc_node_unlock(node); 460 tipc_node_unlock(node);
504 tipc_bclink_peek_nack(msg_destnode(msg), 461 bclink_peek_nack(msg);
505 msg_bcast_tag(msg),
506 msg_bcgap_after(msg),
507 msg_bcgap_to(msg));
508 } 462 }
509 goto exit; 463 goto exit;
510 } 464 }
511 465
512 /* Handle in-sequence broadcast message */ 466 /* Handle in-sequence broadcast message */
513 467
514receive:
515 next_in = mod(node->bclink.last_in + 1);
516 seqno = msg_seqno(msg); 468 seqno = msg_seqno(msg);
469 next_in = mod(node->bclink.last_in + 1);
517 470
518 if (likely(seqno == next_in)) { 471 if (likely(seqno == next_in)) {
519 bcl->stats.recv_info++; 472receive:
520 node->bclink.last_in++; 473 /* Deliver message to destination */
521 bclink_set_gap(node); 474
522 if (unlikely(bclink_ack_allowed(seqno))) {
523 bclink_send_ack(node);
524 bcl->stats.sent_acks++;
525 }
526 if (likely(msg_isdata(msg))) { 475 if (likely(msg_isdata(msg))) {
476 spin_lock_bh(&bc_lock);
477 bclink_accept_pkt(node, seqno);
478 spin_unlock_bh(&bc_lock);
527 tipc_node_unlock(node); 479 tipc_node_unlock(node);
528 if (likely(msg_mcast(msg))) 480 if (likely(msg_mcast(msg)))
529 tipc_port_recv_mcast(buf, NULL); 481 tipc_port_recv_mcast(buf, NULL);
530 else 482 else
531 buf_discard(buf); 483 buf_discard(buf);
532 } else if (msg_user(msg) == MSG_BUNDLER) { 484 } else if (msg_user(msg) == MSG_BUNDLER) {
485 spin_lock_bh(&bc_lock);
486 bclink_accept_pkt(node, seqno);
533 bcl->stats.recv_bundles++; 487 bcl->stats.recv_bundles++;
534 bcl->stats.recv_bundled += msg_msgcnt(msg); 488 bcl->stats.recv_bundled += msg_msgcnt(msg);
489 spin_unlock_bh(&bc_lock);
535 tipc_node_unlock(node); 490 tipc_node_unlock(node);
536 tipc_link_recv_bundle(buf); 491 tipc_link_recv_bundle(buf);
537 } else if (msg_user(msg) == MSG_FRAGMENTER) { 492 } else if (msg_user(msg) == MSG_FRAGMENTER) {
493 int ret = tipc_link_recv_fragment(&node->bclink.defragm,
494 &buf, &msg);
495 if (ret < 0)
496 goto unlock;
497 spin_lock_bh(&bc_lock);
498 bclink_accept_pkt(node, seqno);
538 bcl->stats.recv_fragments++; 499 bcl->stats.recv_fragments++;
539 if (tipc_link_recv_fragment(&node->bclink.defragm, 500 if (ret > 0)
540 &buf, &msg))
541 bcl->stats.recv_fragmented++; 501 bcl->stats.recv_fragmented++;
502 spin_unlock_bh(&bc_lock);
542 tipc_node_unlock(node); 503 tipc_node_unlock(node);
543 tipc_net_route_msg(buf); 504 tipc_net_route_msg(buf);
544 } else if (msg_user(msg) == NAME_DISTRIBUTOR) { 505 } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
506 spin_lock_bh(&bc_lock);
507 bclink_accept_pkt(node, seqno);
508 spin_unlock_bh(&bc_lock);
545 tipc_node_unlock(node); 509 tipc_node_unlock(node);
546 tipc_named_recv(buf); 510 tipc_named_recv(buf);
547 } else { 511 } else {
512 spin_lock_bh(&bc_lock);
513 bclink_accept_pkt(node, seqno);
514 spin_unlock_bh(&bc_lock);
548 tipc_node_unlock(node); 515 tipc_node_unlock(node);
549 buf_discard(buf); 516 buf_discard(buf);
550 } 517 }
551 buf = NULL; 518 buf = NULL;
519
520 /* Determine new synchronization state */
521
552 tipc_node_lock(node); 522 tipc_node_lock(node);
553 deferred = node->bclink.deferred_head; 523 if (unlikely(!tipc_node_is_up(node)))
554 if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) { 524 goto unlock;
555 buf = deferred; 525
556 msg = buf_msg(buf); 526 if (node->bclink.last_in == node->bclink.last_sent)
557 node->bclink.deferred_head = deferred->next; 527 goto unlock;
558 goto receive; 528
559 } 529 if (!node->bclink.deferred_head) {
560 } else if (less(next_in, seqno)) { 530 node->bclink.oos_state = 1;
561 u32 gap_after = node->bclink.gap_after; 531 goto unlock;
562 u32 gap_to = node->bclink.gap_to;
563
564 if (tipc_link_defer_pkt(&node->bclink.deferred_head,
565 &node->bclink.deferred_tail,
566 buf)) {
567 node->bclink.nack_sync++;
568 bcl->stats.deferred_recv++;
569 if (seqno == mod(gap_after + 1))
570 node->bclink.gap_after = seqno;
571 else if (less(gap_after, seqno) && less(seqno, gap_to))
572 node->bclink.gap_to = seqno;
573 } 532 }
533
534 msg = buf_msg(node->bclink.deferred_head);
535 seqno = msg_seqno(msg);
536 next_in = mod(next_in + 1);
537 if (seqno != next_in)
538 goto unlock;
539
540 /* Take in-sequence message from deferred queue & deliver it */
541
542 buf = node->bclink.deferred_head;
543 node->bclink.deferred_head = buf->next;
544 node->bclink.deferred_size--;
545 goto receive;
546 }
547
548 /* Handle out-of-sequence broadcast message */
549
550 if (less(next_in, seqno)) {
551 deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
552 &node->bclink.deferred_tail,
553 buf);
554 node->bclink.deferred_size += deferred;
555 bclink_update_last_sent(node, seqno);
574 buf = NULL; 556 buf = NULL;
575 if (bclink_ack_allowed(node->bclink.nack_sync)) { 557 } else
576 if (gap_to != gap_after) 558 deferred = 0;
577 bclink_send_nack(node); 559
578 bclink_set_gap(node); 560 spin_lock_bh(&bc_lock);
579 } 561
580 } else { 562 if (deferred)
563 bcl->stats.deferred_recv++;
564 else
581 bcl->stats.duplicates++; 565 bcl->stats.duplicates++;
582 } 566
567 spin_unlock_bh(&bc_lock);
568
583unlock: 569unlock:
584 tipc_node_unlock(node); 570 tipc_node_unlock(node);
585exit: 571exit:
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index b009666c60b0..5571394098f9 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -96,7 +96,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf);
96void tipc_bclink_recv_pkt(struct sk_buff *buf); 96void tipc_bclink_recv_pkt(struct sk_buff *buf);
97u32 tipc_bclink_get_last_sent(void); 97u32 tipc_bclink_get_last_sent(void);
98u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr); 98u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
99void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 seqno); 99void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent);
100int tipc_bclink_stats(char *stats_buf, const u32 buf_size); 100int tipc_bclink_stats(char *stats_buf, const u32 buf_size);
101int tipc_bclink_reset_stats(void); 101int tipc_bclink_reset_stats(void);
102int tipc_bclink_set_queue_limits(u32 limit); 102int tipc_bclink_set_queue_limits(u32 limit);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ac1832a66f8a..d8b0a22367b6 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1501,13 +1501,13 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
1501 tipc_node_lock(n_ptr); 1501 tipc_node_lock(n_ptr);
1502 1502
1503 tipc_addr_string_fill(addr_string, n_ptr->addr); 1503 tipc_addr_string_fill(addr_string, n_ptr->addr);
1504 info("Multicast link info for %s\n", addr_string); 1504 info("Broadcast link info for %s\n", addr_string);
1505 info("Supportable: %d, ", n_ptr->bclink.supportable);
1505 info("Supported: %d, ", n_ptr->bclink.supported); 1506 info("Supported: %d, ", n_ptr->bclink.supported);
1506 info("Acked: %u\n", n_ptr->bclink.acked); 1507 info("Acked: %u\n", n_ptr->bclink.acked);
1507 info("Last in: %u, ", n_ptr->bclink.last_in); 1508 info("Last in: %u, ", n_ptr->bclink.last_in);
1508 info("Gap after: %u, ", n_ptr->bclink.gap_after); 1509 info("Oos state: %u, ", n_ptr->bclink.oos_state);
1509 info("Gap to: %u\n", n_ptr->bclink.gap_to); 1510 info("Last sent: %u\n", n_ptr->bclink.last_sent);
1510 info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
1511 1511
1512 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr); 1512 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1513 1513
@@ -1736,7 +1736,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
1736 1736
1737 /* Release acked messages */ 1737 /* Release acked messages */
1738 1738
1739 if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported) 1739 if (n_ptr->bclink.supported)
1740 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1740 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1741 1741
1742 crs = l_ptr->first_out; 1742 crs = l_ptr->first_out;
@@ -1774,6 +1774,7 @@ protocol_check:
1774 head = link_insert_deferred_queue(l_ptr, 1774 head = link_insert_deferred_queue(l_ptr,
1775 head); 1775 head);
1776 if (likely(msg_is_dest(msg, tipc_own_addr))) { 1776 if (likely(msg_is_dest(msg, tipc_own_addr))) {
1777 int ret;
1777deliver: 1778deliver:
1778 if (likely(msg_isdata(msg))) { 1779 if (likely(msg_isdata(msg))) {
1779 tipc_node_unlock(n_ptr); 1780 tipc_node_unlock(n_ptr);
@@ -1798,11 +1799,15 @@ deliver:
1798 continue; 1799 continue;
1799 case MSG_FRAGMENTER: 1800 case MSG_FRAGMENTER:
1800 l_ptr->stats.recv_fragments++; 1801 l_ptr->stats.recv_fragments++;
1801 if (tipc_link_recv_fragment(&l_ptr->defragm_buf, 1802 ret = tipc_link_recv_fragment(
1802 &buf, &msg)) { 1803 &l_ptr->defragm_buf,
1804 &buf, &msg);
1805 if (ret == 1) {
1803 l_ptr->stats.recv_fragmented++; 1806 l_ptr->stats.recv_fragmented++;
1804 goto deliver; 1807 goto deliver;
1805 } 1808 }
1809 if (ret == -1)
1810 l_ptr->next_in_no--;
1806 break; 1811 break;
1807 case CHANGEOVER_PROTOCOL: 1812 case CHANGEOVER_PROTOCOL:
1808 type = msg_type(msg); 1813 type = msg_type(msg);
@@ -1853,17 +1858,16 @@ cont:
1853} 1858}
1854 1859
1855/* 1860/*
1856 * link_defer_buf(): Sort a received out-of-sequence packet 1861 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1857 * into the deferred reception queue. 1862 *
1858 * Returns the increase of the queue length,i.e. 0 or 1 1863 * Returns increase in queue length (i.e. 0 or 1)
1859 */ 1864 */
1860 1865
1861u32 tipc_link_defer_pkt(struct sk_buff **head, 1866u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
1862 struct sk_buff **tail,
1863 struct sk_buff *buf) 1867 struct sk_buff *buf)
1864{ 1868{
1865 struct sk_buff *prev = NULL; 1869 struct sk_buff *queue_buf;
1866 struct sk_buff *crs = *head; 1870 struct sk_buff **prev;
1867 u32 seq_no = buf_seqno(buf); 1871 u32 seq_no = buf_seqno(buf);
1868 1872
1869 buf->next = NULL; 1873 buf->next = NULL;
@@ -1881,31 +1885,30 @@ u32 tipc_link_defer_pkt(struct sk_buff **head,
1881 return 1; 1885 return 1;
1882 } 1886 }
1883 1887
1884 /* Scan through queue and sort it in */ 1888 /* Locate insertion point in queue, then insert; discard if duplicate */
1885 do { 1889 prev = head;
1886 struct tipc_msg *msg = buf_msg(crs); 1890 queue_buf = *head;
1891 for (;;) {
1892 u32 curr_seqno = buf_seqno(queue_buf);
1887 1893
1888 if (less(seq_no, msg_seqno(msg))) { 1894 if (seq_no == curr_seqno) {
1889 buf->next = crs; 1895 buf_discard(buf);
1890 if (prev) 1896 return 0;
1891 prev->next = buf;
1892 else
1893 *head = buf;
1894 return 1;
1895 } 1897 }
1896 if (seq_no == msg_seqno(msg)) 1898
1899 if (less(seq_no, curr_seqno))
1897 break; 1900 break;
1898 prev = crs;
1899 crs = crs->next;
1900 } while (crs);
1901 1901
1902 /* Message is a duplicate of an existing message */ 1902 prev = &queue_buf->next;
1903 queue_buf = queue_buf->next;
1904 }
1903 1905
1904 buf_discard(buf); 1906 buf->next = queue_buf;
1905 return 0; 1907 *prev = buf;
1908 return 1;
1906} 1909}
1907 1910
1908/** 1911/*
1909 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet 1912 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1910 */ 1913 */
1911 1914
@@ -1956,6 +1959,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
1956 u32 msg_size = sizeof(l_ptr->proto_msg); 1959 u32 msg_size = sizeof(l_ptr->proto_msg);
1957 int r_flag; 1960 int r_flag;
1958 1961
1962 /* Discard any previous message that was deferred due to congestion */
1963
1964 if (l_ptr->proto_msg_queue) {
1965 buf_discard(l_ptr->proto_msg_queue);
1966 l_ptr->proto_msg_queue = NULL;
1967 }
1968
1959 if (link_blocked(l_ptr)) 1969 if (link_blocked(l_ptr))
1960 return; 1970 return;
1961 1971
@@ -1964,9 +1974,11 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
1964 if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG)) 1974 if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG))
1965 return; 1975 return;
1966 1976
1977 /* Create protocol message with "out-of-sequence" sequence number */
1978
1967 msg_set_type(msg, msg_typ); 1979 msg_set_type(msg, msg_typ);
1968 msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); 1980 msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
1969 msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); 1981 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1970 msg_set_last_bcast(msg, tipc_bclink_get_last_sent()); 1982 msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
1971 1983
1972 if (msg_typ == STATE_MSG) { 1984 if (msg_typ == STATE_MSG) {
@@ -2020,44 +2032,36 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
2020 r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr)); 2032 r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
2021 msg_set_redundant_link(msg, r_flag); 2033 msg_set_redundant_link(msg, r_flag);
2022 msg_set_linkprio(msg, l_ptr->priority); 2034 msg_set_linkprio(msg, l_ptr->priority);
2023 2035 msg_set_size(msg, msg_size);
2024 /* Ensure sequence number will not fit : */
2025 2036
2026 msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2))); 2037 msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
2027 2038
2028 /* Congestion? */
2029
2030 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
2031 if (!l_ptr->proto_msg_queue) {
2032 l_ptr->proto_msg_queue =
2033 tipc_buf_acquire(sizeof(l_ptr->proto_msg));
2034 }
2035 buf = l_ptr->proto_msg_queue;
2036 if (!buf)
2037 return;
2038 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2039 return;
2040 }
2041
2042 /* Message can be sent */
2043
2044 buf = tipc_buf_acquire(msg_size); 2039 buf = tipc_buf_acquire(msg_size);
2045 if (!buf) 2040 if (!buf)
2046 return; 2041 return;
2047 2042
2048 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 2043 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2049 msg_set_size(buf_msg(buf), msg_size);
2050 2044
2051 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 2045 /* Defer message if bearer is already congested */
2052 l_ptr->unacked_window = 0; 2046
2053 buf_discard(buf); 2047 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
2048 l_ptr->proto_msg_queue = buf;
2054 return; 2049 return;
2055 } 2050 }
2056 2051
2057 /* New congestion */ 2052 /* Defer message if attempting to send results in bearer congestion */
2058 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 2053
2059 l_ptr->proto_msg_queue = buf; 2054 if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
2060 l_ptr->stats.bearer_congs++; 2055 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
2056 l_ptr->proto_msg_queue = buf;
2057 l_ptr->stats.bearer_congs++;
2058 return;
2059 }
2060
2061 /* Discard message if it was sent successfully */
2062
2063 l_ptr->unacked_window = 0;
2064 buf_discard(buf);
2061} 2065}
2062 2066
2063/* 2067/*
@@ -2105,6 +2109,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
2105 l_ptr->owner->block_setup = WAIT_NODE_DOWN; 2109 l_ptr->owner->block_setup = WAIT_NODE_DOWN;
2106 } 2110 }
2107 2111
2112 link_state_event(l_ptr, RESET_MSG);
2113
2108 /* fall thru' */ 2114 /* fall thru' */
2109 case ACTIVATE_MSG: 2115 case ACTIVATE_MSG:
2110 /* Update link settings according other endpoint's values */ 2116 /* Update link settings according other endpoint's values */
@@ -2127,16 +2133,22 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
2127 } else { 2133 } else {
2128 l_ptr->max_pkt = l_ptr->max_pkt_target; 2134 l_ptr->max_pkt = l_ptr->max_pkt_target;
2129 } 2135 }
2130 l_ptr->owner->bclink.supported = (max_pkt_info != 0); 2136 l_ptr->owner->bclink.supportable = (max_pkt_info != 0);
2131 2137
2132 link_state_event(l_ptr, msg_type(msg)); 2138 /* Synchronize broadcast link info, if not done previously */
2139
2140 if (!tipc_node_is_up(l_ptr->owner)) {
2141 l_ptr->owner->bclink.last_sent =
2142 l_ptr->owner->bclink.last_in =
2143 msg_last_bcast(msg);
2144 l_ptr->owner->bclink.oos_state = 0;
2145 }
2133 2146
2134 l_ptr->peer_session = msg_session(msg); 2147 l_ptr->peer_session = msg_session(msg);
2135 l_ptr->peer_bearer_id = msg_bearer_id(msg); 2148 l_ptr->peer_bearer_id = msg_bearer_id(msg);
2136 2149
2137 /* Synchronize broadcast sequence numbers */ 2150 if (msg_type(msg) == ACTIVATE_MSG)
2138 if (!tipc_node_redundant_links(l_ptr->owner)) 2151 link_state_event(l_ptr, ACTIVATE_MSG);
2139 l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2140 break; 2152 break;
2141 case STATE_MSG: 2153 case STATE_MSG:
2142 2154
@@ -2177,7 +2189,9 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
2177 2189
2178 /* Protocol message before retransmits, reduce loss risk */ 2190 /* Protocol message before retransmits, reduce loss risk */
2179 2191
2180 tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg)); 2192 if (l_ptr->owner->bclink.supported)
2193 tipc_bclink_update_link_state(l_ptr->owner,
2194 msg_last_bcast(msg));
2181 2195
2182 if (rec_gap || (msg_probe(msg))) { 2196 if (rec_gap || (msg_probe(msg))) {
2183 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 2197 tipc_link_send_proto_msg(l_ptr, STATE_MSG,
@@ -2623,7 +2637,9 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2623 set_fragm_size(pbuf, fragm_sz); 2637 set_fragm_size(pbuf, fragm_sz);
2624 set_expected_frags(pbuf, exp_fragm_cnt - 1); 2638 set_expected_frags(pbuf, exp_fragm_cnt - 1);
2625 } else { 2639 } else {
2626 warn("Link unable to reassemble fragmented message\n"); 2640 dbg("Link unable to reassemble fragmented message\n");
2641 buf_discard(fbuf);
2642 return -1;
2627 } 2643 }
2628 buf_discard(fbuf); 2644 buf_discard(fbuf);
2629 return 0; 2645 return 0;
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 98ebb37f1808..acecfda82f37 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -239,9 +239,6 @@ exit:
239 * 239 *
240 * Invoked for each publication issued by a newly failed node. 240 * Invoked for each publication issued by a newly failed node.
241 * Removes publication structure from name table & deletes it. 241 * Removes publication structure from name table & deletes it.
242 * In rare cases the link may have come back up again when this
243 * function is called, and we have two items representing the same
244 * publication. Nudge this item's key to distinguish it from the other.
245 */ 242 */
246 243
247static void named_purge_publ(struct publication *publ) 244static void named_purge_publ(struct publication *publ)
@@ -249,7 +246,6 @@ static void named_purge_publ(struct publication *publ)
249 struct publication *p; 246 struct publication *p;
250 247
251 write_lock_bh(&tipc_nametbl_lock); 248 write_lock_bh(&tipc_nametbl_lock);
252 publ->key += 1222345;
253 p = tipc_nametbl_remove_publ(publ->type, publ->lower, 249 p = tipc_nametbl_remove_publ(publ->type, publ->lower,
254 publ->node, publ->ref, publ->key); 250 publ->node, publ->ref, publ->key);
255 if (p) 251 if (p)
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6b226faad89f..7bc45e135fb4 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -49,9 +49,8 @@ LIST_HEAD(tipc_node_list);
49static u32 tipc_num_nodes; 49static u32 tipc_num_nodes;
50 50
51static atomic_t tipc_num_links = ATOMIC_INIT(0); 51static atomic_t tipc_num_links = ATOMIC_INIT(0);
52u32 tipc_own_tag;
53 52
54/** 53/*
55 * tipc_node_find - locate specified node object, if it exists 54 * tipc_node_find - locate specified node object, if it exists
56 */ 55 */
57 56
@@ -306,10 +305,9 @@ static void node_established_contact(struct tipc_node *n_ptr)
306 /* Syncronize broadcast acks */ 305 /* Syncronize broadcast acks */
307 n_ptr->bclink.acked = tipc_bclink_get_last_sent(); 306 n_ptr->bclink.acked = tipc_bclink_get_last_sent();
308 307
309 if (n_ptr->bclink.supported) { 308 if (n_ptr->bclink.supportable) {
310 tipc_bclink_add_node(n_ptr->addr); 309 tipc_bclink_add_node(n_ptr->addr);
311 if (n_ptr->addr < tipc_own_addr) 310 n_ptr->bclink.supported = 1;
312 tipc_own_tag++;
313 } 311 }
314} 312}
315 313
@@ -338,12 +336,12 @@ static void node_lost_contact(struct tipc_node *n_ptr)
338 /* Flush broadcast link info associated with lost node */ 336 /* Flush broadcast link info associated with lost node */
339 337
340 if (n_ptr->bclink.supported) { 338 if (n_ptr->bclink.supported) {
341 n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0;
342 while (n_ptr->bclink.deferred_head) { 339 while (n_ptr->bclink.deferred_head) {
343 struct sk_buff *buf = n_ptr->bclink.deferred_head; 340 struct sk_buff *buf = n_ptr->bclink.deferred_head;
344 n_ptr->bclink.deferred_head = buf->next; 341 n_ptr->bclink.deferred_head = buf->next;
345 buf_discard(buf); 342 buf_discard(buf);
346 } 343 }
344 n_ptr->bclink.deferred_size = 0;
347 345
348 if (n_ptr->bclink.defragm) { 346 if (n_ptr->bclink.defragm) {
349 buf_discard(n_ptr->bclink.defragm); 347 buf_discard(n_ptr->bclink.defragm);
@@ -352,8 +350,6 @@ static void node_lost_contact(struct tipc_node *n_ptr)
352 350
353 tipc_bclink_remove_node(n_ptr->addr); 351 tipc_bclink_remove_node(n_ptr->addr);
354 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ); 352 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
355 if (n_ptr->addr < tipc_own_addr)
356 tipc_own_tag--;
357 353
358 n_ptr->bclink.supported = 0; 354 n_ptr->bclink.supported = 0;
359 } 355 }
@@ -449,7 +445,7 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)
449 445
450 read_lock_bh(&tipc_net_lock); 446 read_lock_bh(&tipc_net_lock);
451 447
452 /* Get space for all unicast links + multicast link */ 448 /* Get space for all unicast links + broadcast link */
453 449
454 payload_size = TLV_SPACE(sizeof(link_info)) * 450 payload_size = TLV_SPACE(sizeof(link_info)) *
455 (atomic_read(&tipc_num_links) + 1); 451 (atomic_read(&tipc_num_links) + 1);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 0b1c5f8b6996..e1b78a2199c2 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -62,12 +62,13 @@
62 * @link_cnt: number of links to node 62 * @link_cnt: number of links to node
63 * @permit_changeover: non-zero if node has redundant links to this system 63 * @permit_changeover: non-zero if node has redundant links to this system
64 * @bclink: broadcast-related info 64 * @bclink: broadcast-related info
65 * @supportable: non-zero if node supports TIPC b'cast link capability
65 * @supported: non-zero if node supports TIPC b'cast capability 66 * @supported: non-zero if node supports TIPC b'cast capability
66 * @acked: sequence # of last outbound b'cast message acknowledged by node 67 * @acked: sequence # of last outbound b'cast message acknowledged by node
67 * @last_in: sequence # of last in-sequence b'cast message received from node 68 * @last_in: sequence # of last in-sequence b'cast message received from node
68 * @gap_after: sequence # of last message not requiring a NAK request 69 * @last_sent: sequence # of last b'cast message sent by node
69 * @gap_to: sequence # of last message requiring a NAK request 70 * @oos_state: state tracker for handling OOS b'cast messages
70 * @nack_sync: counter that determines when NAK requests should be sent 71 * @deferred_size: number of OOS b'cast messages in deferred queue
71 * @deferred_head: oldest OOS b'cast message received from node 72 * @deferred_head: oldest OOS b'cast message received from node
72 * @deferred_tail: newest OOS b'cast message received from node 73 * @deferred_tail: newest OOS b'cast message received from node
73 * @defragm: list of partially reassembled b'cast message fragments from node 74 * @defragm: list of partially reassembled b'cast message fragments from node
@@ -86,12 +87,13 @@ struct tipc_node {
86 int block_setup; 87 int block_setup;
87 int permit_changeover; 88 int permit_changeover;
88 struct { 89 struct {
89 int supported; 90 u8 supportable;
91 u8 supported;
90 u32 acked; 92 u32 acked;
91 u32 last_in; 93 u32 last_in;
92 u32 gap_after; 94 u32 last_sent;
93 u32 gap_to; 95 u32 oos_state;
94 u32 nack_sync; 96 u32 deferred_size;
95 struct sk_buff *deferred_head; 97 struct sk_buff *deferred_head;
96 struct sk_buff *deferred_tail; 98 struct sk_buff *deferred_tail;
97 struct sk_buff *defragm; 99 struct sk_buff *defragm;
@@ -112,8 +114,6 @@ static inline unsigned int tipc_hashfn(u32 addr)
112 return addr & (NODE_HTABLE_SIZE - 1); 114 return addr & (NODE_HTABLE_SIZE - 1);
113} 115}
114 116
115extern u32 tipc_own_tag;
116
117struct tipc_node *tipc_node_find(u32 addr); 117struct tipc_node *tipc_node_find(u32 addr);
118struct tipc_node *tipc_node_create(u32 addr); 118struct tipc_node *tipc_node_create(u32 addr);
119void tipc_node_delete(struct tipc_node *n_ptr); 119void tipc_node_delete(struct tipc_node *n_ptr);
diff --git a/net/tipc/port.c b/net/tipc/port.c
index d91efc69e6f9..ba3268b8da42 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -400,15 +400,16 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
400 400
401 /* send self-abort message when rejecting on a connected port */ 401 /* send self-abort message when rejecting on a connected port */
402 if (msg_connected(msg)) { 402 if (msg_connected(msg)) {
403 struct sk_buff *abuf = NULL;
404 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg)); 403 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
405 404
406 if (p_ptr) { 405 if (p_ptr) {
406 struct sk_buff *abuf = NULL;
407
407 if (p_ptr->connected) 408 if (p_ptr->connected)
408 abuf = port_build_self_abort_msg(p_ptr, err); 409 abuf = port_build_self_abort_msg(p_ptr, err);
409 tipc_port_unlock(p_ptr); 410 tipc_port_unlock(p_ptr);
411 tipc_net_route_msg(abuf);
410 } 412 }
411 tipc_net_route_msg(abuf);
412 } 413 }
413 414
414 /* send returned message & dispose of rejected message */ 415 /* send returned message & dispose of rejected message */