aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorJames Morris <james.l.morris@oracle.com>2014-11-19 05:32:12 -0500
committerJames Morris <james.l.morris@oracle.com>2014-11-19 05:32:12 -0500
commitb10778a00d40b3d9fdaaf5891e802794781ff71c (patch)
tree6ba4cbac86eecedc3f30650e7f764ecf00c83898 /net/tipc/link.c
parent594081ee7145cc30a3977cb4e218f81213b63dc5 (diff)
parentbfe01a5ba2490f299e1d2d5508cbbbadd897bbe9 (diff)
Merge commit 'v3.17' into next
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c794
1 files changed, 220 insertions, 574 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ad2c57f5868d..fb1485dc6736 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -82,15 +82,13 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf);
82static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, 82static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
83 struct sk_buff **buf); 83 struct sk_buff **buf);
84static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); 84static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
85static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
86 struct iovec const *msg_sect,
87 unsigned int len, u32 destnode);
88static void link_state_event(struct tipc_link *l_ptr, u32 event); 85static void link_state_event(struct tipc_link *l_ptr, u32 event);
89static void link_reset_statistics(struct tipc_link *l_ptr); 86static void link_reset_statistics(struct tipc_link *l_ptr);
90static void link_print(struct tipc_link *l_ptr, const char *str); 87static void link_print(struct tipc_link *l_ptr, const char *str);
91static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
92static void tipc_link_sync_xmit(struct tipc_link *l); 88static void tipc_link_sync_xmit(struct tipc_link *l);
93static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 89static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
90static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf);
91static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf);
94 92
95/* 93/*
96 * Simple link routines 94 * Simple link routines
@@ -335,13 +333,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
335static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) 333static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
336{ 334{
337 struct tipc_port *p_ptr; 335 struct tipc_port *p_ptr;
336 struct tipc_sock *tsk;
338 337
339 spin_lock_bh(&tipc_port_list_lock); 338 spin_lock_bh(&tipc_port_list_lock);
340 p_ptr = tipc_port_lock(origport); 339 p_ptr = tipc_port_lock(origport);
341 if (p_ptr) { 340 if (p_ptr) {
342 if (!list_empty(&p_ptr->wait_list)) 341 if (!list_empty(&p_ptr->wait_list))
343 goto exit; 342 goto exit;
344 p_ptr->congested = 1; 343 tsk = tipc_port_to_sock(p_ptr);
344 tsk->link_cong = 1;
345 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); 345 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
346 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 346 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
347 l_ptr->stats.link_congs++; 347 l_ptr->stats.link_congs++;
@@ -355,6 +355,7 @@ exit:
355void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) 355void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
356{ 356{
357 struct tipc_port *p_ptr; 357 struct tipc_port *p_ptr;
358 struct tipc_sock *tsk;
358 struct tipc_port *temp_p_ptr; 359 struct tipc_port *temp_p_ptr;
359 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 360 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
360 361
@@ -370,10 +371,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
370 wait_list) { 371 wait_list) {
371 if (win <= 0) 372 if (win <= 0)
372 break; 373 break;
374 tsk = tipc_port_to_sock(p_ptr);
373 list_del_init(&p_ptr->wait_list); 375 list_del_init(&p_ptr->wait_list);
374 spin_lock_bh(p_ptr->lock); 376 spin_lock_bh(p_ptr->lock);
375 p_ptr->congested = 0; 377 tsk->link_cong = 0;
376 tipc_port_wakeup(p_ptr); 378 tipc_sock_wakeup(tsk);
377 win -= p_ptr->waiting_pkts; 379 win -= p_ptr->waiting_pkts;
378 spin_unlock_bh(p_ptr->lock); 380 spin_unlock_bh(p_ptr->lock);
379 } 381 }
@@ -676,178 +678,142 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
676 } 678 }
677} 679}
678 680
679/* 681/* tipc_link_cong: determine return value and how to treat the
680 * link_bundle_buf(): Append contents of a buffer to 682 * sent buffer during link congestion.
681 * the tail of an existing one. 683 * - For plain, errorless user data messages we keep the buffer and
684 * return -ELINKONG.
685 * - For all other messages we discard the buffer and return -EHOSTUNREACH
686 * - For TIPC internal messages we also reset the link
682 */ 687 */
683static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler, 688static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
684 struct sk_buff *buf)
685{ 689{
686 struct tipc_msg *bundler_msg = buf_msg(bundler);
687 struct tipc_msg *msg = buf_msg(buf); 690 struct tipc_msg *msg = buf_msg(buf);
688 u32 size = msg_size(msg); 691 uint psz = msg_size(msg);
689 u32 bundle_size = msg_size(bundler_msg); 692 uint imp = tipc_msg_tot_importance(msg);
690 u32 to_pos = align(bundle_size); 693 u32 oport = msg_tot_origport(msg);
691 u32 pad = to_pos - bundle_size;
692
693 if (msg_user(bundler_msg) != MSG_BUNDLER)
694 return 0;
695 if (msg_type(bundler_msg) != OPEN_MSG)
696 return 0;
697 if (skb_tailroom(bundler) < (pad + size))
698 return 0;
699 if (l_ptr->max_pkt < (to_pos + size))
700 return 0;
701
702 skb_put(bundler, pad + size);
703 skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
704 msg_set_size(bundler_msg, to_pos + size);
705 msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
706 kfree_skb(buf);
707 l_ptr->stats.sent_bundled++;
708 return 1;
709}
710
711static void link_add_to_outqueue(struct tipc_link *l_ptr,
712 struct sk_buff *buf,
713 struct tipc_msg *msg)
714{
715 u32 ack = mod(l_ptr->next_in_no - 1);
716 u32 seqno = mod(l_ptr->next_out_no++);
717 694
718 msg_set_word(msg, 2, ((ack << 16) | seqno)); 695 if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
719 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 696 if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
720 buf->next = NULL; 697 link_schedule_port(link, oport, psz);
721 if (l_ptr->first_out) { 698 return -ELINKCONG;
722 l_ptr->last_out->next = buf; 699 }
723 l_ptr->last_out = buf; 700 } else {
724 } else 701 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
725 l_ptr->first_out = l_ptr->last_out = buf; 702 tipc_link_reset(link);
726
727 l_ptr->out_queue_size++;
728 if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
729 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
730}
731
732static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
733 struct sk_buff *buf_chain,
734 u32 long_msgno)
735{
736 struct sk_buff *buf;
737 struct tipc_msg *msg;
738
739 if (!l_ptr->next_out)
740 l_ptr->next_out = buf_chain;
741 while (buf_chain) {
742 buf = buf_chain;
743 buf_chain = buf_chain->next;
744
745 msg = buf_msg(buf);
746 msg_set_long_msgno(msg, long_msgno);
747 link_add_to_outqueue(l_ptr, buf, msg);
748 } 703 }
704 kfree_skb_list(buf);
705 return -EHOSTUNREACH;
749} 706}
750 707
751/* 708/**
752 * tipc_link_xmit() is the 'full path' for messages, called from 709 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
753 * inside TIPC when the 'fast path' in tipc_send_xmit 710 * @link: link to use
754 * has failed, and from link_send() 711 * @buf: chain of buffers containing message
712 * Consumes the buffer chain, except when returning -ELINKCONG
713 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
714 * user data messages) or -EHOSTUNREACH (all other messages/senders)
715 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
716 * to act on the return value, since they may need to do more send attempts.
755 */ 717 */
756int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) 718int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
757{ 719{
758 struct tipc_msg *msg = buf_msg(buf); 720 struct tipc_msg *msg = buf_msg(buf);
759 u32 size = msg_size(msg); 721 uint psz = msg_size(msg);
760 u32 dsz = msg_data_sz(msg); 722 uint qsz = link->out_queue_size;
761 u32 queue_size = l_ptr->out_queue_size; 723 uint sndlim = link->queue_limit[0];
762 u32 imp = tipc_msg_tot_importance(msg); 724 uint imp = tipc_msg_tot_importance(msg);
763 u32 queue_limit = l_ptr->queue_limit[imp]; 725 uint mtu = link->max_pkt;
764 u32 max_packet = l_ptr->max_pkt; 726 uint ack = mod(link->next_in_no - 1);
765 727 uint seqno = link->next_out_no;
766 /* Match msg importance against queue limits: */ 728 uint bc_last_in = link->owner->bclink.last_in;
767 if (unlikely(queue_size >= queue_limit)) { 729 struct tipc_media_addr *addr = &link->media_addr;
768 if (imp <= TIPC_CRITICAL_IMPORTANCE) { 730 struct sk_buff *next = buf->next;
769 link_schedule_port(l_ptr, msg_origport(msg), size); 731
770 kfree_skb(buf); 732 /* Match queue limits against msg importance: */
771 return -ELINKCONG; 733 if (unlikely(qsz >= link->queue_limit[imp]))
772 } 734 return tipc_link_cong(link, buf);
773 kfree_skb(buf); 735
774 if (imp > CONN_MANAGER) { 736 /* Has valid packet limit been used ? */
775 pr_warn("%s<%s>, send queue full", link_rst_msg, 737 if (unlikely(psz > mtu)) {
776 l_ptr->name); 738 kfree_skb_list(buf);
777 tipc_link_reset(l_ptr); 739 return -EMSGSIZE;
778 }
779 return dsz;
780 } 740 }
781 741
782 /* Fragmentation needed ? */ 742 /* Prepare each packet for sending, and add to outqueue: */
783 if (size > max_packet) 743 while (buf) {
784 return tipc_link_frag_xmit(l_ptr, buf); 744 next = buf->next;
785 745 msg = buf_msg(buf);
786 /* Packet can be queued or sent. */ 746 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
787 if (likely(!link_congested(l_ptr))) { 747 msg_set_bcast_ack(msg, bc_last_in);
788 link_add_to_outqueue(l_ptr, buf, msg); 748
749 if (!link->first_out) {
750 link->first_out = buf;
751 } else if (qsz < sndlim) {
752 link->last_out->next = buf;
753 } else if (tipc_msg_bundle(link->last_out, buf, mtu)) {
754 link->stats.sent_bundled++;
755 buf = next;
756 next = buf->next;
757 continue;
758 } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) {
759 link->stats.sent_bundled++;
760 link->stats.sent_bundles++;
761 link->last_out->next = buf;
762 if (!link->next_out)
763 link->next_out = buf;
764 } else {
765 link->last_out->next = buf;
766 if (!link->next_out)
767 link->next_out = buf;
768 }
789 769
790 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); 770 /* Send packet if possible: */
791 l_ptr->unacked_window = 0; 771 if (likely(++qsz <= sndlim)) {
792 return dsz; 772 tipc_bearer_send(link->bearer_id, buf, addr);
793 } 773 link->next_out = next;
794 /* Congestion: can message be bundled ? */ 774 link->unacked_window = 0;
795 if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
796 (msg_user(msg) != MSG_FRAGMENTER)) {
797
798 /* Try adding message to an existing bundle */
799 if (l_ptr->next_out &&
800 link_bundle_buf(l_ptr, l_ptr->last_out, buf))
801 return dsz;
802
803 /* Try creating a new bundle */
804 if (size <= max_packet * 2 / 3) {
805 struct sk_buff *bundler = tipc_buf_acquire(max_packet);
806 struct tipc_msg bundler_hdr;
807
808 if (bundler) {
809 tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
810 INT_H_SIZE, l_ptr->addr);
811 skb_copy_to_linear_data(bundler, &bundler_hdr,
812 INT_H_SIZE);
813 skb_trim(bundler, INT_H_SIZE);
814 link_bundle_buf(l_ptr, bundler, buf);
815 buf = bundler;
816 msg = buf_msg(buf);
817 l_ptr->stats.sent_bundles++;
818 }
819 } 775 }
776 seqno++;
777 link->last_out = buf;
778 buf = next;
820 } 779 }
821 if (!l_ptr->next_out) 780 link->next_out_no = seqno;
822 l_ptr->next_out = buf; 781 link->out_queue_size = qsz;
823 link_add_to_outqueue(l_ptr, buf, msg); 782 return 0;
824 return dsz;
825} 783}
826 784
827/* 785/**
828 * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use 786 * tipc_link_xmit() is the general link level function for message sending
829 * has not been selected yet, and the the owner node is not locked 787 * @buf: chain of buffers containing message
830 * Called by TIPC internal users, e.g. the name distributor 788 * @dsz: amount of user data to be sent
789 * @dnode: address of destination node
790 * @selector: a number used for deterministic link selection
791 * Consumes the buffer chain, except when returning -ELINKCONG
792 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
831 */ 793 */
832int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) 794int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
833{ 795{
834 struct tipc_link *l_ptr; 796 struct tipc_link *link = NULL;
835 struct tipc_node *n_ptr; 797 struct tipc_node *node;
836 int res = -ELINKCONG; 798 int rc = -EHOSTUNREACH;
837 799
838 n_ptr = tipc_node_find(dest); 800 node = tipc_node_find(dnode);
839 if (n_ptr) { 801 if (node) {
840 tipc_node_lock(n_ptr); 802 tipc_node_lock(node);
841 l_ptr = n_ptr->active_links[selector & 1]; 803 link = node->active_links[selector & 1];
842 if (l_ptr) 804 if (link)
843 res = __tipc_link_xmit(l_ptr, buf); 805 rc = __tipc_link_xmit(link, buf);
844 else 806 tipc_node_unlock(node);
845 kfree_skb(buf);
846 tipc_node_unlock(n_ptr);
847 } else {
848 kfree_skb(buf);
849 } 807 }
850 return res; 808
809 if (link)
810 return rc;
811
812 if (likely(in_own_node(dnode)))
813 return tipc_sk_rcv(buf);
814
815 kfree_skb_list(buf);
816 return rc;
851} 817}
852 818
853/* 819/*
@@ -858,7 +824,7 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
858 * 824 *
859 * Called with node locked 825 * Called with node locked
860 */ 826 */
861static void tipc_link_sync_xmit(struct tipc_link *l) 827static void tipc_link_sync_xmit(struct tipc_link *link)
862{ 828{
863 struct sk_buff *buf; 829 struct sk_buff *buf;
864 struct tipc_msg *msg; 830 struct tipc_msg *msg;
@@ -868,10 +834,9 @@ static void tipc_link_sync_xmit(struct tipc_link *l)
868 return; 834 return;
869 835
870 msg = buf_msg(buf); 836 msg = buf_msg(buf);
871 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr); 837 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
872 msg_set_last_bcast(msg, l->owner->bclink.acked); 838 msg_set_last_bcast(msg, link->owner->bclink.acked);
873 link_add_chain_to_outqueue(l, buf, 0); 839 __tipc_link_xmit(link, buf);
874 tipc_link_push_queue(l);
875} 840}
876 841
877/* 842/*
@@ -892,293 +857,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
892} 857}
893 858
894/* 859/*
895 * tipc_link_names_xmit - send name table entries to new neighbor
896 *
897 * Send routine for bulk delivery of name table messages when contact
898 * with a new neighbor occurs. No link congestion checking is performed
899 * because name table messages *must* be delivered. The messages must be
900 * small enough not to require fragmentation.
901 * Called without any locks held.
902 */
903void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
904{
905 struct tipc_node *n_ptr;
906 struct tipc_link *l_ptr;
907 struct sk_buff *buf;
908 struct sk_buff *temp_buf;
909
910 if (list_empty(message_list))
911 return;
912
913 n_ptr = tipc_node_find(dest);
914 if (n_ptr) {
915 tipc_node_lock(n_ptr);
916 l_ptr = n_ptr->active_links[0];
917 if (l_ptr) {
918 /* convert circular list to linear list */
919 ((struct sk_buff *)message_list->prev)->next = NULL;
920 link_add_chain_to_outqueue(l_ptr,
921 (struct sk_buff *)message_list->next, 0);
922 tipc_link_push_queue(l_ptr);
923 INIT_LIST_HEAD(message_list);
924 }
925 tipc_node_unlock(n_ptr);
926 }
927
928 /* discard the messages if they couldn't be sent */
929 list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
930 list_del((struct list_head *)buf);
931 kfree_skb(buf);
932 }
933}
934
935/*
936 * tipc_link_xmit_fast: Entry for data messages where the
937 * destination link is known and the header is complete,
938 * inclusive total message length. Very time critical.
939 * Link is locked. Returns user data length.
940 */
941static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
942 u32 *used_max_pkt)
943{
944 struct tipc_msg *msg = buf_msg(buf);
945 int res = msg_data_sz(msg);
946
947 if (likely(!link_congested(l_ptr))) {
948 if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
949 link_add_to_outqueue(l_ptr, buf, msg);
950 tipc_bearer_send(l_ptr->bearer_id, buf,
951 &l_ptr->media_addr);
952 l_ptr->unacked_window = 0;
953 return res;
954 }
955 else
956 *used_max_pkt = l_ptr->max_pkt;
957 }
958 return __tipc_link_xmit(l_ptr, buf); /* All other cases */
959}
960
961/*
962 * tipc_link_iovec_xmit_fast: Entry for messages where the
963 * destination processor is known and the header is complete,
964 * except for total message length.
965 * Returns user data length or errno.
966 */
967int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
968 struct iovec const *msg_sect,
969 unsigned int len, u32 destaddr)
970{
971 struct tipc_msg *hdr = &sender->phdr;
972 struct tipc_link *l_ptr;
973 struct sk_buff *buf;
974 struct tipc_node *node;
975 int res;
976 u32 selector = msg_origport(hdr) & 1;
977
978again:
979 /*
980 * Try building message using port's max_pkt hint.
981 * (Must not hold any locks while building message.)
982 */
983 res = tipc_msg_build(hdr, msg_sect, len, sender->max_pkt, &buf);
984 /* Exit if build request was invalid */
985 if (unlikely(res < 0))
986 return res;
987
988 node = tipc_node_find(destaddr);
989 if (likely(node)) {
990 tipc_node_lock(node);
991 l_ptr = node->active_links[selector];
992 if (likely(l_ptr)) {
993 if (likely(buf)) {
994 res = tipc_link_xmit_fast(l_ptr, buf,
995 &sender->max_pkt);
996exit:
997 tipc_node_unlock(node);
998 return res;
999 }
1000
1001 /* Exit if link (or bearer) is congested */
1002 if (link_congested(l_ptr)) {
1003 res = link_schedule_port(l_ptr,
1004 sender->ref, res);
1005 goto exit;
1006 }
1007
1008 /*
1009 * Message size exceeds max_pkt hint; update hint,
1010 * then re-try fast path or fragment the message
1011 */
1012 sender->max_pkt = l_ptr->max_pkt;
1013 tipc_node_unlock(node);
1014
1015
1016 if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
1017 goto again;
1018
1019 return tipc_link_iovec_long_xmit(sender, msg_sect,
1020 len, destaddr);
1021 }
1022 tipc_node_unlock(node);
1023 }
1024
1025 /* Couldn't find a link to the destination node */
1026 kfree_skb(buf);
1027 tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE);
1028 return -ENETUNREACH;
1029}
1030
1031/*
1032 * tipc_link_iovec_long_xmit(): Entry for long messages where the
1033 * destination node is known and the header is complete,
1034 * inclusive total message length.
1035 * Link and bearer congestion status have been checked to be ok,
1036 * and are ignored if they change.
1037 *
1038 * Note that fragments do not use the full link MTU so that they won't have
1039 * to undergo refragmentation if link changeover causes them to be sent
1040 * over another link with an additional tunnel header added as prefix.
1041 * (Refragmentation will still occur if the other link has a smaller MTU.)
1042 *
1043 * Returns user data length or errno.
1044 */
1045static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
1046 struct iovec const *msg_sect,
1047 unsigned int len, u32 destaddr)
1048{
1049 struct tipc_link *l_ptr;
1050 struct tipc_node *node;
1051 struct tipc_msg *hdr = &sender->phdr;
1052 u32 dsz = len;
1053 u32 max_pkt, fragm_sz, rest;
1054 struct tipc_msg fragm_hdr;
1055 struct sk_buff *buf, *buf_chain, *prev;
1056 u32 fragm_crs, fragm_rest, hsz, sect_rest;
1057 const unchar __user *sect_crs;
1058 int curr_sect;
1059 u32 fragm_no;
1060 int res = 0;
1061
1062again:
1063 fragm_no = 1;
1064 max_pkt = sender->max_pkt - INT_H_SIZE;
1065 /* leave room for tunnel header in case of link changeover */
1066 fragm_sz = max_pkt - INT_H_SIZE;
1067 /* leave room for fragmentation header in each fragment */
1068 rest = dsz;
1069 fragm_crs = 0;
1070 fragm_rest = 0;
1071 sect_rest = 0;
1072 sect_crs = NULL;
1073 curr_sect = -1;
1074
1075 /* Prepare reusable fragment header */
1076 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1077 INT_H_SIZE, msg_destnode(hdr));
1078 msg_set_size(&fragm_hdr, max_pkt);
1079 msg_set_fragm_no(&fragm_hdr, 1);
1080
1081 /* Prepare header of first fragment */
1082 buf_chain = buf = tipc_buf_acquire(max_pkt);
1083 if (!buf)
1084 return -ENOMEM;
1085 buf->next = NULL;
1086 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1087 hsz = msg_hdr_sz(hdr);
1088 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1089
1090 /* Chop up message */
1091 fragm_crs = INT_H_SIZE + hsz;
1092 fragm_rest = fragm_sz - hsz;
1093
1094 do { /* For all sections */
1095 u32 sz;
1096
1097 if (!sect_rest) {
1098 sect_rest = msg_sect[++curr_sect].iov_len;
1099 sect_crs = msg_sect[curr_sect].iov_base;
1100 }
1101
1102 if (sect_rest < fragm_rest)
1103 sz = sect_rest;
1104 else
1105 sz = fragm_rest;
1106
1107 if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1108 res = -EFAULT;
1109error:
1110 kfree_skb_list(buf_chain);
1111 return res;
1112 }
1113 sect_crs += sz;
1114 sect_rest -= sz;
1115 fragm_crs += sz;
1116 fragm_rest -= sz;
1117 rest -= sz;
1118
1119 if (!fragm_rest && rest) {
1120
1121 /* Initiate new fragment: */
1122 if (rest <= fragm_sz) {
1123 fragm_sz = rest;
1124 msg_set_type(&fragm_hdr, LAST_FRAGMENT);
1125 } else {
1126 msg_set_type(&fragm_hdr, FRAGMENT);
1127 }
1128 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1129 msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1130 prev = buf;
1131 buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
1132 if (!buf) {
1133 res = -ENOMEM;
1134 goto error;
1135 }
1136
1137 buf->next = NULL;
1138 prev->next = buf;
1139 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1140 fragm_crs = INT_H_SIZE;
1141 fragm_rest = fragm_sz;
1142 }
1143 } while (rest > 0);
1144
1145 /*
1146 * Now we have a buffer chain. Select a link and check
1147 * that packet size is still OK
1148 */
1149 node = tipc_node_find(destaddr);
1150 if (likely(node)) {
1151 tipc_node_lock(node);
1152 l_ptr = node->active_links[sender->ref & 1];
1153 if (!l_ptr) {
1154 tipc_node_unlock(node);
1155 goto reject;
1156 }
1157 if (l_ptr->max_pkt < max_pkt) {
1158 sender->max_pkt = l_ptr->max_pkt;
1159 tipc_node_unlock(node);
1160 kfree_skb_list(buf_chain);
1161 goto again;
1162 }
1163 } else {
1164reject:
1165 kfree_skb_list(buf_chain);
1166 tipc_port_iovec_reject(sender, hdr, msg_sect, len,
1167 TIPC_ERR_NO_NODE);
1168 return -ENETUNREACH;
1169 }
1170
1171 /* Append chain of fragments to send queue & send them */
1172 l_ptr->long_msg_seq_no++;
1173 link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
1174 l_ptr->stats.sent_fragments += fragm_no;
1175 l_ptr->stats.sent_fragmented++;
1176 tipc_link_push_queue(l_ptr);
1177 tipc_node_unlock(node);
1178 return dsz;
1179}
1180
1181/*
1182 * tipc_link_push_packet: Push one unsent packet to the media 860 * tipc_link_push_packet: Push one unsent packet to the media
1183 */ 861 */
1184static u32 tipc_link_push_packet(struct tipc_link *l_ptr) 862static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
@@ -1238,7 +916,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1238 tipc_bearer_send(l_ptr->bearer_id, buf, 916 tipc_bearer_send(l_ptr->bearer_id, buf,
1239 &l_ptr->media_addr); 917 &l_ptr->media_addr);
1240 if (msg_user(msg) == MSG_BUNDLER) 918 if (msg_user(msg) == MSG_BUNDLER)
1241 msg_set_type(msg, CLOSED_MSG); 919 msg_set_type(msg, BUNDLE_CLOSED);
1242 l_ptr->next_out = buf->next; 920 l_ptr->next_out = buf->next;
1243 return 0; 921 return 0;
1244 } 922 }
@@ -1527,11 +1205,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1527 if (unlikely(!list_empty(&l_ptr->waiting_ports))) 1205 if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1528 tipc_link_wakeup_ports(l_ptr, 0); 1206 tipc_link_wakeup_ports(l_ptr, 0);
1529 1207
1530 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1531 l_ptr->stats.sent_acks++;
1532 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1533 }
1534
1535 /* Process the incoming packet */ 1208 /* Process the incoming packet */
1536 if (unlikely(!link_working_working(l_ptr))) { 1209 if (unlikely(!link_working_working(l_ptr))) {
1537 if (msg_user(msg) == LINK_PROTOCOL) { 1210 if (msg_user(msg) == LINK_PROTOCOL) {
@@ -1565,57 +1238,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1565 if (unlikely(l_ptr->oldest_deferred_in)) 1238 if (unlikely(l_ptr->oldest_deferred_in))
1566 head = link_insert_deferred_queue(l_ptr, head); 1239 head = link_insert_deferred_queue(l_ptr, head);
1567 1240
1568 /* Deliver packet/message to correct user: */ 1241 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1569 if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) { 1242 l_ptr->stats.sent_acks++;
1570 if (!tipc_link_tunnel_rcv(n_ptr, &buf)) { 1243 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1571 tipc_node_unlock(n_ptr);
1572 continue;
1573 }
1574 msg = buf_msg(buf);
1575 } else if (msg_user(msg) == MSG_FRAGMENTER) {
1576 l_ptr->stats.recv_fragments++;
1577 if (tipc_buf_append(&l_ptr->reasm_buf, &buf)) {
1578 l_ptr->stats.recv_fragmented++;
1579 msg = buf_msg(buf);
1580 } else {
1581 if (!l_ptr->reasm_buf)
1582 tipc_link_reset(l_ptr);
1583 tipc_node_unlock(n_ptr);
1584 continue;
1585 }
1586 } 1244 }
1587 1245
1588 switch (msg_user(msg)) { 1246 if (tipc_link_prepare_input(l_ptr, &buf)) {
1589 case TIPC_LOW_IMPORTANCE:
1590 case TIPC_MEDIUM_IMPORTANCE:
1591 case TIPC_HIGH_IMPORTANCE:
1592 case TIPC_CRITICAL_IMPORTANCE:
1593 tipc_node_unlock(n_ptr); 1247 tipc_node_unlock(n_ptr);
1594 tipc_sk_rcv(buf);
1595 continue; 1248 continue;
1596 case MSG_BUNDLER:
1597 l_ptr->stats.recv_bundles++;
1598 l_ptr->stats.recv_bundled += msg_msgcnt(msg);
1599 tipc_node_unlock(n_ptr);
1600 tipc_link_bundle_rcv(buf);
1601 continue;
1602 case NAME_DISTRIBUTOR:
1603 n_ptr->bclink.recv_permitted = true;
1604 tipc_node_unlock(n_ptr);
1605 tipc_named_rcv(buf);
1606 continue;
1607 case CONN_MANAGER:
1608 tipc_node_unlock(n_ptr);
1609 tipc_port_proto_rcv(buf);
1610 continue;
1611 case BCAST_PROTOCOL:
1612 tipc_link_sync_rcv(n_ptr, buf);
1613 break;
1614 default:
1615 kfree_skb(buf);
1616 break;
1617 } 1249 }
1618 tipc_node_unlock(n_ptr); 1250 tipc_node_unlock(n_ptr);
1251 msg = buf_msg(buf);
1252 if (tipc_link_input(l_ptr, buf) != 0)
1253 goto discard;
1619 continue; 1254 continue;
1620unlock_discard: 1255unlock_discard:
1621 tipc_node_unlock(n_ptr); 1256 tipc_node_unlock(n_ptr);
@@ -1625,6 +1260,80 @@ discard:
1625} 1260}
1626 1261
1627/** 1262/**
1263 * tipc_link_prepare_input - process TIPC link messages
1264 *
1265 * returns nonzero if the message was consumed
1266 *
1267 * Node lock must be held
1268 */
1269static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf)
1270{
1271 struct tipc_node *n;
1272 struct tipc_msg *msg;
1273 int res = -EINVAL;
1274
1275 n = l->owner;
1276 msg = buf_msg(*buf);
1277 switch (msg_user(msg)) {
1278 case CHANGEOVER_PROTOCOL:
1279 if (tipc_link_tunnel_rcv(n, buf))
1280 res = 0;
1281 break;
1282 case MSG_FRAGMENTER:
1283 l->stats.recv_fragments++;
1284 if (tipc_buf_append(&l->reasm_buf, buf)) {
1285 l->stats.recv_fragmented++;
1286 res = 0;
1287 } else if (!l->reasm_buf) {
1288 tipc_link_reset(l);
1289 }
1290 break;
1291 case MSG_BUNDLER:
1292 l->stats.recv_bundles++;
1293 l->stats.recv_bundled += msg_msgcnt(msg);
1294 res = 0;
1295 break;
1296 case NAME_DISTRIBUTOR:
1297 n->bclink.recv_permitted = true;
1298 res = 0;
1299 break;
1300 case BCAST_PROTOCOL:
1301 tipc_link_sync_rcv(n, *buf);
1302 break;
1303 default:
1304 res = 0;
1305 }
1306 return res;
1307}
1308/**
1309 * tipc_link_input - Deliver message too higher layers
1310 */
1311static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
1312{
1313 struct tipc_msg *msg = buf_msg(buf);
1314 int res = 0;
1315
1316 switch (msg_user(msg)) {
1317 case TIPC_LOW_IMPORTANCE:
1318 case TIPC_MEDIUM_IMPORTANCE:
1319 case TIPC_HIGH_IMPORTANCE:
1320 case TIPC_CRITICAL_IMPORTANCE:
1321 case CONN_MANAGER:
1322 tipc_sk_rcv(buf);
1323 break;
1324 case NAME_DISTRIBUTOR:
1325 tipc_named_rcv(buf);
1326 break;
1327 case MSG_BUNDLER:
1328 tipc_link_bundle_rcv(buf);
1329 break;
1330 default:
1331 res = -EINVAL;
1332 }
1333 return res;
1334}
1335
1336/**
1628 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue 1337 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1629 * 1338 *
1630 * Returns increase in queue length (i.e. 0 or 1) 1339 * Returns increase in queue length (i.e. 0 or 1)
@@ -2217,6 +1926,7 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
2217 u32 msgcount = msg_msgcnt(buf_msg(buf)); 1926 u32 msgcount = msg_msgcnt(buf_msg(buf));
2218 u32 pos = INT_H_SIZE; 1927 u32 pos = INT_H_SIZE;
2219 struct sk_buff *obuf; 1928 struct sk_buff *obuf;
1929 struct tipc_msg *omsg;
2220 1930
2221 while (msgcount--) { 1931 while (msgcount--) {
2222 obuf = buf_extract(buf, pos); 1932 obuf = buf_extract(buf, pos);
@@ -2224,82 +1934,18 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
2224 pr_warn("Link unable to unbundle message(s)\n"); 1934 pr_warn("Link unable to unbundle message(s)\n");
2225 break; 1935 break;
2226 } 1936 }
2227 pos += align(msg_size(buf_msg(obuf))); 1937 omsg = buf_msg(obuf);
2228 tipc_net_route_msg(obuf); 1938 pos += align(msg_size(omsg));
2229 } 1939 if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) {
2230 kfree_skb(buf); 1940 tipc_sk_rcv(obuf);
2231} 1941 } else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
2232 1942 tipc_named_rcv(obuf);
2233/* 1943 } else {
2234 * Fragmentation/defragmentation: 1944 pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
2235 */ 1945 kfree_skb(obuf);
2236
2237/*
2238 * tipc_link_frag_xmit: Entry for buffers needing fragmentation.
2239 * The buffer is complete, inclusive total message length.
2240 * Returns user data length.
2241 */
2242static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
2243{
2244 struct sk_buff *buf_chain = NULL;
2245 struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
2246 struct tipc_msg *inmsg = buf_msg(buf);
2247 struct tipc_msg fragm_hdr;
2248 u32 insize = msg_size(inmsg);
2249 u32 dsz = msg_data_sz(inmsg);
2250 unchar *crs = buf->data;
2251 u32 rest = insize;
2252 u32 pack_sz = l_ptr->max_pkt;
2253 u32 fragm_sz = pack_sz - INT_H_SIZE;
2254 u32 fragm_no = 0;
2255 u32 destaddr;
2256
2257 if (msg_short(inmsg))
2258 destaddr = l_ptr->addr;
2259 else
2260 destaddr = msg_destnode(inmsg);
2261
2262 /* Prepare reusable fragment header: */
2263 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2264 INT_H_SIZE, destaddr);
2265
2266 /* Chop up message: */
2267 while (rest > 0) {
2268 struct sk_buff *fragm;
2269
2270 if (rest <= fragm_sz) {
2271 fragm_sz = rest;
2272 msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2273 }
2274 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2275 if (fragm == NULL) {
2276 kfree_skb(buf);
2277 kfree_skb_list(buf_chain);
2278 return -ENOMEM;
2279 } 1946 }
2280 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2281 fragm_no++;
2282 msg_set_fragm_no(&fragm_hdr, fragm_no);
2283 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2284 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2285 fragm_sz);
2286 buf_chain_tail->next = fragm;
2287 buf_chain_tail = fragm;
2288
2289 rest -= fragm_sz;
2290 crs += fragm_sz;
2291 msg_set_type(&fragm_hdr, FRAGMENT);
2292 } 1947 }
2293 kfree_skb(buf); 1948 kfree_skb(buf);
2294
2295 /* Append chain of fragments to send queue & send them */
2296 l_ptr->long_msg_seq_no++;
2297 link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
2298 l_ptr->stats.sent_fragments += fragm_no;
2299 l_ptr->stats.sent_fragmented++;
2300 tipc_link_push_queue(l_ptr);
2301
2302 return dsz;
2303} 1949}
2304 1950
2305static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) 1951static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)