aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-03-25 12:07:25 -0400
committerDavid S. Miller <davem@davemloft.net>2015-03-25 14:05:56 -0400
commit3127a0200d4a46cf279bb388cc0f71827cd60699 (patch)
treec0bf142814ecd5cea45db33cd8706c118ea6e45d
parent1f66d161ab3d8b518903fa6c3f9c1f48d6919e74 (diff)
tipc: clean up handling of link congestion
After the recent changes in message importance handling it becomes possible to simplify handling of messages and sockets when we encounter link congestion. We merge the function tipc_link_cong() into link_schedule_user(), and simplify the code of the latter. The code should now be easier to follow, especially regarding return codes and handling of the message that caused the situation. In case the scheduling function is unable to pre-allocate a wakeup message buffer, it now returns -ENOBUFS, which is a more correct code than the previously used -EHOSTUNREACH. Reviewed-by: Ying Xue <ying.xue@windriver.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/link.c104
-rw-r--r--net/tipc/msg.h28
2 files changed, 60 insertions, 72 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index b9325a1bddaa..58e2460682da 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -367,28 +367,43 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id,
367} 367}
368 368
369/** 369/**
370 * link_schedule_user - schedule user for wakeup after congestion 370 * link_schedule_user - schedule a message sender for wakeup after congestion
371 * @link: congested link 371 * @link: congested link
372 * @oport: sending port 372 * @list: message that was attempted sent
373 * @chain_sz: size of buffer chain that was attempted sent
374 * @imp: importance of message attempted sent
375 * Create pseudo msg to send back to user when congestion abates 373 * Create pseudo msg to send back to user when congestion abates
374 * Only consumes message if there is an error
376 */ 375 */
377static bool link_schedule_user(struct tipc_link *link, u32 oport, 376static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
378 uint chain_sz, uint imp)
379{ 377{
380 struct sk_buff *buf; 378 struct tipc_msg *msg = buf_msg(skb_peek(list));
379 int imp = msg_importance(msg);
380 u32 oport = msg_origport(msg);
381 u32 addr = link_own_addr(link);
382 struct sk_buff *skb;
381 383
382 buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, 384 /* This really cannot happen... */
383 link_own_addr(link), link_own_addr(link), 385 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
384 oport, 0, 0); 386 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
385 if (!buf) 387 tipc_link_reset(link);
386 return false; 388 goto err;
387 TIPC_SKB_CB(buf)->chain_sz = chain_sz; 389 }
388 TIPC_SKB_CB(buf)->chain_imp = imp; 390 /* Non-blocking sender: */
389 skb_queue_tail(&link->wakeupq, buf); 391 if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
392 return -ELINKCONG;
393
394 /* Create and schedule wakeup pseudo message */
395 skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
396 addr, addr, oport, 0, 0);
397 if (!skb)
398 goto err;
399 TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
400 TIPC_SKB_CB(skb)->chain_imp = imp;
401 skb_queue_tail(&link->wakeupq, skb);
390 link->stats.link_congs++; 402 link->stats.link_congs++;
391 return true; 403 return -ELINKCONG;
404err:
405 __skb_queue_purge(list);
406 return -ENOBUFS;
392} 407}
393 408
394/** 409/**
@@ -708,48 +723,15 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
708 } 723 }
709} 724}
710 725
711/* tipc_link_cong: determine return value and how to treat the
712 * sent buffer during link congestion.
713 * - For plain, errorless user data messages we keep the buffer and
714 * return -ELINKONG.
715 * - For all other messages we discard the buffer and return -EHOSTUNREACH
716 * - For TIPC internal messages we also reset the link
717 */
718static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
719{
720 struct sk_buff *skb = skb_peek(list);
721 struct tipc_msg *msg = buf_msg(skb);
722 int imp = msg_importance(msg);
723 u32 oport = msg_tot_origport(msg);
724
725 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
726 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
727 tipc_link_reset(link);
728 goto drop;
729 }
730 if (unlikely(msg_errcode(msg)))
731 goto drop;
732 if (unlikely(msg_reroute_cnt(msg)))
733 goto drop;
734 if (TIPC_SKB_CB(skb)->wakeup_pending)
735 return -ELINKCONG;
736 if (link_schedule_user(link, oport, skb_queue_len(list), imp))
737 return -ELINKCONG;
738drop:
739 __skb_queue_purge(list);
740 return -EHOSTUNREACH;
741}
742
743/** 726/**
744 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked 727 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
745 * @link: link to use 728 * @link: link to use
746 * @list: chain of buffers containing message 729 * @list: chain of buffers containing message
747 * 730 *
748 * Consumes the buffer chain, except when returning -ELINKCONG 731 * Consumes the buffer chain, except when returning -ELINKCONG,
749 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket 732 * since the caller then may want to make more send attempts.
750 * user data messages) or -EHOSTUNREACH (all other messages/senders) 733 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
751 * Only the socket functions tipc_send_stream() and tipc_send_packet() need 734 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
752 * to act on the return value, since they may need to do more send attempts.
753 */ 735 */
754int __tipc_link_xmit(struct net *net, struct tipc_link *link, 736int __tipc_link_xmit(struct net *net, struct tipc_link *link,
755 struct sk_buff_head *list) 737 struct sk_buff_head *list)
@@ -768,7 +750,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
768 750
769 /* Match backlog limit against msg importance: */ 751 /* Match backlog limit against msg importance: */
770 if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit)) 752 if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit))
771 return tipc_link_cong(link, list); 753 return link_schedule_user(link, list);
772 754
773 if (unlikely(msg_size(msg) > mtu)) { 755 if (unlikely(msg_size(msg) > mtu)) {
774 __skb_queue_purge(list); 756 __skb_queue_purge(list);
@@ -820,13 +802,25 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
820 return __tipc_link_xmit(link->owner->net, link, &head); 802 return __tipc_link_xmit(link->owner->net, link, &head);
821} 803}
822 804
805/* tipc_link_xmit_skb(): send single buffer to destination
806 * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
807 * messages, which will not be rejected
808 * The only exception is datagram messages rerouted after secondary
809 * lookup, which are rare and safe to dispose of anyway.
810 * TODO: Return real return value, and let callers use
811 * tipc_wait_for_sendpkt() where applicable
812 */
823int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, 813int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
824 u32 selector) 814 u32 selector)
825{ 815{
826 struct sk_buff_head head; 816 struct sk_buff_head head;
817 int rc;
827 818
828 skb2list(skb, &head); 819 skb2list(skb, &head);
829 return tipc_link_xmit(net, &head, dnode, selector); 820 rc = tipc_link_xmit(net, &head, dnode, selector);
821 if (rc == -ELINKCONG)
822 kfree_skb(skb);
823 return 0;
830} 824}
831 825
832/** 826/**
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index bd3969a80dd4..6445db09c0c4 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -240,6 +240,15 @@ static inline void msg_set_size(struct tipc_msg *m, u32 sz)
240 m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz); 240 m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);
241} 241}
242 242
243static inline unchar *msg_data(struct tipc_msg *m)
244{
245 return ((unchar *)m) + msg_hdr_sz(m);
246}
247
248static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
249{
250 return (struct tipc_msg *)msg_data(m);
251}
243 252
244/* 253/*
245 * Word 1 254 * Word 1
@@ -372,6 +381,8 @@ static inline void msg_set_prevnode(struct tipc_msg *m, u32 a)
372 381
373static inline u32 msg_origport(struct tipc_msg *m) 382static inline u32 msg_origport(struct tipc_msg *m)
374{ 383{
384 if (msg_user(m) == MSG_FRAGMENTER)
385 m = msg_get_wrapped(m);
375 return msg_word(m, 4); 386 return msg_word(m, 4);
376} 387}
377 388
@@ -467,16 +478,6 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
467 msg_set_word(m, 10, n); 478 msg_set_word(m, 10, n);
468} 479}
469 480
470static inline unchar *msg_data(struct tipc_msg *m)
471{
472 return ((unchar *)m) + msg_hdr_sz(m);
473}
474
475static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
476{
477 return (struct tipc_msg *)msg_data(m);
478}
479
480/* 481/*
481 * Constants and routines used to read and write TIPC internal message headers 482 * Constants and routines used to read and write TIPC internal message headers
482 */ 483 */
@@ -753,13 +754,6 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
753 msg_set_bits(m, 9, 0, 0xffff, n); 754 msg_set_bits(m, 9, 0, 0xffff, n);
754} 755}
755 756
756static inline u32 msg_tot_origport(struct tipc_msg *m)
757{
758 if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
759 return msg_origport(msg_get_wrapped(m));
760 return msg_origport(m);
761}
762
763struct sk_buff *tipc_buf_acquire(u32 size); 757struct sk_buff *tipc_buf_acquire(u32 size);
764bool tipc_msg_validate(struct sk_buff *skb); 758bool tipc_msg_validate(struct sk_buff *skb);
765bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode, 759bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,