summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2019-08-15 10:42:50 -0400
committerDavid S. Miller <davem@davemloft.net>2019-08-18 17:01:07 -0400
commite654f9f53b45fde3fcc8051830b212c7a8f36148 (patch)
tree1b3b3af06442bae94101ca20f1c0b6ca06d54e44
parent10086b345385bc1ca67b260aff236e742e2e630e (diff)
tipc: clean up skb list lock handling on send path
The policy for handling the skb list locks on the send and receive paths is simple. - On the send path we never need to grab the lock on the 'xmitq' list when the destination is an exernal node. - On the receive path we always need to grab the lock on the 'inputq' list, irrespective of source node. However, when transmitting node local messages those will eventually end up on the receive path of a local socket, meaning that the argument 'xmitq' in tipc_node_xmit() will become the 'ínputq' argument in the function tipc_sk_rcv(). This has been handled by always initializing the spinlock of the 'xmitq' list at message creation, just in case it may end up on the receive path later, and despite knowing that the lock in most cases never will be used. This approach is inaccurate and confusing, and has also concealed the fact that the stated 'no lock grabbing' policy for the send path is violated in some cases. We now clean up this by never initializing the lock at message creation, instead doing this at the moment we find that the message actually will enter the receive path. At the same time we fix the four locations where we incorrectly access the spinlock on the send/error path. This patch also reverts commit d12cffe9329f ("tipc: ensure head->lock is initialised") which has now become redundant. CC: Eric Dumazet <edumazet@google.com> Reported-by: Chris Packham <chris.packham@alliedtelesis.co.nz> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Xin Long <lucien.xin@gmail.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/bcast.c10
-rw-r--r--net/tipc/group.c4
-rw-r--r--net/tipc/link.c14
-rw-r--r--net/tipc/name_distr.c2
-rw-r--r--net/tipc/node.c7
-rw-r--r--net/tipc/socket.c14
6 files changed, 26 insertions, 25 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 34f3e5641438..6ef1abdd525f 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -185,7 +185,7 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
185 } 185 }
186 186
187 /* We have to transmit across all bearers */ 187 /* We have to transmit across all bearers */
188 skb_queue_head_init(&_xmitq); 188 __skb_queue_head_init(&_xmitq);
189 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { 189 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
190 if (!bb->dests[bearer_id]) 190 if (!bb->dests[bearer_id])
191 continue; 191 continue;
@@ -256,7 +256,7 @@ static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
256 struct sk_buff_head xmitq; 256 struct sk_buff_head xmitq;
257 int rc = 0; 257 int rc = 0;
258 258
259 skb_queue_head_init(&xmitq); 259 __skb_queue_head_init(&xmitq);
260 tipc_bcast_lock(net); 260 tipc_bcast_lock(net);
261 if (tipc_link_bc_peers(l)) 261 if (tipc_link_bc_peers(l))
262 rc = tipc_link_xmit(l, pkts, &xmitq); 262 rc = tipc_link_xmit(l, pkts, &xmitq);
@@ -286,7 +286,7 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
286 u32 dnode, selector; 286 u32 dnode, selector;
287 287
288 selector = msg_link_selector(buf_msg(skb_peek(pkts))); 288 selector = msg_link_selector(buf_msg(skb_peek(pkts)));
289 skb_queue_head_init(&_pkts); 289 __skb_queue_head_init(&_pkts);
290 290
291 list_for_each_entry_safe(dst, tmp, &dests->list, list) { 291 list_for_each_entry_safe(dst, tmp, &dests->list, list) {
292 dnode = dst->node; 292 dnode = dst->node;
@@ -344,7 +344,7 @@ static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
344 msg_set_size(_hdr, MCAST_H_SIZE); 344 msg_set_size(_hdr, MCAST_H_SIZE);
345 msg_set_is_rcast(_hdr, !msg_is_rcast(hdr)); 345 msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
346 346
347 skb_queue_head_init(&tmpq); 347 __skb_queue_head_init(&tmpq);
348 __skb_queue_tail(&tmpq, _skb); 348 __skb_queue_tail(&tmpq, _skb);
349 if (method->rcast) 349 if (method->rcast)
350 tipc_bcast_xmit(net, &tmpq, cong_link_cnt); 350 tipc_bcast_xmit(net, &tmpq, cong_link_cnt);
@@ -378,7 +378,7 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
378 int rc = 0; 378 int rc = 0;
379 379
380 skb_queue_head_init(&inputq); 380 skb_queue_head_init(&inputq);
381 skb_queue_head_init(&localq); 381 __skb_queue_head_init(&localq);
382 382
383 /* Clone packets before they are consumed by next call */ 383 /* Clone packets before they are consumed by next call */
384 if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { 384 if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 5f98d38bcf08..89257e2a980d 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -199,7 +199,7 @@ void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf)
199 struct tipc_member *m, *tmp; 199 struct tipc_member *m, *tmp;
200 struct sk_buff_head xmitq; 200 struct sk_buff_head xmitq;
201 201
202 skb_queue_head_init(&xmitq); 202 __skb_queue_head_init(&xmitq);
203 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 203 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
204 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq); 204 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq);
205 tipc_group_update_member(m, 0); 205 tipc_group_update_member(m, 0);
@@ -435,7 +435,7 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
435 return true; 435 return true;
436 if (state == MBR_PENDING && adv == ADV_IDLE) 436 if (state == MBR_PENDING && adv == ADV_IDLE)
437 return true; 437 return true;
438 skb_queue_head_init(&xmitq); 438 __skb_queue_head_init(&xmitq);
439 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); 439 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
440 tipc_node_distr_xmit(grp->net, &xmitq); 440 tipc_node_distr_xmit(grp->net, &xmitq);
441 return true; 441 return true;
diff --git a/net/tipc/link.c b/net/tipc/link.c
index dd3155b14654..289e848084ac 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -959,7 +959,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
959 pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n", 959 pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n",
960 skb_queue_len(list), msg_user(hdr), 960 skb_queue_len(list), msg_user(hdr),
961 msg_type(hdr), msg_size(hdr), mtu); 961 msg_type(hdr), msg_size(hdr), mtu);
962 skb_queue_purge(list); 962 __skb_queue_purge(list);
963 return -EMSGSIZE; 963 return -EMSGSIZE;
964 } 964 }
965 965
@@ -988,7 +988,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
988 if (likely(skb_queue_len(transmq) < maxwin)) { 988 if (likely(skb_queue_len(transmq) < maxwin)) {
989 _skb = skb_clone(skb, GFP_ATOMIC); 989 _skb = skb_clone(skb, GFP_ATOMIC);
990 if (!_skb) { 990 if (!_skb) {
991 skb_queue_purge(list); 991 __skb_queue_purge(list);
992 return -ENOBUFS; 992 return -ENOBUFS;
993 } 993 }
994 __skb_dequeue(list); 994 __skb_dequeue(list);
@@ -1668,7 +1668,7 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l,
1668 struct sk_buff *skb; 1668 struct sk_buff *skb;
1669 u32 dnode = l->addr; 1669 u32 dnode = l->addr;
1670 1670
1671 skb_queue_head_init(&tnlq); 1671 __skb_queue_head_init(&tnlq);
1672 skb = tipc_msg_create(TUNNEL_PROTOCOL, FAILOVER_MSG, 1672 skb = tipc_msg_create(TUNNEL_PROTOCOL, FAILOVER_MSG,
1673 INT_H_SIZE, BASIC_H_SIZE, 1673 INT_H_SIZE, BASIC_H_SIZE,
1674 dnode, onode, 0, 0, 0); 1674 dnode, onode, 0, 0, 0);
@@ -1708,9 +1708,9 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
1708 if (!tnl) 1708 if (!tnl)
1709 return; 1709 return;
1710 1710
1711 skb_queue_head_init(&tnlq); 1711 __skb_queue_head_init(&tnlq);
1712 skb_queue_head_init(&tmpxq); 1712 __skb_queue_head_init(&tmpxq);
1713 skb_queue_head_init(&frags); 1713 __skb_queue_head_init(&frags);
1714 1714
1715 /* At least one packet required for safe algorithm => add dummy */ 1715 /* At least one packet required for safe algorithm => add dummy */
1716 skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG, 1716 skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG,
@@ -1720,7 +1720,7 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
1720 pr_warn("%sunable to create tunnel packet\n", link_co_err); 1720 pr_warn("%sunable to create tunnel packet\n", link_co_err);
1721 return; 1721 return;
1722 } 1722 }
1723 skb_queue_tail(&tnlq, skb); 1723 __skb_queue_tail(&tnlq, skb);
1724 tipc_link_xmit(l, &tnlq, &tmpxq); 1724 tipc_link_xmit(l, &tnlq, &tmpxq);
1725 __skb_queue_purge(&tmpxq); 1725 __skb_queue_purge(&tmpxq);
1726 1726
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 44abc8e9c990..61219f0b9677 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -190,7 +190,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
190 struct name_table *nt = tipc_name_table(net); 190 struct name_table *nt = tipc_name_table(net);
191 struct sk_buff_head head; 191 struct sk_buff_head head;
192 192
193 skb_queue_head_init(&head); 193 __skb_queue_head_init(&head);
194 194
195 read_lock_bh(&nt->cluster_scope_lock); 195 read_lock_bh(&nt->cluster_scope_lock);
196 named_distribute(net, &head, dnode, &nt->cluster_scope); 196 named_distribute(net, &head, dnode, &nt->cluster_scope);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 1bdcf0fc1a4d..c8f6177dd5a2 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1444,13 +1444,14 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1444 1444
1445 if (in_own_node(net, dnode)) { 1445 if (in_own_node(net, dnode)) {
1446 tipc_loopback_trace(net, list); 1446 tipc_loopback_trace(net, list);
1447 spin_lock_init(&list->lock);
1447 tipc_sk_rcv(net, list); 1448 tipc_sk_rcv(net, list);
1448 return 0; 1449 return 0;
1449 } 1450 }
1450 1451
1451 n = tipc_node_find(net, dnode); 1452 n = tipc_node_find(net, dnode);
1452 if (unlikely(!n)) { 1453 if (unlikely(!n)) {
1453 skb_queue_purge(list); 1454 __skb_queue_purge(list);
1454 return -EHOSTUNREACH; 1455 return -EHOSTUNREACH;
1455 } 1456 }
1456 1457
@@ -1459,7 +1460,7 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1459 if (unlikely(bearer_id == INVALID_BEARER_ID)) { 1460 if (unlikely(bearer_id == INVALID_BEARER_ID)) {
1460 tipc_node_read_unlock(n); 1461 tipc_node_read_unlock(n);
1461 tipc_node_put(n); 1462 tipc_node_put(n);
1462 skb_queue_purge(list); 1463 __skb_queue_purge(list);
1463 return -EHOSTUNREACH; 1464 return -EHOSTUNREACH;
1464 } 1465 }
1465 1466
@@ -1491,7 +1492,7 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1491{ 1492{
1492 struct sk_buff_head head; 1493 struct sk_buff_head head;
1493 1494
1494 skb_queue_head_init(&head); 1495 __skb_queue_head_init(&head);
1495 __skb_queue_tail(&head, skb); 1496 __skb_queue_tail(&head, skb);
1496 tipc_node_xmit(net, &head, dnode, selector); 1497 tipc_node_xmit(net, &head, dnode, selector);
1497 return 0; 1498 return 0;
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 83ae41d7e554..3b9f8cc328f5 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -809,7 +809,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
809 msg_set_nameupper(hdr, seq->upper); 809 msg_set_nameupper(hdr, seq->upper);
810 810
811 /* Build message as chain of buffers */ 811 /* Build message as chain of buffers */
812 skb_queue_head_init(&pkts); 812 __skb_queue_head_init(&pkts);
813 rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts); 813 rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
814 814
815 /* Send message if build was successful */ 815 /* Send message if build was successful */
@@ -853,7 +853,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
853 msg_set_grp_bc_seqno(hdr, bc_snd_nxt); 853 msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
854 854
855 /* Build message as chain of buffers */ 855 /* Build message as chain of buffers */
856 skb_queue_head_init(&pkts); 856 __skb_queue_head_init(&pkts);
857 mtu = tipc_node_get_mtu(net, dnode, tsk->portid); 857 mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
858 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); 858 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
859 if (unlikely(rc != dlen)) 859 if (unlikely(rc != dlen))
@@ -1058,7 +1058,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1058 msg_set_grp_bc_ack_req(hdr, ack); 1058 msg_set_grp_bc_ack_req(hdr, ack);
1059 1059
1060 /* Build message as chain of buffers */ 1060 /* Build message as chain of buffers */
1061 skb_queue_head_init(&pkts); 1061 __skb_queue_head_init(&pkts);
1062 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); 1062 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1063 if (unlikely(rc != dlen)) 1063 if (unlikely(rc != dlen))
1064 return rc; 1064 return rc;
@@ -1387,7 +1387,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
1387 if (unlikely(rc)) 1387 if (unlikely(rc))
1388 return rc; 1388 return rc;
1389 1389
1390 skb_queue_head_init(&pkts); 1390 __skb_queue_head_init(&pkts);
1391 mtu = tipc_node_get_mtu(net, dnode, tsk->portid); 1391 mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
1392 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); 1392 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1393 if (unlikely(rc != dlen)) 1393 if (unlikely(rc != dlen))
@@ -1445,7 +1445,7 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
1445 int send, sent = 0; 1445 int send, sent = 0;
1446 int rc = 0; 1446 int rc = 0;
1447 1447
1448 skb_queue_head_init(&pkts); 1448 __skb_queue_head_init(&pkts);
1449 1449
1450 if (unlikely(dlen > INT_MAX)) 1450 if (unlikely(dlen > INT_MAX))
1451 return -EMSGSIZE; 1451 return -EMSGSIZE;
@@ -1805,7 +1805,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1805 1805
1806 /* Send group flow control advertisement when applicable */ 1806 /* Send group flow control advertisement when applicable */
1807 if (tsk->group && msg_in_group(hdr) && !grp_evt) { 1807 if (tsk->group && msg_in_group(hdr) && !grp_evt) {
1808 skb_queue_head_init(&xmitq); 1808 __skb_queue_head_init(&xmitq);
1809 tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen), 1809 tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
1810 msg_orignode(hdr), msg_origport(hdr), 1810 msg_orignode(hdr), msg_origport(hdr),
1811 &xmitq); 1811 &xmitq);
@@ -2674,7 +2674,7 @@ static void tipc_sk_timeout(struct timer_list *t)
2674 struct sk_buff_head list; 2674 struct sk_buff_head list;
2675 int rc = 0; 2675 int rc = 0;
2676 2676
2677 skb_queue_head_init(&list); 2677 __skb_queue_head_init(&list);
2678 bh_lock_sock(sk); 2678 bh_lock_sock(sk);
2679 2679
2680 /* Try again later if socket is busy */ 2680 /* Try again later if socket is busy */