summaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorTuong Lien <tuong.t.lien@dektech.com.au>2019-10-02 07:49:43 -0400
committerDavid S. Miller <davem@davemloft.net>2019-10-02 11:02:05 -0400
commite95584a889e1902fdf1ded9712e2c3c3083baf96 (patch)
treef6f9f0b1c9f475b0012656e8df0071c11f24fee3 /net/tipc
parenta761129e3625688310aecf26e1be9e98e85f8eb5 (diff)
tipc: fix unlimited bundling of small messages
We have identified a problem with the "oversubscription" policy in the link transmission code. When small messages are transmitted, and the sending link has reached the transmit window limit, those messages will be bundled and put into the link backlog queue. However, bundles of data messages are counted at the 'CRITICAL' level, so that the counter for that level, instead of the counter for the real, bundled message's level is the one being increased. Subsequent, to-be-bundled data messages at non-CRITICAL levels continue to be tested against the unchanged counter for their own level, while contributing to an unrestrained increase at the CRITICAL backlog level. This leaves a gap in congestion control algorithm for small messages that can result in starvation for other users or a "real" CRITICAL user. Even that eventually can lead to buffer exhaustion & link reset. We fix this by keeping a 'target_bskb' buffer pointer at each levels, then when bundling, we only bundle messages at the same importance level only. This way, we know exactly how many slots a certain level have occupied in the queue, so can manage level congestion accurately. By bundling messages at the same level, we even have more benefits. Let consider this: - One socket sends 64-byte messages at the 'CRITICAL' level; - Another sends 4096-byte messages at the 'LOW' level; When a 64-byte message comes and is bundled the first time, we put the overhead of message bundle to it (+ 40-byte header, data copy, etc.) for later use, but the next message can be a 4096-byte one that cannot be bundled to the previous one. This means the last bundle carries only one payload message which is totally inefficient, as for the receiver also! Later on, another 64-byte message comes, now we make a new bundle and the same story repeats... With the new bundling algorithm, this will not happen, the 64-byte messages will be bundled together even when the 4096-byte message(s) comes in between. However, if the 4096-byte messages are sent at the same level i.e. 'CRITICAL', the bundling algorithm will again cause the same overhead. Also, the same will happen even with only one socket sending small messages at a rate close to the link transmit's one, so that, when one message is bundled, it's transmitted shortly. Then, another message comes, a new bundle is created and so on... We will solve this issue radically by another patch. Fixes: 365ad353c256 ("tipc: reduce risk of user starvation during link congestion") Reported-by: Hoang Le <hoang.h.le@dektech.com.au> Acked-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/link.c29
-rw-r--r--net/tipc/msg.c5
2 files changed, 19 insertions, 15 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 6cc75ffd9e2c..999eab592de8 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -160,6 +160,7 @@ struct tipc_link {
160 struct { 160 struct {
161 u16 len; 161 u16 len;
162 u16 limit; 162 u16 limit;
163 struct sk_buff *target_bskb;
163 } backlog[5]; 164 } backlog[5];
164 u16 snd_nxt; 165 u16 snd_nxt;
165 u16 window; 166 u16 window;
@@ -880,6 +881,7 @@ static void link_prepare_wakeup(struct tipc_link *l)
880void tipc_link_reset(struct tipc_link *l) 881void tipc_link_reset(struct tipc_link *l)
881{ 882{
882 struct sk_buff_head list; 883 struct sk_buff_head list;
884 u32 imp;
883 885
884 __skb_queue_head_init(&list); 886 __skb_queue_head_init(&list);
885 887
@@ -901,11 +903,10 @@ void tipc_link_reset(struct tipc_link *l)
901 __skb_queue_purge(&l->deferdq); 903 __skb_queue_purge(&l->deferdq);
902 __skb_queue_purge(&l->backlogq); 904 __skb_queue_purge(&l->backlogq);
903 __skb_queue_purge(&l->failover_deferdq); 905 __skb_queue_purge(&l->failover_deferdq);
904 l->backlog[TIPC_LOW_IMPORTANCE].len = 0; 906 for (imp = 0; imp <= TIPC_SYSTEM_IMPORTANCE; imp++) {
905 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; 907 l->backlog[imp].len = 0;
906 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; 908 l->backlog[imp].target_bskb = NULL;
907 l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; 909 }
908 l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
909 kfree_skb(l->reasm_buf); 910 kfree_skb(l->reasm_buf);
910 kfree_skb(l->reasm_tnlmsg); 911 kfree_skb(l->reasm_tnlmsg);
911 kfree_skb(l->failover_reasm_skb); 912 kfree_skb(l->failover_reasm_skb);
@@ -947,7 +948,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
947 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; 948 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
948 struct sk_buff_head *transmq = &l->transmq; 949 struct sk_buff_head *transmq = &l->transmq;
949 struct sk_buff_head *backlogq = &l->backlogq; 950 struct sk_buff_head *backlogq = &l->backlogq;
950 struct sk_buff *skb, *_skb, *bskb; 951 struct sk_buff *skb, *_skb, **tskb;
951 int pkt_cnt = skb_queue_len(list); 952 int pkt_cnt = skb_queue_len(list);
952 int rc = 0; 953 int rc = 0;
953 954
@@ -999,19 +1000,21 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
999 seqno++; 1000 seqno++;
1000 continue; 1001 continue;
1001 } 1002 }
1002 if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) { 1003 tskb = &l->backlog[imp].target_bskb;
1004 if (tipc_msg_bundle(*tskb, hdr, mtu)) {
1003 kfree_skb(__skb_dequeue(list)); 1005 kfree_skb(__skb_dequeue(list));
1004 l->stats.sent_bundled++; 1006 l->stats.sent_bundled++;
1005 continue; 1007 continue;
1006 } 1008 }
1007 if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) { 1009 if (tipc_msg_make_bundle(tskb, hdr, mtu, l->addr)) {
1008 kfree_skb(__skb_dequeue(list)); 1010 kfree_skb(__skb_dequeue(list));
1009 __skb_queue_tail(backlogq, bskb); 1011 __skb_queue_tail(backlogq, *tskb);
1010 l->backlog[msg_importance(buf_msg(bskb))].len++; 1012 l->backlog[imp].len++;
1011 l->stats.sent_bundled++; 1013 l->stats.sent_bundled++;
1012 l->stats.sent_bundles++; 1014 l->stats.sent_bundles++;
1013 continue; 1015 continue;
1014 } 1016 }
1017 l->backlog[imp].target_bskb = NULL;
1015 l->backlog[imp].len += skb_queue_len(list); 1018 l->backlog[imp].len += skb_queue_len(list);
1016 skb_queue_splice_tail_init(list, backlogq); 1019 skb_queue_splice_tail_init(list, backlogq);
1017 } 1020 }
@@ -1027,6 +1030,7 @@ static void tipc_link_advance_backlog(struct tipc_link *l,
1027 u16 seqno = l->snd_nxt; 1030 u16 seqno = l->snd_nxt;
1028 u16 ack = l->rcv_nxt - 1; 1031 u16 ack = l->rcv_nxt - 1;
1029 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; 1032 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
1033 u32 imp;
1030 1034
1031 while (skb_queue_len(&l->transmq) < l->window) { 1035 while (skb_queue_len(&l->transmq) < l->window) {
1032 skb = skb_peek(&l->backlogq); 1036 skb = skb_peek(&l->backlogq);
@@ -1037,7 +1041,10 @@ static void tipc_link_advance_backlog(struct tipc_link *l,
1037 break; 1041 break;
1038 __skb_dequeue(&l->backlogq); 1042 __skb_dequeue(&l->backlogq);
1039 hdr = buf_msg(skb); 1043 hdr = buf_msg(skb);
1040 l->backlog[msg_importance(hdr)].len--; 1044 imp = msg_importance(hdr);
1045 l->backlog[imp].len--;
1046 if (unlikely(skb == l->backlog[imp].target_bskb))
1047 l->backlog[imp].target_bskb = NULL;
1041 __skb_queue_tail(&l->transmq, skb); 1048 __skb_queue_tail(&l->transmq, skb);
1042 /* next retransmit attempt */ 1049 /* next retransmit attempt */
1043 if (link_is_bc_sndlink(l)) 1050 if (link_is_bc_sndlink(l))
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index e6d49cdc61b4..922d262e153f 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -543,10 +543,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg,
543 bmsg = buf_msg(_skb); 543 bmsg = buf_msg(_skb);
544 tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0, 544 tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0,
545 INT_H_SIZE, dnode); 545 INT_H_SIZE, dnode);
546 if (msg_isdata(msg)) 546 msg_set_importance(bmsg, msg_importance(msg));
547 msg_set_importance(bmsg, TIPC_CRITICAL_IMPORTANCE);
548 else
549 msg_set_importance(bmsg, TIPC_SYSTEM_IMPORTANCE);
550 msg_set_seqno(bmsg, msg_seqno(msg)); 547 msg_set_seqno(bmsg, msg_seqno(msg));
551 msg_set_ack(bmsg, msg_ack(msg)); 548 msg_set_ack(bmsg, msg_ack(msg));
552 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); 549 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));