aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c620
1 files changed, 338 insertions, 282 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index da6018beb6eb..c5190ab75290 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -77,19 +77,19 @@ static const char *link_unk_evt = "Unknown link event ";
77 77
78static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, 78static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
79 struct sk_buff *buf); 79 struct sk_buff *buf);
80static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf); 80static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf);
81static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr, 81static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
82 struct sk_buff **buf); 82 struct sk_buff **buf);
83static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); 83static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
84static int link_send_sections_long(struct tipc_port *sender, 84static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
85 struct iovec const *msg_sect, 85 struct iovec const *msg_sect,
86 unsigned int len, u32 destnode); 86 unsigned int len, u32 destnode);
87static void link_state_event(struct tipc_link *l_ptr, u32 event); 87static void link_state_event(struct tipc_link *l_ptr, u32 event);
88static void link_reset_statistics(struct tipc_link *l_ptr); 88static void link_reset_statistics(struct tipc_link *l_ptr);
89static void link_print(struct tipc_link *l_ptr, const char *str); 89static void link_print(struct tipc_link *l_ptr, const char *str);
90static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf); 90static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
91static void tipc_link_send_sync(struct tipc_link *l); 91static void tipc_link_sync_xmit(struct tipc_link *l);
92static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf); 92static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
93 93
94/* 94/*
95 * Simple link routines 95 * Simple link routines
@@ -147,11 +147,6 @@ int tipc_link_is_active(struct tipc_link *l_ptr)
147/** 147/**
148 * link_timeout - handle expiration of link timer 148 * link_timeout - handle expiration of link timer
149 * @l_ptr: pointer to link 149 * @l_ptr: pointer to link
150 *
151 * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
152 * with tipc_link_delete(). (There is no risk that the node will be deleted by
153 * another thread because tipc_link_delete() always cancels the link timer before
154 * tipc_node_delete() is called.)
155 */ 150 */
156static void link_timeout(struct tipc_link *l_ptr) 151static void link_timeout(struct tipc_link *l_ptr)
157{ 152{
@@ -213,8 +208,8 @@ static void link_set_timer(struct tipc_link *l_ptr, u32 time)
213 * Returns pointer to link. 208 * Returns pointer to link.
214 */ 209 */
215struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, 210struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
216 struct tipc_bearer *b_ptr, 211 struct tipc_bearer *b_ptr,
217 const struct tipc_media_addr *media_addr) 212 const struct tipc_media_addr *media_addr)
218{ 213{
219 struct tipc_link *l_ptr; 214 struct tipc_link *l_ptr;
220 struct tipc_msg *msg; 215 struct tipc_msg *msg;
@@ -279,41 +274,44 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
279 274
280 k_init_timer(&l_ptr->timer, (Handler)link_timeout, 275 k_init_timer(&l_ptr->timer, (Handler)link_timeout,
281 (unsigned long)l_ptr); 276 (unsigned long)l_ptr);
282 list_add_tail(&l_ptr->link_list, &b_ptr->links);
283 277
284 link_state_event(l_ptr, STARTING_EVT); 278 link_state_event(l_ptr, STARTING_EVT);
285 279
286 return l_ptr; 280 return l_ptr;
287} 281}
288 282
289/** 283void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
290 * tipc_link_delete - delete a link
291 * @l_ptr: pointer to link
292 *
293 * Note: 'tipc_net_lock' is write_locked, bearer is locked.
294 * This routine must not grab the node lock until after link timer cancellation
295 * to avoid a potential deadlock situation.
296 */
297void tipc_link_delete(struct tipc_link *l_ptr)
298{ 284{
299 if (!l_ptr) { 285 struct tipc_link *l_ptr;
300 pr_err("Attempt to delete non-existent link\n"); 286 struct tipc_node *n_ptr;
301 return;
302 }
303
304 k_cancel_timer(&l_ptr->timer);
305 287
306 tipc_node_lock(l_ptr->owner); 288 rcu_read_lock();
307 tipc_link_reset(l_ptr); 289 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
308 tipc_node_detach_link(l_ptr->owner, l_ptr); 290 spin_lock_bh(&n_ptr->lock);
309 tipc_link_purge_queues(l_ptr); 291 l_ptr = n_ptr->links[bearer_id];
310 list_del_init(&l_ptr->link_list); 292 if (l_ptr) {
311 tipc_node_unlock(l_ptr->owner); 293 tipc_link_reset(l_ptr);
312 k_term_timer(&l_ptr->timer); 294 if (shutting_down || !tipc_node_is_up(n_ptr)) {
313 kfree(l_ptr); 295 tipc_node_detach_link(l_ptr->owner, l_ptr);
296 tipc_link_reset_fragments(l_ptr);
297 spin_unlock_bh(&n_ptr->lock);
298
299 /* Nobody else can access this link now: */
300 del_timer_sync(&l_ptr->timer);
301 kfree(l_ptr);
302 } else {
303 /* Detach/delete when failover is finished: */
304 l_ptr->flags |= LINK_STOPPED;
305 spin_unlock_bh(&n_ptr->lock);
306 del_timer_sync(&l_ptr->timer);
307 }
308 continue;
309 }
310 spin_unlock_bh(&n_ptr->lock);
311 }
312 rcu_read_unlock();
314} 313}
315 314
316
317/** 315/**
318 * link_schedule_port - schedule port for deferred sending 316 * link_schedule_port - schedule port for deferred sending
319 * @l_ptr: pointer to link 317 * @l_ptr: pointer to link
@@ -330,8 +328,6 @@ static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
330 spin_lock_bh(&tipc_port_list_lock); 328 spin_lock_bh(&tipc_port_list_lock);
331 p_ptr = tipc_port_lock(origport); 329 p_ptr = tipc_port_lock(origport);
332 if (p_ptr) { 330 if (p_ptr) {
333 if (!p_ptr->wakeup)
334 goto exit;
335 if (!list_empty(&p_ptr->wait_list)) 331 if (!list_empty(&p_ptr->wait_list))
336 goto exit; 332 goto exit;
337 p_ptr->congested = 1; 333 p_ptr->congested = 1;
@@ -366,7 +362,7 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
366 list_del_init(&p_ptr->wait_list); 362 list_del_init(&p_ptr->wait_list);
367 spin_lock_bh(p_ptr->lock); 363 spin_lock_bh(p_ptr->lock);
368 p_ptr->congested = 0; 364 p_ptr->congested = 0;
369 p_ptr->wakeup(p_ptr); 365 tipc_port_wakeup(p_ptr);
370 win -= p_ptr->waiting_pkts; 366 win -= p_ptr->waiting_pkts;
371 spin_unlock_bh(p_ptr->lock); 367 spin_unlock_bh(p_ptr->lock);
372 } 368 }
@@ -461,6 +457,21 @@ void tipc_link_reset(struct tipc_link *l_ptr)
461 link_reset_statistics(l_ptr); 457 link_reset_statistics(l_ptr);
462} 458}
463 459
460void tipc_link_reset_list(unsigned int bearer_id)
461{
462 struct tipc_link *l_ptr;
463 struct tipc_node *n_ptr;
464
465 rcu_read_lock();
466 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
467 spin_lock_bh(&n_ptr->lock);
468 l_ptr = n_ptr->links[bearer_id];
469 if (l_ptr)
470 tipc_link_reset(l_ptr);
471 spin_unlock_bh(&n_ptr->lock);
472 }
473 rcu_read_unlock();
474}
464 475
465static void link_activate(struct tipc_link *l_ptr) 476static void link_activate(struct tipc_link *l_ptr)
466{ 477{
@@ -479,7 +490,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
479 struct tipc_link *other; 490 struct tipc_link *other;
480 u32 cont_intv = l_ptr->continuity_interval; 491 u32 cont_intv = l_ptr->continuity_interval;
481 492
482 if (!l_ptr->started && (event != STARTING_EVT)) 493 if (l_ptr->flags & LINK_STOPPED)
494 return;
495
496 if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))
483 return; /* Not yet. */ 497 return; /* Not yet. */
484 498
485 /* Check whether changeover is going on */ 499 /* Check whether changeover is going on */
@@ -499,12 +513,12 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
499 if (l_ptr->next_in_no != l_ptr->checkpoint) { 513 if (l_ptr->next_in_no != l_ptr->checkpoint) {
500 l_ptr->checkpoint = l_ptr->next_in_no; 514 l_ptr->checkpoint = l_ptr->next_in_no;
501 if (tipc_bclink_acks_missing(l_ptr->owner)) { 515 if (tipc_bclink_acks_missing(l_ptr->owner)) {
502 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 516 tipc_link_proto_xmit(l_ptr, STATE_MSG,
503 0, 0, 0, 0, 0); 517 0, 0, 0, 0, 0);
504 l_ptr->fsm_msg_cnt++; 518 l_ptr->fsm_msg_cnt++;
505 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) { 519 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
506 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 520 tipc_link_proto_xmit(l_ptr, STATE_MSG,
507 1, 0, 0, 0, 0); 521 1, 0, 0, 0, 0);
508 l_ptr->fsm_msg_cnt++; 522 l_ptr->fsm_msg_cnt++;
509 } 523 }
510 link_set_timer(l_ptr, cont_intv); 524 link_set_timer(l_ptr, cont_intv);
@@ -512,7 +526,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
512 } 526 }
513 l_ptr->state = WORKING_UNKNOWN; 527 l_ptr->state = WORKING_UNKNOWN;
514 l_ptr->fsm_msg_cnt = 0; 528 l_ptr->fsm_msg_cnt = 0;
515 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 529 tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
516 l_ptr->fsm_msg_cnt++; 530 l_ptr->fsm_msg_cnt++;
517 link_set_timer(l_ptr, cont_intv / 4); 531 link_set_timer(l_ptr, cont_intv / 4);
518 break; 532 break;
@@ -522,7 +536,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
522 tipc_link_reset(l_ptr); 536 tipc_link_reset(l_ptr);
523 l_ptr->state = RESET_RESET; 537 l_ptr->state = RESET_RESET;
524 l_ptr->fsm_msg_cnt = 0; 538 l_ptr->fsm_msg_cnt = 0;
525 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 539 tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
540 0, 0, 0, 0, 0);
526 l_ptr->fsm_msg_cnt++; 541 l_ptr->fsm_msg_cnt++;
527 link_set_timer(l_ptr, cont_intv); 542 link_set_timer(l_ptr, cont_intv);
528 break; 543 break;
@@ -544,7 +559,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
544 tipc_link_reset(l_ptr); 559 tipc_link_reset(l_ptr);
545 l_ptr->state = RESET_RESET; 560 l_ptr->state = RESET_RESET;
546 l_ptr->fsm_msg_cnt = 0; 561 l_ptr->fsm_msg_cnt = 0;
547 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 562 tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
563 0, 0, 0, 0, 0);
548 l_ptr->fsm_msg_cnt++; 564 l_ptr->fsm_msg_cnt++;
549 link_set_timer(l_ptr, cont_intv); 565 link_set_timer(l_ptr, cont_intv);
550 break; 566 break;
@@ -554,14 +570,14 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
554 l_ptr->fsm_msg_cnt = 0; 570 l_ptr->fsm_msg_cnt = 0;
555 l_ptr->checkpoint = l_ptr->next_in_no; 571 l_ptr->checkpoint = l_ptr->next_in_no;
556 if (tipc_bclink_acks_missing(l_ptr->owner)) { 572 if (tipc_bclink_acks_missing(l_ptr->owner)) {
557 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 573 tipc_link_proto_xmit(l_ptr, STATE_MSG,
558 0, 0, 0, 0, 0); 574 0, 0, 0, 0, 0);
559 l_ptr->fsm_msg_cnt++; 575 l_ptr->fsm_msg_cnt++;
560 } 576 }
561 link_set_timer(l_ptr, cont_intv); 577 link_set_timer(l_ptr, cont_intv);
562 } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) { 578 } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
563 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 579 tipc_link_proto_xmit(l_ptr, STATE_MSG,
564 1, 0, 0, 0, 0); 580 1, 0, 0, 0, 0);
565 l_ptr->fsm_msg_cnt++; 581 l_ptr->fsm_msg_cnt++;
566 link_set_timer(l_ptr, cont_intv / 4); 582 link_set_timer(l_ptr, cont_intv / 4);
567 } else { /* Link has failed */ 583 } else { /* Link has failed */
@@ -570,8 +586,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
570 tipc_link_reset(l_ptr); 586 tipc_link_reset(l_ptr);
571 l_ptr->state = RESET_UNKNOWN; 587 l_ptr->state = RESET_UNKNOWN;
572 l_ptr->fsm_msg_cnt = 0; 588 l_ptr->fsm_msg_cnt = 0;
573 tipc_link_send_proto_msg(l_ptr, RESET_MSG, 589 tipc_link_proto_xmit(l_ptr, RESET_MSG,
574 0, 0, 0, 0, 0); 590 0, 0, 0, 0, 0);
575 l_ptr->fsm_msg_cnt++; 591 l_ptr->fsm_msg_cnt++;
576 link_set_timer(l_ptr, cont_intv); 592 link_set_timer(l_ptr, cont_intv);
577 } 593 }
@@ -591,24 +607,25 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
591 l_ptr->state = WORKING_WORKING; 607 l_ptr->state = WORKING_WORKING;
592 l_ptr->fsm_msg_cnt = 0; 608 l_ptr->fsm_msg_cnt = 0;
593 link_activate(l_ptr); 609 link_activate(l_ptr);
594 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 610 tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
595 l_ptr->fsm_msg_cnt++; 611 l_ptr->fsm_msg_cnt++;
596 if (l_ptr->owner->working_links == 1) 612 if (l_ptr->owner->working_links == 1)
597 tipc_link_send_sync(l_ptr); 613 tipc_link_sync_xmit(l_ptr);
598 link_set_timer(l_ptr, cont_intv); 614 link_set_timer(l_ptr, cont_intv);
599 break; 615 break;
600 case RESET_MSG: 616 case RESET_MSG:
601 l_ptr->state = RESET_RESET; 617 l_ptr->state = RESET_RESET;
602 l_ptr->fsm_msg_cnt = 0; 618 l_ptr->fsm_msg_cnt = 0;
603 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0); 619 tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
620 1, 0, 0, 0, 0);
604 l_ptr->fsm_msg_cnt++; 621 l_ptr->fsm_msg_cnt++;
605 link_set_timer(l_ptr, cont_intv); 622 link_set_timer(l_ptr, cont_intv);
606 break; 623 break;
607 case STARTING_EVT: 624 case STARTING_EVT:
608 l_ptr->started = 1; 625 l_ptr->flags |= LINK_STARTED;
609 /* fall through */ 626 /* fall through */
610 case TIMEOUT_EVT: 627 case TIMEOUT_EVT:
611 tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); 628 tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
612 l_ptr->fsm_msg_cnt++; 629 l_ptr->fsm_msg_cnt++;
613 link_set_timer(l_ptr, cont_intv); 630 link_set_timer(l_ptr, cont_intv);
614 break; 631 break;
@@ -626,16 +643,17 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
626 l_ptr->state = WORKING_WORKING; 643 l_ptr->state = WORKING_WORKING;
627 l_ptr->fsm_msg_cnt = 0; 644 l_ptr->fsm_msg_cnt = 0;
628 link_activate(l_ptr); 645 link_activate(l_ptr);
629 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 646 tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
630 l_ptr->fsm_msg_cnt++; 647 l_ptr->fsm_msg_cnt++;
631 if (l_ptr->owner->working_links == 1) 648 if (l_ptr->owner->working_links == 1)
632 tipc_link_send_sync(l_ptr); 649 tipc_link_sync_xmit(l_ptr);
633 link_set_timer(l_ptr, cont_intv); 650 link_set_timer(l_ptr, cont_intv);
634 break; 651 break;
635 case RESET_MSG: 652 case RESET_MSG:
636 break; 653 break;
637 case TIMEOUT_EVT: 654 case TIMEOUT_EVT:
638 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 655 tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
656 0, 0, 0, 0, 0);
639 l_ptr->fsm_msg_cnt++; 657 l_ptr->fsm_msg_cnt++;
640 link_set_timer(l_ptr, cont_intv); 658 link_set_timer(l_ptr, cont_intv);
641 break; 659 break;
@@ -721,11 +739,11 @@ static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
721} 739}
722 740
723/* 741/*
724 * tipc_link_send_buf() is the 'full path' for messages, called from 742 * tipc_link_xmit() is the 'full path' for messages, called from
725 * inside TIPC when the 'fast path' in tipc_send_buf 743 * inside TIPC when the 'fast path' in tipc_send_xmit
726 * has failed, and from link_send() 744 * has failed, and from link_send()
727 */ 745 */
728int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) 746int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
729{ 747{
730 struct tipc_msg *msg = buf_msg(buf); 748 struct tipc_msg *msg = buf_msg(buf);
731 u32 size = msg_size(msg); 749 u32 size = msg_size(msg);
@@ -753,7 +771,7 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
753 771
754 /* Fragmentation needed ? */ 772 /* Fragmentation needed ? */
755 if (size > max_packet) 773 if (size > max_packet)
756 return link_send_long_buf(l_ptr, buf); 774 return tipc_link_frag_xmit(l_ptr, buf);
757 775
758 /* Packet can be queued or sent. */ 776 /* Packet can be queued or sent. */
759 if (likely(!link_congested(l_ptr))) { 777 if (likely(!link_congested(l_ptr))) {
@@ -797,11 +815,11 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
797} 815}
798 816
799/* 817/*
800 * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has 818 * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use
801 * not been selected yet, and the the owner node is not locked 819 * has not been selected yet, and the the owner node is not locked
802 * Called by TIPC internal users, e.g. the name distributor 820 * Called by TIPC internal users, e.g. the name distributor
803 */ 821 */
804int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector) 822int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
805{ 823{
806 struct tipc_link *l_ptr; 824 struct tipc_link *l_ptr;
807 struct tipc_node *n_ptr; 825 struct tipc_node *n_ptr;
@@ -813,7 +831,7 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
813 tipc_node_lock(n_ptr); 831 tipc_node_lock(n_ptr);
814 l_ptr = n_ptr->active_links[selector & 1]; 832 l_ptr = n_ptr->active_links[selector & 1];
815 if (l_ptr) 833 if (l_ptr)
816 res = tipc_link_send_buf(l_ptr, buf); 834 res = __tipc_link_xmit(l_ptr, buf);
817 else 835 else
818 kfree_skb(buf); 836 kfree_skb(buf);
819 tipc_node_unlock(n_ptr); 837 tipc_node_unlock(n_ptr);
@@ -825,14 +843,14 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
825} 843}
826 844
827/* 845/*
828 * tipc_link_send_sync - synchronize broadcast link endpoints. 846 * tipc_link_sync_xmit - synchronize broadcast link endpoints.
829 * 847 *
830 * Give a newly added peer node the sequence number where it should 848 * Give a newly added peer node the sequence number where it should
831 * start receiving and acking broadcast packets. 849 * start receiving and acking broadcast packets.
832 * 850 *
833 * Called with node locked 851 * Called with node locked
834 */ 852 */
835static void tipc_link_send_sync(struct tipc_link *l) 853static void tipc_link_sync_xmit(struct tipc_link *l)
836{ 854{
837 struct sk_buff *buf; 855 struct sk_buff *buf;
838 struct tipc_msg *msg; 856 struct tipc_msg *msg;
@@ -849,14 +867,14 @@ static void tipc_link_send_sync(struct tipc_link *l)
849} 867}
850 868
851/* 869/*
852 * tipc_link_recv_sync - synchronize broadcast link endpoints. 870 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
853 * Receive the sequence number where we should start receiving and 871 * Receive the sequence number where we should start receiving and
854 * acking broadcast packets from a newly added peer node, and open 872 * acking broadcast packets from a newly added peer node, and open
855 * up for reception of such packets. 873 * up for reception of such packets.
856 * 874 *
857 * Called with node locked 875 * Called with node locked
858 */ 876 */
859static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf) 877static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
860{ 878{
861 struct tipc_msg *msg = buf_msg(buf); 879 struct tipc_msg *msg = buf_msg(buf);
862 880
@@ -866,7 +884,7 @@ static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
866} 884}
867 885
868/* 886/*
869 * tipc_link_send_names - send name table entries to new neighbor 887 * tipc_link_names_xmit - send name table entries to new neighbor
870 * 888 *
871 * Send routine for bulk delivery of name table messages when contact 889 * Send routine for bulk delivery of name table messages when contact
872 * with a new neighbor occurs. No link congestion checking is performed 890 * with a new neighbor occurs. No link congestion checking is performed
@@ -874,7 +892,7 @@ static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
874 * small enough not to require fragmentation. 892 * small enough not to require fragmentation.
875 * Called without any locks held. 893 * Called without any locks held.
876 */ 894 */
877void tipc_link_send_names(struct list_head *message_list, u32 dest) 895void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
878{ 896{
879 struct tipc_node *n_ptr; 897 struct tipc_node *n_ptr;
880 struct tipc_link *l_ptr; 898 struct tipc_link *l_ptr;
@@ -909,13 +927,13 @@ void tipc_link_send_names(struct list_head *message_list, u32 dest)
909} 927}
910 928
911/* 929/*
912 * link_send_buf_fast: Entry for data messages where the 930 * tipc_link_xmit_fast: Entry for data messages where the
913 * destination link is known and the header is complete, 931 * destination link is known and the header is complete,
914 * inclusive total message length. Very time critical. 932 * inclusive total message length. Very time critical.
915 * Link is locked. Returns user data length. 933 * Link is locked. Returns user data length.
916 */ 934 */
917static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf, 935static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
918 u32 *used_max_pkt) 936 u32 *used_max_pkt)
919{ 937{
920 struct tipc_msg *msg = buf_msg(buf); 938 struct tipc_msg *msg = buf_msg(buf);
921 int res = msg_data_sz(msg); 939 int res = msg_data_sz(msg);
@@ -931,18 +949,18 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
931 else 949 else
932 *used_max_pkt = l_ptr->max_pkt; 950 *used_max_pkt = l_ptr->max_pkt;
933 } 951 }
934 return tipc_link_send_buf(l_ptr, buf); /* All other cases */ 952 return __tipc_link_xmit(l_ptr, buf); /* All other cases */
935} 953}
936 954
937/* 955/*
938 * tipc_link_send_sections_fast: Entry for messages where the 956 * tipc_link_iovec_xmit_fast: Entry for messages where the
939 * destination processor is known and the header is complete, 957 * destination processor is known and the header is complete,
940 * except for total message length. 958 * except for total message length.
941 * Returns user data length or errno. 959 * Returns user data length or errno.
942 */ 960 */
943int tipc_link_send_sections_fast(struct tipc_port *sender, 961int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
944 struct iovec const *msg_sect, 962 struct iovec const *msg_sect,
945 unsigned int len, u32 destaddr) 963 unsigned int len, u32 destaddr)
946{ 964{
947 struct tipc_msg *hdr = &sender->phdr; 965 struct tipc_msg *hdr = &sender->phdr;
948 struct tipc_link *l_ptr; 966 struct tipc_link *l_ptr;
@@ -968,8 +986,8 @@ again:
968 l_ptr = node->active_links[selector]; 986 l_ptr = node->active_links[selector];
969 if (likely(l_ptr)) { 987 if (likely(l_ptr)) {
970 if (likely(buf)) { 988 if (likely(buf)) {
971 res = link_send_buf_fast(l_ptr, buf, 989 res = tipc_link_xmit_fast(l_ptr, buf,
972 &sender->max_pkt); 990 &sender->max_pkt);
973exit: 991exit:
974 tipc_node_unlock(node); 992 tipc_node_unlock(node);
975 read_unlock_bh(&tipc_net_lock); 993 read_unlock_bh(&tipc_net_lock);
@@ -995,24 +1013,21 @@ exit:
995 if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt) 1013 if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
996 goto again; 1014 goto again;
997 1015
998 return link_send_sections_long(sender, msg_sect, len, 1016 return tipc_link_iovec_long_xmit(sender, msg_sect,
999 destaddr); 1017 len, destaddr);
1000 } 1018 }
1001 tipc_node_unlock(node); 1019 tipc_node_unlock(node);
1002 } 1020 }
1003 read_unlock_bh(&tipc_net_lock); 1021 read_unlock_bh(&tipc_net_lock);
1004 1022
1005 /* Couldn't find a link to the destination node */ 1023 /* Couldn't find a link to the destination node */
1006 if (buf) 1024 kfree_skb(buf);
1007 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE); 1025 tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE);
1008 if (res >= 0) 1026 return -ENETUNREACH;
1009 return tipc_port_reject_sections(sender, hdr, msg_sect,
1010 len, TIPC_ERR_NO_NODE);
1011 return res;
1012} 1027}
1013 1028
1014/* 1029/*
1015 * link_send_sections_long(): Entry for long messages where the 1030 * tipc_link_iovec_long_xmit(): Entry for long messages where the
1016 * destination node is known and the header is complete, 1031 * destination node is known and the header is complete,
1017 * inclusive total message length. 1032 * inclusive total message length.
1018 * Link and bearer congestion status have been checked to be ok, 1033 * Link and bearer congestion status have been checked to be ok,
@@ -1025,9 +1040,9 @@ exit:
1025 * 1040 *
1026 * Returns user data length or errno. 1041 * Returns user data length or errno.
1027 */ 1042 */
1028static int link_send_sections_long(struct tipc_port *sender, 1043static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
1029 struct iovec const *msg_sect, 1044 struct iovec const *msg_sect,
1030 unsigned int len, u32 destaddr) 1045 unsigned int len, u32 destaddr)
1031{ 1046{
1032 struct tipc_link *l_ptr; 1047 struct tipc_link *l_ptr;
1033 struct tipc_node *node; 1048 struct tipc_node *node;
@@ -1146,8 +1161,9 @@ error:
1146 } else { 1161 } else {
1147reject: 1162reject:
1148 kfree_skb_list(buf_chain); 1163 kfree_skb_list(buf_chain);
1149 return tipc_port_reject_sections(sender, hdr, msg_sect, 1164 tipc_port_iovec_reject(sender, hdr, msg_sect, len,
1150 len, TIPC_ERR_NO_NODE); 1165 TIPC_ERR_NO_NODE);
1166 return -ENETUNREACH;
1151 } 1167 }
1152 1168
1153 /* Append chain of fragments to send queue & send them */ 1169 /* Append chain of fragments to send queue & send them */
@@ -1441,15 +1457,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1441 u32 seq_no; 1457 u32 seq_no;
1442 u32 ackd; 1458 u32 ackd;
1443 u32 released = 0; 1459 u32 released = 0;
1444 int type;
1445 1460
1446 head = head->next; 1461 head = head->next;
1447 buf->next = NULL; 1462 buf->next = NULL;
1448 1463
1449 /* Ensure bearer is still enabled */
1450 if (unlikely(!b_ptr->active))
1451 goto discard;
1452
1453 /* Ensure message is well-formed */ 1464 /* Ensure message is well-formed */
1454 if (unlikely(!link_recv_buf_validate(buf))) 1465 if (unlikely(!link_recv_buf_validate(buf)))
1455 goto discard; 1466 goto discard;
@@ -1463,9 +1474,9 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1463 1474
1464 if (unlikely(msg_non_seq(msg))) { 1475 if (unlikely(msg_non_seq(msg))) {
1465 if (msg_user(msg) == LINK_CONFIG) 1476 if (msg_user(msg) == LINK_CONFIG)
1466 tipc_disc_recv_msg(buf, b_ptr); 1477 tipc_disc_rcv(buf, b_ptr);
1467 else 1478 else
1468 tipc_bclink_recv_pkt(buf); 1479 tipc_bclink_rcv(buf);
1469 continue; 1480 continue;
1470 } 1481 }
1471 1482
@@ -1489,7 +1500,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1489 if ((n_ptr->block_setup & WAIT_PEER_DOWN) && 1500 if ((n_ptr->block_setup & WAIT_PEER_DOWN) &&
1490 msg_user(msg) == LINK_PROTOCOL && 1501 msg_user(msg) == LINK_PROTOCOL &&
1491 (msg_type(msg) == RESET_MSG || 1502 (msg_type(msg) == RESET_MSG ||
1492 msg_type(msg) == ACTIVATE_MSG) && 1503 msg_type(msg) == ACTIVATE_MSG) &&
1493 !msg_redundant_link(msg)) 1504 !msg_redundant_link(msg))
1494 n_ptr->block_setup &= ~WAIT_PEER_DOWN; 1505 n_ptr->block_setup &= ~WAIT_PEER_DOWN;
1495 1506
@@ -1508,7 +1519,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1508 while ((crs != l_ptr->next_out) && 1519 while ((crs != l_ptr->next_out) &&
1509 less_eq(buf_seqno(crs), ackd)) { 1520 less_eq(buf_seqno(crs), ackd)) {
1510 struct sk_buff *next = crs->next; 1521 struct sk_buff *next = crs->next;
1511
1512 kfree_skb(crs); 1522 kfree_skb(crs);
1513 crs = next; 1523 crs = next;
1514 released++; 1524 released++;
@@ -1521,18 +1531,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1521 /* Try sending any messages link endpoint has pending */ 1531 /* Try sending any messages link endpoint has pending */
1522 if (unlikely(l_ptr->next_out)) 1532 if (unlikely(l_ptr->next_out))
1523 tipc_link_push_queue(l_ptr); 1533 tipc_link_push_queue(l_ptr);
1534
1524 if (unlikely(!list_empty(&l_ptr->waiting_ports))) 1535 if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1525 tipc_link_wakeup_ports(l_ptr, 0); 1536 tipc_link_wakeup_ports(l_ptr, 0);
1537
1526 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { 1538 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1527 l_ptr->stats.sent_acks++; 1539 l_ptr->stats.sent_acks++;
1528 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1540 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1529 } 1541 }
1530 1542
1531 /* Now (finally!) process the incoming message */ 1543 /* Process the incoming packet */
1532protocol_check:
1533 if (unlikely(!link_working_working(l_ptr))) { 1544 if (unlikely(!link_working_working(l_ptr))) {
1534 if (msg_user(msg) == LINK_PROTOCOL) { 1545 if (msg_user(msg) == LINK_PROTOCOL) {
1535 link_recv_proto_msg(l_ptr, buf); 1546 tipc_link_proto_rcv(l_ptr, buf);
1536 head = link_insert_deferred_queue(l_ptr, head); 1547 head = link_insert_deferred_queue(l_ptr, head);
1537 tipc_node_unlock(n_ptr); 1548 tipc_node_unlock(n_ptr);
1538 continue; 1549 continue;
@@ -1561,67 +1572,65 @@ protocol_check:
1561 l_ptr->next_in_no++; 1572 l_ptr->next_in_no++;
1562 if (unlikely(l_ptr->oldest_deferred_in)) 1573 if (unlikely(l_ptr->oldest_deferred_in))
1563 head = link_insert_deferred_queue(l_ptr, head); 1574 head = link_insert_deferred_queue(l_ptr, head);
1564deliver: 1575
1565 if (likely(msg_isdata(msg))) { 1576 /* Deliver packet/message to correct user: */
1566 tipc_node_unlock(n_ptr); 1577 if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) {
1567 tipc_port_recv_msg(buf); 1578 if (!tipc_link_tunnel_rcv(n_ptr, &buf)) {
1568 continue; 1579 tipc_node_unlock(n_ptr);
1580 continue;
1581 }
1582 msg = buf_msg(buf);
1583 } else if (msg_user(msg) == MSG_FRAGMENTER) {
1584 int rc;
1585
1586 l_ptr->stats.recv_fragments++;
1587 rc = tipc_link_frag_rcv(&l_ptr->reasm_head,
1588 &l_ptr->reasm_tail,
1589 &buf);
1590 if (rc == LINK_REASM_COMPLETE) {
1591 l_ptr->stats.recv_fragmented++;
1592 msg = buf_msg(buf);
1593 } else {
1594 if (rc == LINK_REASM_ERROR)
1595 tipc_link_reset(l_ptr);
1596 tipc_node_unlock(n_ptr);
1597 continue;
1598 }
1569 } 1599 }
1600
1570 switch (msg_user(msg)) { 1601 switch (msg_user(msg)) {
1571 int ret; 1602 case TIPC_LOW_IMPORTANCE:
1603 case TIPC_MEDIUM_IMPORTANCE:
1604 case TIPC_HIGH_IMPORTANCE:
1605 case TIPC_CRITICAL_IMPORTANCE:
1606 tipc_node_unlock(n_ptr);
1607 tipc_port_rcv(buf);
1608 continue;
1572 case MSG_BUNDLER: 1609 case MSG_BUNDLER:
1573 l_ptr->stats.recv_bundles++; 1610 l_ptr->stats.recv_bundles++;
1574 l_ptr->stats.recv_bundled += msg_msgcnt(msg); 1611 l_ptr->stats.recv_bundled += msg_msgcnt(msg);
1575 tipc_node_unlock(n_ptr); 1612 tipc_node_unlock(n_ptr);
1576 tipc_link_recv_bundle(buf); 1613 tipc_link_bundle_rcv(buf);
1577 continue; 1614 continue;
1578 case NAME_DISTRIBUTOR: 1615 case NAME_DISTRIBUTOR:
1579 n_ptr->bclink.recv_permitted = true; 1616 n_ptr->bclink.recv_permitted = true;
1580 tipc_node_unlock(n_ptr); 1617 tipc_node_unlock(n_ptr);
1581 tipc_named_recv(buf); 1618 tipc_named_rcv(buf);
1582 continue;
1583 case BCAST_PROTOCOL:
1584 tipc_link_recv_sync(n_ptr, buf);
1585 tipc_node_unlock(n_ptr);
1586 continue; 1619 continue;
1587 case CONN_MANAGER: 1620 case CONN_MANAGER:
1588 tipc_node_unlock(n_ptr); 1621 tipc_node_unlock(n_ptr);
1589 tipc_port_recv_proto_msg(buf); 1622 tipc_port_proto_rcv(buf);
1590 continue; 1623 continue;
1591 case MSG_FRAGMENTER: 1624 case BCAST_PROTOCOL:
1592 l_ptr->stats.recv_fragments++; 1625 tipc_link_sync_rcv(n_ptr, buf);
1593 ret = tipc_link_recv_fragment(&l_ptr->reasm_head,
1594 &l_ptr->reasm_tail,
1595 &buf);
1596 if (ret == LINK_REASM_COMPLETE) {
1597 l_ptr->stats.recv_fragmented++;
1598 msg = buf_msg(buf);
1599 goto deliver;
1600 }
1601 if (ret == LINK_REASM_ERROR)
1602 tipc_link_reset(l_ptr);
1603 tipc_node_unlock(n_ptr);
1604 continue;
1605 case CHANGEOVER_PROTOCOL:
1606 type = msg_type(msg);
1607 if (tipc_link_tunnel_rcv(&l_ptr, &buf)) {
1608 msg = buf_msg(buf);
1609 seq_no = msg_seqno(msg);
1610 if (type == ORIGINAL_MSG)
1611 goto deliver;
1612 goto protocol_check;
1613 }
1614 break; 1626 break;
1615 default: 1627 default:
1616 kfree_skb(buf); 1628 kfree_skb(buf);
1617 buf = NULL;
1618 break; 1629 break;
1619 } 1630 }
1620 tipc_node_unlock(n_ptr); 1631 tipc_node_unlock(n_ptr);
1621 tipc_net_route_msg(buf);
1622 continue; 1632 continue;
1623unlock_discard: 1633unlock_discard:
1624
1625 tipc_node_unlock(n_ptr); 1634 tipc_node_unlock(n_ptr);
1626discard: 1635discard:
1627 kfree_skb(buf); 1636 kfree_skb(buf);
@@ -1688,7 +1697,7 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1688 u32 seq_no = buf_seqno(buf); 1697 u32 seq_no = buf_seqno(buf);
1689 1698
1690 if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { 1699 if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1691 link_recv_proto_msg(l_ptr, buf); 1700 tipc_link_proto_rcv(l_ptr, buf);
1692 return; 1701 return;
1693 } 1702 }
1694 1703
@@ -1711,7 +1720,7 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1711 l_ptr->stats.deferred_recv++; 1720 l_ptr->stats.deferred_recv++;
1712 TIPC_SKB_CB(buf)->deferred = true; 1721 TIPC_SKB_CB(buf)->deferred = true;
1713 if ((l_ptr->deferred_inqueue_sz % 16) == 1) 1722 if ((l_ptr->deferred_inqueue_sz % 16) == 1)
1714 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1723 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1715 } else 1724 } else
1716 l_ptr->stats.duplicates++; 1725 l_ptr->stats.duplicates++;
1717} 1726}
@@ -1719,9 +1728,8 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1719/* 1728/*
1720 * Send protocol message to the other endpoint. 1729 * Send protocol message to the other endpoint.
1721 */ 1730 */
1722void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, 1731void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1723 int probe_msg, u32 gap, u32 tolerance, 1732 u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
1724 u32 priority, u32 ack_mtu)
1725{ 1733{
1726 struct sk_buff *buf = NULL; 1734 struct sk_buff *buf = NULL;
1727 struct tipc_msg *msg = l_ptr->pmsg; 1735 struct tipc_msg *msg = l_ptr->pmsg;
@@ -1820,7 +1828,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
1820 * Note that network plane id propagates through the network, and may 1828 * Note that network plane id propagates through the network, and may
1821 * change at any time. The node with lowest address rules 1829 * change at any time. The node with lowest address rules
1822 */ 1830 */
1823static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) 1831static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
1824{ 1832{
1825 u32 rec_gap = 0; 1833 u32 rec_gap = 0;
1826 u32 max_pkt_info; 1834 u32 max_pkt_info;
@@ -1939,8 +1947,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
1939 msg_last_bcast(msg)); 1947 msg_last_bcast(msg));
1940 1948
1941 if (rec_gap || (msg_probe(msg))) { 1949 if (rec_gap || (msg_probe(msg))) {
1942 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1950 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, rec_gap, 0,
1943 0, rec_gap, 0, 0, max_pkt_ack); 1951 0, max_pkt_ack);
1944 } 1952 }
1945 if (msg_seq_gap(msg)) { 1953 if (msg_seq_gap(msg)) {
1946 l_ptr->stats.recv_nacks++; 1954 l_ptr->stats.recv_nacks++;
@@ -1979,7 +1987,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1979 } 1987 }
1980 skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); 1988 skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
1981 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); 1989 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
1982 tipc_link_send_buf(tunnel, buf); 1990 __tipc_link_xmit(tunnel, buf);
1983} 1991}
1984 1992
1985 1993
@@ -2012,7 +2020,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
2012 if (buf) { 2020 if (buf) {
2013 skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE); 2021 skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2014 msg_set_size(&tunnel_hdr, INT_H_SIZE); 2022 msg_set_size(&tunnel_hdr, INT_H_SIZE);
2015 tipc_link_send_buf(tunnel, buf); 2023 __tipc_link_xmit(tunnel, buf);
2016 } else { 2024 } else {
2017 pr_warn("%sunable to send changeover msg\n", 2025 pr_warn("%sunable to send changeover msg\n",
2018 link_co_err); 2026 link_co_err);
@@ -2046,7 +2054,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
2046 } 2054 }
2047} 2055}
2048 2056
2049/* tipc_link_dup_send_queue(): A second link has become active. Tunnel a 2057/* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a
2050 * duplicate of the first link's send queue via the new link. This way, we 2058 * duplicate of the first link's send queue via the new link. This way, we
2051 * are guaranteed that currently queued packets from a socket are delivered 2059 * are guaranteed that currently queued packets from a socket are delivered
2052 * before future traffic from the same socket, even if this is using the 2060 * before future traffic from the same socket, even if this is using the
@@ -2055,7 +2063,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
2055 * and sequence order is preserved per sender/receiver socket pair. 2063 * and sequence order is preserved per sender/receiver socket pair.
2056 * Owner node is locked. 2064 * Owner node is locked.
2057 */ 2065 */
2058void tipc_link_dup_send_queue(struct tipc_link *l_ptr, 2066void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
2059 struct tipc_link *tunnel) 2067 struct tipc_link *tunnel)
2060{ 2068{
2061 struct sk_buff *iter; 2069 struct sk_buff *iter;
@@ -2085,7 +2093,7 @@ void tipc_link_dup_send_queue(struct tipc_link *l_ptr,
2085 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); 2093 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2086 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, 2094 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2087 length); 2095 length);
2088 tipc_link_send_buf(tunnel, outbuf); 2096 __tipc_link_xmit(tunnel, outbuf);
2089 if (!tipc_link_is_up(l_ptr)) 2097 if (!tipc_link_is_up(l_ptr))
2090 return; 2098 return;
2091 iter = iter->next; 2099 iter = iter->next;
@@ -2112,89 +2120,114 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2112 return eb; 2120 return eb;
2113} 2121}
2114 2122
2115/* tipc_link_tunnel_rcv(): Receive a tunneled packet, sent 2123
2116 * via other link as result of a failover (ORIGINAL_MSG) or 2124
2117 * a new active link (DUPLICATE_MSG). Failover packets are 2125/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
2118 * returned to the active link for delivery upwards. 2126 * Owner node is locked.
2127 */
2128static void tipc_link_dup_rcv(struct tipc_link *l_ptr,
2129 struct sk_buff *t_buf)
2130{
2131 struct sk_buff *buf;
2132
2133 if (!tipc_link_is_up(l_ptr))
2134 return;
2135
2136 buf = buf_extract(t_buf, INT_H_SIZE);
2137 if (buf == NULL) {
2138 pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
2139 return;
2140 }
2141
2142 /* Add buffer to deferred queue, if applicable: */
2143 link_handle_out_of_seq_msg(l_ptr, buf);
2144}
2145
2146/* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
2119 * Owner node is locked. 2147 * Owner node is locked.
2120 */ 2148 */
2121static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr, 2149static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
2122 struct sk_buff **buf) 2150 struct sk_buff *t_buf)
2123{ 2151{
2124 struct sk_buff *tunnel_buf = *buf; 2152 struct tipc_msg *t_msg = buf_msg(t_buf);
2125 struct tipc_link *dest_link; 2153 struct sk_buff *buf = NULL;
2126 struct tipc_msg *msg; 2154 struct tipc_msg *msg;
2127 struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2128 u32 msg_typ = msg_type(tunnel_msg);
2129 u32 msg_count = msg_msgcnt(tunnel_msg);
2130 u32 bearer_id = msg_bearer_id(tunnel_msg);
2131 2155
2132 if (bearer_id >= MAX_BEARERS) 2156 if (tipc_link_is_up(l_ptr))
2133 goto exit; 2157 tipc_link_reset(l_ptr);
2134 dest_link = (*l_ptr)->owner->links[bearer_id];
2135 if (!dest_link)
2136 goto exit;
2137 if (dest_link == *l_ptr) {
2138 pr_err("Unexpected changeover message on link <%s>\n",
2139 (*l_ptr)->name);
2140 goto exit;
2141 }
2142 *l_ptr = dest_link;
2143 msg = msg_get_wrapped(tunnel_msg);
2144 2158
2145 if (msg_typ == DUPLICATE_MSG) { 2159 /* First failover packet? */
2146 if (less(msg_seqno(msg), mod(dest_link->next_in_no))) 2160 if (l_ptr->exp_msg_count == START_CHANGEOVER)
2147 goto exit; 2161 l_ptr->exp_msg_count = msg_msgcnt(t_msg);
2148 *buf = buf_extract(tunnel_buf, INT_H_SIZE); 2162
2149 if (*buf == NULL) { 2163 /* Should there be an inner packet? */
2150 pr_warn("%sduplicate msg dropped\n", link_co_err); 2164 if (l_ptr->exp_msg_count) {
2165 l_ptr->exp_msg_count--;
2166 buf = buf_extract(t_buf, INT_H_SIZE);
2167 if (buf == NULL) {
2168 pr_warn("%sno inner failover pkt\n", link_co_err);
2151 goto exit; 2169 goto exit;
2152 } 2170 }
2153 kfree_skb(tunnel_buf); 2171 msg = buf_msg(buf);
2154 return 1;
2155 }
2156 2172
2157 /* First original message ?: */ 2173 if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) {
2158 if (tipc_link_is_up(dest_link)) { 2174 kfree_skb(buf);
2159 pr_info("%s<%s>, changeover initiated by peer\n", link_rst_msg, 2175 buf = NULL;
2160 dest_link->name);
2161 tipc_link_reset(dest_link);
2162 dest_link->exp_msg_count = msg_count;
2163 if (!msg_count)
2164 goto exit;
2165 } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2166 dest_link->exp_msg_count = msg_count;
2167 if (!msg_count)
2168 goto exit; 2176 goto exit;
2177 }
2178 if (msg_user(msg) == MSG_FRAGMENTER) {
2179 l_ptr->stats.recv_fragments++;
2180 tipc_link_frag_rcv(&l_ptr->reasm_head,
2181 &l_ptr->reasm_tail,
2182 &buf);
2183 }
2184 }
2185exit:
2186 if ((l_ptr->exp_msg_count == 0) && (l_ptr->flags & LINK_STOPPED)) {
2187 tipc_node_detach_link(l_ptr->owner, l_ptr);
2188 kfree(l_ptr);
2169 } 2189 }
2190 return buf;
2191}
2170 2192
2171 /* Receive original message */ 2193/* tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent
2172 if (dest_link->exp_msg_count == 0) { 2194 * via other link as result of a failover (ORIGINAL_MSG) or
2173 pr_warn("%sgot too many tunnelled messages\n", link_co_err); 2195 * a new active link (DUPLICATE_MSG). Failover packets are
2196 * returned to the active link for delivery upwards.
2197 * Owner node is locked.
2198 */
2199static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
2200 struct sk_buff **buf)
2201{
2202 struct sk_buff *t_buf = *buf;
2203 struct tipc_link *l_ptr;
2204 struct tipc_msg *t_msg = buf_msg(t_buf);
2205 u32 bearer_id = msg_bearer_id(t_msg);
2206
2207 *buf = NULL;
2208
2209 if (bearer_id >= MAX_BEARERS)
2174 goto exit; 2210 goto exit;
2175 } 2211
2176 dest_link->exp_msg_count--; 2212 l_ptr = n_ptr->links[bearer_id];
2177 if (less(msg_seqno(msg), dest_link->reset_checkpoint)) { 2213 if (!l_ptr)
2178 goto exit; 2214 goto exit;
2179 } else { 2215
2180 *buf = buf_extract(tunnel_buf, INT_H_SIZE); 2216 if (msg_type(t_msg) == DUPLICATE_MSG)
2181 if (*buf != NULL) { 2217 tipc_link_dup_rcv(l_ptr, t_buf);
2182 kfree_skb(tunnel_buf); 2218 else if (msg_type(t_msg) == ORIGINAL_MSG)
2183 return 1; 2219 *buf = tipc_link_failover_rcv(l_ptr, t_buf);
2184 } else { 2220 else
2185 pr_warn("%soriginal msg dropped\n", link_co_err); 2221 pr_warn("%sunknown tunnel pkt received\n", link_co_err);
2186 }
2187 }
2188exit: 2222exit:
2189 *buf = NULL; 2223 kfree_skb(t_buf);
2190 kfree_skb(tunnel_buf); 2224 return *buf != NULL;
2191 return 0;
2192} 2225}
2193 2226
2194/* 2227/*
2195 * Bundler functionality: 2228 * Bundler functionality:
2196 */ 2229 */
2197void tipc_link_recv_bundle(struct sk_buff *buf) 2230void tipc_link_bundle_rcv(struct sk_buff *buf)
2198{ 2231{
2199 u32 msgcount = msg_msgcnt(buf_msg(buf)); 2232 u32 msgcount = msg_msgcnt(buf_msg(buf));
2200 u32 pos = INT_H_SIZE; 2233 u32 pos = INT_H_SIZE;
@@ -2217,11 +2250,11 @@ void tipc_link_recv_bundle(struct sk_buff *buf)
2217 */ 2250 */
2218 2251
2219/* 2252/*
2220 * link_send_long_buf: Entry for buffers needing fragmentation. 2253 * tipc_link_frag_xmit: Entry for buffers needing fragmentation.
2221 * The buffer is complete, inclusive total message length. 2254 * The buffer is complete, inclusive total message length.
2222 * Returns user data length. 2255 * Returns user data length.
2223 */ 2256 */
2224static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf) 2257static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
2225{ 2258{
2226 struct sk_buff *buf_chain = NULL; 2259 struct sk_buff *buf_chain = NULL;
2227 struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain; 2260 struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
@@ -2284,12 +2317,11 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
2284 return dsz; 2317 return dsz;
2285} 2318}
2286 2319
2287/* 2320/* tipc_link_frag_rcv(): Called with node lock on. Returns
2288 * tipc_link_recv_fragment(): Called with node lock on. Returns
2289 * the reassembled buffer if message is complete. 2321 * the reassembled buffer if message is complete.
2290 */ 2322 */
2291int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail, 2323int tipc_link_frag_rcv(struct sk_buff **head, struct sk_buff **tail,
2292 struct sk_buff **fbuf) 2324 struct sk_buff **fbuf)
2293{ 2325{
2294 struct sk_buff *frag = *fbuf; 2326 struct sk_buff *frag = *fbuf;
2295 struct tipc_msg *msg = buf_msg(frag); 2327 struct tipc_msg *msg = buf_msg(frag);
@@ -2303,6 +2335,7 @@ int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,
2303 goto out_free; 2335 goto out_free;
2304 *head = frag; 2336 *head = frag;
2305 skb_frag_list_init(*head); 2337 skb_frag_list_init(*head);
2338 *fbuf = NULL;
2306 return 0; 2339 return 0;
2307 } else if (*head && 2340 } else if (*head &&
2308 skb_try_coalesce(*head, frag, &headstolen, &delta)) { 2341 skb_try_coalesce(*head, frag, &headstolen, &delta)) {
@@ -2322,10 +2355,12 @@ int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,
2322 *tail = *head = NULL; 2355 *tail = *head = NULL;
2323 return LINK_REASM_COMPLETE; 2356 return LINK_REASM_COMPLETE;
2324 } 2357 }
2358 *fbuf = NULL;
2325 return 0; 2359 return 0;
2326out_free: 2360out_free:
2327 pr_warn_ratelimited("Link unable to reassemble fragmented message\n"); 2361 pr_warn_ratelimited("Link unable to reassemble fragmented message\n");
2328 kfree_skb(*fbuf); 2362 kfree_skb(*fbuf);
2363 *fbuf = NULL;
2329 return LINK_REASM_ERROR; 2364 return LINK_REASM_ERROR;
2330} 2365}
2331 2366
@@ -2359,35 +2394,41 @@ void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
2359 l_ptr->queue_limit[MSG_FRAGMENTER] = 4000; 2394 l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2360} 2395}
2361 2396
2362/** 2397/* tipc_link_find_owner - locate owner node of link by link's name
2363 * link_find_link - locate link by name 2398 * @name: pointer to link name string
2364 * @name: ptr to link name string 2399 * @bearer_id: pointer to index in 'node->links' array where the link was found.
2365 * @node: ptr to area to be filled with ptr to associated node
2366 *
2367 * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted; 2400 * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2368 * this also prevents link deletion. 2401 * this also prevents link deletion.
2369 * 2402 *
2370 * Returns pointer to link (or 0 if invalid link name). 2403 * Returns pointer to node owning the link, or 0 if no matching link is found.
2371 */ 2404 */
2372static struct tipc_link *link_find_link(const char *name, 2405static struct tipc_node *tipc_link_find_owner(const char *link_name,
2373 struct tipc_node **node) 2406 unsigned int *bearer_id)
2374{ 2407{
2375 struct tipc_link *l_ptr; 2408 struct tipc_link *l_ptr;
2376 struct tipc_node *n_ptr; 2409 struct tipc_node *n_ptr;
2410 struct tipc_node *found_node = 0;
2377 int i; 2411 int i;
2378 2412
2379 list_for_each_entry(n_ptr, &tipc_node_list, list) { 2413 *bearer_id = 0;
2414 rcu_read_lock();
2415 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
2416 tipc_node_lock(n_ptr);
2380 for (i = 0; i < MAX_BEARERS; i++) { 2417 for (i = 0; i < MAX_BEARERS; i++) {
2381 l_ptr = n_ptr->links[i]; 2418 l_ptr = n_ptr->links[i];
2382 if (l_ptr && !strcmp(l_ptr->name, name)) 2419 if (l_ptr && !strcmp(l_ptr->name, link_name)) {
2383 goto found; 2420 *bearer_id = i;
2421 found_node = n_ptr;
2422 break;
2423 }
2384 } 2424 }
2425 tipc_node_unlock(n_ptr);
2426 if (found_node)
2427 break;
2385 } 2428 }
2386 l_ptr = NULL; 2429 rcu_read_unlock();
2387 n_ptr = NULL; 2430
2388found: 2431 return found_node;
2389 *node = n_ptr;
2390 return l_ptr;
2391} 2432}
2392 2433
2393/** 2434/**
@@ -2429,32 +2470,33 @@ static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd)
2429 struct tipc_link *l_ptr; 2470 struct tipc_link *l_ptr;
2430 struct tipc_bearer *b_ptr; 2471 struct tipc_bearer *b_ptr;
2431 struct tipc_media *m_ptr; 2472 struct tipc_media *m_ptr;
2473 int bearer_id;
2432 int res = 0; 2474 int res = 0;
2433 2475
2434 l_ptr = link_find_link(name, &node); 2476 node = tipc_link_find_owner(name, &bearer_id);
2435 if (l_ptr) { 2477 if (node) {
2436 /*
2437 * acquire node lock for tipc_link_send_proto_msg().
2438 * see "TIPC locking policy" in net.c.
2439 */
2440 tipc_node_lock(node); 2478 tipc_node_lock(node);
2441 switch (cmd) { 2479 l_ptr = node->links[bearer_id];
2442 case TIPC_CMD_SET_LINK_TOL: 2480
2443 link_set_supervision_props(l_ptr, new_value); 2481 if (l_ptr) {
2444 tipc_link_send_proto_msg(l_ptr, 2482 switch (cmd) {
2445 STATE_MSG, 0, 0, new_value, 0, 0); 2483 case TIPC_CMD_SET_LINK_TOL:
2446 break; 2484 link_set_supervision_props(l_ptr, new_value);
2447 case TIPC_CMD_SET_LINK_PRI: 2485 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0,
2448 l_ptr->priority = new_value; 2486 new_value, 0, 0);
2449 tipc_link_send_proto_msg(l_ptr, 2487 break;
2450 STATE_MSG, 0, 0, 0, new_value, 0); 2488 case TIPC_CMD_SET_LINK_PRI:
2451 break; 2489 l_ptr->priority = new_value;
2452 case TIPC_CMD_SET_LINK_WINDOW: 2490 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0,
2453 tipc_link_set_queue_limits(l_ptr, new_value); 2491 0, new_value, 0);
2454 break; 2492 break;
2455 default: 2493 case TIPC_CMD_SET_LINK_WINDOW:
2456 res = -EINVAL; 2494 tipc_link_set_queue_limits(l_ptr, new_value);
2457 break; 2495 break;
2496 default:
2497 res = -EINVAL;
2498 break;
2499 }
2458 } 2500 }
2459 tipc_node_unlock(node); 2501 tipc_node_unlock(node);
2460 return res; 2502 return res;
@@ -2549,6 +2591,7 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_
2549 char *link_name; 2591 char *link_name;
2550 struct tipc_link *l_ptr; 2592 struct tipc_link *l_ptr;
2551 struct tipc_node *node; 2593 struct tipc_node *node;
2594 unsigned int bearer_id;
2552 2595
2553 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) 2596 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2554 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 2597 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
@@ -2559,15 +2602,19 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_
2559 return tipc_cfg_reply_error_string("link not found"); 2602 return tipc_cfg_reply_error_string("link not found");
2560 return tipc_cfg_reply_none(); 2603 return tipc_cfg_reply_none();
2561 } 2604 }
2562
2563 read_lock_bh(&tipc_net_lock); 2605 read_lock_bh(&tipc_net_lock);
2564 l_ptr = link_find_link(link_name, &node); 2606 node = tipc_link_find_owner(link_name, &bearer_id);
2565 if (!l_ptr) { 2607 if (!node) {
2566 read_unlock_bh(&tipc_net_lock); 2608 read_unlock_bh(&tipc_net_lock);
2567 return tipc_cfg_reply_error_string("link not found"); 2609 return tipc_cfg_reply_error_string("link not found");
2568 } 2610 }
2569
2570 tipc_node_lock(node); 2611 tipc_node_lock(node);
2612 l_ptr = node->links[bearer_id];
2613 if (!l_ptr) {
2614 tipc_node_unlock(node);
2615 read_unlock_bh(&tipc_net_lock);
2616 return tipc_cfg_reply_error_string("link not found");
2617 }
2571 link_reset_statistics(l_ptr); 2618 link_reset_statistics(l_ptr);
2572 tipc_node_unlock(node); 2619 tipc_node_unlock(node);
2573 read_unlock_bh(&tipc_net_lock); 2620 read_unlock_bh(&tipc_net_lock);
@@ -2597,18 +2644,27 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2597 struct tipc_node *node; 2644 struct tipc_node *node;
2598 char *status; 2645 char *status;
2599 u32 profile_total = 0; 2646 u32 profile_total = 0;
2647 unsigned int bearer_id;
2600 int ret; 2648 int ret;
2601 2649
2602 if (!strcmp(name, tipc_bclink_name)) 2650 if (!strcmp(name, tipc_bclink_name))
2603 return tipc_bclink_stats(buf, buf_size); 2651 return tipc_bclink_stats(buf, buf_size);
2604 2652
2605 read_lock_bh(&tipc_net_lock); 2653 read_lock_bh(&tipc_net_lock);
2606 l = link_find_link(name, &node); 2654 node = tipc_link_find_owner(name, &bearer_id);
2607 if (!l) { 2655 if (!node) {
2608 read_unlock_bh(&tipc_net_lock); 2656 read_unlock_bh(&tipc_net_lock);
2609 return 0; 2657 return 0;
2610 } 2658 }
2611 tipc_node_lock(node); 2659 tipc_node_lock(node);
2660
2661 l = node->links[bearer_id];
2662 if (!l) {
2663 tipc_node_unlock(node);
2664 read_unlock_bh(&tipc_net_lock);
2665 return 0;
2666 }
2667
2612 s = &l->stats; 2668 s = &l->stats;
2613 2669
2614 if (tipc_link_is_active(l)) 2670 if (tipc_link_is_active(l))