aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c87
1 files changed, 43 insertions, 44 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 4e8647aef01c..ddd2dd6f77aa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -515,6 +515,10 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
515 if (link_is_bc_sndlink(l)) 515 if (link_is_bc_sndlink(l))
516 l->state = LINK_ESTABLISHED; 516 l->state = LINK_ESTABLISHED;
517 517
518 /* Disable replicast if even a single peer doesn't support it */
519 if (link_is_bc_rcvlink(l) && !(peer_caps & TIPC_BCAST_RCAST))
520 tipc_bcast_disable_rcast(net);
521
518 return true; 522 return true;
519} 523}
520 524
@@ -776,60 +780,47 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
776 780
777/** 781/**
778 * link_schedule_user - schedule a message sender for wakeup after congestion 782 * link_schedule_user - schedule a message sender for wakeup after congestion
779 * @link: congested link 783 * @l: congested link
780 * @list: message that was attempted sent 784 * @hdr: header of message that is being sent
781 * Create pseudo msg to send back to user when congestion abates 785 * Create pseudo msg to send back to user when congestion abates
782 * Does not consume buffer list
783 */ 786 */
784static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list) 787static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
785{ 788{
786 struct tipc_msg *msg = buf_msg(skb_peek(list)); 789 u32 dnode = tipc_own_addr(l->net);
787 int imp = msg_importance(msg); 790 u32 dport = msg_origport(hdr);
788 u32 oport = msg_origport(msg);
789 u32 addr = tipc_own_addr(link->net);
790 struct sk_buff *skb; 791 struct sk_buff *skb;
791 792
792 /* This really cannot happen... */
793 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
794 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
795 return -ENOBUFS;
796 }
797 /* Non-blocking sender: */
798 if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
799 return -ELINKCONG;
800
801 /* Create and schedule wakeup pseudo message */ 793 /* Create and schedule wakeup pseudo message */
802 skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, 794 skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
803 addr, addr, oport, 0, 0); 795 dnode, l->addr, dport, 0, 0);
804 if (!skb) 796 if (!skb)
805 return -ENOBUFS; 797 return -ENOBUFS;
806 TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list); 798 msg_set_dest_droppable(buf_msg(skb), true);
807 TIPC_SKB_CB(skb)->chain_imp = imp; 799 TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr);
808 skb_queue_tail(&link->wakeupq, skb); 800 skb_queue_tail(&l->wakeupq, skb);
809 link->stats.link_congs++; 801 l->stats.link_congs++;
810 return -ELINKCONG; 802 return -ELINKCONG;
811} 803}
812 804
813/** 805/**
814 * link_prepare_wakeup - prepare users for wakeup after congestion 806 * link_prepare_wakeup - prepare users for wakeup after congestion
815 * @link: congested link 807 * @l: congested link
816 * Move a number of waiting users, as permitted by available space in 808 * Wake up a number of waiting users, as permitted by available space
817 * the send queue, from link wait queue to node wait queue for wakeup 809 * in the send queue
818 */ 810 */
819void link_prepare_wakeup(struct tipc_link *l) 811void link_prepare_wakeup(struct tipc_link *l)
820{ 812{
821 int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
822 int imp, lim;
823 struct sk_buff *skb, *tmp; 813 struct sk_buff *skb, *tmp;
814 int imp, i = 0;
824 815
825 skb_queue_walk_safe(&l->wakeupq, skb, tmp) { 816 skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
826 imp = TIPC_SKB_CB(skb)->chain_imp; 817 imp = TIPC_SKB_CB(skb)->chain_imp;
827 lim = l->backlog[imp].limit; 818 if (l->backlog[imp].len < l->backlog[imp].limit) {
828 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; 819 skb_unlink(skb, &l->wakeupq);
829 if ((pnd[imp] + l->backlog[imp].len) >= lim) 820 skb_queue_tail(l->inputq, skb);
821 } else if (i++ > 10) {
830 break; 822 break;
831 skb_unlink(skb, &l->wakeupq); 823 }
832 skb_queue_tail(l->inputq, skb);
833 } 824 }
834} 825}
835 826
@@ -869,8 +860,7 @@ void tipc_link_reset(struct tipc_link *l)
869 * @list: chain of buffers containing message 860 * @list: chain of buffers containing message
870 * @xmitq: returned list of packets to be sent by caller 861 * @xmitq: returned list of packets to be sent by caller
871 * 862 *
872 * Consumes the buffer chain, except when returning -ELINKCONG, 863 * Consumes the buffer chain.
873 * since the caller then may want to make more send attempts.
874 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS 864 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
875 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted 865 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
876 */ 866 */
@@ -879,7 +869,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
879{ 869{
880 struct tipc_msg *hdr = buf_msg(skb_peek(list)); 870 struct tipc_msg *hdr = buf_msg(skb_peek(list));
881 unsigned int maxwin = l->window; 871 unsigned int maxwin = l->window;
882 unsigned int i, imp = msg_importance(hdr); 872 int imp = msg_importance(hdr);
883 unsigned int mtu = l->mtu; 873 unsigned int mtu = l->mtu;
884 u16 ack = l->rcv_nxt - 1; 874 u16 ack = l->rcv_nxt - 1;
885 u16 seqno = l->snd_nxt; 875 u16 seqno = l->snd_nxt;
@@ -888,19 +878,22 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
888 struct sk_buff_head *backlogq = &l->backlogq; 878 struct sk_buff_head *backlogq = &l->backlogq;
889 struct sk_buff *skb, *_skb, *bskb; 879 struct sk_buff *skb, *_skb, *bskb;
890 int pkt_cnt = skb_queue_len(list); 880 int pkt_cnt = skb_queue_len(list);
881 int rc = 0;
891 882
892 /* Match msg importance against this and all higher backlog limits: */
893 if (!skb_queue_empty(backlogq)) {
894 for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
895 if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
896 return link_schedule_user(l, list);
897 }
898 }
899 if (unlikely(msg_size(hdr) > mtu)) { 883 if (unlikely(msg_size(hdr) > mtu)) {
900 skb_queue_purge(list); 884 skb_queue_purge(list);
901 return -EMSGSIZE; 885 return -EMSGSIZE;
902 } 886 }
903 887
888 /* Allow oversubscription of one data msg per source at congestion */
889 if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) {
890 if (imp == TIPC_SYSTEM_IMPORTANCE) {
891 pr_warn("%s<%s>, link overflow", link_rst_msg, l->name);
892 return -ENOBUFS;
893 }
894 rc = link_schedule_user(l, hdr);
895 }
896
904 if (pkt_cnt > 1) { 897 if (pkt_cnt > 1) {
905 l->stats.sent_fragmented++; 898 l->stats.sent_fragmented++;
906 l->stats.sent_fragments += pkt_cnt; 899 l->stats.sent_fragments += pkt_cnt;
@@ -946,7 +939,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
946 skb_queue_splice_tail_init(list, backlogq); 939 skb_queue_splice_tail_init(list, backlogq);
947 } 940 }
948 l->snd_nxt = seqno; 941 l->snd_nxt = seqno;
949 return 0; 942 return rc;
950} 943}
951 944
952void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) 945void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
@@ -1043,11 +1036,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
1043static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, 1036static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1044 struct sk_buff_head *inputq) 1037 struct sk_buff_head *inputq)
1045{ 1038{
1046 switch (msg_user(buf_msg(skb))) { 1039 struct tipc_msg *hdr = buf_msg(skb);
1040
1041 switch (msg_user(hdr)) {
1047 case TIPC_LOW_IMPORTANCE: 1042 case TIPC_LOW_IMPORTANCE:
1048 case TIPC_MEDIUM_IMPORTANCE: 1043 case TIPC_MEDIUM_IMPORTANCE:
1049 case TIPC_HIGH_IMPORTANCE: 1044 case TIPC_HIGH_IMPORTANCE:
1050 case TIPC_CRITICAL_IMPORTANCE: 1045 case TIPC_CRITICAL_IMPORTANCE:
1046 if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
1047 skb_queue_tail(l->bc_rcvlink->inputq, skb);
1048 return true;
1049 }
1051 case CONN_MANAGER: 1050 case CONN_MANAGER:
1052 skb_queue_tail(inputq, skb); 1051 skb_queue_tail(inputq, skb);
1053 return true; 1052 return true;