aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-03-25 12:07:24 -0400
committerDavid S. Miller <davem@davemloft.net>2015-03-25 14:05:56 -0400
commit1f66d161ab3d8b518903fa6c3f9c1f48d6919e74 (patch)
tree31275d3b6836126fc76c944b65693d83458dcb93
parentb06b107a4c190299e9e3f8dbcccfc7fe9e10c8cb (diff)
tipc: introduce starvation free send algorithm
Currently, we only use a single counter; the length of the backlog queue, to determine whether a message should be accepted to the queue or not. Each time a message is being sent, the queue length is compared to a threshold value for the message's importance priority. If the queue length is beyond this threshold, the message is rejected. This algorithm implies a risk of starvation of low importance senders during very high load, because it may take a long time before the backlog queue has decreased enough to accept a lower level message. We now eliminate this risk by introducing a counter for each importance priority. When a message is sent, we check only the queue level for that particular message's priority. If that is ok, the message can be added to the backlog, irrespective of the queue level for other priorities. This way, each level is guaranteed a certain portion of the total bandwidth, and any risk of starvation is eliminated. Reviewed-by: Ying Xue <ying.xue@windriver.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/bcast.c2
-rw-r--r--net/tipc/link.c58
-rw-r--r--net/tipc/link.h7
3 files changed, 42 insertions, 25 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 52307397e0b1..79355531c3e2 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -831,7 +831,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
831 prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP); 831 prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
832 if (!prop) 832 if (!prop)
833 goto attr_msg_full; 833 goto attr_msg_full;
834 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0])) 834 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))
835 goto prop_msg_full; 835 goto prop_msg_full;
836 nla_nest_end(msg->skb, prop); 836 nla_nest_end(msg->skb, prop);
837 837
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 8c98c4d00ad6..b9325a1bddaa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -310,7 +310,6 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
310 link_init_max_pkt(l_ptr); 310 link_init_max_pkt(l_ptr);
311 l_ptr->priority = b_ptr->priority; 311 l_ptr->priority = b_ptr->priority;
312 tipc_link_set_queue_limits(l_ptr, b_ptr->window); 312 tipc_link_set_queue_limits(l_ptr, b_ptr->window);
313
314 l_ptr->next_out_no = 1; 313 l_ptr->next_out_no = 1;
315 __skb_queue_head_init(&l_ptr->transmq); 314 __skb_queue_head_init(&l_ptr->transmq);
316 __skb_queue_head_init(&l_ptr->backlogq); 315 __skb_queue_head_init(&l_ptr->backlogq);
@@ -398,19 +397,22 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
398 * Move a number of waiting users, as permitted by available space in 397 * Move a number of waiting users, as permitted by available space in
399 * the send queue, from link wait queue to node wait queue for wakeup 398 * the send queue, from link wait queue to node wait queue for wakeup
400 */ 399 */
401void link_prepare_wakeup(struct tipc_link *link) 400void link_prepare_wakeup(struct tipc_link *l)
402{ 401{
403 uint pend_qsz = skb_queue_len(&link->backlogq); 402 int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
403 int imp, lim;
404 struct sk_buff *skb, *tmp; 404 struct sk_buff *skb, *tmp;
405 405
406 skb_queue_walk_safe(&link->wakeupq, skb, tmp) { 406 skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
407 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp]) 407 imp = TIPC_SKB_CB(skb)->chain_imp;
408 lim = l->window + l->backlog[imp].limit;
409 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
410 if ((pnd[imp] + l->backlog[imp].len) >= lim)
408 break; 411 break;
409 pend_qsz += TIPC_SKB_CB(skb)->chain_sz; 412 skb_unlink(skb, &l->wakeupq);
410 skb_unlink(skb, &link->wakeupq); 413 skb_queue_tail(&l->inputq, skb);
411 skb_queue_tail(&link->inputq, skb); 414 l->owner->inputq = &l->inputq;
412 link->owner->inputq = &link->inputq; 415 l->owner->action_flags |= TIPC_MSG_EVT;
413 link->owner->action_flags |= TIPC_MSG_EVT;
414 } 416 }
415} 417}
416 418
@@ -424,6 +426,16 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
424 l_ptr->reasm_buf = NULL; 426 l_ptr->reasm_buf = NULL;
425} 427}
426 428
429static void tipc_link_purge_backlog(struct tipc_link *l)
430{
431 __skb_queue_purge(&l->backlogq);
432 l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
433 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
434 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
435 l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
436 l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
437}
438
427/** 439/**
428 * tipc_link_purge_queues - purge all pkt queues associated with link 440 * tipc_link_purge_queues - purge all pkt queues associated with link
429 * @l_ptr: pointer to link 441 * @l_ptr: pointer to link
@@ -432,7 +444,7 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr)
432{ 444{
433 __skb_queue_purge(&l_ptr->deferdq); 445 __skb_queue_purge(&l_ptr->deferdq);
434 __skb_queue_purge(&l_ptr->transmq); 446 __skb_queue_purge(&l_ptr->transmq);
435 __skb_queue_purge(&l_ptr->backlogq); 447 tipc_link_purge_backlog(l_ptr);
436 tipc_link_reset_fragments(l_ptr); 448 tipc_link_reset_fragments(l_ptr);
437} 449}
438 450
@@ -466,13 +478,13 @@ void tipc_link_reset(struct tipc_link *l_ptr)
466 478
467 /* Clean up all queues, except inputq: */ 479 /* Clean up all queues, except inputq: */
468 __skb_queue_purge(&l_ptr->transmq); 480 __skb_queue_purge(&l_ptr->transmq);
469 __skb_queue_purge(&l_ptr->backlogq);
470 __skb_queue_purge(&l_ptr->deferdq); 481 __skb_queue_purge(&l_ptr->deferdq);
471 if (!owner->inputq) 482 if (!owner->inputq)
472 owner->inputq = &l_ptr->inputq; 483 owner->inputq = &l_ptr->inputq;
473 skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq); 484 skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
474 if (!skb_queue_empty(owner->inputq)) 485 if (!skb_queue_empty(owner->inputq))
475 owner->action_flags |= TIPC_MSG_EVT; 486 owner->action_flags |= TIPC_MSG_EVT;
487 tipc_link_purge_backlog(l_ptr);
476 l_ptr->rcv_unacked = 0; 488 l_ptr->rcv_unacked = 0;
477 l_ptr->checkpoint = 1; 489 l_ptr->checkpoint = 1;
478 l_ptr->next_out_no = 1; 490 l_ptr->next_out_no = 1;
@@ -754,16 +766,14 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
754 struct sk_buff_head *backlogq = &link->backlogq; 766 struct sk_buff_head *backlogq = &link->backlogq;
755 struct sk_buff *skb, *tmp; 767 struct sk_buff *skb, *tmp;
756 768
757 /* Match queue limit against msg importance: */ 769 /* Match backlog limit against msg importance: */
758 if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp])) 770 if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit))
759 return tipc_link_cong(link, list); 771 return tipc_link_cong(link, list);
760 772
761 /* Has valid packet limit been used ? */
762 if (unlikely(msg_size(msg) > mtu)) { 773 if (unlikely(msg_size(msg) > mtu)) {
763 __skb_queue_purge(list); 774 __skb_queue_purge(list);
764 return -EMSGSIZE; 775 return -EMSGSIZE;
765 } 776 }
766
767 /* Prepare each packet for sending, and add to relevant queue: */ 777 /* Prepare each packet for sending, and add to relevant queue: */
768 skb_queue_walk_safe(list, skb, tmp) { 778 skb_queue_walk_safe(list, skb, tmp) {
769 __skb_unlink(skb, list); 779 __skb_unlink(skb, list);
@@ -786,8 +796,10 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
786 if (tipc_msg_make_bundle(&skb, mtu, link->addr)) { 796 if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {
787 link->stats.sent_bundled++; 797 link->stats.sent_bundled++;
788 link->stats.sent_bundles++; 798 link->stats.sent_bundles++;
799 imp = msg_importance(buf_msg(skb));
789 } 800 }
790 __skb_queue_tail(backlogq, skb); 801 __skb_queue_tail(backlogq, skb);
802 link->backlog[imp].len++;
791 seqno++; 803 seqno++;
792 } 804 }
793 link->next_out_no = seqno; 805 link->next_out_no = seqno;
@@ -914,6 +926,7 @@ void tipc_link_push_packets(struct tipc_link *link)
914 if (!skb) 926 if (!skb)
915 break; 927 break;
916 msg = buf_msg(skb); 928 msg = buf_msg(skb);
929 link->backlog[msg_importance(msg)].len--;
917 msg_set_ack(msg, ack); 930 msg_set_ack(msg, ack);
918 msg_set_bcast_ack(msg, link->owner->bclink.last_in); 931 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
919 link->rcv_unacked = 0; 932 link->rcv_unacked = 0;
@@ -1610,6 +1623,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1610 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, 1623 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
1611 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); 1624 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
1612 skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq); 1625 skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
1626 tipc_link_purge_backlog(l_ptr);
1613 msgcount = skb_queue_len(&l_ptr->transmq); 1627 msgcount = skb_queue_len(&l_ptr->transmq);
1614 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1628 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1615 msg_set_msgcnt(&tunnel_hdr, msgcount); 1629 msg_set_msgcnt(&tunnel_hdr, msgcount);
@@ -1817,11 +1831,11 @@ void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1817 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE); 1831 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE);
1818 1832
1819 l->window = win; 1833 l->window = win;
1820 l->queue_limit[TIPC_LOW_IMPORTANCE] = win / 2; 1834 l->backlog[TIPC_LOW_IMPORTANCE].limit = win / 2;
1821 l->queue_limit[TIPC_MEDIUM_IMPORTANCE] = win; 1835 l->backlog[TIPC_MEDIUM_IMPORTANCE].limit = win;
1822 l->queue_limit[TIPC_HIGH_IMPORTANCE] = win / 2 * 3; 1836 l->backlog[TIPC_HIGH_IMPORTANCE].limit = win / 2 * 3;
1823 l->queue_limit[TIPC_CRITICAL_IMPORTANCE] = win * 2; 1837 l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = win * 2;
1824 l->queue_limit[TIPC_SYSTEM_IMPORTANCE] = max_bulk; 1838 l->backlog[TIPC_SYSTEM_IMPORTANCE].limit = max_bulk;
1825} 1839}
1826 1840
1827/* tipc_link_find_owner - locate owner node of link by link's name 1841/* tipc_link_find_owner - locate owner node of link by link's name
@@ -2120,7 +2134,7 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
2120 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance)) 2134 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))
2121 goto prop_msg_full; 2135 goto prop_msg_full;
2122 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, 2136 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN,
2123 link->queue_limit[TIPC_LOW_IMPORTANCE])) 2137 link->window))
2124 goto prop_msg_full; 2138 goto prop_msg_full;
2125 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority)) 2139 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2126 goto prop_msg_full; 2140 goto prop_msg_full;
diff --git a/net/tipc/link.h b/net/tipc/link.h
index eec3ecf2d450..99543a46095a 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -118,7 +118,7 @@ struct tipc_stats {
118 * @pmsg: convenience pointer to "proto_msg" field 118 * @pmsg: convenience pointer to "proto_msg" field
119 * @priority: current link priority 119 * @priority: current link priority
120 * @net_plane: current link network plane ('A' through 'H') 120 * @net_plane: current link network plane ('A' through 'H')
121 * @queue_limit: outbound message queue congestion thresholds (indexed by user) 121 * @backlog_limit: backlog queue congestion thresholds (indexed by importance)
122 * @exp_msg_count: # of tunnelled messages expected during link changeover 122 * @exp_msg_count: # of tunnelled messages expected during link changeover
123 * @reset_checkpoint: seq # of last acknowledged message at time of link reset 123 * @reset_checkpoint: seq # of last acknowledged message at time of link reset
124 * @max_pkt: current maximum packet size for this link 124 * @max_pkt: current maximum packet size for this link
@@ -166,7 +166,6 @@ struct tipc_link {
166 struct tipc_msg *pmsg; 166 struct tipc_msg *pmsg;
167 u32 priority; 167 u32 priority;
168 char net_plane; 168 char net_plane;
169 u32 queue_limit[15]; /* queue_limit[0]==window limit */
170 169
171 /* Changeover */ 170 /* Changeover */
172 u32 exp_msg_count; 171 u32 exp_msg_count;
@@ -180,6 +179,10 @@ struct tipc_link {
180 /* Sending */ 179 /* Sending */
181 struct sk_buff_head transmq; 180 struct sk_buff_head transmq;
182 struct sk_buff_head backlogq; 181 struct sk_buff_head backlogq;
182 struct {
183 u16 len;
184 u16 limit;
185 } backlog[5];
183 u32 next_out_no; 186 u32 next_out_no;
184 u32 window; 187 u32 window;
185 u32 last_retransmitted; 188 u32 last_retransmitted;