aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-07-16 16:54:24 -0400
committerDavid S. Miller <davem@davemloft.net>2015-07-20 23:41:15 -0400
commitaf9b028e270fda6fb812d70d17d902297df1ceb5 (patch)
tree1a204c6d10d597d5db18908dc2066e980a78120d /net/tipc/link.c
parent22d85c79428b8ca9a01623aa3e3a1fe29a30a119 (diff)
tipc: make media xmit call outside node spinlock context
Currently, message sending is performed through a deep call chain, where the node spinlock is grabbed and held during a significant part of the transmission time. This is clearly detrimental to overall throughput performance; it would be better if we could send the message after the spinlock has been released. In this commit, we do instead let the call revert on the stack after the buffer chain has been added to the transmission queue, whereafter clones of the buffers are transmitted to the device layer outside the spinlock scope. As a further step in our effort to separate the roles of the node and link entities we also move the function tipc_link_xmit() to node.c, and rename it to tipc_node_xmit(). Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c132
1 files changed, 72 insertions, 60 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ea32679b6737..c052437a7cfa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -353,7 +353,6 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
353 /* This really cannot happen... */ 353 /* This really cannot happen... */
354 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { 354 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
355 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); 355 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
356 tipc_link_reset(link);
357 return -ENOBUFS; 356 return -ENOBUFS;
358 } 357 }
359 /* Non-blocking sender: */ 358 /* Non-blocking sender: */
@@ -701,6 +700,78 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
701 return 0; 700 return 0;
702} 701}
703 702
703/**
704 * tipc_link_xmit(): enqueue buffer list according to queue situation
705 * @link: link to use
706 * @list: chain of buffers containing message
707 * @xmitq: returned list of packets to be sent by caller
708 *
709 * Consumes the buffer chain, except when returning -ELINKCONG,
710 * since the caller then may want to make more send attempts.
711 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
712 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
713 */
714int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
715 struct sk_buff_head *xmitq)
716{
717 struct tipc_msg *hdr = buf_msg(skb_peek(list));
718 unsigned int maxwin = l->window;
719 unsigned int i, imp = msg_importance(hdr);
720 unsigned int mtu = l->mtu;
721 u16 ack = l->rcv_nxt - 1;
722 u16 seqno = l->snd_nxt;
723 u16 bc_last_in = l->owner->bclink.last_in;
724 struct sk_buff_head *transmq = &l->transmq;
725 struct sk_buff_head *backlogq = &l->backlogq;
726 struct sk_buff *skb, *_skb, *bskb;
727
728 /* Match msg importance against this and all higher backlog limits: */
729 for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
730 if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
731 return link_schedule_user(l, list);
732 }
733 if (unlikely(msg_size(hdr) > mtu))
734 return -EMSGSIZE;
735
736 /* Prepare each packet for sending, and add to relevant queue: */
737 while (skb_queue_len(list)) {
738 skb = skb_peek(list);
739 hdr = buf_msg(skb);
740 msg_set_seqno(hdr, seqno);
741 msg_set_ack(hdr, ack);
742 msg_set_bcast_ack(hdr, bc_last_in);
743
744 if (likely(skb_queue_len(transmq) < maxwin)) {
745 _skb = skb_clone(skb, GFP_ATOMIC);
746 if (!_skb)
747 return -ENOBUFS;
748 __skb_dequeue(list);
749 __skb_queue_tail(transmq, skb);
750 __skb_queue_tail(xmitq, _skb);
751 l->rcv_unacked = 0;
752 seqno++;
753 continue;
754 }
755 if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
756 kfree_skb(__skb_dequeue(list));
757 l->stats.sent_bundled++;
758 continue;
759 }
760 if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
761 kfree_skb(__skb_dequeue(list));
762 __skb_queue_tail(backlogq, bskb);
763 l->backlog[msg_importance(buf_msg(bskb))].len++;
764 l->stats.sent_bundled++;
765 l->stats.sent_bundles++;
766 continue;
767 }
768 l->backlog[imp].len += skb_queue_len(list);
769 skb_queue_splice_tail_init(list, backlogq);
770 }
771 l->snd_nxt = seqno;
772 return 0;
773}
774
704static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) 775static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
705{ 776{
706 skb_queue_head_init(list); 777 skb_queue_head_init(list);
@@ -715,65 +786,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
715 return __tipc_link_xmit(link->owner->net, link, &head); 786 return __tipc_link_xmit(link->owner->net, link, &head);
716} 787}
717 788
718/* tipc_link_xmit_skb(): send single buffer to destination
719 * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
720 * messages, which will not cause link congestion
721 * The only exception is datagram messages rerouted after secondary
722 * lookup, which are rare and safe to dispose of anyway.
723 * TODO: Return real return value, and let callers use
724 * tipc_wait_for_sendpkt() where applicable
725 */
726int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
727 u32 selector)
728{
729 struct sk_buff_head head;
730 int rc;
731
732 skb2list(skb, &head);
733 rc = tipc_link_xmit(net, &head, dnode, selector);
734 if (rc)
735 kfree_skb(skb);
736 return 0;
737}
738
739/**
740 * tipc_link_xmit() is the general link level function for message sending
741 * @net: the applicable net namespace
742 * @list: chain of buffers containing message
743 * @dsz: amount of user data to be sent
744 * @dnode: address of destination node
745 * @selector: a number used for deterministic link selection
746 * Consumes the buffer chain, except when returning error
747 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
748 */
749int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
750 u32 selector)
751{
752 struct tipc_link *link = NULL;
753 struct tipc_node *node;
754 int rc = -EHOSTUNREACH;
755
756 node = tipc_node_find(net, dnode);
757 if (node) {
758 tipc_node_lock(node);
759 link = node_active_link(node, selector & 1);
760 if (link)
761 rc = __tipc_link_xmit(net, link, list);
762 tipc_node_unlock(node);
763 tipc_node_put(node);
764 }
765 if (link)
766 return rc;
767
768 if (likely(in_own_node(net, dnode))) {
769 tipc_sk_rcv(net, list);
770 return 0;
771 }
772
773 __skb_queue_purge(list);
774 return rc;
775}
776
777/* 789/*
778 * tipc_link_sync_xmit - synchronize broadcast link endpoints. 790 * tipc_link_sync_xmit - synchronize broadcast link endpoints.
779 * 791 *