aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/msg.h
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-02-05 08:36:41 -0500
committerDavid S. Miller <davem@davemloft.net>2015-02-05 19:00:02 -0500
commitc637c1035534867b85b78b453c38c495b58e2c5a (patch)
tree77cd2a48a5b04e43b014da64168a6c1e209a1d40 /net/tipc/msg.h
parent94153e36e709e78fc4e1f93dc4e4da785690c7d1 (diff)
tipc: resolve race problem at unicast message reception
TIPC handles message cardinality and sequencing at the link layer, before passing messages upwards to the destination sockets. During the upcall from link to socket no locks are held. It is therefore possible, and we see it happen occasionally, that messages arriving in different threads and delivered in sequence still bypass each other before they reach the destination socket. This must not happen, since it violates the sequentiality guarantee. We solve this by adding a new input buffer queue to the link structure. Arriving messages are added safely to the tail of that queue by the link, while the head of the queue is consumed, also safely, by the receiving socket. Sequentiality is secured per socket by only allowing buffers to be dequeued inside the socket lock. Since there may be multiple simultaneous readers of the queue, we use a 'filter' parameter to reduce the risk that they peek the same buffer from the queue, hence also reducing the risk of contention on the receiving socket locks. This solves the sequentiality problem, and seems to cause no measurable performance degradation. A nice side effect of this change is that lock handling in the functions tipc_rcv() and tipc_bcast_rcv() now becomes uniform, something that will enable future simplifications of those functions. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/msg.h')
-rw-r--r--net/tipc/msg.h73
1 files changed, 73 insertions, 0 deletions
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 60702992933d..ab467261bd9d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -45,6 +45,7 @@
45 * Note: Some items are also used with TIPC internal message headers 45 * Note: Some items are also used with TIPC internal message headers
46 */ 46 */
47#define TIPC_VERSION 2 47#define TIPC_VERSION 2
48struct plist;
48 49
49/* 50/*
50 * Payload message users are defined in TIPC's public API: 51 * Payload message users are defined in TIPC's public API:
@@ -759,10 +760,82 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
759bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu); 760bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu);
760bool tipc_msg_make_bundle(struct sk_buff_head *list, 761bool tipc_msg_make_bundle(struct sk_buff_head *list,
761 struct sk_buff *skb, u32 mtu, u32 dnode); 762 struct sk_buff *skb, u32 mtu, u32 dnode);
763bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
762int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 764int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
763 int offset, int dsz, int mtu, struct sk_buff_head *list); 765 int offset, int dsz, int mtu, struct sk_buff_head *list);
764bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode, 766bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode,
765 int *err); 767 int *err);
766struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); 768struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
767 769
770/* tipc_skb_peek_port(): find a destination port, ignoring all destinations
771 * up to and including 'filter'.
772 * Note: ignoring previously tried destinations minimizes the risk of
773 * contention on the socket lock
774 * @list: list to be peeked in
775 * @filter: last destination to be ignored from search
776 * Returns a destination port number, of applicable.
777 */
778static inline u32 tipc_skb_peek_port(struct sk_buff_head *list, u32 filter)
779{
780 struct sk_buff *skb;
781 u32 dport = 0;
782 bool ignore = true;
783
784 spin_lock_bh(&list->lock);
785 skb_queue_walk(list, skb) {
786 dport = msg_destport(buf_msg(skb));
787 if (!filter || skb_queue_is_last(list, skb))
788 break;
789 if (dport == filter)
790 ignore = false;
791 else if (!ignore)
792 break;
793 }
794 spin_unlock_bh(&list->lock);
795 return dport;
796}
797
798/* tipc_skb_dequeue(): unlink first buffer with dest 'dport' from list
799 * @list: list to be unlinked from
800 * @dport: selection criteria for buffer to unlink
801 */
802static inline struct sk_buff *tipc_skb_dequeue(struct sk_buff_head *list,
803 u32 dport)
804{
805 struct sk_buff *_skb, *tmp, *skb = NULL;
806
807 spin_lock_bh(&list->lock);
808 skb_queue_walk_safe(list, _skb, tmp) {
809 if (msg_destport(buf_msg(_skb)) == dport) {
810 __skb_unlink(_skb, list);
811 skb = _skb;
812 break;
813 }
814 }
815 spin_unlock_bh(&list->lock);
816 return skb;
817}
818
819/* tipc_skb_queue_tail(): add buffer to tail of list;
820 * @list: list to be appended to
821 * @skb: buffer to append. Always appended
822 * @dport: the destination port of the buffer
823 * returns true if dport differs from previous destination
824 */
825static inline bool tipc_skb_queue_tail(struct sk_buff_head *list,
826 struct sk_buff *skb, u32 dport)
827{
828 struct sk_buff *_skb = NULL;
829 bool rv = false;
830
831 spin_lock_bh(&list->lock);
832 _skb = skb_peek_tail(list);
833 if (!_skb || (msg_destport(buf_msg(_skb)) != dport) ||
834 (skb_queue_len(list) > 32))
835 rv = true;
836 __skb_queue_tail(list, skb);
837 spin_unlock_bh(&list->lock);
838 return rv;
839}
840
768#endif 841#endif