aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-03-25 12:07:24 -0400
committerDavid S. Miller <davem@davemloft.net>2015-03-25 14:05:56 -0400
commit1f66d161ab3d8b518903fa6c3f9c1f48d6919e74 (patch)
tree31275d3b6836126fc76c944b65693d83458dcb93 /net/tipc/link.c
parentb06b107a4c190299e9e3f8dbcccfc7fe9e10c8cb (diff)
tipc: introduce starvation free send algorithm
Currently, we only use a single counter; the length of the backlog queue, to determine whether a message should be accepted to the queue or not. Each time a message is being sent, the queue length is compared to a threshold value for the message's importance priority. If the queue length is beyond this threshold, the message is rejected. This algorithm implies a risk of starvation of low importance senders during very high load, because it may take a long time before the backlog queue has decreased enough to accept a lower level message. We now eliminate this risk by introducing a counter for each importance priority. When a message is sent, we check only the queue level for that particular message's priority. If that is ok, the message can be added to the backlog, irrespective of the queue level for other priorities. This way, each level is guaranteed a certain portion of the total bandwidth, and any risk of starvation is eliminated. Reviewed-by: Ying Xue <ying.xue@windriver.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c58
1 files changed, 36 insertions, 22 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 8c98c4d00ad6..b9325a1bddaa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -310,7 +310,6 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
310 link_init_max_pkt(l_ptr); 310 link_init_max_pkt(l_ptr);
311 l_ptr->priority = b_ptr->priority; 311 l_ptr->priority = b_ptr->priority;
312 tipc_link_set_queue_limits(l_ptr, b_ptr->window); 312 tipc_link_set_queue_limits(l_ptr, b_ptr->window);
313
314 l_ptr->next_out_no = 1; 313 l_ptr->next_out_no = 1;
315 __skb_queue_head_init(&l_ptr->transmq); 314 __skb_queue_head_init(&l_ptr->transmq);
316 __skb_queue_head_init(&l_ptr->backlogq); 315 __skb_queue_head_init(&l_ptr->backlogq);
@@ -398,19 +397,22 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
398 * Move a number of waiting users, as permitted by available space in 397 * Move a number of waiting users, as permitted by available space in
399 * the send queue, from link wait queue to node wait queue for wakeup 398 * the send queue, from link wait queue to node wait queue for wakeup
400 */ 399 */
401void link_prepare_wakeup(struct tipc_link *link) 400void link_prepare_wakeup(struct tipc_link *l)
402{ 401{
403 uint pend_qsz = skb_queue_len(&link->backlogq); 402 int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
403 int imp, lim;
404 struct sk_buff *skb, *tmp; 404 struct sk_buff *skb, *tmp;
405 405
406 skb_queue_walk_safe(&link->wakeupq, skb, tmp) { 406 skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
407 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp]) 407 imp = TIPC_SKB_CB(skb)->chain_imp;
408 lim = l->window + l->backlog[imp].limit;
409 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
410 if ((pnd[imp] + l->backlog[imp].len) >= lim)
408 break; 411 break;
409 pend_qsz += TIPC_SKB_CB(skb)->chain_sz; 412 skb_unlink(skb, &l->wakeupq);
410 skb_unlink(skb, &link->wakeupq); 413 skb_queue_tail(&l->inputq, skb);
411 skb_queue_tail(&link->inputq, skb); 414 l->owner->inputq = &l->inputq;
412 link->owner->inputq = &link->inputq; 415 l->owner->action_flags |= TIPC_MSG_EVT;
413 link->owner->action_flags |= TIPC_MSG_EVT;
414 } 416 }
415} 417}
416 418
@@ -424,6 +426,16 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
424 l_ptr->reasm_buf = NULL; 426 l_ptr->reasm_buf = NULL;
425} 427}
426 428
429static void tipc_link_purge_backlog(struct tipc_link *l)
430{
431 __skb_queue_purge(&l->backlogq);
432 l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
433 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
434 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
435 l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
436 l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
437}
438
427/** 439/**
428 * tipc_link_purge_queues - purge all pkt queues associated with link 440 * tipc_link_purge_queues - purge all pkt queues associated with link
429 * @l_ptr: pointer to link 441 * @l_ptr: pointer to link
@@ -432,7 +444,7 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr)
432{ 444{
433 __skb_queue_purge(&l_ptr->deferdq); 445 __skb_queue_purge(&l_ptr->deferdq);
434 __skb_queue_purge(&l_ptr->transmq); 446 __skb_queue_purge(&l_ptr->transmq);
435 __skb_queue_purge(&l_ptr->backlogq); 447 tipc_link_purge_backlog(l_ptr);
436 tipc_link_reset_fragments(l_ptr); 448 tipc_link_reset_fragments(l_ptr);
437} 449}
438 450
@@ -466,13 +478,13 @@ void tipc_link_reset(struct tipc_link *l_ptr)
466 478
467 /* Clean up all queues, except inputq: */ 479 /* Clean up all queues, except inputq: */
468 __skb_queue_purge(&l_ptr->transmq); 480 __skb_queue_purge(&l_ptr->transmq);
469 __skb_queue_purge(&l_ptr->backlogq);
470 __skb_queue_purge(&l_ptr->deferdq); 481 __skb_queue_purge(&l_ptr->deferdq);
471 if (!owner->inputq) 482 if (!owner->inputq)
472 owner->inputq = &l_ptr->inputq; 483 owner->inputq = &l_ptr->inputq;
473 skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq); 484 skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
474 if (!skb_queue_empty(owner->inputq)) 485 if (!skb_queue_empty(owner->inputq))
475 owner->action_flags |= TIPC_MSG_EVT; 486 owner->action_flags |= TIPC_MSG_EVT;
487 tipc_link_purge_backlog(l_ptr);
476 l_ptr->rcv_unacked = 0; 488 l_ptr->rcv_unacked = 0;
477 l_ptr->checkpoint = 1; 489 l_ptr->checkpoint = 1;
478 l_ptr->next_out_no = 1; 490 l_ptr->next_out_no = 1;
@@ -754,16 +766,14 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
754 struct sk_buff_head *backlogq = &link->backlogq; 766 struct sk_buff_head *backlogq = &link->backlogq;
755 struct sk_buff *skb, *tmp; 767 struct sk_buff *skb, *tmp;
756 768
757 /* Match queue limit against msg importance: */ 769 /* Match backlog limit against msg importance: */
758 if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp])) 770 if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit))
759 return tipc_link_cong(link, list); 771 return tipc_link_cong(link, list);
760 772
761 /* Has valid packet limit been used ? */
762 if (unlikely(msg_size(msg) > mtu)) { 773 if (unlikely(msg_size(msg) > mtu)) {
763 __skb_queue_purge(list); 774 __skb_queue_purge(list);
764 return -EMSGSIZE; 775 return -EMSGSIZE;
765 } 776 }
766
767 /* Prepare each packet for sending, and add to relevant queue: */ 777 /* Prepare each packet for sending, and add to relevant queue: */
768 skb_queue_walk_safe(list, skb, tmp) { 778 skb_queue_walk_safe(list, skb, tmp) {
769 __skb_unlink(skb, list); 779 __skb_unlink(skb, list);
@@ -786,8 +796,10 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
786 if (tipc_msg_make_bundle(&skb, mtu, link->addr)) { 796 if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {
787 link->stats.sent_bundled++; 797 link->stats.sent_bundled++;
788 link->stats.sent_bundles++; 798 link->stats.sent_bundles++;
799 imp = msg_importance(buf_msg(skb));
789 } 800 }
790 __skb_queue_tail(backlogq, skb); 801 __skb_queue_tail(backlogq, skb);
802 link->backlog[imp].len++;
791 seqno++; 803 seqno++;
792 } 804 }
793 link->next_out_no = seqno; 805 link->next_out_no = seqno;
@@ -914,6 +926,7 @@ void tipc_link_push_packets(struct tipc_link *link)
914 if (!skb) 926 if (!skb)
915 break; 927 break;
916 msg = buf_msg(skb); 928 msg = buf_msg(skb);
929 link->backlog[msg_importance(msg)].len--;
917 msg_set_ack(msg, ack); 930 msg_set_ack(msg, ack);
918 msg_set_bcast_ack(msg, link->owner->bclink.last_in); 931 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
919 link->rcv_unacked = 0; 932 link->rcv_unacked = 0;
@@ -1610,6 +1623,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1610 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, 1623 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
1611 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); 1624 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
1612 skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq); 1625 skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
1626 tipc_link_purge_backlog(l_ptr);
1613 msgcount = skb_queue_len(&l_ptr->transmq); 1627 msgcount = skb_queue_len(&l_ptr->transmq);
1614 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1628 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1615 msg_set_msgcnt(&tunnel_hdr, msgcount); 1629 msg_set_msgcnt(&tunnel_hdr, msgcount);
@@ -1817,11 +1831,11 @@ void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1817 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE); 1831 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE);
1818 1832
1819 l->window = win; 1833 l->window = win;
1820 l->queue_limit[TIPC_LOW_IMPORTANCE] = win / 2; 1834 l->backlog[TIPC_LOW_IMPORTANCE].limit = win / 2;
1821 l->queue_limit[TIPC_MEDIUM_IMPORTANCE] = win; 1835 l->backlog[TIPC_MEDIUM_IMPORTANCE].limit = win;
1822 l->queue_limit[TIPC_HIGH_IMPORTANCE] = win / 2 * 3; 1836 l->backlog[TIPC_HIGH_IMPORTANCE].limit = win / 2 * 3;
1823 l->queue_limit[TIPC_CRITICAL_IMPORTANCE] = win * 2; 1837 l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = win * 2;
1824 l->queue_limit[TIPC_SYSTEM_IMPORTANCE] = max_bulk; 1838 l->backlog[TIPC_SYSTEM_IMPORTANCE].limit = max_bulk;
1825} 1839}
1826 1840
1827/* tipc_link_find_owner - locate owner node of link by link's name 1841/* tipc_link_find_owner - locate owner node of link by link's name
@@ -2120,7 +2134,7 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
2120 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance)) 2134 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))
2121 goto prop_msg_full; 2135 goto prop_msg_full;
2122 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, 2136 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN,
2123 link->queue_limit[TIPC_LOW_IMPORTANCE])) 2137 link->window))
2124 goto prop_msg_full; 2138 goto prop_msg_full;
2125 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority)) 2139 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2126 goto prop_msg_full; 2140 goto prop_msg_full;