aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c87
-rw-r--r--net/tipc/bcast.h5
-rw-r--r--net/tipc/link.c794
-rw-r--r--net/tipc/link.h7
-rw-r--r--net/tipc/msg.c381
-rw-r--r--net/tipc/msg.h35
-rw-r--r--net/tipc/name_distr.c76
-rw-r--r--net/tipc/name_distr.h2
-rw-r--r--net/tipc/net.c63
-rw-r--r--net/tipc/net.h2
-rw-r--r--net/tipc/node.c38
-rw-r--r--net/tipc/node.h17
-rw-r--r--net/tipc/node_subscr.c6
-rw-r--r--net/tipc/port.c440
-rw-r--r--net/tipc/port.h55
-rw-r--r--net/tipc/socket.c553
-rw-r--r--net/tipc/socket.h16
17 files changed, 1117 insertions, 1460 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 55c6c9d3e1ce..dd13bfa09333 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/bcast.c: TIPC broadcast code 2 * net/tipc/bcast.c: TIPC broadcast code
3 * 3 *
4 * Copyright (c) 2004-2006, Ericsson AB 4 * Copyright (c) 2004-2006, 2014, Ericsson AB
5 * Copyright (c) 2004, Intel Corporation. 5 * Copyright (c) 2004, Intel Corporation.
6 * Copyright (c) 2005, 2010-2011, Wind River Systems 6 * Copyright (c) 2005, 2010-2011, Wind River Systems
7 * All rights reserved. 7 * All rights reserved.
@@ -38,6 +38,8 @@
38#include "core.h" 38#include "core.h"
39#include "link.h" 39#include "link.h"
40#include "port.h" 40#include "port.h"
41#include "socket.h"
42#include "msg.h"
41#include "bcast.h" 43#include "bcast.h"
42#include "name_distr.h" 44#include "name_distr.h"
43 45
@@ -138,6 +140,11 @@ static void tipc_bclink_unlock(void)
138 tipc_link_reset_all(node); 140 tipc_link_reset_all(node);
139} 141}
140 142
143uint tipc_bclink_get_mtu(void)
144{
145 return MAX_PKT_DEFAULT_MCAST;
146}
147
141void tipc_bclink_set_flags(unsigned int flags) 148void tipc_bclink_set_flags(unsigned int flags)
142{ 149{
143 bclink->flags |= flags; 150 bclink->flags |= flags;
@@ -382,30 +389,50 @@ static void bclink_peek_nack(struct tipc_msg *msg)
382 tipc_node_unlock(n_ptr); 389 tipc_node_unlock(n_ptr);
383} 390}
384 391
385/* 392/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
386 * tipc_bclink_xmit - broadcast a packet to all nodes in cluster 393 * and to identified node local sockets
394 * @buf: chain of buffers containing message
395 * Consumes the buffer chain, except when returning -ELINKCONG
396 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
387 */ 397 */
388int tipc_bclink_xmit(struct sk_buff *buf) 398int tipc_bclink_xmit(struct sk_buff *buf)
389{ 399{
390 int res; 400 int rc = 0;
401 int bc = 0;
402 struct sk_buff *clbuf;
391 403
392 tipc_bclink_lock(); 404 /* Prepare clone of message for local node */
393 405 clbuf = tipc_msg_reassemble(buf);
394 if (!bclink->bcast_nodes.count) { 406 if (unlikely(!clbuf)) {
395 res = msg_data_sz(buf_msg(buf)); 407 kfree_skb_list(buf);
396 kfree_skb(buf); 408 return -EHOSTUNREACH;
397 goto exit;
398 } 409 }
399 410
400 res = __tipc_link_xmit(bcl, buf); 411 /* Broadcast to all other nodes */
401 if (likely(res >= 0)) { 412 if (likely(bclink)) {
402 bclink_set_last_sent(); 413 tipc_bclink_lock();
403 bcl->stats.queue_sz_counts++; 414 if (likely(bclink->bcast_nodes.count)) {
404 bcl->stats.accu_queue_sz += bcl->out_queue_size; 415 rc = __tipc_link_xmit(bcl, buf);
416 if (likely(!rc)) {
417 bclink_set_last_sent();
418 bcl->stats.queue_sz_counts++;
419 bcl->stats.accu_queue_sz += bcl->out_queue_size;
420 }
421 bc = 1;
422 }
423 tipc_bclink_unlock();
405 } 424 }
406exit: 425
407 tipc_bclink_unlock(); 426 if (unlikely(!bc))
408 return res; 427 kfree_skb_list(buf);
428
429 /* Deliver message clone */
430 if (likely(!rc))
431 tipc_sk_mcast_rcv(clbuf);
432 else
433 kfree_skb(clbuf);
434
435 return rc;
409} 436}
410 437
411/** 438/**
@@ -443,7 +470,7 @@ void tipc_bclink_rcv(struct sk_buff *buf)
443 struct tipc_node *node; 470 struct tipc_node *node;
444 u32 next_in; 471 u32 next_in;
445 u32 seqno; 472 u32 seqno;
446 int deferred; 473 int deferred = 0;
447 474
448 /* Screen out unwanted broadcast messages */ 475 /* Screen out unwanted broadcast messages */
449 476
@@ -494,7 +521,7 @@ receive:
494 tipc_bclink_unlock(); 521 tipc_bclink_unlock();
495 tipc_node_unlock(node); 522 tipc_node_unlock(node);
496 if (likely(msg_mcast(msg))) 523 if (likely(msg_mcast(msg)))
497 tipc_port_mcast_rcv(buf, NULL); 524 tipc_sk_mcast_rcv(buf);
498 else 525 else
499 kfree_skb(buf); 526 kfree_skb(buf);
500 } else if (msg_user(msg) == MSG_BUNDLER) { 527 } else if (msg_user(msg) == MSG_BUNDLER) {
@@ -573,8 +600,7 @@ receive:
573 node->bclink.deferred_size += deferred; 600 node->bclink.deferred_size += deferred;
574 bclink_update_last_sent(node, seqno); 601 bclink_update_last_sent(node, seqno);
575 buf = NULL; 602 buf = NULL;
576 } else 603 }
577 deferred = 0;
578 604
579 tipc_bclink_lock(); 605 tipc_bclink_lock();
580 606
@@ -611,6 +637,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
611 struct tipc_media_addr *unused2) 637 struct tipc_media_addr *unused2)
612{ 638{
613 int bp_index; 639 int bp_index;
640 struct tipc_msg *msg = buf_msg(buf);
614 641
615 /* Prepare broadcast link message for reliable transmission, 642 /* Prepare broadcast link message for reliable transmission,
616 * if first time trying to send it; 643 * if first time trying to send it;
@@ -618,10 +645,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
618 * since they are sent in an unreliable manner and don't need it 645 * since they are sent in an unreliable manner and don't need it
619 */ 646 */
620 if (likely(!msg_non_seq(buf_msg(buf)))) { 647 if (likely(!msg_non_seq(buf_msg(buf)))) {
621 struct tipc_msg *msg;
622
623 bcbuf_set_acks(buf, bclink->bcast_nodes.count); 648 bcbuf_set_acks(buf, bclink->bcast_nodes.count);
624 msg = buf_msg(buf);
625 msg_set_non_seq(msg, 1); 649 msg_set_non_seq(msg, 1);
626 msg_set_mc_netid(msg, tipc_net_id); 650 msg_set_mc_netid(msg, tipc_net_id);
627 bcl->stats.sent_info++; 651 bcl->stats.sent_info++;
@@ -638,12 +662,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
638 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { 662 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
639 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; 663 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
640 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; 664 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
641 struct tipc_bearer *b = p; 665 struct tipc_bearer *bp[2] = {p, s};
666 struct tipc_bearer *b = bp[msg_link_selector(msg)];
642 struct sk_buff *tbuf; 667 struct sk_buff *tbuf;
643 668
644 if (!p) 669 if (!p)
645 break; /* No more bearers to try */ 670 break; /* No more bearers to try */
646 671 if (!b)
672 b = p;
647 tipc_nmap_diff(&bcbearer->remains, &b->nodes, 673 tipc_nmap_diff(&bcbearer->remains, &b->nodes,
648 &bcbearer->remains_new); 674 &bcbearer->remains_new);
649 if (bcbearer->remains_new.count == bcbearer->remains.count) 675 if (bcbearer->remains_new.count == bcbearer->remains.count)
@@ -660,13 +686,6 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
660 tipc_bearer_send(b->identity, tbuf, &b->bcast_addr); 686 tipc_bearer_send(b->identity, tbuf, &b->bcast_addr);
661 kfree_skb(tbuf); /* Bearer keeps a clone */ 687 kfree_skb(tbuf); /* Bearer keeps a clone */
662 } 688 }
663
664 /* Swap bearers for next packet */
665 if (s) {
666 bcbearer->bpairs[bp_index].primary = s;
667 bcbearer->bpairs[bp_index].secondary = p;
668 }
669
670 if (bcbearer->remains_new.count == 0) 689 if (bcbearer->remains_new.count == 0)
671 break; /* All targets reached */ 690 break; /* All targets reached */
672 691
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 00330c45df3e..4875d9536aee 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/bcast.h: Include file for TIPC broadcast code 2 * net/tipc/bcast.h: Include file for TIPC broadcast code
3 * 3 *
4 * Copyright (c) 2003-2006, Ericsson AB 4 * Copyright (c) 2003-2006, 2014, Ericsson AB
5 * Copyright (c) 2005, 2010-2011, Wind River Systems 5 * Copyright (c) 2005, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -89,7 +89,6 @@ void tipc_bclink_add_node(u32 addr);
89void tipc_bclink_remove_node(u32 addr); 89void tipc_bclink_remove_node(u32 addr);
90struct tipc_node *tipc_bclink_retransmit_to(void); 90struct tipc_node *tipc_bclink_retransmit_to(void);
91void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); 91void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
92int tipc_bclink_xmit(struct sk_buff *buf);
93void tipc_bclink_rcv(struct sk_buff *buf); 92void tipc_bclink_rcv(struct sk_buff *buf);
94u32 tipc_bclink_get_last_sent(void); 93u32 tipc_bclink_get_last_sent(void);
95u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr); 94u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
@@ -98,5 +97,7 @@ int tipc_bclink_stats(char *stats_buf, const u32 buf_size);
98int tipc_bclink_reset_stats(void); 97int tipc_bclink_reset_stats(void);
99int tipc_bclink_set_queue_limits(u32 limit); 98int tipc_bclink_set_queue_limits(u32 limit);
100void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action); 99void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
100uint tipc_bclink_get_mtu(void);
101int tipc_bclink_xmit(struct sk_buff *buf);
101 102
102#endif 103#endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ad2c57f5868d..fb1485dc6736 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -82,15 +82,13 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf);
82static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, 82static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
83 struct sk_buff **buf); 83 struct sk_buff **buf);
84static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); 84static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
85static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
86 struct iovec const *msg_sect,
87 unsigned int len, u32 destnode);
88static void link_state_event(struct tipc_link *l_ptr, u32 event); 85static void link_state_event(struct tipc_link *l_ptr, u32 event);
89static void link_reset_statistics(struct tipc_link *l_ptr); 86static void link_reset_statistics(struct tipc_link *l_ptr);
90static void link_print(struct tipc_link *l_ptr, const char *str); 87static void link_print(struct tipc_link *l_ptr, const char *str);
91static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
92static void tipc_link_sync_xmit(struct tipc_link *l); 88static void tipc_link_sync_xmit(struct tipc_link *l);
93static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 89static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
90static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf);
91static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf);
94 92
95/* 93/*
96 * Simple link routines 94 * Simple link routines
@@ -335,13 +333,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
335static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) 333static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
336{ 334{
337 struct tipc_port *p_ptr; 335 struct tipc_port *p_ptr;
336 struct tipc_sock *tsk;
338 337
339 spin_lock_bh(&tipc_port_list_lock); 338 spin_lock_bh(&tipc_port_list_lock);
340 p_ptr = tipc_port_lock(origport); 339 p_ptr = tipc_port_lock(origport);
341 if (p_ptr) { 340 if (p_ptr) {
342 if (!list_empty(&p_ptr->wait_list)) 341 if (!list_empty(&p_ptr->wait_list))
343 goto exit; 342 goto exit;
344 p_ptr->congested = 1; 343 tsk = tipc_port_to_sock(p_ptr);
344 tsk->link_cong = 1;
345 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); 345 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
346 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 346 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
347 l_ptr->stats.link_congs++; 347 l_ptr->stats.link_congs++;
@@ -355,6 +355,7 @@ exit:
355void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) 355void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
356{ 356{
357 struct tipc_port *p_ptr; 357 struct tipc_port *p_ptr;
358 struct tipc_sock *tsk;
358 struct tipc_port *temp_p_ptr; 359 struct tipc_port *temp_p_ptr;
359 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 360 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
360 361
@@ -370,10 +371,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
370 wait_list) { 371 wait_list) {
371 if (win <= 0) 372 if (win <= 0)
372 break; 373 break;
374 tsk = tipc_port_to_sock(p_ptr);
373 list_del_init(&p_ptr->wait_list); 375 list_del_init(&p_ptr->wait_list);
374 spin_lock_bh(p_ptr->lock); 376 spin_lock_bh(p_ptr->lock);
375 p_ptr->congested = 0; 377 tsk->link_cong = 0;
376 tipc_port_wakeup(p_ptr); 378 tipc_sock_wakeup(tsk);
377 win -= p_ptr->waiting_pkts; 379 win -= p_ptr->waiting_pkts;
378 spin_unlock_bh(p_ptr->lock); 380 spin_unlock_bh(p_ptr->lock);
379 } 381 }
@@ -676,178 +678,142 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
676 } 678 }
677} 679}
678 680
679/* 681/* tipc_link_cong: determine return value and how to treat the
680 * link_bundle_buf(): Append contents of a buffer to 682 * sent buffer during link congestion.
681 * the tail of an existing one. 683 * - For plain, errorless user data messages we keep the buffer and
684 * return -ELINKONG.
685 * - For all other messages we discard the buffer and return -EHOSTUNREACH
686 * - For TIPC internal messages we also reset the link
682 */ 687 */
683static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler, 688static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
684 struct sk_buff *buf)
685{ 689{
686 struct tipc_msg *bundler_msg = buf_msg(bundler);
687 struct tipc_msg *msg = buf_msg(buf); 690 struct tipc_msg *msg = buf_msg(buf);
688 u32 size = msg_size(msg); 691 uint psz = msg_size(msg);
689 u32 bundle_size = msg_size(bundler_msg); 692 uint imp = tipc_msg_tot_importance(msg);
690 u32 to_pos = align(bundle_size); 693 u32 oport = msg_tot_origport(msg);
691 u32 pad = to_pos - bundle_size;
692
693 if (msg_user(bundler_msg) != MSG_BUNDLER)
694 return 0;
695 if (msg_type(bundler_msg) != OPEN_MSG)
696 return 0;
697 if (skb_tailroom(bundler) < (pad + size))
698 return 0;
699 if (l_ptr->max_pkt < (to_pos + size))
700 return 0;
701
702 skb_put(bundler, pad + size);
703 skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
704 msg_set_size(bundler_msg, to_pos + size);
705 msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
706 kfree_skb(buf);
707 l_ptr->stats.sent_bundled++;
708 return 1;
709}
710
711static void link_add_to_outqueue(struct tipc_link *l_ptr,
712 struct sk_buff *buf,
713 struct tipc_msg *msg)
714{
715 u32 ack = mod(l_ptr->next_in_no - 1);
716 u32 seqno = mod(l_ptr->next_out_no++);
717 694
718 msg_set_word(msg, 2, ((ack << 16) | seqno)); 695 if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
719 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 696 if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
720 buf->next = NULL; 697 link_schedule_port(link, oport, psz);
721 if (l_ptr->first_out) { 698 return -ELINKCONG;
722 l_ptr->last_out->next = buf; 699 }
723 l_ptr->last_out = buf; 700 } else {
724 } else 701 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
725 l_ptr->first_out = l_ptr->last_out = buf; 702 tipc_link_reset(link);
726
727 l_ptr->out_queue_size++;
728 if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
729 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
730}
731
732static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
733 struct sk_buff *buf_chain,
734 u32 long_msgno)
735{
736 struct sk_buff *buf;
737 struct tipc_msg *msg;
738
739 if (!l_ptr->next_out)
740 l_ptr->next_out = buf_chain;
741 while (buf_chain) {
742 buf = buf_chain;
743 buf_chain = buf_chain->next;
744
745 msg = buf_msg(buf);
746 msg_set_long_msgno(msg, long_msgno);
747 link_add_to_outqueue(l_ptr, buf, msg);
748 } 703 }
704 kfree_skb_list(buf);
705 return -EHOSTUNREACH;
749} 706}
750 707
751/* 708/**
752 * tipc_link_xmit() is the 'full path' for messages, called from 709 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
753 * inside TIPC when the 'fast path' in tipc_send_xmit 710 * @link: link to use
754 * has failed, and from link_send() 711 * @buf: chain of buffers containing message
712 * Consumes the buffer chain, except when returning -ELINKCONG
713 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
714 * user data messages) or -EHOSTUNREACH (all other messages/senders)
715 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
716 * to act on the return value, since they may need to do more send attempts.
755 */ 717 */
756int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) 718int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
757{ 719{
758 struct tipc_msg *msg = buf_msg(buf); 720 struct tipc_msg *msg = buf_msg(buf);
759 u32 size = msg_size(msg); 721 uint psz = msg_size(msg);
760 u32 dsz = msg_data_sz(msg); 722 uint qsz = link->out_queue_size;
761 u32 queue_size = l_ptr->out_queue_size; 723 uint sndlim = link->queue_limit[0];
762 u32 imp = tipc_msg_tot_importance(msg); 724 uint imp = tipc_msg_tot_importance(msg);
763 u32 queue_limit = l_ptr->queue_limit[imp]; 725 uint mtu = link->max_pkt;
764 u32 max_packet = l_ptr->max_pkt; 726 uint ack = mod(link->next_in_no - 1);
765 727 uint seqno = link->next_out_no;
766 /* Match msg importance against queue limits: */ 728 uint bc_last_in = link->owner->bclink.last_in;
767 if (unlikely(queue_size >= queue_limit)) { 729 struct tipc_media_addr *addr = &link->media_addr;
768 if (imp <= TIPC_CRITICAL_IMPORTANCE) { 730 struct sk_buff *next = buf->next;
769 link_schedule_port(l_ptr, msg_origport(msg), size); 731
770 kfree_skb(buf); 732 /* Match queue limits against msg importance: */
771 return -ELINKCONG; 733 if (unlikely(qsz >= link->queue_limit[imp]))
772 } 734 return tipc_link_cong(link, buf);
773 kfree_skb(buf); 735
774 if (imp > CONN_MANAGER) { 736 /* Has valid packet limit been used ? */
775 pr_warn("%s<%s>, send queue full", link_rst_msg, 737 if (unlikely(psz > mtu)) {
776 l_ptr->name); 738 kfree_skb_list(buf);
777 tipc_link_reset(l_ptr); 739 return -EMSGSIZE;
778 }
779 return dsz;
780 } 740 }
781 741
782 /* Fragmentation needed ? */ 742 /* Prepare each packet for sending, and add to outqueue: */
783 if (size > max_packet) 743 while (buf) {
784 return tipc_link_frag_xmit(l_ptr, buf); 744 next = buf->next;
785 745 msg = buf_msg(buf);
786 /* Packet can be queued or sent. */ 746 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
787 if (likely(!link_congested(l_ptr))) { 747 msg_set_bcast_ack(msg, bc_last_in);
788 link_add_to_outqueue(l_ptr, buf, msg); 748
749 if (!link->first_out) {
750 link->first_out = buf;
751 } else if (qsz < sndlim) {
752 link->last_out->next = buf;
753 } else if (tipc_msg_bundle(link->last_out, buf, mtu)) {
754 link->stats.sent_bundled++;
755 buf = next;
756 next = buf->next;
757 continue;
758 } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) {
759 link->stats.sent_bundled++;
760 link->stats.sent_bundles++;
761 link->last_out->next = buf;
762 if (!link->next_out)
763 link->next_out = buf;
764 } else {
765 link->last_out->next = buf;
766 if (!link->next_out)
767 link->next_out = buf;
768 }
789 769
790 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); 770 /* Send packet if possible: */
791 l_ptr->unacked_window = 0; 771 if (likely(++qsz <= sndlim)) {
792 return dsz; 772 tipc_bearer_send(link->bearer_id, buf, addr);
793 } 773 link->next_out = next;
794 /* Congestion: can message be bundled ? */ 774 link->unacked_window = 0;
795 if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
796 (msg_user(msg) != MSG_FRAGMENTER)) {
797
798 /* Try adding message to an existing bundle */
799 if (l_ptr->next_out &&
800 link_bundle_buf(l_ptr, l_ptr->last_out, buf))
801 return dsz;
802
803 /* Try creating a new bundle */
804 if (size <= max_packet * 2 / 3) {
805 struct sk_buff *bundler = tipc_buf_acquire(max_packet);
806 struct tipc_msg bundler_hdr;
807
808 if (bundler) {
809 tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
810 INT_H_SIZE, l_ptr->addr);
811 skb_copy_to_linear_data(bundler, &bundler_hdr,
812 INT_H_SIZE);
813 skb_trim(bundler, INT_H_SIZE);
814 link_bundle_buf(l_ptr, bundler, buf);
815 buf = bundler;
816 msg = buf_msg(buf);
817 l_ptr->stats.sent_bundles++;
818 }
819 } 775 }
776 seqno++;
777 link->last_out = buf;
778 buf = next;
820 } 779 }
821 if (!l_ptr->next_out) 780 link->next_out_no = seqno;
822 l_ptr->next_out = buf; 781 link->out_queue_size = qsz;
823 link_add_to_outqueue(l_ptr, buf, msg); 782 return 0;
824 return dsz;
825} 783}
826 784
827/* 785/**
828 * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use 786 * tipc_link_xmit() is the general link level function for message sending
829 * has not been selected yet, and the the owner node is not locked 787 * @buf: chain of buffers containing message
830 * Called by TIPC internal users, e.g. the name distributor 788 * @dsz: amount of user data to be sent
789 * @dnode: address of destination node
790 * @selector: a number used for deterministic link selection
791 * Consumes the buffer chain, except when returning -ELINKCONG
792 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
831 */ 793 */
832int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) 794int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
833{ 795{
834 struct tipc_link *l_ptr; 796 struct tipc_link *link = NULL;
835 struct tipc_node *n_ptr; 797 struct tipc_node *node;
836 int res = -ELINKCONG; 798 int rc = -EHOSTUNREACH;
837 799
838 n_ptr = tipc_node_find(dest); 800 node = tipc_node_find(dnode);
839 if (n_ptr) { 801 if (node) {
840 tipc_node_lock(n_ptr); 802 tipc_node_lock(node);
841 l_ptr = n_ptr->active_links[selector & 1]; 803 link = node->active_links[selector & 1];
842 if (l_ptr) 804 if (link)
843 res = __tipc_link_xmit(l_ptr, buf); 805 rc = __tipc_link_xmit(link, buf);
844 else 806 tipc_node_unlock(node);
845 kfree_skb(buf);
846 tipc_node_unlock(n_ptr);
847 } else {
848 kfree_skb(buf);
849 } 807 }
850 return res; 808
809 if (link)
810 return rc;
811
812 if (likely(in_own_node(dnode)))
813 return tipc_sk_rcv(buf);
814
815 kfree_skb_list(buf);
816 return rc;
851} 817}
852 818
853/* 819/*
@@ -858,7 +824,7 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
858 * 824 *
859 * Called with node locked 825 * Called with node locked
860 */ 826 */
861static void tipc_link_sync_xmit(struct tipc_link *l) 827static void tipc_link_sync_xmit(struct tipc_link *link)
862{ 828{
863 struct sk_buff *buf; 829 struct sk_buff *buf;
864 struct tipc_msg *msg; 830 struct tipc_msg *msg;
@@ -868,10 +834,9 @@ static void tipc_link_sync_xmit(struct tipc_link *l)
868 return; 834 return;
869 835
870 msg = buf_msg(buf); 836 msg = buf_msg(buf);
871 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr); 837 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
872 msg_set_last_bcast(msg, l->owner->bclink.acked); 838 msg_set_last_bcast(msg, link->owner->bclink.acked);
873 link_add_chain_to_outqueue(l, buf, 0); 839 __tipc_link_xmit(link, buf);
874 tipc_link_push_queue(l);
875} 840}
876 841
877/* 842/*
@@ -892,293 +857,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
892} 857}
893 858
894/* 859/*
895 * tipc_link_names_xmit - send name table entries to new neighbor
896 *
897 * Send routine for bulk delivery of name table messages when contact
898 * with a new neighbor occurs. No link congestion checking is performed
899 * because name table messages *must* be delivered. The messages must be
900 * small enough not to require fragmentation.
901 * Called without any locks held.
902 */
903void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
904{
905 struct tipc_node *n_ptr;
906 struct tipc_link *l_ptr;
907 struct sk_buff *buf;
908 struct sk_buff *temp_buf;
909
910 if (list_empty(message_list))
911 return;
912
913 n_ptr = tipc_node_find(dest);
914 if (n_ptr) {
915 tipc_node_lock(n_ptr);
916 l_ptr = n_ptr->active_links[0];
917 if (l_ptr) {
918 /* convert circular list to linear list */
919 ((struct sk_buff *)message_list->prev)->next = NULL;
920 link_add_chain_to_outqueue(l_ptr,
921 (struct sk_buff *)message_list->next, 0);
922 tipc_link_push_queue(l_ptr);
923 INIT_LIST_HEAD(message_list);
924 }
925 tipc_node_unlock(n_ptr);
926 }
927
928 /* discard the messages if they couldn't be sent */
929 list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
930 list_del((struct list_head *)buf);
931 kfree_skb(buf);
932 }
933}
934
935/*
936 * tipc_link_xmit_fast: Entry for data messages where the
937 * destination link is known and the header is complete,
938 * inclusive total message length. Very time critical.
939 * Link is locked. Returns user data length.
940 */
941static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
942 u32 *used_max_pkt)
943{
944 struct tipc_msg *msg = buf_msg(buf);
945 int res = msg_data_sz(msg);
946
947 if (likely(!link_congested(l_ptr))) {
948 if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
949 link_add_to_outqueue(l_ptr, buf, msg);
950 tipc_bearer_send(l_ptr->bearer_id, buf,
951 &l_ptr->media_addr);
952 l_ptr->unacked_window = 0;
953 return res;
954 }
955 else
956 *used_max_pkt = l_ptr->max_pkt;
957 }
958 return __tipc_link_xmit(l_ptr, buf); /* All other cases */
959}
960
961/*
962 * tipc_link_iovec_xmit_fast: Entry for messages where the
963 * destination processor is known and the header is complete,
964 * except for total message length.
965 * Returns user data length or errno.
966 */
967int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
968 struct iovec const *msg_sect,
969 unsigned int len, u32 destaddr)
970{
971 struct tipc_msg *hdr = &sender->phdr;
972 struct tipc_link *l_ptr;
973 struct sk_buff *buf;
974 struct tipc_node *node;
975 int res;
976 u32 selector = msg_origport(hdr) & 1;
977
978again:
979 /*
980 * Try building message using port's max_pkt hint.
981 * (Must not hold any locks while building message.)
982 */
983 res = tipc_msg_build(hdr, msg_sect, len, sender->max_pkt, &buf);
984 /* Exit if build request was invalid */
985 if (unlikely(res < 0))
986 return res;
987
988 node = tipc_node_find(destaddr);
989 if (likely(node)) {
990 tipc_node_lock(node);
991 l_ptr = node->active_links[selector];
992 if (likely(l_ptr)) {
993 if (likely(buf)) {
994 res = tipc_link_xmit_fast(l_ptr, buf,
995 &sender->max_pkt);
996exit:
997 tipc_node_unlock(node);
998 return res;
999 }
1000
1001 /* Exit if link (or bearer) is congested */
1002 if (link_congested(l_ptr)) {
1003 res = link_schedule_port(l_ptr,
1004 sender->ref, res);
1005 goto exit;
1006 }
1007
1008 /*
1009 * Message size exceeds max_pkt hint; update hint,
1010 * then re-try fast path or fragment the message
1011 */
1012 sender->max_pkt = l_ptr->max_pkt;
1013 tipc_node_unlock(node);
1014
1015
1016 if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
1017 goto again;
1018
1019 return tipc_link_iovec_long_xmit(sender, msg_sect,
1020 len, destaddr);
1021 }
1022 tipc_node_unlock(node);
1023 }
1024
1025 /* Couldn't find a link to the destination node */
1026 kfree_skb(buf);
1027 tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE);
1028 return -ENETUNREACH;
1029}
1030
1031/*
1032 * tipc_link_iovec_long_xmit(): Entry for long messages where the
1033 * destination node is known and the header is complete,
1034 * inclusive total message length.
1035 * Link and bearer congestion status have been checked to be ok,
1036 * and are ignored if they change.
1037 *
1038 * Note that fragments do not use the full link MTU so that they won't have
1039 * to undergo refragmentation if link changeover causes them to be sent
1040 * over another link with an additional tunnel header added as prefix.
1041 * (Refragmentation will still occur if the other link has a smaller MTU.)
1042 *
1043 * Returns user data length or errno.
1044 */
1045static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
1046 struct iovec const *msg_sect,
1047 unsigned int len, u32 destaddr)
1048{
1049 struct tipc_link *l_ptr;
1050 struct tipc_node *node;
1051 struct tipc_msg *hdr = &sender->phdr;
1052 u32 dsz = len;
1053 u32 max_pkt, fragm_sz, rest;
1054 struct tipc_msg fragm_hdr;
1055 struct sk_buff *buf, *buf_chain, *prev;
1056 u32 fragm_crs, fragm_rest, hsz, sect_rest;
1057 const unchar __user *sect_crs;
1058 int curr_sect;
1059 u32 fragm_no;
1060 int res = 0;
1061
1062again:
1063 fragm_no = 1;
1064 max_pkt = sender->max_pkt - INT_H_SIZE;
1065 /* leave room for tunnel header in case of link changeover */
1066 fragm_sz = max_pkt - INT_H_SIZE;
1067 /* leave room for fragmentation header in each fragment */
1068 rest = dsz;
1069 fragm_crs = 0;
1070 fragm_rest = 0;
1071 sect_rest = 0;
1072 sect_crs = NULL;
1073 curr_sect = -1;
1074
1075 /* Prepare reusable fragment header */
1076 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1077 INT_H_SIZE, msg_destnode(hdr));
1078 msg_set_size(&fragm_hdr, max_pkt);
1079 msg_set_fragm_no(&fragm_hdr, 1);
1080
1081 /* Prepare header of first fragment */
1082 buf_chain = buf = tipc_buf_acquire(max_pkt);
1083 if (!buf)
1084 return -ENOMEM;
1085 buf->next = NULL;
1086 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1087 hsz = msg_hdr_sz(hdr);
1088 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1089
1090 /* Chop up message */
1091 fragm_crs = INT_H_SIZE + hsz;
1092 fragm_rest = fragm_sz - hsz;
1093
1094 do { /* For all sections */
1095 u32 sz;
1096
1097 if (!sect_rest) {
1098 sect_rest = msg_sect[++curr_sect].iov_len;
1099 sect_crs = msg_sect[curr_sect].iov_base;
1100 }
1101
1102 if (sect_rest < fragm_rest)
1103 sz = sect_rest;
1104 else
1105 sz = fragm_rest;
1106
1107 if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1108 res = -EFAULT;
1109error:
1110 kfree_skb_list(buf_chain);
1111 return res;
1112 }
1113 sect_crs += sz;
1114 sect_rest -= sz;
1115 fragm_crs += sz;
1116 fragm_rest -= sz;
1117 rest -= sz;
1118
1119 if (!fragm_rest && rest) {
1120
1121 /* Initiate new fragment: */
1122 if (rest <= fragm_sz) {
1123 fragm_sz = rest;
1124 msg_set_type(&fragm_hdr, LAST_FRAGMENT);
1125 } else {
1126 msg_set_type(&fragm_hdr, FRAGMENT);
1127 }
1128 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1129 msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1130 prev = buf;
1131 buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
1132 if (!buf) {
1133 res = -ENOMEM;
1134 goto error;
1135 }
1136
1137 buf->next = NULL;
1138 prev->next = buf;
1139 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1140 fragm_crs = INT_H_SIZE;
1141 fragm_rest = fragm_sz;
1142 }
1143 } while (rest > 0);
1144
1145 /*
1146 * Now we have a buffer chain. Select a link and check
1147 * that packet size is still OK
1148 */
1149 node = tipc_node_find(destaddr);
1150 if (likely(node)) {
1151 tipc_node_lock(node);
1152 l_ptr = node->active_links[sender->ref & 1];
1153 if (!l_ptr) {
1154 tipc_node_unlock(node);
1155 goto reject;
1156 }
1157 if (l_ptr->max_pkt < max_pkt) {
1158 sender->max_pkt = l_ptr->max_pkt;
1159 tipc_node_unlock(node);
1160 kfree_skb_list(buf_chain);
1161 goto again;
1162 }
1163 } else {
1164reject:
1165 kfree_skb_list(buf_chain);
1166 tipc_port_iovec_reject(sender, hdr, msg_sect, len,
1167 TIPC_ERR_NO_NODE);
1168 return -ENETUNREACH;
1169 }
1170
1171 /* Append chain of fragments to send queue & send them */
1172 l_ptr->long_msg_seq_no++;
1173 link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
1174 l_ptr->stats.sent_fragments += fragm_no;
1175 l_ptr->stats.sent_fragmented++;
1176 tipc_link_push_queue(l_ptr);
1177 tipc_node_unlock(node);
1178 return dsz;
1179}
1180
1181/*
1182 * tipc_link_push_packet: Push one unsent packet to the media 860 * tipc_link_push_packet: Push one unsent packet to the media
1183 */ 861 */
1184static u32 tipc_link_push_packet(struct tipc_link *l_ptr) 862static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
@@ -1238,7 +916,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1238 tipc_bearer_send(l_ptr->bearer_id, buf, 916 tipc_bearer_send(l_ptr->bearer_id, buf,
1239 &l_ptr->media_addr); 917 &l_ptr->media_addr);
1240 if (msg_user(msg) == MSG_BUNDLER) 918 if (msg_user(msg) == MSG_BUNDLER)
1241 msg_set_type(msg, CLOSED_MSG); 919 msg_set_type(msg, BUNDLE_CLOSED);
1242 l_ptr->next_out = buf->next; 920 l_ptr->next_out = buf->next;
1243 return 0; 921 return 0;
1244 } 922 }
@@ -1527,11 +1205,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1527 if (unlikely(!list_empty(&l_ptr->waiting_ports))) 1205 if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1528 tipc_link_wakeup_ports(l_ptr, 0); 1206 tipc_link_wakeup_ports(l_ptr, 0);
1529 1207
1530 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1531 l_ptr->stats.sent_acks++;
1532 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1533 }
1534
1535 /* Process the incoming packet */ 1208 /* Process the incoming packet */
1536 if (unlikely(!link_working_working(l_ptr))) { 1209 if (unlikely(!link_working_working(l_ptr))) {
1537 if (msg_user(msg) == LINK_PROTOCOL) { 1210 if (msg_user(msg) == LINK_PROTOCOL) {
@@ -1565,57 +1238,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1565 if (unlikely(l_ptr->oldest_deferred_in)) 1238 if (unlikely(l_ptr->oldest_deferred_in))
1566 head = link_insert_deferred_queue(l_ptr, head); 1239 head = link_insert_deferred_queue(l_ptr, head);
1567 1240
1568 /* Deliver packet/message to correct user: */ 1241 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1569 if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) { 1242 l_ptr->stats.sent_acks++;
1570 if (!tipc_link_tunnel_rcv(n_ptr, &buf)) { 1243 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1571 tipc_node_unlock(n_ptr);
1572 continue;
1573 }
1574 msg = buf_msg(buf);
1575 } else if (msg_user(msg) == MSG_FRAGMENTER) {
1576 l_ptr->stats.recv_fragments++;
1577 if (tipc_buf_append(&l_ptr->reasm_buf, &buf)) {
1578 l_ptr->stats.recv_fragmented++;
1579 msg = buf_msg(buf);
1580 } else {
1581 if (!l_ptr->reasm_buf)
1582 tipc_link_reset(l_ptr);
1583 tipc_node_unlock(n_ptr);
1584 continue;
1585 }
1586 } 1244 }
1587 1245
1588 switch (msg_user(msg)) { 1246 if (tipc_link_prepare_input(l_ptr, &buf)) {
1589 case TIPC_LOW_IMPORTANCE:
1590 case TIPC_MEDIUM_IMPORTANCE:
1591 case TIPC_HIGH_IMPORTANCE:
1592 case TIPC_CRITICAL_IMPORTANCE:
1593 tipc_node_unlock(n_ptr); 1247 tipc_node_unlock(n_ptr);
1594 tipc_sk_rcv(buf);
1595 continue; 1248 continue;
1596 case MSG_BUNDLER:
1597 l_ptr->stats.recv_bundles++;
1598 l_ptr->stats.recv_bundled += msg_msgcnt(msg);
1599 tipc_node_unlock(n_ptr);
1600 tipc_link_bundle_rcv(buf);
1601 continue;
1602 case NAME_DISTRIBUTOR:
1603 n_ptr->bclink.recv_permitted = true;
1604 tipc_node_unlock(n_ptr);
1605 tipc_named_rcv(buf);
1606 continue;
1607 case CONN_MANAGER:
1608 tipc_node_unlock(n_ptr);
1609 tipc_port_proto_rcv(buf);
1610 continue;
1611 case BCAST_PROTOCOL:
1612 tipc_link_sync_rcv(n_ptr, buf);
1613 break;
1614 default:
1615 kfree_skb(buf);
1616 break;
1617 } 1249 }
1618 tipc_node_unlock(n_ptr); 1250 tipc_node_unlock(n_ptr);
1251 msg = buf_msg(buf);
1252 if (tipc_link_input(l_ptr, buf) != 0)
1253 goto discard;
1619 continue; 1254 continue;
1620unlock_discard: 1255unlock_discard:
1621 tipc_node_unlock(n_ptr); 1256 tipc_node_unlock(n_ptr);
@@ -1625,6 +1260,80 @@ discard:
1625} 1260}
1626 1261
1627/** 1262/**
1263 * tipc_link_prepare_input - process TIPC link messages
1264 *
1265 * returns nonzero if the message was consumed
1266 *
1267 * Node lock must be held
1268 */
1269static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf)
1270{
1271 struct tipc_node *n;
1272 struct tipc_msg *msg;
1273 int res = -EINVAL;
1274
1275 n = l->owner;
1276 msg = buf_msg(*buf);
1277 switch (msg_user(msg)) {
1278 case CHANGEOVER_PROTOCOL:
1279 if (tipc_link_tunnel_rcv(n, buf))
1280 res = 0;
1281 break;
1282 case MSG_FRAGMENTER:
1283 l->stats.recv_fragments++;
1284 if (tipc_buf_append(&l->reasm_buf, buf)) {
1285 l->stats.recv_fragmented++;
1286 res = 0;
1287 } else if (!l->reasm_buf) {
1288 tipc_link_reset(l);
1289 }
1290 break;
1291 case MSG_BUNDLER:
1292 l->stats.recv_bundles++;
1293 l->stats.recv_bundled += msg_msgcnt(msg);
1294 res = 0;
1295 break;
1296 case NAME_DISTRIBUTOR:
1297 n->bclink.recv_permitted = true;
1298 res = 0;
1299 break;
1300 case BCAST_PROTOCOL:
1301 tipc_link_sync_rcv(n, *buf);
1302 break;
1303 default:
1304 res = 0;
1305 }
1306 return res;
1307}
1308/**
1309 * tipc_link_input - Deliver message too higher layers
1310 */
1311static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
1312{
1313 struct tipc_msg *msg = buf_msg(buf);
1314 int res = 0;
1315
1316 switch (msg_user(msg)) {
1317 case TIPC_LOW_IMPORTANCE:
1318 case TIPC_MEDIUM_IMPORTANCE:
1319 case TIPC_HIGH_IMPORTANCE:
1320 case TIPC_CRITICAL_IMPORTANCE:
1321 case CONN_MANAGER:
1322 tipc_sk_rcv(buf);
1323 break;
1324 case NAME_DISTRIBUTOR:
1325 tipc_named_rcv(buf);
1326 break;
1327 case MSG_BUNDLER:
1328 tipc_link_bundle_rcv(buf);
1329 break;
1330 default:
1331 res = -EINVAL;
1332 }
1333 return res;
1334}
1335
1336/**
1628 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue 1337 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1629 * 1338 *
1630 * Returns increase in queue length (i.e. 0 or 1) 1339 * Returns increase in queue length (i.e. 0 or 1)
@@ -2217,6 +1926,7 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
2217 u32 msgcount = msg_msgcnt(buf_msg(buf)); 1926 u32 msgcount = msg_msgcnt(buf_msg(buf));
2218 u32 pos = INT_H_SIZE; 1927 u32 pos = INT_H_SIZE;
2219 struct sk_buff *obuf; 1928 struct sk_buff *obuf;
1929 struct tipc_msg *omsg;
2220 1930
2221 while (msgcount--) { 1931 while (msgcount--) {
2222 obuf = buf_extract(buf, pos); 1932 obuf = buf_extract(buf, pos);
@@ -2224,82 +1934,18 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
2224 pr_warn("Link unable to unbundle message(s)\n"); 1934 pr_warn("Link unable to unbundle message(s)\n");
2225 break; 1935 break;
2226 } 1936 }
2227 pos += align(msg_size(buf_msg(obuf))); 1937 omsg = buf_msg(obuf);
2228 tipc_net_route_msg(obuf); 1938 pos += align(msg_size(omsg));
2229 } 1939 if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) {
2230 kfree_skb(buf); 1940 tipc_sk_rcv(obuf);
2231} 1941 } else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
2232 1942 tipc_named_rcv(obuf);
2233/* 1943 } else {
2234 * Fragmentation/defragmentation: 1944 pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
2235 */ 1945 kfree_skb(obuf);
2236
2237/*
2238 * tipc_link_frag_xmit: Entry for buffers needing fragmentation.
2239 * The buffer is complete, inclusive total message length.
2240 * Returns user data length.
2241 */
2242static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
2243{
2244 struct sk_buff *buf_chain = NULL;
2245 struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
2246 struct tipc_msg *inmsg = buf_msg(buf);
2247 struct tipc_msg fragm_hdr;
2248 u32 insize = msg_size(inmsg);
2249 u32 dsz = msg_data_sz(inmsg);
2250 unchar *crs = buf->data;
2251 u32 rest = insize;
2252 u32 pack_sz = l_ptr->max_pkt;
2253 u32 fragm_sz = pack_sz - INT_H_SIZE;
2254 u32 fragm_no = 0;
2255 u32 destaddr;
2256
2257 if (msg_short(inmsg))
2258 destaddr = l_ptr->addr;
2259 else
2260 destaddr = msg_destnode(inmsg);
2261
2262 /* Prepare reusable fragment header: */
2263 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2264 INT_H_SIZE, destaddr);
2265
2266 /* Chop up message: */
2267 while (rest > 0) {
2268 struct sk_buff *fragm;
2269
2270 if (rest <= fragm_sz) {
2271 fragm_sz = rest;
2272 msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2273 }
2274 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2275 if (fragm == NULL) {
2276 kfree_skb(buf);
2277 kfree_skb_list(buf_chain);
2278 return -ENOMEM;
2279 } 1946 }
2280 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2281 fragm_no++;
2282 msg_set_fragm_no(&fragm_hdr, fragm_no);
2283 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2284 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2285 fragm_sz);
2286 buf_chain_tail->next = fragm;
2287 buf_chain_tail = fragm;
2288
2289 rest -= fragm_sz;
2290 crs += fragm_sz;
2291 msg_set_type(&fragm_hdr, FRAGMENT);
2292 } 1947 }
2293 kfree_skb(buf); 1948 kfree_skb(buf);
2294
2295 /* Append chain of fragments to send queue & send them */
2296 l_ptr->long_msg_seq_no++;
2297 link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
2298 l_ptr->stats.sent_fragments += fragm_no;
2299 l_ptr->stats.sent_fragmented++;
2300 tipc_link_push_queue(l_ptr);
2301
2302 return dsz;
2303} 1949}
2304 1950
2305static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) 1951static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 200d518b218e..782983ccd323 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -227,13 +227,8 @@ void tipc_link_reset_all(struct tipc_node *node);
227void tipc_link_reset(struct tipc_link *l_ptr); 227void tipc_link_reset(struct tipc_link *l_ptr);
228void tipc_link_reset_list(unsigned int bearer_id); 228void tipc_link_reset_list(unsigned int bearer_id);
229int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); 229int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
230void tipc_link_names_xmit(struct list_head *message_list, u32 dest); 230int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf);
231int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
232int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
233u32 tipc_link_get_max_pkt(u32 dest, u32 selector); 231u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
234int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
235 struct iovec const *msg_sect,
236 unsigned int len, u32 destnode);
237void tipc_link_bundle_rcv(struct sk_buff *buf); 232void tipc_link_bundle_rcv(struct sk_buff *buf);
238void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, 233void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
239 u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); 234 u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 0a37a472c29f..9680be6d388a 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -36,21 +36,16 @@
36 36
37#include "core.h" 37#include "core.h"
38#include "msg.h" 38#include "msg.h"
39#include "addr.h"
40#include "name_table.h"
39 41
40u32 tipc_msg_tot_importance(struct tipc_msg *m) 42#define MAX_FORWARD_SIZE 1024
43
44static unsigned int align(unsigned int i)
41{ 45{
42 if (likely(msg_isdata(m))) { 46 return (i + 3) & ~3u;
43 if (likely(msg_orignode(m) == tipc_own_addr))
44 return msg_importance(m);
45 return msg_importance(m) + 4;
46 }
47 if ((msg_user(m) == MSG_FRAGMENTER) &&
48 (msg_type(m) == FIRST_FRAGMENT))
49 return msg_importance(msg_get_wrapped(m));
50 return msg_importance(m);
51} 47}
52 48
53
54void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, 49void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
55 u32 destnode) 50 u32 destnode)
56{ 51{
@@ -65,41 +60,6 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
65 msg_set_destnode(m, destnode); 60 msg_set_destnode(m, destnode);
66} 61}
67 62
68/**
69 * tipc_msg_build - create message using specified header and data
70 *
71 * Note: Caller must not hold any locks in case copy_from_user() is interrupted!
72 *
73 * Returns message data size or errno
74 */
75int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
76 unsigned int len, int max_size, struct sk_buff **buf)
77{
78 int dsz, sz, hsz;
79 unsigned char *to;
80
81 dsz = len;
82 hsz = msg_hdr_sz(hdr);
83 sz = hsz + dsz;
84 msg_set_size(hdr, sz);
85 if (unlikely(sz > max_size)) {
86 *buf = NULL;
87 return dsz;
88 }
89
90 *buf = tipc_buf_acquire(sz);
91 if (!(*buf))
92 return -ENOMEM;
93 skb_copy_to_linear_data(*buf, hdr, hsz);
94 to = (*buf)->data + hsz;
95 if (len && memcpy_fromiovecend(to, msg_sect, 0, dsz)) {
96 kfree_skb(*buf);
97 *buf = NULL;
98 return -EFAULT;
99 }
100 return dsz;
101}
102
103/* tipc_buf_append(): Append a buffer to the fragment list of another buffer 63/* tipc_buf_append(): Append a buffer to the fragment list of another buffer
104 * @*headbuf: in: NULL for first frag, otherwise value returned from prev call 64 * @*headbuf: in: NULL for first frag, otherwise value returned from prev call
105 * out: set when successful non-complete reassembly, otherwise NULL 65 * out: set when successful non-complete reassembly, otherwise NULL
@@ -112,27 +72,38 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
112 struct sk_buff *head = *headbuf; 72 struct sk_buff *head = *headbuf;
113 struct sk_buff *frag = *buf; 73 struct sk_buff *frag = *buf;
114 struct sk_buff *tail; 74 struct sk_buff *tail;
115 struct tipc_msg *msg = buf_msg(frag); 75 struct tipc_msg *msg;
116 u32 fragid = msg_type(msg); 76 u32 fragid;
117 bool headstolen;
118 int delta; 77 int delta;
78 bool headstolen;
119 79
80 if (!frag)
81 goto err;
82
83 msg = buf_msg(frag);
84 fragid = msg_type(msg);
85 frag->next = NULL;
120 skb_pull(frag, msg_hdr_sz(msg)); 86 skb_pull(frag, msg_hdr_sz(msg));
121 87
122 if (fragid == FIRST_FRAGMENT) { 88 if (fragid == FIRST_FRAGMENT) {
123 if (head || skb_unclone(frag, GFP_ATOMIC)) 89 if (unlikely(head))
124 goto out_free; 90 goto err;
91 if (unlikely(skb_unclone(frag, GFP_ATOMIC)))
92 goto err;
125 head = *headbuf = frag; 93 head = *headbuf = frag;
126 skb_frag_list_init(head); 94 skb_frag_list_init(head);
95 TIPC_SKB_CB(head)->tail = NULL;
127 *buf = NULL; 96 *buf = NULL;
128 return 0; 97 return 0;
129 } 98 }
99
130 if (!head) 100 if (!head)
131 goto out_free; 101 goto err;
132 tail = TIPC_SKB_CB(head)->tail; 102
133 if (skb_try_coalesce(head, frag, &headstolen, &delta)) { 103 if (skb_try_coalesce(head, frag, &headstolen, &delta)) {
134 kfree_skb_partial(frag, headstolen); 104 kfree_skb_partial(frag, headstolen);
135 } else { 105 } else {
106 tail = TIPC_SKB_CB(head)->tail;
136 if (!skb_has_frag_list(head)) 107 if (!skb_has_frag_list(head))
137 skb_shinfo(head)->frag_list = frag; 108 skb_shinfo(head)->frag_list = frag;
138 else 109 else
@@ -142,6 +113,7 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
142 head->len += frag->len; 113 head->len += frag->len;
143 TIPC_SKB_CB(head)->tail = frag; 114 TIPC_SKB_CB(head)->tail = frag;
144 } 115 }
116
145 if (fragid == LAST_FRAGMENT) { 117 if (fragid == LAST_FRAGMENT) {
146 *buf = head; 118 *buf = head;
147 TIPC_SKB_CB(head)->tail = NULL; 119 TIPC_SKB_CB(head)->tail = NULL;
@@ -150,10 +122,311 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
150 } 122 }
151 *buf = NULL; 123 *buf = NULL;
152 return 0; 124 return 0;
153out_free: 125
126err:
154 pr_warn_ratelimited("Unable to build fragment list\n"); 127 pr_warn_ratelimited("Unable to build fragment list\n");
155 kfree_skb(*buf); 128 kfree_skb(*buf);
156 kfree_skb(*headbuf); 129 kfree_skb(*headbuf);
157 *buf = *headbuf = NULL; 130 *buf = *headbuf = NULL;
158 return 0; 131 return 0;
159} 132}
133
134
135/**
136 * tipc_msg_build - create buffer chain containing specified header and data
137 * @mhdr: Message header, to be prepended to data
138 * @iov: User data
139 * @offset: Posision in iov to start copying from
140 * @dsz: Total length of user data
141 * @pktmax: Max packet size that can be used
142 * @chain: Buffer or chain of buffers to be returned to caller
143 * Returns message data size or errno: -ENOMEM, -EFAULT
144 */
145int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
146 int offset, int dsz, int pktmax , struct sk_buff **chain)
147{
148 int mhsz = msg_hdr_sz(mhdr);
149 int msz = mhsz + dsz;
150 int pktno = 1;
151 int pktsz;
152 int pktrem = pktmax;
153 int drem = dsz;
154 struct tipc_msg pkthdr;
155 struct sk_buff *buf, *prev;
156 char *pktpos;
157 int rc;
158
159 msg_set_size(mhdr, msz);
160
161 /* No fragmentation needed? */
162 if (likely(msz <= pktmax)) {
163 buf = tipc_buf_acquire(msz);
164 *chain = buf;
165 if (unlikely(!buf))
166 return -ENOMEM;
167 skb_copy_to_linear_data(buf, mhdr, mhsz);
168 pktpos = buf->data + mhsz;
169 if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz))
170 return dsz;
171 rc = -EFAULT;
172 goto error;
173 }
174
175 /* Prepare reusable fragment header */
176 tipc_msg_init(&pkthdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
177 INT_H_SIZE, msg_destnode(mhdr));
178 msg_set_size(&pkthdr, pktmax);
179 msg_set_fragm_no(&pkthdr, pktno);
180
181 /* Prepare first fragment */
182 *chain = buf = tipc_buf_acquire(pktmax);
183 if (!buf)
184 return -ENOMEM;
185 pktpos = buf->data;
186 skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
187 pktpos += INT_H_SIZE;
188 pktrem -= INT_H_SIZE;
189 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, mhdr, mhsz);
190 pktpos += mhsz;
191 pktrem -= mhsz;
192
193 do {
194 if (drem < pktrem)
195 pktrem = drem;
196
197 if (memcpy_fromiovecend(pktpos, iov, offset, pktrem)) {
198 rc = -EFAULT;
199 goto error;
200 }
201 drem -= pktrem;
202 offset += pktrem;
203
204 if (!drem)
205 break;
206
207 /* Prepare new fragment: */
208 if (drem < (pktmax - INT_H_SIZE))
209 pktsz = drem + INT_H_SIZE;
210 else
211 pktsz = pktmax;
212 prev = buf;
213 buf = tipc_buf_acquire(pktsz);
214 if (!buf) {
215 rc = -ENOMEM;
216 goto error;
217 }
218 prev->next = buf;
219 msg_set_type(&pkthdr, FRAGMENT);
220 msg_set_size(&pkthdr, pktsz);
221 msg_set_fragm_no(&pkthdr, ++pktno);
222 skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
223 pktpos = buf->data + INT_H_SIZE;
224 pktrem = pktsz - INT_H_SIZE;
225
226 } while (1);
227
228 msg_set_type(buf_msg(buf), LAST_FRAGMENT);
229 return dsz;
230error:
231 kfree_skb_list(*chain);
232 *chain = NULL;
233 return rc;
234}
235
236/**
237 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
238 * @bbuf: the existing buffer ("bundle")
239 * @buf: buffer to be appended
240 * @mtu: max allowable size for the bundle buffer
241 * Consumes buffer if successful
242 * Returns true if bundling could be performed, otherwise false
243 */
244bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu)
245{
246 struct tipc_msg *bmsg = buf_msg(bbuf);
247 struct tipc_msg *msg = buf_msg(buf);
248 unsigned int bsz = msg_size(bmsg);
249 unsigned int msz = msg_size(msg);
250 u32 start = align(bsz);
251 u32 max = mtu - INT_H_SIZE;
252 u32 pad = start - bsz;
253
254 if (likely(msg_user(msg) == MSG_FRAGMENTER))
255 return false;
256 if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL))
257 return false;
258 if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
259 return false;
260 if (likely(msg_user(bmsg) != MSG_BUNDLER))
261 return false;
262 if (likely(msg_type(bmsg) != BUNDLE_OPEN))
263 return false;
264 if (unlikely(skb_tailroom(bbuf) < (pad + msz)))
265 return false;
266 if (unlikely(max < (start + msz)))
267 return false;
268
269 skb_put(bbuf, pad + msz);
270 skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz);
271 msg_set_size(bmsg, start + msz);
272 msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
273 bbuf->next = buf->next;
274 kfree_skb(buf);
275 return true;
276}
277
278/**
279 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
280 * @buf: buffer to be appended and replaced
281 * @mtu: max allowable size for the bundle buffer, inclusive header
282 * @dnode: destination node for message. (Not always present in header)
283 * Replaces buffer if successful
284 * Returns true if sucess, otherwise false
285 */
286bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
287{
288 struct sk_buff *bbuf;
289 struct tipc_msg *bmsg;
290 struct tipc_msg *msg = buf_msg(*buf);
291 u32 msz = msg_size(msg);
292 u32 max = mtu - INT_H_SIZE;
293
294 if (msg_user(msg) == MSG_FRAGMENTER)
295 return false;
296 if (msg_user(msg) == CHANGEOVER_PROTOCOL)
297 return false;
298 if (msg_user(msg) == BCAST_PROTOCOL)
299 return false;
300 if (msz > (max / 2))
301 return false;
302
303 bbuf = tipc_buf_acquire(max);
304 if (!bbuf)
305 return false;
306
307 skb_trim(bbuf, INT_H_SIZE);
308 bmsg = buf_msg(bbuf);
309 tipc_msg_init(bmsg, MSG_BUNDLER, BUNDLE_OPEN, INT_H_SIZE, dnode);
310 msg_set_seqno(bmsg, msg_seqno(msg));
311 msg_set_ack(bmsg, msg_ack(msg));
312 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
313 bbuf->next = (*buf)->next;
314 tipc_msg_bundle(bbuf, *buf, mtu);
315 *buf = bbuf;
316 return true;
317}
318
319/**
320 * tipc_msg_reverse(): swap source and destination addresses and add error code
321 * @buf: buffer containing message to be reversed
322 * @dnode: return value: node where to send message after reversal
323 * @err: error code to be set in message
324 * Consumes buffer if failure
325 * Returns true if success, otherwise false
326 */
327bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err)
328{
329 struct tipc_msg *msg = buf_msg(buf);
330 uint imp = msg_importance(msg);
331 struct tipc_msg ohdr;
332 uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);
333
334 if (skb_linearize(buf))
335 goto exit;
336 if (msg_dest_droppable(msg))
337 goto exit;
338 if (msg_errcode(msg))
339 goto exit;
340
341 memcpy(&ohdr, msg, msg_hdr_sz(msg));
342 imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE);
343 if (msg_isdata(msg))
344 msg_set_importance(msg, imp);
345 msg_set_errcode(msg, err);
346 msg_set_origport(msg, msg_destport(&ohdr));
347 msg_set_destport(msg, msg_origport(&ohdr));
348 msg_set_prevnode(msg, tipc_own_addr);
349 if (!msg_short(msg)) {
350 msg_set_orignode(msg, msg_destnode(&ohdr));
351 msg_set_destnode(msg, msg_orignode(&ohdr));
352 }
353 msg_set_size(msg, msg_hdr_sz(msg) + rdsz);
354 skb_trim(buf, msg_size(msg));
355 skb_orphan(buf);
356 *dnode = msg_orignode(&ohdr);
357 return true;
358exit:
359 kfree_skb(buf);
360 return false;
361}
362
363/**
364 * tipc_msg_eval: determine fate of message that found no destination
365 * @buf: the buffer containing the message.
366 * @dnode: return value: next-hop node, if message to be forwarded
367 * @err: error code to use, if message to be rejected
368 *
369 * Does not consume buffer
370 * Returns 0 (TIPC_OK) if message ok and we can try again, -TIPC error
371 * code if message to be rejected
372 */
373int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
374{
375 struct tipc_msg *msg = buf_msg(buf);
376 u32 dport;
377
378 if (msg_type(msg) != TIPC_NAMED_MSG)
379 return -TIPC_ERR_NO_PORT;
380 if (skb_linearize(buf))
381 return -TIPC_ERR_NO_NAME;
382 if (msg_data_sz(msg) > MAX_FORWARD_SIZE)
383 return -TIPC_ERR_NO_NAME;
384 if (msg_reroute_cnt(msg) > 0)
385 return -TIPC_ERR_NO_NAME;
386
387 *dnode = addr_domain(msg_lookup_scope(msg));
388 dport = tipc_nametbl_translate(msg_nametype(msg),
389 msg_nameinst(msg),
390 dnode);
391 if (!dport)
392 return -TIPC_ERR_NO_NAME;
393 msg_incr_reroute_cnt(msg);
394 msg_set_destnode(msg, *dnode);
395 msg_set_destport(msg, dport);
396 return TIPC_OK;
397}
398
399/* tipc_msg_reassemble() - clone a buffer chain of fragments and
400 * reassemble the clones into one message
401 */
402struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
403{
404 struct sk_buff *buf = chain;
405 struct sk_buff *frag = buf;
406 struct sk_buff *head = NULL;
407 int hdr_sz;
408
409 /* Copy header if single buffer */
410 if (!buf->next) {
411 hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf));
412 return __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
413 }
414
415 /* Clone all fragments and reassemble */
416 while (buf) {
417 frag = skb_clone(buf, GFP_ATOMIC);
418 if (!frag)
419 goto error;
420 frag->next = NULL;
421 if (tipc_buf_append(&head, &frag))
422 break;
423 if (!head)
424 goto error;
425 buf = buf->next;
426 }
427 return frag;
428error:
429 pr_warn("Failed do clone local mcast rcv buffer\n");
430 kfree_skb(head);
431 return NULL;
432}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 503511903d1d..462fa194a6af 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -463,6 +463,11 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
463#define FRAGMENT 1 463#define FRAGMENT 1
464#define LAST_FRAGMENT 2 464#define LAST_FRAGMENT 2
465 465
466/* Bundling protocol message types
467 */
468#define BUNDLE_OPEN 0
469#define BUNDLE_CLOSED 1
470
466/* 471/*
467 * Link management protocol message types 472 * Link management protocol message types
468 */ 473 */
@@ -706,12 +711,36 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
706 msg_set_bits(m, 9, 0, 0xffff, n); 711 msg_set_bits(m, 9, 0, 0xffff, n);
707} 712}
708 713
709u32 tipc_msg_tot_importance(struct tipc_msg *m); 714static inline u32 tipc_msg_tot_importance(struct tipc_msg *m)
715{
716 if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
717 return msg_importance(msg_get_wrapped(m));
718 return msg_importance(m);
719}
720
721static inline u32 msg_tot_origport(struct tipc_msg *m)
722{
723 if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
724 return msg_origport(msg_get_wrapped(m));
725 return msg_origport(m);
726}
727
728bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err);
729
730int tipc_msg_eval(struct sk_buff *buf, u32 *dnode);
731
710void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, 732void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
711 u32 destnode); 733 u32 destnode);
712int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
713 unsigned int len, int max_size, struct sk_buff **buf);
714 734
715int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); 735int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
716 736
737bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
738
739bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
740
741int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
742 int offset, int dsz, int mtu , struct sk_buff **chain);
743
744struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain);
745
717#endif 746#endif
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 8ce730984aa1..dcc15bcd5692 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -101,24 +101,22 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
101 101
102void named_cluster_distribute(struct sk_buff *buf) 102void named_cluster_distribute(struct sk_buff *buf)
103{ 103{
104 struct sk_buff *buf_copy; 104 struct sk_buff *obuf;
105 struct tipc_node *n_ptr; 105 struct tipc_node *node;
106 struct tipc_link *l_ptr; 106 u32 dnode;
107 107
108 rcu_read_lock(); 108 rcu_read_lock();
109 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { 109 list_for_each_entry_rcu(node, &tipc_node_list, list) {
110 tipc_node_lock(n_ptr); 110 dnode = node->addr;
111 l_ptr = n_ptr->active_links[n_ptr->addr & 1]; 111 if (in_own_node(dnode))
112 if (l_ptr) { 112 continue;
113 buf_copy = skb_copy(buf, GFP_ATOMIC); 113 if (!tipc_node_active_links(node))
114 if (!buf_copy) { 114 continue;
115 tipc_node_unlock(n_ptr); 115 obuf = skb_copy(buf, GFP_ATOMIC);
116 break; 116 if (!obuf)
117 } 117 break;
118 msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); 118 msg_set_destnode(buf_msg(obuf), dnode);
119 __tipc_link_xmit(l_ptr, buf_copy); 119 tipc_link_xmit(obuf, dnode, dnode);
120 }
121 tipc_node_unlock(n_ptr);
122 } 120 }
123 rcu_read_unlock(); 121 rcu_read_unlock();
124 122
@@ -175,34 +173,44 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
175 return buf; 173 return buf;
176} 174}
177 175
178/* 176/**
179 * named_distribute - prepare name info for bulk distribution to another node 177 * named_distribute - prepare name info for bulk distribution to another node
178 * @msg_list: list of messages (buffers) to be returned from this function
179 * @dnode: node to be updated
180 * @pls: linked list of publication items to be packed into buffer chain
180 */ 181 */
181static void named_distribute(struct list_head *message_list, u32 node, 182static void named_distribute(struct list_head *msg_list, u32 dnode,
182 struct publ_list *pls, u32 max_item_buf) 183 struct publ_list *pls)
183{ 184{
184 struct publication *publ; 185 struct publication *publ;
185 struct sk_buff *buf = NULL; 186 struct sk_buff *buf = NULL;
186 struct distr_item *item = NULL; 187 struct distr_item *item = NULL;
187 u32 left = 0; 188 uint dsz = pls->size * ITEM_SIZE;
188 u32 rest = pls->size * ITEM_SIZE; 189 uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
190 uint rem = dsz;
191 uint msg_rem = 0;
189 192
190 list_for_each_entry(publ, &pls->list, local_list) { 193 list_for_each_entry(publ, &pls->list, local_list) {
194 /* Prepare next buffer: */
191 if (!buf) { 195 if (!buf) {
192 left = (rest <= max_item_buf) ? rest : max_item_buf; 196 msg_rem = min_t(uint, rem, msg_dsz);
193 rest -= left; 197 rem -= msg_rem;
194 buf = named_prepare_buf(PUBLICATION, left, node); 198 buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
195 if (!buf) { 199 if (!buf) {
196 pr_warn("Bulk publication failure\n"); 200 pr_warn("Bulk publication failure\n");
197 return; 201 return;
198 } 202 }
199 item = (struct distr_item *)msg_data(buf_msg(buf)); 203 item = (struct distr_item *)msg_data(buf_msg(buf));
200 } 204 }
205
206 /* Pack publication into message: */
201 publ_to_item(item, publ); 207 publ_to_item(item, publ);
202 item++; 208 item++;
203 left -= ITEM_SIZE; 209 msg_rem -= ITEM_SIZE;
204 if (!left) { 210
205 list_add_tail((struct list_head *)buf, message_list); 211 /* Append full buffer to list: */
212 if (!msg_rem) {
213 list_add_tail((struct list_head *)buf, msg_list);
206 buf = NULL; 214 buf = NULL;
207 } 215 }
208 } 216 }
@@ -211,16 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node,
211/** 219/**
212 * tipc_named_node_up - tell specified node about all publications by this node 220 * tipc_named_node_up - tell specified node about all publications by this node
213 */ 221 */
214void tipc_named_node_up(u32 max_item_buf, u32 node) 222void tipc_named_node_up(u32 dnode)
215{ 223{
216 LIST_HEAD(message_list); 224 LIST_HEAD(msg_list);
225 struct sk_buff *buf_chain;
217 226
218 read_lock_bh(&tipc_nametbl_lock); 227 read_lock_bh(&tipc_nametbl_lock);
219 named_distribute(&message_list, node, &publ_cluster, max_item_buf); 228 named_distribute(&msg_list, dnode, &publ_cluster);
220 named_distribute(&message_list, node, &publ_zone, max_item_buf); 229 named_distribute(&msg_list, dnode, &publ_zone);
221 read_unlock_bh(&tipc_nametbl_lock); 230 read_unlock_bh(&tipc_nametbl_lock);
222 231
223 tipc_link_names_xmit(&message_list, node); 232 /* Convert circular list to linear list and send: */
233 buf_chain = (struct sk_buff *)msg_list.next;
234 ((struct sk_buff *)msg_list.prev)->next = NULL;
235 tipc_link_xmit(buf_chain, dnode, dnode);
224} 236}
225 237
226/** 238/**
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index b2eed4ec1526..8afe32b7fc9a 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -70,7 +70,7 @@ struct distr_item {
70struct sk_buff *tipc_named_publish(struct publication *publ); 70struct sk_buff *tipc_named_publish(struct publication *publ);
71struct sk_buff *tipc_named_withdraw(struct publication *publ); 71struct sk_buff *tipc_named_withdraw(struct publication *publ);
72void named_cluster_distribute(struct sk_buff *buf); 72void named_cluster_distribute(struct sk_buff *buf);
73void tipc_named_node_up(u32 max_item_buf, u32 node); 73void tipc_named_node_up(u32 dnode);
74void tipc_named_rcv(struct sk_buff *buf); 74void tipc_named_rcv(struct sk_buff *buf);
75void tipc_named_reinit(void); 75void tipc_named_reinit(void);
76 76
diff --git a/net/tipc/net.c b/net/tipc/net.c
index f64375e7f99f..7fcc94998fea 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/net.c: TIPC network routing code 2 * net/tipc/net.c: TIPC network routing code
3 * 3 *
4 * Copyright (c) 1995-2006, Ericsson AB 4 * Copyright (c) 1995-2006, 2014, Ericsson AB
5 * Copyright (c) 2005, 2010-2011, Wind River Systems 5 * Copyright (c) 2005, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -104,67 +104,6 @@
104 * - A local spin_lock protecting the queue of subscriber events. 104 * - A local spin_lock protecting the queue of subscriber events.
105*/ 105*/
106 106
107static void net_route_named_msg(struct sk_buff *buf)
108{
109 struct tipc_msg *msg = buf_msg(buf);
110 u32 dnode;
111 u32 dport;
112
113 if (!msg_named(msg)) {
114 kfree_skb(buf);
115 return;
116 }
117
118 dnode = addr_domain(msg_lookup_scope(msg));
119 dport = tipc_nametbl_translate(msg_nametype(msg), msg_nameinst(msg), &dnode);
120 if (dport) {
121 msg_set_destnode(msg, dnode);
122 msg_set_destport(msg, dport);
123 tipc_net_route_msg(buf);
124 return;
125 }
126 tipc_reject_msg(buf, TIPC_ERR_NO_NAME);
127}
128
129void tipc_net_route_msg(struct sk_buff *buf)
130{
131 struct tipc_msg *msg;
132 u32 dnode;
133
134 if (!buf)
135 return;
136 msg = buf_msg(buf);
137
138 /* Handle message for this node */
139 dnode = msg_short(msg) ? tipc_own_addr : msg_destnode(msg);
140 if (tipc_in_scope(dnode, tipc_own_addr)) {
141 if (msg_isdata(msg)) {
142 if (msg_mcast(msg))
143 tipc_port_mcast_rcv(buf, NULL);
144 else if (msg_destport(msg))
145 tipc_sk_rcv(buf);
146 else
147 net_route_named_msg(buf);
148 return;
149 }
150 switch (msg_user(msg)) {
151 case NAME_DISTRIBUTOR:
152 tipc_named_rcv(buf);
153 break;
154 case CONN_MANAGER:
155 tipc_port_proto_rcv(buf);
156 break;
157 default:
158 kfree_skb(buf);
159 }
160 return;
161 }
162
163 /* Handle message for another node */
164 skb_trim(buf, msg_size(msg));
165 tipc_link_xmit(buf, dnode, msg_link_selector(msg));
166}
167
168int tipc_net_start(u32 addr) 107int tipc_net_start(u32 addr)
169{ 108{
170 char addr_string[16]; 109 char addr_string[16];
diff --git a/net/tipc/net.h b/net/tipc/net.h
index c6c2b46f7c28..59ef3388be2c 100644
--- a/net/tipc/net.h
+++ b/net/tipc/net.h
@@ -37,8 +37,6 @@
37#ifndef _TIPC_NET_H 37#ifndef _TIPC_NET_H
38#define _TIPC_NET_H 38#define _TIPC_NET_H
39 39
40void tipc_net_route_msg(struct sk_buff *buf);
41
42int tipc_net_start(u32 addr); 40int tipc_net_start(u32 addr);
43void tipc_net_stop(void); 41void tipc_net_stop(void);
44 42
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 5b44c3041be4..f7069299943f 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/node.c: TIPC node management routines 2 * net/tipc/node.c: TIPC node management routines
3 * 3 *
4 * Copyright (c) 2000-2006, 2012 Ericsson AB 4 * Copyright (c) 2000-2006, 2012-2014, Ericsson AB
5 * Copyright (c) 2005-2006, 2010-2014, Wind River Systems 5 * Copyright (c) 2005-2006, 2010-2014, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -155,21 +155,25 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
155 if (!active[0]) { 155 if (!active[0]) {
156 active[0] = active[1] = l_ptr; 156 active[0] = active[1] = l_ptr;
157 node_established_contact(n_ptr); 157 node_established_contact(n_ptr);
158 return; 158 goto exit;
159 } 159 }
160 if (l_ptr->priority < active[0]->priority) { 160 if (l_ptr->priority < active[0]->priority) {
161 pr_info("New link <%s> becomes standby\n", l_ptr->name); 161 pr_info("New link <%s> becomes standby\n", l_ptr->name);
162 return; 162 goto exit;
163 } 163 }
164 tipc_link_dup_queue_xmit(active[0], l_ptr); 164 tipc_link_dup_queue_xmit(active[0], l_ptr);
165 if (l_ptr->priority == active[0]->priority) { 165 if (l_ptr->priority == active[0]->priority) {
166 active[0] = l_ptr; 166 active[0] = l_ptr;
167 return; 167 goto exit;
168 } 168 }
169 pr_info("Old link <%s> becomes standby\n", active[0]->name); 169 pr_info("Old link <%s> becomes standby\n", active[0]->name);
170 if (active[1] != active[0]) 170 if (active[1] != active[0])
171 pr_info("Old link <%s> becomes standby\n", active[1]->name); 171 pr_info("Old link <%s> becomes standby\n", active[1]->name);
172 active[0] = active[1] = l_ptr; 172 active[0] = active[1] = l_ptr;
173exit:
174 /* Leave room for changeover header when returning 'mtu' to users: */
175 n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE;
176 n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE;
173} 177}
174 178
175/** 179/**
@@ -229,6 +233,19 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
229 tipc_link_failover_send_queue(l_ptr); 233 tipc_link_failover_send_queue(l_ptr);
230 else 234 else
231 node_lost_contact(n_ptr); 235 node_lost_contact(n_ptr);
236
237 /* Leave room for changeover header when returning 'mtu' to users: */
238 if (active[0]) {
239 n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE;
240 n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE;
241 return;
242 }
243
244 /* Loopback link went down? No fragmentation needed from now on. */
245 if (n_ptr->addr == tipc_own_addr) {
246 n_ptr->act_mtus[0] = MAX_MSG_SIZE;
247 n_ptr->act_mtus[1] = MAX_MSG_SIZE;
248 }
232} 249}
233 250
234int tipc_node_active_links(struct tipc_node *n_ptr) 251int tipc_node_active_links(struct tipc_node *n_ptr)
@@ -457,8 +474,6 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
457void tipc_node_unlock(struct tipc_node *node) 474void tipc_node_unlock(struct tipc_node *node)
458{ 475{
459 LIST_HEAD(nsub_list); 476 LIST_HEAD(nsub_list);
460 struct tipc_link *link;
461 int pkt_sz = 0;
462 u32 addr = 0; 477 u32 addr = 0;
463 478
464 if (likely(!node->action_flags)) { 479 if (likely(!node->action_flags)) {
@@ -471,18 +486,13 @@ void tipc_node_unlock(struct tipc_node *node)
471 node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN; 486 node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
472 } 487 }
473 if (node->action_flags & TIPC_NOTIFY_NODE_UP) { 488 if (node->action_flags & TIPC_NOTIFY_NODE_UP) {
474 link = node->active_links[0];
475 node->action_flags &= ~TIPC_NOTIFY_NODE_UP; 489 node->action_flags &= ~TIPC_NOTIFY_NODE_UP;
476 if (link) { 490 addr = node->addr;
477 pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) *
478 ITEM_SIZE;
479 addr = node->addr;
480 }
481 } 491 }
482 spin_unlock_bh(&node->lock); 492 spin_unlock_bh(&node->lock);
483 493
484 if (!list_empty(&nsub_list)) 494 if (!list_empty(&nsub_list))
485 tipc_nodesub_notify(&nsub_list); 495 tipc_nodesub_notify(&nsub_list);
486 if (pkt_sz) 496 if (addr)
487 tipc_named_node_up(pkt_sz, addr); 497 tipc_named_node_up(addr);
488} 498}
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 9087063793f2..b61716a8218e 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -41,6 +41,7 @@
41#include "addr.h" 41#include "addr.h"
42#include "net.h" 42#include "net.h"
43#include "bearer.h" 43#include "bearer.h"
44#include "msg.h"
44 45
45/* 46/*
46 * Out-of-range value for node signature 47 * Out-of-range value for node signature
@@ -105,6 +106,7 @@ struct tipc_node {
105 spinlock_t lock; 106 spinlock_t lock;
106 struct hlist_node hash; 107 struct hlist_node hash;
107 struct tipc_link *active_links[2]; 108 struct tipc_link *active_links[2];
109 u32 act_mtus[2];
108 struct tipc_link *links[MAX_BEARERS]; 110 struct tipc_link *links[MAX_BEARERS];
109 unsigned int action_flags; 111 unsigned int action_flags;
110 struct tipc_node_bclink bclink; 112 struct tipc_node_bclink bclink;
@@ -143,4 +145,19 @@ static inline bool tipc_node_blocked(struct tipc_node *node)
143 TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN)); 145 TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN));
144} 146}
145 147
148static inline uint tipc_node_get_mtu(u32 addr, u32 selector)
149{
150 struct tipc_node *node;
151 u32 mtu;
152
153 node = tipc_node_find(addr);
154
155 if (likely(node))
156 mtu = node->act_mtus[selector & 1];
157 else
158 mtu = MAX_MSG_SIZE;
159
160 return mtu;
161}
162
146#endif 163#endif
diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
index 7c59ab1d6ecb..2d13eea8574a 100644
--- a/net/tipc/node_subscr.c
+++ b/net/tipc/node_subscr.c
@@ -84,11 +84,13 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
84void tipc_nodesub_notify(struct list_head *nsub_list) 84void tipc_nodesub_notify(struct list_head *nsub_list)
85{ 85{
86 struct tipc_node_subscr *ns, *safe; 86 struct tipc_node_subscr *ns, *safe;
87 net_ev_handler handle_node_down;
87 88
88 list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) { 89 list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) {
89 if (ns->handle_node_down) { 90 handle_node_down = ns->handle_node_down;
90 ns->handle_node_down(ns->usr_handle); 91 if (handle_node_down) {
91 ns->handle_node_down = NULL; 92 ns->handle_node_down = NULL;
93 handle_node_down(ns->usr_handle);
92 } 94 }
93 } 95 }
94} 96}
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 5fd7acce01ea..7e096a5e7701 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -42,8 +42,6 @@
42 42
43/* Connection management: */ 43/* Connection management: */
44#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 44#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */
45#define CONFIRMED 0
46#define PROBING 1
47 45
48#define MAX_REJECT_SIZE 1024 46#define MAX_REJECT_SIZE 1024
49 47
@@ -76,124 +74,6 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
76 (!peernode && (orignode == tipc_own_addr)); 74 (!peernode && (orignode == tipc_own_addr));
77} 75}
78 76
79/**
80 * tipc_port_mcast_xmit - send a multicast message to local and remote
81 * destinations
82 */
83int tipc_port_mcast_xmit(struct tipc_port *oport,
84 struct tipc_name_seq const *seq,
85 struct iovec const *msg_sect,
86 unsigned int len)
87{
88 struct tipc_msg *hdr;
89 struct sk_buff *buf;
90 struct sk_buff *ibuf = NULL;
91 struct tipc_port_list dports = {0, NULL, };
92 int ext_targets;
93 int res;
94
95 /* Create multicast message */
96 hdr = &oport->phdr;
97 msg_set_type(hdr, TIPC_MCAST_MSG);
98 msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
99 msg_set_destport(hdr, 0);
100 msg_set_destnode(hdr, 0);
101 msg_set_nametype(hdr, seq->type);
102 msg_set_namelower(hdr, seq->lower);
103 msg_set_nameupper(hdr, seq->upper);
104 msg_set_hdr_sz(hdr, MCAST_H_SIZE);
105 res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf);
106 if (unlikely(!buf))
107 return res;
108
109 /* Figure out where to send multicast message */
110 ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
111 TIPC_NODE_SCOPE, &dports);
112
113 /* Send message to destinations (duplicate it only if necessary) */
114 if (ext_targets) {
115 if (dports.count != 0) {
116 ibuf = skb_copy(buf, GFP_ATOMIC);
117 if (ibuf == NULL) {
118 tipc_port_list_free(&dports);
119 kfree_skb(buf);
120 return -ENOMEM;
121 }
122 }
123 res = tipc_bclink_xmit(buf);
124 if ((res < 0) && (dports.count != 0))
125 kfree_skb(ibuf);
126 } else {
127 ibuf = buf;
128 }
129
130 if (res >= 0) {
131 if (ibuf)
132 tipc_port_mcast_rcv(ibuf, &dports);
133 } else {
134 tipc_port_list_free(&dports);
135 }
136 return res;
137}
138
139/**
140 * tipc_port_mcast_rcv - deliver multicast message to all destination ports
141 *
142 * If there is no port list, perform a lookup to create one
143 */
144void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp)
145{
146 struct tipc_msg *msg;
147 struct tipc_port_list dports = {0, NULL, };
148 struct tipc_port_list *item = dp;
149 int cnt = 0;
150
151 msg = buf_msg(buf);
152
153 /* Create destination port list, if one wasn't supplied */
154 if (dp == NULL) {
155 tipc_nametbl_mc_translate(msg_nametype(msg),
156 msg_namelower(msg),
157 msg_nameupper(msg),
158 TIPC_CLUSTER_SCOPE,
159 &dports);
160 item = dp = &dports;
161 }
162
163 /* Deliver a copy of message to each destination port */
164 if (dp->count != 0) {
165 msg_set_destnode(msg, tipc_own_addr);
166 if (dp->count == 1) {
167 msg_set_destport(msg, dp->ports[0]);
168 tipc_sk_rcv(buf);
169 tipc_port_list_free(dp);
170 return;
171 }
172 for (; cnt < dp->count; cnt++) {
173 int index = cnt % PLSIZE;
174 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
175
176 if (b == NULL) {
177 pr_warn("Unable to deliver multicast message(s)\n");
178 goto exit;
179 }
180 if ((index == 0) && (cnt != 0))
181 item = item->next;
182 msg_set_destport(buf_msg(b), item->ports[index]);
183 tipc_sk_rcv(b);
184 }
185 }
186exit:
187 kfree_skb(buf);
188 tipc_port_list_free(dp);
189}
190
191
192void tipc_port_wakeup(struct tipc_port *port)
193{
194 tipc_sock_wakeup(tipc_port_to_sock(port));
195}
196
197/* tipc_port_init - intiate TIPC port and lock it 77/* tipc_port_init - intiate TIPC port and lock it
198 * 78 *
199 * Returns obtained reference if initialization is successful, zero otherwise 79 * Returns obtained reference if initialization is successful, zero otherwise
@@ -235,6 +115,8 @@ u32 tipc_port_init(struct tipc_port *p_ptr,
235void tipc_port_destroy(struct tipc_port *p_ptr) 115void tipc_port_destroy(struct tipc_port *p_ptr)
236{ 116{
237 struct sk_buff *buf = NULL; 117 struct sk_buff *buf = NULL;
118 struct tipc_msg *msg = NULL;
119 u32 peer;
238 120
239 tipc_withdraw(p_ptr, 0, NULL); 121 tipc_withdraw(p_ptr, 0, NULL);
240 122
@@ -246,14 +128,15 @@ void tipc_port_destroy(struct tipc_port *p_ptr)
246 if (p_ptr->connected) { 128 if (p_ptr->connected) {
247 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 129 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
248 tipc_nodesub_unsubscribe(&p_ptr->subscription); 130 tipc_nodesub_unsubscribe(&p_ptr->subscription);
131 msg = buf_msg(buf);
132 peer = msg_destnode(msg);
133 tipc_link_xmit(buf, peer, msg_link_selector(msg));
249 } 134 }
250
251 spin_lock_bh(&tipc_port_list_lock); 135 spin_lock_bh(&tipc_port_list_lock);
252 list_del(&p_ptr->port_list); 136 list_del(&p_ptr->port_list);
253 list_del(&p_ptr->wait_list); 137 list_del(&p_ptr->wait_list);
254 spin_unlock_bh(&tipc_port_list_lock); 138 spin_unlock_bh(&tipc_port_list_lock);
255 k_term_timer(&p_ptr->timer); 139 k_term_timer(&p_ptr->timer);
256 tipc_net_route_msg(buf);
257} 140}
258 141
259/* 142/*
@@ -275,100 +158,16 @@ static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
275 msg_set_destport(msg, tipc_port_peerport(p_ptr)); 158 msg_set_destport(msg, tipc_port_peerport(p_ptr));
276 msg_set_origport(msg, p_ptr->ref); 159 msg_set_origport(msg, p_ptr->ref);
277 msg_set_msgcnt(msg, ack); 160 msg_set_msgcnt(msg, ack);
161 buf->next = NULL;
278 } 162 }
279 return buf; 163 return buf;
280} 164}
281 165
282int tipc_reject_msg(struct sk_buff *buf, u32 err)
283{
284 struct tipc_msg *msg = buf_msg(buf);
285 struct sk_buff *rbuf;
286 struct tipc_msg *rmsg;
287 int hdr_sz;
288 u32 imp;
289 u32 data_sz = msg_data_sz(msg);
290 u32 src_node;
291 u32 rmsg_sz;
292
293 /* discard rejected message if it shouldn't be returned to sender */
294 if (WARN(!msg_isdata(msg),
295 "attempt to reject message with user=%u", msg_user(msg))) {
296 dump_stack();
297 goto exit;
298 }
299 if (msg_errcode(msg) || msg_dest_droppable(msg))
300 goto exit;
301
302 /*
303 * construct returned message by copying rejected message header and
304 * data (or subset), then updating header fields that need adjusting
305 */
306 hdr_sz = msg_hdr_sz(msg);
307 rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
308
309 rbuf = tipc_buf_acquire(rmsg_sz);
310 if (rbuf == NULL)
311 goto exit;
312
313 rmsg = buf_msg(rbuf);
314 skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
315
316 if (msg_connected(rmsg)) {
317 imp = msg_importance(rmsg);
318 if (imp < TIPC_CRITICAL_IMPORTANCE)
319 msg_set_importance(rmsg, ++imp);
320 }
321 msg_set_non_seq(rmsg, 0);
322 msg_set_size(rmsg, rmsg_sz);
323 msg_set_errcode(rmsg, err);
324 msg_set_prevnode(rmsg, tipc_own_addr);
325 msg_swap_words(rmsg, 4, 5);
326 if (!msg_short(rmsg))
327 msg_swap_words(rmsg, 6, 7);
328
329 /* send self-abort message when rejecting on a connected port */
330 if (msg_connected(msg)) {
331 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
332
333 if (p_ptr) {
334 struct sk_buff *abuf = NULL;
335
336 if (p_ptr->connected)
337 abuf = port_build_self_abort_msg(p_ptr, err);
338 tipc_port_unlock(p_ptr);
339 tipc_net_route_msg(abuf);
340 }
341 }
342
343 /* send returned message & dispose of rejected message */
344 src_node = msg_prevnode(msg);
345 if (in_own_node(src_node))
346 tipc_sk_rcv(rbuf);
347 else
348 tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg));
349exit:
350 kfree_skb(buf);
351 return data_sz;
352}
353
354int tipc_port_iovec_reject(struct tipc_port *p_ptr, struct tipc_msg *hdr,
355 struct iovec const *msg_sect, unsigned int len,
356 int err)
357{
358 struct sk_buff *buf;
359 int res;
360
361 res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf);
362 if (!buf)
363 return res;
364
365 return tipc_reject_msg(buf, err);
366}
367
368static void port_timeout(unsigned long ref) 166static void port_timeout(unsigned long ref)
369{ 167{
370 struct tipc_port *p_ptr = tipc_port_lock(ref); 168 struct tipc_port *p_ptr = tipc_port_lock(ref);
371 struct sk_buff *buf = NULL; 169 struct sk_buff *buf = NULL;
170 struct tipc_msg *msg = NULL;
372 171
373 if (!p_ptr) 172 if (!p_ptr)
374 return; 173 return;
@@ -379,15 +178,16 @@ static void port_timeout(unsigned long ref)
379 } 178 }
380 179
381 /* Last probe answered ? */ 180 /* Last probe answered ? */
382 if (p_ptr->probing_state == PROBING) { 181 if (p_ptr->probing_state == TIPC_CONN_PROBING) {
383 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 182 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
384 } else { 183 } else {
385 buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0); 184 buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
386 p_ptr->probing_state = PROBING; 185 p_ptr->probing_state = TIPC_CONN_PROBING;
387 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 186 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
388 } 187 }
389 tipc_port_unlock(p_ptr); 188 tipc_port_unlock(p_ptr);
390 tipc_net_route_msg(buf); 189 msg = buf_msg(buf);
190 tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
391} 191}
392 192
393 193
@@ -395,12 +195,14 @@ static void port_handle_node_down(unsigned long ref)
395{ 195{
396 struct tipc_port *p_ptr = tipc_port_lock(ref); 196 struct tipc_port *p_ptr = tipc_port_lock(ref);
397 struct sk_buff *buf = NULL; 197 struct sk_buff *buf = NULL;
198 struct tipc_msg *msg = NULL;
398 199
399 if (!p_ptr) 200 if (!p_ptr)
400 return; 201 return;
401 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); 202 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
402 tipc_port_unlock(p_ptr); 203 tipc_port_unlock(p_ptr);
403 tipc_net_route_msg(buf); 204 msg = buf_msg(buf);
205 tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
404} 206}
405 207
406 208
@@ -412,6 +214,7 @@ static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 er
412 struct tipc_msg *msg = buf_msg(buf); 214 struct tipc_msg *msg = buf_msg(buf);
413 msg_swap_words(msg, 4, 5); 215 msg_swap_words(msg, 4, 5);
414 msg_swap_words(msg, 6, 7); 216 msg_swap_words(msg, 6, 7);
217 buf->next = NULL;
415 } 218 }
416 return buf; 219 return buf;
417} 220}
@@ -436,60 +239,11 @@ static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 er
436 if (imp < TIPC_CRITICAL_IMPORTANCE) 239 if (imp < TIPC_CRITICAL_IMPORTANCE)
437 msg_set_importance(msg, ++imp); 240 msg_set_importance(msg, ++imp);
438 msg_set_errcode(msg, err); 241 msg_set_errcode(msg, err);
242 buf->next = NULL;
439 } 243 }
440 return buf; 244 return buf;
441} 245}
442 246
443void tipc_port_proto_rcv(struct sk_buff *buf)
444{
445 struct tipc_msg *msg = buf_msg(buf);
446 struct tipc_port *p_ptr;
447 struct sk_buff *r_buf = NULL;
448 u32 destport = msg_destport(msg);
449 int wakeable;
450
451 /* Validate connection */
452 p_ptr = tipc_port_lock(destport);
453 if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
454 r_buf = tipc_buf_acquire(BASIC_H_SIZE);
455 if (r_buf) {
456 msg = buf_msg(r_buf);
457 tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
458 BASIC_H_SIZE, msg_orignode(msg));
459 msg_set_errcode(msg, TIPC_ERR_NO_PORT);
460 msg_set_origport(msg, destport);
461 msg_set_destport(msg, msg_origport(msg));
462 }
463 if (p_ptr)
464 tipc_port_unlock(p_ptr);
465 goto exit;
466 }
467
468 /* Process protocol message sent by peer */
469 switch (msg_type(msg)) {
470 case CONN_ACK:
471 wakeable = tipc_port_congested(p_ptr) && p_ptr->congested;
472 p_ptr->acked += msg_msgcnt(msg);
473 if (!tipc_port_congested(p_ptr)) {
474 p_ptr->congested = 0;
475 if (wakeable)
476 tipc_port_wakeup(p_ptr);
477 }
478 break;
479 case CONN_PROBE:
480 r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0);
481 break;
482 default:
483 /* CONN_PROBE_REPLY or unrecognized - no action required */
484 break;
485 }
486 p_ptr->probing_state = CONFIRMED;
487 tipc_port_unlock(p_ptr);
488exit:
489 tipc_net_route_msg(r_buf);
490 kfree_skb(buf);
491}
492
493static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id) 247static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id)
494{ 248{
495 struct publication *publ; 249 struct publication *publ;
@@ -581,16 +335,19 @@ void tipc_acknowledge(u32 ref, u32 ack)
581{ 335{
582 struct tipc_port *p_ptr; 336 struct tipc_port *p_ptr;
583 struct sk_buff *buf = NULL; 337 struct sk_buff *buf = NULL;
338 struct tipc_msg *msg;
584 339
585 p_ptr = tipc_port_lock(ref); 340 p_ptr = tipc_port_lock(ref);
586 if (!p_ptr) 341 if (!p_ptr)
587 return; 342 return;
588 if (p_ptr->connected) { 343 if (p_ptr->connected)
589 p_ptr->conn_unacked -= ack;
590 buf = port_build_proto_msg(p_ptr, CONN_ACK, ack); 344 buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
591 } 345
592 tipc_port_unlock(p_ptr); 346 tipc_port_unlock(p_ptr);
593 tipc_net_route_msg(buf); 347 if (!buf)
348 return;
349 msg = buf_msg(buf);
350 tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
594} 351}
595 352
596int tipc_publish(struct tipc_port *p_ptr, unsigned int scope, 353int tipc_publish(struct tipc_port *p_ptr, unsigned int scope,
@@ -689,7 +446,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
689 msg_set_hdr_sz(msg, SHORT_H_SIZE); 446 msg_set_hdr_sz(msg, SHORT_H_SIZE);
690 447
691 p_ptr->probing_interval = PROBING_INTERVAL; 448 p_ptr->probing_interval = PROBING_INTERVAL;
692 p_ptr->probing_state = CONFIRMED; 449 p_ptr->probing_state = TIPC_CONN_OK;
693 p_ptr->connected = 1; 450 p_ptr->connected = 1;
694 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 451 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
695 452
@@ -698,7 +455,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
698 (net_ev_handler)port_handle_node_down); 455 (net_ev_handler)port_handle_node_down);
699 res = 0; 456 res = 0;
700exit: 457exit:
701 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); 458 p_ptr->max_pkt = tipc_node_get_mtu(peer->node, ref);
702 return res; 459 return res;
703} 460}
704 461
@@ -741,6 +498,7 @@ int tipc_port_disconnect(u32 ref)
741 */ 498 */
742int tipc_port_shutdown(u32 ref) 499int tipc_port_shutdown(u32 ref)
743{ 500{
501 struct tipc_msg *msg;
744 struct tipc_port *p_ptr; 502 struct tipc_port *p_ptr;
745 struct sk_buff *buf = NULL; 503 struct sk_buff *buf = NULL;
746 504
@@ -750,149 +508,7 @@ int tipc_port_shutdown(u32 ref)
750 508
751 buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN); 509 buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
752 tipc_port_unlock(p_ptr); 510 tipc_port_unlock(p_ptr);
753 tipc_net_route_msg(buf); 511 msg = buf_msg(buf);
512 tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
754 return tipc_port_disconnect(ref); 513 return tipc_port_disconnect(ref);
755} 514}
756
757/*
758 * tipc_port_iovec_rcv: Concatenate and deliver sectioned
759 * message for this node.
760 */
761static int tipc_port_iovec_rcv(struct tipc_port *sender,
762 struct iovec const *msg_sect,
763 unsigned int len)
764{
765 struct sk_buff *buf;
766 int res;
767
768 res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf);
769 if (likely(buf))
770 tipc_sk_rcv(buf);
771 return res;
772}
773
774/**
775 * tipc_send - send message sections on connection
776 */
777int tipc_send(struct tipc_port *p_ptr,
778 struct iovec const *msg_sect,
779 unsigned int len)
780{
781 u32 destnode;
782 int res;
783
784 if (!p_ptr->connected)
785 return -EINVAL;
786
787 p_ptr->congested = 1;
788 if (!tipc_port_congested(p_ptr)) {
789 destnode = tipc_port_peernode(p_ptr);
790 if (likely(!in_own_node(destnode)))
791 res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
792 destnode);
793 else
794 res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
795
796 if (likely(res != -ELINKCONG)) {
797 p_ptr->congested = 0;
798 if (res > 0)
799 p_ptr->sent++;
800 return res;
801 }
802 }
803 if (tipc_port_unreliable(p_ptr)) {
804 p_ptr->congested = 0;
805 return len;
806 }
807 return -ELINKCONG;
808}
809
810/**
811 * tipc_send2name - send message sections to port name
812 */
813int tipc_send2name(struct tipc_port *p_ptr,
814 struct tipc_name const *name,
815 unsigned int domain,
816 struct iovec const *msg_sect,
817 unsigned int len)
818{
819 struct tipc_msg *msg;
820 u32 destnode = domain;
821 u32 destport;
822 int res;
823
824 if (p_ptr->connected)
825 return -EINVAL;
826
827 msg = &p_ptr->phdr;
828 msg_set_type(msg, TIPC_NAMED_MSG);
829 msg_set_hdr_sz(msg, NAMED_H_SIZE);
830 msg_set_nametype(msg, name->type);
831 msg_set_nameinst(msg, name->instance);
832 msg_set_lookup_scope(msg, tipc_addr_scope(domain));
833 destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
834 msg_set_destnode(msg, destnode);
835 msg_set_destport(msg, destport);
836
837 if (likely(destport || destnode)) {
838 if (likely(in_own_node(destnode)))
839 res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
840 else if (tipc_own_addr)
841 res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
842 destnode);
843 else
844 res = tipc_port_iovec_reject(p_ptr, msg, msg_sect,
845 len, TIPC_ERR_NO_NODE);
846 if (likely(res != -ELINKCONG)) {
847 if (res > 0)
848 p_ptr->sent++;
849 return res;
850 }
851 if (tipc_port_unreliable(p_ptr))
852 return len;
853
854 return -ELINKCONG;
855 }
856 return tipc_port_iovec_reject(p_ptr, msg, msg_sect, len,
857 TIPC_ERR_NO_NAME);
858}
859
860/**
861 * tipc_send2port - send message sections to port identity
862 */
863int tipc_send2port(struct tipc_port *p_ptr,
864 struct tipc_portid const *dest,
865 struct iovec const *msg_sect,
866 unsigned int len)
867{
868 struct tipc_msg *msg;
869 int res;
870
871 if (p_ptr->connected)
872 return -EINVAL;
873
874 msg = &p_ptr->phdr;
875 msg_set_type(msg, TIPC_DIRECT_MSG);
876 msg_set_lookup_scope(msg, 0);
877 msg_set_destnode(msg, dest->node);
878 msg_set_destport(msg, dest->ref);
879 msg_set_hdr_sz(msg, BASIC_H_SIZE);
880
881 if (in_own_node(dest->node))
882 res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
883 else if (tipc_own_addr)
884 res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
885 dest->node);
886 else
887 res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, len,
888 TIPC_ERR_NO_NODE);
889 if (likely(res != -ELINKCONG)) {
890 if (res > 0)
891 p_ptr->sent++;
892 return res;
893 }
894 if (tipc_port_unreliable(p_ptr))
895 return len;
896
897 return -ELINKCONG;
898}
diff --git a/net/tipc/port.h b/net/tipc/port.h
index cf4ca5b1d9a4..3087da39ee47 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -53,17 +53,13 @@
53 * @connected: non-zero if port is currently connected to a peer port 53 * @connected: non-zero if port is currently connected to a peer port
54 * @conn_type: TIPC type used when connection was established 54 * @conn_type: TIPC type used when connection was established
55 * @conn_instance: TIPC instance used when connection was established 55 * @conn_instance: TIPC instance used when connection was established
56 * @conn_unacked: number of unacknowledged messages received from peer port
57 * @published: non-zero if port has one or more associated names 56 * @published: non-zero if port has one or more associated names
58 * @congested: non-zero if cannot send because of link or port congestion
59 * @max_pkt: maximum packet size "hint" used when building messages sent by port 57 * @max_pkt: maximum packet size "hint" used when building messages sent by port
60 * @ref: unique reference to port in TIPC object registry 58 * @ref: unique reference to port in TIPC object registry
61 * @phdr: preformatted message header used when sending messages 59 * @phdr: preformatted message header used when sending messages
62 * @port_list: adjacent ports in TIPC's global list of ports 60 * @port_list: adjacent ports in TIPC's global list of ports
63 * @wait_list: adjacent ports in list of ports waiting on link congestion 61 * @wait_list: adjacent ports in list of ports waiting on link congestion
64 * @waiting_pkts: 62 * @waiting_pkts:
65 * @sent: # of non-empty messages sent by port
66 * @acked: # of non-empty message acknowledgements from connected port's peer
67 * @publications: list of publications for port 63 * @publications: list of publications for port
68 * @pub_count: total # of publications port has made during its lifetime 64 * @pub_count: total # of publications port has made during its lifetime
69 * @probing_state: 65 * @probing_state:
@@ -76,17 +72,13 @@ struct tipc_port {
76 int connected; 72 int connected;
77 u32 conn_type; 73 u32 conn_type;
78 u32 conn_instance; 74 u32 conn_instance;
79 u32 conn_unacked;
80 int published; 75 int published;
81 u32 congested;
82 u32 max_pkt; 76 u32 max_pkt;
83 u32 ref; 77 u32 ref;
84 struct tipc_msg phdr; 78 struct tipc_msg phdr;
85 struct list_head port_list; 79 struct list_head port_list;
86 struct list_head wait_list; 80 struct list_head wait_list;
87 u32 waiting_pkts; 81 u32 waiting_pkts;
88 u32 sent;
89 u32 acked;
90 struct list_head publications; 82 struct list_head publications;
91 u32 pub_count; 83 u32 pub_count;
92 u32 probing_state; 84 u32 probing_state;
@@ -104,8 +96,6 @@ struct tipc_port_list;
104u32 tipc_port_init(struct tipc_port *p_ptr, 96u32 tipc_port_init(struct tipc_port *p_ptr,
105 const unsigned int importance); 97 const unsigned int importance);
106 98
107int tipc_reject_msg(struct sk_buff *buf, u32 err);
108
109void tipc_acknowledge(u32 port_ref, u32 ack); 99void tipc_acknowledge(u32 port_ref, u32 ack);
110 100
111void tipc_port_destroy(struct tipc_port *p_ptr); 101void tipc_port_destroy(struct tipc_port *p_ptr);
@@ -122,8 +112,6 @@ int tipc_port_disconnect(u32 portref);
122 112
123int tipc_port_shutdown(u32 ref); 113int tipc_port_shutdown(u32 ref);
124 114
125void tipc_port_wakeup(struct tipc_port *port);
126
127/* 115/*
128 * The following routines require that the port be locked on entry 116 * The following routines require that the port be locked on entry
129 */ 117 */
@@ -132,39 +120,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
132 struct tipc_portid const *peer); 120 struct tipc_portid const *peer);
133int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); 121int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
134 122
135/*
136 * TIPC messaging routines
137 */
138
139int tipc_send(struct tipc_port *port,
140 struct iovec const *msg_sect,
141 unsigned int len);
142
143int tipc_send2name(struct tipc_port *port,
144 struct tipc_name const *name,
145 u32 domain,
146 struct iovec const *msg_sect,
147 unsigned int len);
148
149int tipc_send2port(struct tipc_port *port,
150 struct tipc_portid const *dest,
151 struct iovec const *msg_sect,
152 unsigned int len);
153
154int tipc_port_mcast_xmit(struct tipc_port *port,
155 struct tipc_name_seq const *seq,
156 struct iovec const *msg,
157 unsigned int len);
158
159int tipc_port_iovec_reject(struct tipc_port *p_ptr,
160 struct tipc_msg *hdr,
161 struct iovec const *msg_sect,
162 unsigned int len,
163 int err);
164
165struct sk_buff *tipc_port_get_ports(void); 123struct sk_buff *tipc_port_get_ports(void);
166void tipc_port_proto_rcv(struct sk_buff *buf);
167void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp);
168void tipc_port_reinit(void); 124void tipc_port_reinit(void);
169 125
170/** 126/**
@@ -185,12 +141,6 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr)
185 spin_unlock_bh(p_ptr->lock); 141 spin_unlock_bh(p_ptr->lock);
186} 142}
187 143
188static inline int tipc_port_congested(struct tipc_port *p_ptr)
189{
190 return ((p_ptr->sent - p_ptr->acked) >= TIPC_FLOWCTRL_WIN);
191}
192
193
194static inline u32 tipc_port_peernode(struct tipc_port *p_ptr) 144static inline u32 tipc_port_peernode(struct tipc_port *p_ptr)
195{ 145{
196 return msg_destnode(&p_ptr->phdr); 146 return msg_destnode(&p_ptr->phdr);
@@ -229,9 +179,12 @@ static inline int tipc_port_importance(struct tipc_port *port)
229 return msg_importance(&port->phdr); 179 return msg_importance(&port->phdr);
230} 180}
231 181
232static inline void tipc_port_set_importance(struct tipc_port *port, int imp) 182static inline int tipc_port_set_importance(struct tipc_port *port, int imp)
233{ 183{
184 if (imp > TIPC_CRITICAL_IMPORTANCE)
185 return -EINVAL;
234 msg_set_importance(&port->phdr, (u32)imp); 186 msg_set_importance(&port->phdr, (u32)imp);
187 return 0;
235} 188}
236 189
237#endif 190#endif
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index ef0475568f9e..ff8c8118d56e 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -36,20 +36,23 @@
36 36
37#include "core.h" 37#include "core.h"
38#include "port.h" 38#include "port.h"
39#include "name_table.h"
39#include "node.h" 40#include "node.h"
40 41#include "link.h"
41#include <linux/export.h> 42#include <linux/export.h>
42 43
43#define SS_LISTENING -1 /* socket is listening */ 44#define SS_LISTENING -1 /* socket is listening */
44#define SS_READY -2 /* socket is connectionless */ 45#define SS_READY -2 /* socket is connectionless */
45 46
46#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ 47#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
48#define TIPC_FWD_MSG 1
47 49
48static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); 50static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
49static void tipc_data_ready(struct sock *sk); 51static void tipc_data_ready(struct sock *sk);
50static void tipc_write_space(struct sock *sk); 52static void tipc_write_space(struct sock *sk);
51static int tipc_release(struct socket *sock); 53static int tipc_release(struct socket *sock);
52static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); 54static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
55static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
53 56
54static const struct proto_ops packet_ops; 57static const struct proto_ops packet_ops;
55static const struct proto_ops stream_ops; 58static const struct proto_ops stream_ops;
@@ -123,9 +126,12 @@ static void advance_rx_queue(struct sock *sk)
123static void reject_rx_queue(struct sock *sk) 126static void reject_rx_queue(struct sock *sk)
124{ 127{
125 struct sk_buff *buf; 128 struct sk_buff *buf;
129 u32 dnode;
126 130
127 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) 131 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
128 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 132 if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
133 tipc_link_xmit(buf, dnode, 0);
134 }
129} 135}
130 136
131/** 137/**
@@ -201,6 +207,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
201 sk->sk_data_ready = tipc_data_ready; 207 sk->sk_data_ready = tipc_data_ready;
202 sk->sk_write_space = tipc_write_space; 208 sk->sk_write_space = tipc_write_space;
203 tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; 209 tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
210 tsk->sent_unacked = 0;
204 atomic_set(&tsk->dupl_rcvcnt, 0); 211 atomic_set(&tsk->dupl_rcvcnt, 0);
205 tipc_port_unlock(port); 212 tipc_port_unlock(port);
206 213
@@ -303,6 +310,7 @@ static int tipc_release(struct socket *sock)
303 struct tipc_sock *tsk; 310 struct tipc_sock *tsk;
304 struct tipc_port *port; 311 struct tipc_port *port;
305 struct sk_buff *buf; 312 struct sk_buff *buf;
313 u32 dnode;
306 314
307 /* 315 /*
308 * Exit if socket isn't fully initialized (occurs when a failed accept() 316 * Exit if socket isn't fully initialized (occurs when a failed accept()
@@ -331,7 +339,8 @@ static int tipc_release(struct socket *sock)
331 sock->state = SS_DISCONNECTING; 339 sock->state = SS_DISCONNECTING;
332 tipc_port_disconnect(port->ref); 340 tipc_port_disconnect(port->ref);
333 } 341 }
334 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 342 if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
343 tipc_link_xmit(buf, dnode, 0);
335 } 344 }
336 } 345 }
337 346
@@ -504,12 +513,12 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
504 513
505 switch ((int)sock->state) { 514 switch ((int)sock->state) {
506 case SS_UNCONNECTED: 515 case SS_UNCONNECTED:
507 if (!tsk->port.congested) 516 if (!tsk->link_cong)
508 mask |= POLLOUT; 517 mask |= POLLOUT;
509 break; 518 break;
510 case SS_READY: 519 case SS_READY:
511 case SS_CONNECTED: 520 case SS_CONNECTED:
512 if (!tsk->port.congested) 521 if (!tsk->link_cong && !tipc_sk_conn_cong(tsk))
513 mask |= POLLOUT; 522 mask |= POLLOUT;
514 /* fall thru' */ 523 /* fall thru' */
515 case SS_CONNECTING: 524 case SS_CONNECTING:
@@ -526,6 +535,136 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
526} 535}
527 536
528/** 537/**
538 * tipc_sendmcast - send multicast message
539 * @sock: socket structure
540 * @seq: destination address
541 * @iov: message data to send
542 * @dsz: total length of message data
543 * @timeo: timeout to wait for wakeup
544 *
545 * Called from function tipc_sendmsg(), which has done all sanity checks
546 * Returns the number of bytes sent on success, or errno
547 */
548static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
549 struct iovec *iov, size_t dsz, long timeo)
550{
551 struct sock *sk = sock->sk;
552 struct tipc_msg *mhdr = &tipc_sk(sk)->port.phdr;
553 struct sk_buff *buf;
554 uint mtu;
555 int rc;
556
557 msg_set_type(mhdr, TIPC_MCAST_MSG);
558 msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
559 msg_set_destport(mhdr, 0);
560 msg_set_destnode(mhdr, 0);
561 msg_set_nametype(mhdr, seq->type);
562 msg_set_namelower(mhdr, seq->lower);
563 msg_set_nameupper(mhdr, seq->upper);
564 msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
565
566new_mtu:
567 mtu = tipc_bclink_get_mtu();
568 rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
569 if (unlikely(rc < 0))
570 return rc;
571
572 do {
573 rc = tipc_bclink_xmit(buf);
574 if (likely(rc >= 0)) {
575 rc = dsz;
576 break;
577 }
578 if (rc == -EMSGSIZE)
579 goto new_mtu;
580 if (rc != -ELINKCONG)
581 break;
582 rc = tipc_wait_for_sndmsg(sock, &timeo);
583 if (rc)
584 kfree_skb_list(buf);
585 } while (!rc);
586 return rc;
587}
588
589/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
590 */
591void tipc_sk_mcast_rcv(struct sk_buff *buf)
592{
593 struct tipc_msg *msg = buf_msg(buf);
594 struct tipc_port_list dports = {0, NULL, };
595 struct tipc_port_list *item;
596 struct sk_buff *b;
597 uint i, last, dst = 0;
598 u32 scope = TIPC_CLUSTER_SCOPE;
599
600 if (in_own_node(msg_orignode(msg)))
601 scope = TIPC_NODE_SCOPE;
602
603 /* Create destination port list: */
604 tipc_nametbl_mc_translate(msg_nametype(msg),
605 msg_namelower(msg),
606 msg_nameupper(msg),
607 scope,
608 &dports);
609 last = dports.count;
610 if (!last) {
611 kfree_skb(buf);
612 return;
613 }
614
615 for (item = &dports; item; item = item->next) {
616 for (i = 0; i < PLSIZE && ++dst <= last; i++) {
617 b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
618 if (!b) {
619 pr_warn("Failed do clone mcast rcv buffer\n");
620 continue;
621 }
622 msg_set_destport(msg, item->ports[i]);
623 tipc_sk_rcv(b);
624 }
625 }
626 tipc_port_list_free(&dports);
627}
628
629/**
630 * tipc_sk_proto_rcv - receive a connection mng protocol message
631 * @tsk: receiving socket
632 * @dnode: node to send response message to, if any
633 * @buf: buffer containing protocol message
634 * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
635 * (CONN_PROBE_REPLY) message should be forwarded.
636 */
637static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
638 struct sk_buff *buf)
639{
640 struct tipc_msg *msg = buf_msg(buf);
641 struct tipc_port *port = &tsk->port;
642 int conn_cong;
643
644 /* Ignore if connection cannot be validated: */
645 if (!port->connected || !tipc_port_peer_msg(port, msg))
646 goto exit;
647
648 port->probing_state = TIPC_CONN_OK;
649
650 if (msg_type(msg) == CONN_ACK) {
651 conn_cong = tipc_sk_conn_cong(tsk);
652 tsk->sent_unacked -= msg_msgcnt(msg);
653 if (conn_cong)
654 tipc_sock_wakeup(tsk);
655 } else if (msg_type(msg) == CONN_PROBE) {
656 if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
657 return TIPC_OK;
658 msg_set_type(msg, CONN_PROBE_REPLY);
659 return TIPC_FWD_MSG;
660 }
661 /* Do nothing if msg_type() == CONN_PROBE_REPLY */
662exit:
663 kfree_skb(buf);
664 return TIPC_OK;
665}
666
667/**
529 * dest_name_check - verify user is permitted to send to specified port name 668 * dest_name_check - verify user is permitted to send to specified port name
530 * @dest: destination address 669 * @dest: destination address
531 * @m: descriptor for message to be sent 670 * @m: descriptor for message to be sent
@@ -539,6 +678,8 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
539{ 678{
540 struct tipc_cfg_msg_hdr hdr; 679 struct tipc_cfg_msg_hdr hdr;
541 680
681 if (unlikely(dest->addrtype == TIPC_ADDR_ID))
682 return 0;
542 if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES)) 683 if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
543 return 0; 684 return 0;
544 if (likely(dest->addr.name.name.type == TIPC_TOP_SRV)) 685 if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
@@ -575,19 +716,18 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
575 return sock_intr_errno(*timeo_p); 716 return sock_intr_errno(*timeo_p);
576 717
577 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); 718 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
578 done = sk_wait_event(sk, timeo_p, !tsk->port.congested); 719 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
579 finish_wait(sk_sleep(sk), &wait); 720 finish_wait(sk_sleep(sk), &wait);
580 } while (!done); 721 } while (!done);
581 return 0; 722 return 0;
582} 723}
583 724
584
585/** 725/**
586 * tipc_sendmsg - send message in connectionless manner 726 * tipc_sendmsg - send message in connectionless manner
587 * @iocb: if NULL, indicates that socket lock is already held 727 * @iocb: if NULL, indicates that socket lock is already held
588 * @sock: socket structure 728 * @sock: socket structure
589 * @m: message to send 729 * @m: message to send
590 * @total_len: length of message 730 * @dsz: amount of user data to be sent
591 * 731 *
592 * Message must have an destination specified explicitly. 732 * Message must have an destination specified explicitly.
593 * Used for SOCK_RDM and SOCK_DGRAM messages, 733 * Used for SOCK_RDM and SOCK_DGRAM messages,
@@ -597,100 +737,123 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
597 * Returns the number of bytes sent on success, or errno otherwise 737 * Returns the number of bytes sent on success, or errno otherwise
598 */ 738 */
599static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, 739static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
600 struct msghdr *m, size_t total_len) 740 struct msghdr *m, size_t dsz)
601{ 741{
742 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
602 struct sock *sk = sock->sk; 743 struct sock *sk = sock->sk;
603 struct tipc_sock *tsk = tipc_sk(sk); 744 struct tipc_sock *tsk = tipc_sk(sk);
604 struct tipc_port *port = &tsk->port; 745 struct tipc_port *port = &tsk->port;
605 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 746 struct tipc_msg *mhdr = &port->phdr;
606 int needs_conn; 747 struct iovec *iov = m->msg_iov;
748 u32 dnode, dport;
749 struct sk_buff *buf;
750 struct tipc_name_seq *seq = &dest->addr.nameseq;
751 u32 mtu;
607 long timeo; 752 long timeo;
608 int res = -EINVAL; 753 int rc = -EINVAL;
609 754
610 if (unlikely(!dest)) 755 if (unlikely(!dest))
611 return -EDESTADDRREQ; 756 return -EDESTADDRREQ;
757
612 if (unlikely((m->msg_namelen < sizeof(*dest)) || 758 if (unlikely((m->msg_namelen < sizeof(*dest)) ||
613 (dest->family != AF_TIPC))) 759 (dest->family != AF_TIPC)))
614 return -EINVAL; 760 return -EINVAL;
615 if (total_len > TIPC_MAX_USER_MSG_SIZE) 761
762 if (dsz > TIPC_MAX_USER_MSG_SIZE)
616 return -EMSGSIZE; 763 return -EMSGSIZE;
617 764
618 if (iocb) 765 if (iocb)
619 lock_sock(sk); 766 lock_sock(sk);
620 767
621 needs_conn = (sock->state != SS_READY); 768 if (unlikely(sock->state != SS_READY)) {
622 if (unlikely(needs_conn)) {
623 if (sock->state == SS_LISTENING) { 769 if (sock->state == SS_LISTENING) {
624 res = -EPIPE; 770 rc = -EPIPE;
625 goto exit; 771 goto exit;
626 } 772 }
627 if (sock->state != SS_UNCONNECTED) { 773 if (sock->state != SS_UNCONNECTED) {
628 res = -EISCONN; 774 rc = -EISCONN;
629 goto exit; 775 goto exit;
630 } 776 }
631 if (tsk->port.published) { 777 if (tsk->port.published) {
632 res = -EOPNOTSUPP; 778 rc = -EOPNOTSUPP;
633 goto exit; 779 goto exit;
634 } 780 }
635 if (dest->addrtype == TIPC_ADDR_NAME) { 781 if (dest->addrtype == TIPC_ADDR_NAME) {
636 tsk->port.conn_type = dest->addr.name.name.type; 782 tsk->port.conn_type = dest->addr.name.name.type;
637 tsk->port.conn_instance = dest->addr.name.name.instance; 783 tsk->port.conn_instance = dest->addr.name.name.instance;
638 } 784 }
639
640 /* Abort any pending connection attempts (very unlikely) */
641 reject_rx_queue(sk);
642 } 785 }
786 rc = dest_name_check(dest, m);
787 if (rc)
788 goto exit;
643 789
644 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 790 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
645 do { 791
646 if (dest->addrtype == TIPC_ADDR_NAME) { 792 if (dest->addrtype == TIPC_ADDR_MCAST) {
647 res = dest_name_check(dest, m); 793 rc = tipc_sendmcast(sock, seq, iov, dsz, timeo);
648 if (res) 794 goto exit;
649 break; 795 } else if (dest->addrtype == TIPC_ADDR_NAME) {
650 res = tipc_send2name(port, 796 u32 type = dest->addr.name.name.type;
651 &dest->addr.name.name, 797 u32 inst = dest->addr.name.name.instance;
652 dest->addr.name.domain, 798 u32 domain = dest->addr.name.domain;
653 m->msg_iov, 799
654 total_len); 800 dnode = domain;
655 } else if (dest->addrtype == TIPC_ADDR_ID) { 801 msg_set_type(mhdr, TIPC_NAMED_MSG);
656 res = tipc_send2port(port, 802 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
657 &dest->addr.id, 803 msg_set_nametype(mhdr, type);
658 m->msg_iov, 804 msg_set_nameinst(mhdr, inst);
659 total_len); 805 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
660 } else if (dest->addrtype == TIPC_ADDR_MCAST) { 806 dport = tipc_nametbl_translate(type, inst, &dnode);
661 if (needs_conn) { 807 msg_set_destnode(mhdr, dnode);
662 res = -EOPNOTSUPP; 808 msg_set_destport(mhdr, dport);
663 break; 809 if (unlikely(!dport && !dnode)) {
664 } 810 rc = -EHOSTUNREACH;
665 res = dest_name_check(dest, m); 811 goto exit;
666 if (res)
667 break;
668 res = tipc_port_mcast_xmit(port,
669 &dest->addr.nameseq,
670 m->msg_iov,
671 total_len);
672 } 812 }
673 if (likely(res != -ELINKCONG)) { 813 } else if (dest->addrtype == TIPC_ADDR_ID) {
674 if (needs_conn && (res >= 0)) 814 dnode = dest->addr.id.node;
815 msg_set_type(mhdr, TIPC_DIRECT_MSG);
816 msg_set_lookup_scope(mhdr, 0);
817 msg_set_destnode(mhdr, dnode);
818 msg_set_destport(mhdr, dest->addr.id.ref);
819 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
820 }
821
822new_mtu:
823 mtu = tipc_node_get_mtu(dnode, tsk->port.ref);
824 rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
825 if (rc < 0)
826 goto exit;
827
828 do {
829 rc = tipc_link_xmit(buf, dnode, tsk->port.ref);
830 if (likely(rc >= 0)) {
831 if (sock->state != SS_READY)
675 sock->state = SS_CONNECTING; 832 sock->state = SS_CONNECTING;
833 rc = dsz;
676 break; 834 break;
677 } 835 }
678 res = tipc_wait_for_sndmsg(sock, &timeo); 836 if (rc == -EMSGSIZE)
679 if (res) 837 goto new_mtu;
838
839 if (rc != -ELINKCONG)
680 break; 840 break;
681 } while (1);
682 841
842 rc = tipc_wait_for_sndmsg(sock, &timeo);
843 if (rc)
844 kfree_skb_list(buf);
845 } while (!rc);
683exit: 846exit:
684 if (iocb) 847 if (iocb)
685 release_sock(sk); 848 release_sock(sk);
686 return res; 849
850 return rc;
687} 851}
688 852
689static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) 853static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
690{ 854{
691 struct sock *sk = sock->sk; 855 struct sock *sk = sock->sk;
692 struct tipc_sock *tsk = tipc_sk(sk); 856 struct tipc_sock *tsk = tipc_sk(sk);
693 struct tipc_port *port = &tsk->port;
694 DEFINE_WAIT(wait); 857 DEFINE_WAIT(wait);
695 int done; 858 int done;
696 859
@@ -709,37 +872,49 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
709 872
710 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); 873 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
711 done = sk_wait_event(sk, timeo_p, 874 done = sk_wait_event(sk, timeo_p,
712 (!port->congested || !port->connected)); 875 (!tsk->link_cong &&
876 !tipc_sk_conn_cong(tsk)) ||
877 !tsk->port.connected);
713 finish_wait(sk_sleep(sk), &wait); 878 finish_wait(sk_sleep(sk), &wait);
714 } while (!done); 879 } while (!done);
715 return 0; 880 return 0;
716} 881}
717 882
718/** 883/**
719 * tipc_send_packet - send a connection-oriented message 884 * tipc_send_stream - send stream-oriented data
720 * @iocb: if NULL, indicates that socket lock is already held 885 * @iocb: (unused)
721 * @sock: socket structure 886 * @sock: socket structure
722 * @m: message to send 887 * @m: data to send
723 * @total_len: length of message 888 * @dsz: total length of data to be transmitted
724 * 889 *
725 * Used for SOCK_SEQPACKET messages and SOCK_STREAM data. 890 * Used for SOCK_STREAM data.
726 * 891 *
727 * Returns the number of bytes sent on success, or errno otherwise 892 * Returns the number of bytes sent on success (or partial success),
893 * or errno if no data sent
728 */ 894 */
729static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, 895static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
730 struct msghdr *m, size_t total_len) 896 struct msghdr *m, size_t dsz)
731{ 897{
732 struct sock *sk = sock->sk; 898 struct sock *sk = sock->sk;
733 struct tipc_sock *tsk = tipc_sk(sk); 899 struct tipc_sock *tsk = tipc_sk(sk);
900 struct tipc_port *port = &tsk->port;
901 struct tipc_msg *mhdr = &port->phdr;
902 struct sk_buff *buf;
734 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 903 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
735 int res = -EINVAL; 904 u32 ref = port->ref;
905 int rc = -EINVAL;
736 long timeo; 906 long timeo;
907 u32 dnode;
908 uint mtu, send, sent = 0;
737 909
738 /* Handle implied connection establishment */ 910 /* Handle implied connection establishment */
739 if (unlikely(dest)) 911 if (unlikely(dest)) {
740 return tipc_sendmsg(iocb, sock, m, total_len); 912 rc = tipc_sendmsg(iocb, sock, m, dsz);
741 913 if (dsz && (dsz == rc))
742 if (total_len > TIPC_MAX_USER_MSG_SIZE) 914 tsk->sent_unacked = 1;
915 return rc;
916 }
917 if (dsz > (uint)INT_MAX)
743 return -EMSGSIZE; 918 return -EMSGSIZE;
744 919
745 if (iocb) 920 if (iocb)
@@ -747,123 +922,66 @@ static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
747 922
748 if (unlikely(sock->state != SS_CONNECTED)) { 923 if (unlikely(sock->state != SS_CONNECTED)) {
749 if (sock->state == SS_DISCONNECTING) 924 if (sock->state == SS_DISCONNECTING)
750 res = -EPIPE; 925 rc = -EPIPE;
751 else 926 else
752 res = -ENOTCONN; 927 rc = -ENOTCONN;
753 goto exit; 928 goto exit;
754 } 929 }
755 930
756 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 931 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
932 dnode = tipc_port_peernode(port);
933
934next:
935 mtu = port->max_pkt;
936 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
937 rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf);
938 if (unlikely(rc < 0))
939 goto exit;
757 do { 940 do {
758 res = tipc_send(&tsk->port, m->msg_iov, total_len); 941 if (likely(!tipc_sk_conn_cong(tsk))) {
759 if (likely(res != -ELINKCONG)) 942 rc = tipc_link_xmit(buf, dnode, ref);
760 break; 943 if (likely(!rc)) {
761 res = tipc_wait_for_sndpkt(sock, &timeo); 944 tsk->sent_unacked++;
762 if (res) 945 sent += send;
763 break; 946 if (sent == dsz)
764 } while (1); 947 break;
948 goto next;
949 }
950 if (rc == -EMSGSIZE) {
951 port->max_pkt = tipc_node_get_mtu(dnode, ref);
952 goto next;
953 }
954 if (rc != -ELINKCONG)
955 break;
956 }
957 rc = tipc_wait_for_sndpkt(sock, &timeo);
958 if (rc)
959 kfree_skb_list(buf);
960 } while (!rc);
765exit: 961exit:
766 if (iocb) 962 if (iocb)
767 release_sock(sk); 963 release_sock(sk);
768 return res; 964 return sent ? sent : rc;
769} 965}
770 966
771/** 967/**
772 * tipc_send_stream - send stream-oriented data 968 * tipc_send_packet - send a connection-oriented message
773 * @iocb: (unused) 969 * @iocb: if NULL, indicates that socket lock is already held
774 * @sock: socket structure 970 * @sock: socket structure
775 * @m: data to send 971 * @m: message to send
776 * @total_len: total length of data to be sent 972 * @dsz: length of data to be transmitted
777 * 973 *
778 * Used for SOCK_STREAM data. 974 * Used for SOCK_SEQPACKET messages.
779 * 975 *
780 * Returns the number of bytes sent on success (or partial success), 976 * Returns the number of bytes sent on success, or errno otherwise
781 * or errno if no data sent
782 */ 977 */
783static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, 978static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
784 struct msghdr *m, size_t total_len) 979 struct msghdr *m, size_t dsz)
785{ 980{
786 struct sock *sk = sock->sk; 981 if (dsz > TIPC_MAX_USER_MSG_SIZE)
787 struct tipc_sock *tsk = tipc_sk(sk); 982 return -EMSGSIZE;
788 struct msghdr my_msg;
789 struct iovec my_iov;
790 struct iovec *curr_iov;
791 int curr_iovlen;
792 char __user *curr_start;
793 u32 hdr_size;
794 int curr_left;
795 int bytes_to_send;
796 int bytes_sent;
797 int res;
798
799 lock_sock(sk);
800
801 /* Handle special cases where there is no connection */
802 if (unlikely(sock->state != SS_CONNECTED)) {
803 if (sock->state == SS_UNCONNECTED)
804 res = tipc_send_packet(NULL, sock, m, total_len);
805 else
806 res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
807 goto exit;
808 }
809
810 if (unlikely(m->msg_name)) {
811 res = -EISCONN;
812 goto exit;
813 }
814
815 if (total_len > (unsigned int)INT_MAX) {
816 res = -EMSGSIZE;
817 goto exit;
818 }
819
820 /*
821 * Send each iovec entry using one or more messages
822 *
823 * Note: This algorithm is good for the most likely case
824 * (i.e. one large iovec entry), but could be improved to pass sets
825 * of small iovec entries into send_packet().
826 */
827 curr_iov = m->msg_iov;
828 curr_iovlen = m->msg_iovlen;
829 my_msg.msg_iov = &my_iov;
830 my_msg.msg_iovlen = 1;
831 my_msg.msg_flags = m->msg_flags;
832 my_msg.msg_name = NULL;
833 bytes_sent = 0;
834
835 hdr_size = msg_hdr_sz(&tsk->port.phdr);
836
837 while (curr_iovlen--) {
838 curr_start = curr_iov->iov_base;
839 curr_left = curr_iov->iov_len;
840
841 while (curr_left) {
842 bytes_to_send = tsk->port.max_pkt - hdr_size;
843 if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
844 bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
845 if (curr_left < bytes_to_send)
846 bytes_to_send = curr_left;
847 my_iov.iov_base = curr_start;
848 my_iov.iov_len = bytes_to_send;
849 res = tipc_send_packet(NULL, sock, &my_msg,
850 bytes_to_send);
851 if (res < 0) {
852 if (bytes_sent)
853 res = bytes_sent;
854 goto exit;
855 }
856 curr_left -= bytes_to_send;
857 curr_start += bytes_to_send;
858 bytes_sent += bytes_to_send;
859 }
860 983
861 curr_iov++; 984 return tipc_send_stream(iocb, sock, m, dsz);
862 }
863 res = bytes_sent;
864exit:
865 release_sock(sk);
866 return res;
867} 985}
868 986
869/** 987/**
@@ -1104,8 +1222,10 @@ restart:
1104 /* Consume received message (optional) */ 1222 /* Consume received message (optional) */
1105 if (likely(!(flags & MSG_PEEK))) { 1223 if (likely(!(flags & MSG_PEEK))) {
1106 if ((sock->state != SS_READY) && 1224 if ((sock->state != SS_READY) &&
1107 (++port->conn_unacked >= TIPC_CONNACK_INTV)) 1225 (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1108 tipc_acknowledge(port->ref, port->conn_unacked); 1226 tipc_acknowledge(port->ref, tsk->rcv_unacked);
1227 tsk->rcv_unacked = 0;
1228 }
1109 advance_rx_queue(sk); 1229 advance_rx_queue(sk);
1110 } 1230 }
1111exit: 1231exit:
@@ -1213,8 +1333,10 @@ restart:
1213 1333
1214 /* Consume received message (optional) */ 1334 /* Consume received message (optional) */
1215 if (likely(!(flags & MSG_PEEK))) { 1335 if (likely(!(flags & MSG_PEEK))) {
1216 if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV)) 1336 if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1217 tipc_acknowledge(port->ref, port->conn_unacked); 1337 tipc_acknowledge(port->ref, tsk->rcv_unacked);
1338 tsk->rcv_unacked = 0;
1339 }
1218 advance_rx_queue(sk); 1340 advance_rx_queue(sk);
1219 } 1341 }
1220 1342
@@ -1269,17 +1391,16 @@ static void tipc_data_ready(struct sock *sk)
1269 * @tsk: TIPC socket 1391 * @tsk: TIPC socket
1270 * @msg: message 1392 * @msg: message
1271 * 1393 *
1272 * Returns TIPC error status code and socket error status code 1394 * Returns 0 (TIPC_OK) if everyting ok, -TIPC_ERR_NO_PORT otherwise
1273 * once it encounters some errors
1274 */ 1395 */
1275static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) 1396static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1276{ 1397{
1277 struct sock *sk = &tsk->sk; 1398 struct sock *sk = &tsk->sk;
1278 struct tipc_port *port = &tsk->port; 1399 struct tipc_port *port = &tsk->port;
1279 struct socket *sock = sk->sk_socket; 1400 struct socket *sock = sk->sk_socket;
1280 struct tipc_msg *msg = buf_msg(*buf); 1401 struct tipc_msg *msg = buf_msg(*buf);
1281 1402
1282 u32 retval = TIPC_ERR_NO_PORT; 1403 int retval = -TIPC_ERR_NO_PORT;
1283 int res; 1404 int res;
1284 1405
1285 if (msg_mcast(msg)) 1406 if (msg_mcast(msg))
@@ -1382,32 +1503,37 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1382 * 1503 *
1383 * Called with socket lock already taken; port lock may also be taken. 1504 * Called with socket lock already taken; port lock may also be taken.
1384 * 1505 *
1385 * Returns TIPC error status code (TIPC_OK if message is not to be rejected) 1506 * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message
1507 * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
1386 */ 1508 */
1387static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) 1509static int filter_rcv(struct sock *sk, struct sk_buff *buf)
1388{ 1510{
1389 struct socket *sock = sk->sk_socket; 1511 struct socket *sock = sk->sk_socket;
1390 struct tipc_sock *tsk = tipc_sk(sk); 1512 struct tipc_sock *tsk = tipc_sk(sk);
1391 struct tipc_msg *msg = buf_msg(buf); 1513 struct tipc_msg *msg = buf_msg(buf);
1392 unsigned int limit = rcvbuf_limit(sk, buf); 1514 unsigned int limit = rcvbuf_limit(sk, buf);
1393 u32 res = TIPC_OK; 1515 u32 onode;
1516 int rc = TIPC_OK;
1517
1518 if (unlikely(msg_user(msg) == CONN_MANAGER))
1519 return tipc_sk_proto_rcv(tsk, &onode, buf);
1394 1520
1395 /* Reject message if it is wrong sort of message for socket */ 1521 /* Reject message if it is wrong sort of message for socket */
1396 if (msg_type(msg) > TIPC_DIRECT_MSG) 1522 if (msg_type(msg) > TIPC_DIRECT_MSG)
1397 return TIPC_ERR_NO_PORT; 1523 return -TIPC_ERR_NO_PORT;
1398 1524
1399 if (sock->state == SS_READY) { 1525 if (sock->state == SS_READY) {
1400 if (msg_connected(msg)) 1526 if (msg_connected(msg))
1401 return TIPC_ERR_NO_PORT; 1527 return -TIPC_ERR_NO_PORT;
1402 } else { 1528 } else {
1403 res = filter_connect(tsk, &buf); 1529 rc = filter_connect(tsk, &buf);
1404 if (res != TIPC_OK || buf == NULL) 1530 if (rc != TIPC_OK || buf == NULL)
1405 return res; 1531 return rc;
1406 } 1532 }
1407 1533
1408 /* Reject message if there isn't room to queue it */ 1534 /* Reject message if there isn't room to queue it */
1409 if (sk_rmem_alloc_get(sk) + buf->truesize >= limit) 1535 if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1410 return TIPC_ERR_OVERLOAD; 1536 return -TIPC_ERR_OVERLOAD;
1411 1537
1412 /* Enqueue message */ 1538 /* Enqueue message */
1413 TIPC_SKB_CB(buf)->handle = NULL; 1539 TIPC_SKB_CB(buf)->handle = NULL;
@@ -1429,16 +1555,23 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1429 */ 1555 */
1430static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) 1556static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
1431{ 1557{
1432 u32 res; 1558 int rc;
1559 u32 onode;
1433 struct tipc_sock *tsk = tipc_sk(sk); 1560 struct tipc_sock *tsk = tipc_sk(sk);
1434 uint truesize = buf->truesize; 1561 uint truesize = buf->truesize;
1435 1562
1436 res = filter_rcv(sk, buf); 1563 rc = filter_rcv(sk, buf);
1437 if (unlikely(res))
1438 tipc_reject_msg(buf, res);
1439 1564
1440 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) 1565 if (likely(!rc)) {
1441 atomic_add(truesize, &tsk->dupl_rcvcnt); 1566 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1567 atomic_add(truesize, &tsk->dupl_rcvcnt);
1568 return 0;
1569 }
1570
1571 if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc))
1572 return 0;
1573
1574 tipc_link_xmit(buf, onode, 0);
1442 1575
1443 return 0; 1576 return 0;
1444} 1577}
@@ -1455,19 +1588,14 @@ int tipc_sk_rcv(struct sk_buff *buf)
1455 struct tipc_port *port; 1588 struct tipc_port *port;
1456 struct sock *sk; 1589 struct sock *sk;
1457 u32 dport = msg_destport(buf_msg(buf)); 1590 u32 dport = msg_destport(buf_msg(buf));
1458 int err = TIPC_OK; 1591 int rc = TIPC_OK;
1459 uint limit; 1592 uint limit;
1593 u32 dnode;
1460 1594
1461 /* Forward unresolved named message */ 1595 /* Validate destination and message */
1462 if (unlikely(!dport)) {
1463 tipc_net_route_msg(buf);
1464 return 0;
1465 }
1466
1467 /* Validate destination */
1468 port = tipc_port_lock(dport); 1596 port = tipc_port_lock(dport);
1469 if (unlikely(!port)) { 1597 if (unlikely(!port)) {
1470 err = TIPC_ERR_NO_PORT; 1598 rc = tipc_msg_eval(buf, &dnode);
1471 goto exit; 1599 goto exit;
1472 } 1600 }
1473 1601
@@ -1478,23 +1606,25 @@ int tipc_sk_rcv(struct sk_buff *buf)
1478 bh_lock_sock(sk); 1606 bh_lock_sock(sk);
1479 1607
1480 if (!sock_owned_by_user(sk)) { 1608 if (!sock_owned_by_user(sk)) {
1481 err = filter_rcv(sk, buf); 1609 rc = filter_rcv(sk, buf);
1482 } else { 1610 } else {
1483 if (sk->sk_backlog.len == 0) 1611 if (sk->sk_backlog.len == 0)
1484 atomic_set(&tsk->dupl_rcvcnt, 0); 1612 atomic_set(&tsk->dupl_rcvcnt, 0);
1485 limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt); 1613 limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt);
1486 if (sk_add_backlog(sk, buf, limit)) 1614 if (sk_add_backlog(sk, buf, limit))
1487 err = TIPC_ERR_OVERLOAD; 1615 rc = -TIPC_ERR_OVERLOAD;
1488 } 1616 }
1489
1490 bh_unlock_sock(sk); 1617 bh_unlock_sock(sk);
1491 tipc_port_unlock(port); 1618 tipc_port_unlock(port);
1492 1619
1493 if (likely(!err)) 1620 if (likely(!rc))
1494 return 0; 1621 return 0;
1495exit: 1622exit:
1496 tipc_reject_msg(buf, err); 1623 if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc))
1497 return -EHOSTUNREACH; 1624 return -EHOSTUNREACH;
1625
1626 tipc_link_xmit(buf, dnode, 0);
1627 return (rc < 0) ? -EHOSTUNREACH : 0;
1498} 1628}
1499 1629
1500static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) 1630static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
@@ -1758,6 +1888,7 @@ static int tipc_shutdown(struct socket *sock, int how)
1758 struct tipc_sock *tsk = tipc_sk(sk); 1888 struct tipc_sock *tsk = tipc_sk(sk);
1759 struct tipc_port *port = &tsk->port; 1889 struct tipc_port *port = &tsk->port;
1760 struct sk_buff *buf; 1890 struct sk_buff *buf;
1891 u32 peer;
1761 int res; 1892 int res;
1762 1893
1763 if (how != SHUT_RDWR) 1894 if (how != SHUT_RDWR)
@@ -1778,7 +1909,8 @@ restart:
1778 goto restart; 1909 goto restart;
1779 } 1910 }
1780 tipc_port_disconnect(port->ref); 1911 tipc_port_disconnect(port->ref);
1781 tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN); 1912 if (tipc_msg_reverse(buf, &peer, TIPC_CONN_SHUTDOWN))
1913 tipc_link_xmit(buf, peer, 0);
1782 } else { 1914 } else {
1783 tipc_port_shutdown(port->ref); 1915 tipc_port_shutdown(port->ref);
1784 } 1916 }
@@ -1841,7 +1973,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
1841 1973
1842 switch (opt) { 1974 switch (opt) {
1843 case TIPC_IMPORTANCE: 1975 case TIPC_IMPORTANCE:
1844 tipc_port_set_importance(port, value); 1976 res = tipc_port_set_importance(port, value);
1845 break; 1977 break;
1846 case TIPC_SRC_DROPPABLE: 1978 case TIPC_SRC_DROPPABLE:
1847 if (sock->type != SOCK_STREAM) 1979 if (sock->type != SOCK_STREAM)
@@ -1936,7 +2068,7 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
1936 return put_user(sizeof(value), ol); 2068 return put_user(sizeof(value), ol);
1937} 2069}
1938 2070
1939int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) 2071static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
1940{ 2072{
1941 struct tipc_sioc_ln_req lnr; 2073 struct tipc_sioc_ln_req lnr;
1942 void __user *argp = (void __user *)arg; 2074 void __user *argp = (void __user *)arg;
@@ -1952,7 +2084,6 @@ int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
1952 return 0; 2084 return 0;
1953 } 2085 }
1954 return -EADDRNOTAVAIL; 2086 return -EADDRNOTAVAIL;
1955 break;
1956 default: 2087 default:
1957 return -ENOIOCTLCMD; 2088 return -ENOIOCTLCMD;
1958 } 2089 }
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 3afcd2a70b31..43b75b3ceced 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -38,6 +38,9 @@
38#include "port.h" 38#include "port.h"
39#include <net/sock.h> 39#include <net/sock.h>
40 40
41#define TIPC_CONN_OK 0
42#define TIPC_CONN_PROBING 1
43
41/** 44/**
42 * struct tipc_sock - TIPC socket structure 45 * struct tipc_sock - TIPC socket structure
43 * @sk: socket - interacts with 'port' and with user via the socket API 46 * @sk: socket - interacts with 'port' and with user via the socket API
@@ -45,6 +48,9 @@
45 * @peer_name: the peer of the connection, if any 48 * @peer_name: the peer of the connection, if any
46 * @conn_timeout: the time we can wait for an unresponded setup request 49 * @conn_timeout: the time we can wait for an unresponded setup request
47 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue 50 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
51 * @link_cong: non-zero if owner must sleep because of link congestion
52 * @sent_unacked: # messages sent by socket, and not yet acked by peer
53 * @rcv_unacked: # messages read by user, but not yet acked back to peer
48 */ 54 */
49 55
50struct tipc_sock { 56struct tipc_sock {
@@ -52,6 +58,9 @@ struct tipc_sock {
52 struct tipc_port port; 58 struct tipc_port port;
53 unsigned int conn_timeout; 59 unsigned int conn_timeout;
54 atomic_t dupl_rcvcnt; 60 atomic_t dupl_rcvcnt;
61 int link_cong;
62 uint sent_unacked;
63 uint rcv_unacked;
55}; 64};
56 65
57static inline struct tipc_sock *tipc_sk(const struct sock *sk) 66static inline struct tipc_sock *tipc_sk(const struct sock *sk)
@@ -69,6 +78,13 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk)
69 tsk->sk.sk_write_space(&tsk->sk); 78 tsk->sk.sk_write_space(&tsk->sk);
70} 79}
71 80
81static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
82{
83 return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
84}
85
72int tipc_sk_rcv(struct sk_buff *buf); 86int tipc_sk_rcv(struct sk_buff *buf);
73 87
88void tipc_sk_mcast_rcv(struct sk_buff *buf);
89
74#endif 90#endif