aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c204
-rw-r--r--net/tipc/bcast.h33
-rw-r--r--net/tipc/bearer.c15
-rw-r--r--net/tipc/bearer.h8
-rw-r--r--net/tipc/link.c87
-rw-r--r--net/tipc/msg.c17
-rw-r--r--net/tipc/msg.h11
-rw-r--r--net/tipc/name_table.c128
-rw-r--r--net/tipc/name_table.h24
-rw-r--r--net/tipc/node.c42
-rw-r--r--net/tipc/node.h4
-rw-r--r--net/tipc/socket.c495
-rw-r--r--net/tipc/udp_media.c8
13 files changed, 672 insertions, 404 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index aa1babbea385..7d99029df342 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/bcast.c: TIPC broadcast code 2 * net/tipc/bcast.c: TIPC broadcast code
3 * 3 *
4 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB 4 * Copyright (c) 2004-2006, 2014-2016, Ericsson AB
5 * Copyright (c) 2004, Intel Corporation. 5 * Copyright (c) 2004, Intel Corporation.
6 * Copyright (c) 2005, 2010-2011, Wind River Systems 6 * Copyright (c) 2005, 2010-2011, Wind River Systems
7 * All rights reserved. 7 * All rights reserved.
@@ -39,9 +39,8 @@
39#include "socket.h" 39#include "socket.h"
40#include "msg.h" 40#include "msg.h"
41#include "bcast.h" 41#include "bcast.h"
42#include "name_distr.h"
43#include "link.h" 42#include "link.h"
44#include "node.h" 43#include "name_table.h"
45 44
46#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ 45#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */
47#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ 46#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */
@@ -54,12 +53,20 @@ const char tipc_bclink_name[] = "broadcast-link";
54 * @inputq: data input queue; will only carry SOCK_WAKEUP messages 53 * @inputq: data input queue; will only carry SOCK_WAKEUP messages
55 * @dest: array keeping number of reachable destinations per bearer 54 * @dest: array keeping number of reachable destinations per bearer
56 * @primary_bearer: a bearer having links to all broadcast destinations, if any 55 * @primary_bearer: a bearer having links to all broadcast destinations, if any
56 * @bcast_support: indicates if primary bearer, if any, supports broadcast
57 * @rcast_support: indicates if all peer nodes support replicast
58 * @rc_ratio: dest count as percentage of cluster size where send method changes
59 * @bc_threshold: calculated drom rc_ratio; if dests > threshold use broadcast
57 */ 60 */
58struct tipc_bc_base { 61struct tipc_bc_base {
59 struct tipc_link *link; 62 struct tipc_link *link;
60 struct sk_buff_head inputq; 63 struct sk_buff_head inputq;
61 int dests[MAX_BEARERS]; 64 int dests[MAX_BEARERS];
62 int primary_bearer; 65 int primary_bearer;
66 bool bcast_support;
67 bool rcast_support;
68 int rc_ratio;
69 int bc_threshold;
63}; 70};
64 71
65static struct tipc_bc_base *tipc_bc_base(struct net *net) 72static struct tipc_bc_base *tipc_bc_base(struct net *net)
@@ -69,7 +76,20 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
69 76
70int tipc_bcast_get_mtu(struct net *net) 77int tipc_bcast_get_mtu(struct net *net)
71{ 78{
72 return tipc_link_mtu(tipc_bc_sndlink(net)); 79 return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
80}
81
82void tipc_bcast_disable_rcast(struct net *net)
83{
84 tipc_bc_base(net)->rcast_support = false;
85}
86
87static void tipc_bcbase_calc_bc_threshold(struct net *net)
88{
89 struct tipc_bc_base *bb = tipc_bc_base(net);
90 int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net));
91
92 bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100);
73} 93}
74 94
75/* tipc_bcbase_select_primary(): find a bearer with links to all destinations, 95/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
@@ -79,9 +99,10 @@ static void tipc_bcbase_select_primary(struct net *net)
79{ 99{
80 struct tipc_bc_base *bb = tipc_bc_base(net); 100 struct tipc_bc_base *bb = tipc_bc_base(net);
81 int all_dests = tipc_link_bc_peers(bb->link); 101 int all_dests = tipc_link_bc_peers(bb->link);
82 int i, mtu; 102 int i, mtu, prim;
83 103
84 bb->primary_bearer = INVALID_BEARER_ID; 104 bb->primary_bearer = INVALID_BEARER_ID;
105 bb->bcast_support = true;
85 106
86 if (!all_dests) 107 if (!all_dests)
87 return; 108 return;
@@ -93,7 +114,7 @@ static void tipc_bcbase_select_primary(struct net *net)
93 mtu = tipc_bearer_mtu(net, i); 114 mtu = tipc_bearer_mtu(net, i);
94 if (mtu < tipc_link_mtu(bb->link)) 115 if (mtu < tipc_link_mtu(bb->link))
95 tipc_link_set_mtu(bb->link, mtu); 116 tipc_link_set_mtu(bb->link, mtu);
96 117 bb->bcast_support &= tipc_bearer_bcast_support(net, i);
97 if (bb->dests[i] < all_dests) 118 if (bb->dests[i] < all_dests)
98 continue; 119 continue;
99 120
@@ -103,6 +124,9 @@ static void tipc_bcbase_select_primary(struct net *net)
103 if ((i ^ tipc_own_addr(net)) & 1) 124 if ((i ^ tipc_own_addr(net)) & 1)
104 break; 125 break;
105 } 126 }
127 prim = bb->primary_bearer;
128 if (prim != INVALID_BEARER_ID)
129 bb->bcast_support = tipc_bearer_bcast_support(net, prim);
106} 130}
107 131
108void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) 132void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id)
@@ -170,45 +194,131 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
170 __skb_queue_purge(&_xmitq); 194 __skb_queue_purge(&_xmitq);
171} 195}
172 196
173/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster 197static void tipc_bcast_select_xmit_method(struct net *net, int dests,
174 * and to identified node local sockets 198 struct tipc_mc_method *method)
199{
200 struct tipc_bc_base *bb = tipc_bc_base(net);
201 unsigned long exp = method->expires;
202
203 /* Broadcast supported by used bearer/bearers? */
204 if (!bb->bcast_support) {
205 method->rcast = true;
206 return;
207 }
208 /* Any destinations which don't support replicast ? */
209 if (!bb->rcast_support) {
210 method->rcast = false;
211 return;
212 }
213 /* Can current method be changed ? */
214 method->expires = jiffies + TIPC_METHOD_EXPIRE;
215 if (method->mandatory || time_before(jiffies, exp))
216 return;
217
218 /* Determine method to use now */
219 method->rcast = dests <= bb->bc_threshold;
220}
221
222/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
175 * @net: the applicable net namespace 223 * @net: the applicable net namespace
176 * @list: chain of buffers containing message 224 * @pkts: chain of buffers containing message
177 * Consumes the buffer chain, except when returning -ELINKCONG 225 * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
178 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 226 * Consumes the buffer chain.
227 * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
179 */ 228 */
180int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) 229static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
230 u16 *cong_link_cnt)
181{ 231{
182 struct tipc_link *l = tipc_bc_sndlink(net); 232 struct tipc_link *l = tipc_bc_sndlink(net);
183 struct sk_buff_head xmitq, inputq, rcvq; 233 struct sk_buff_head xmitq;
184 int rc = 0; 234 int rc = 0;
185 235
186 __skb_queue_head_init(&rcvq);
187 __skb_queue_head_init(&xmitq); 236 __skb_queue_head_init(&xmitq);
188 skb_queue_head_init(&inputq);
189
190 /* Prepare message clone for local node */
191 if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
192 return -EHOSTUNREACH;
193
194 tipc_bcast_lock(net); 237 tipc_bcast_lock(net);
195 if (tipc_link_bc_peers(l)) 238 if (tipc_link_bc_peers(l))
196 rc = tipc_link_xmit(l, list, &xmitq); 239 rc = tipc_link_xmit(l, pkts, &xmitq);
197 tipc_bcast_unlock(net); 240 tipc_bcast_unlock(net);
198 241 tipc_bcbase_xmit(net, &xmitq);
199 /* Don't send to local node if adding to link failed */ 242 __skb_queue_purge(pkts);
200 if (unlikely(rc)) { 243 if (rc == -ELINKCONG) {
201 __skb_queue_purge(&rcvq); 244 *cong_link_cnt = 1;
202 return rc; 245 rc = 0;
203 } 246 }
247 return rc;
248}
204 249
205 /* Broadcast to all nodes, inluding local node */ 250/* tipc_rcast_xmit - replicate and send a message to given destination nodes
206 tipc_bcbase_xmit(net, &xmitq); 251 * @net: the applicable net namespace
207 tipc_sk_mcast_rcv(net, &rcvq, &inputq); 252 * @pkts: chain of buffers containing message
208 __skb_queue_purge(list); 253 * @dests: list of destination nodes
254 * @cong_link_cnt: returns number of congested links
255 * @cong_links: returns identities of congested links
256 * Returns 0 if success, otherwise errno
257 */
258static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
259 struct tipc_nlist *dests, u16 *cong_link_cnt)
260{
261 struct sk_buff_head _pkts;
262 struct u32_item *n, *tmp;
263 u32 dst, selector;
264
265 selector = msg_link_selector(buf_msg(skb_peek(pkts)));
266 __skb_queue_head_init(&_pkts);
267
268 list_for_each_entry_safe(n, tmp, &dests->list, list) {
269 dst = n->value;
270 if (!tipc_msg_pskb_copy(dst, pkts, &_pkts))
271 return -ENOMEM;
272
273 /* Any other return value than -ELINKCONG is ignored */
274 if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG)
275 (*cong_link_cnt)++;
276 }
209 return 0; 277 return 0;
210} 278}
211 279
280/* tipc_mcast_xmit - deliver message to indicated destination nodes
281 * and to identified node local sockets
282 * @net: the applicable net namespace
283 * @pkts: chain of buffers containing message
284 * @method: send method to be used
285 * @dests: destination nodes for message.
286 * @cong_link_cnt: returns number of encountered congested destination links
287 * Consumes buffer chain.
288 * Returns 0 if success, otherwise errno
289 */
290int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
291 struct tipc_mc_method *method, struct tipc_nlist *dests,
292 u16 *cong_link_cnt)
293{
294 struct sk_buff_head inputq, localq;
295 int rc = 0;
296
297 skb_queue_head_init(&inputq);
298 skb_queue_head_init(&localq);
299
300 /* Clone packets before they are consumed by next call */
301 if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
302 rc = -ENOMEM;
303 goto exit;
304 }
305 /* Send according to determined transmit method */
306 if (dests->remote) {
307 tipc_bcast_select_xmit_method(net, dests->remote, method);
308 if (method->rcast)
309 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
310 else
311 rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
312 }
313
314 if (dests->local)
315 tipc_sk_mcast_rcv(net, &localq, &inputq);
316exit:
317 /* This queue should normally be empty by now */
318 __skb_queue_purge(pkts);
319 return rc;
320}
321
212/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link 322/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
213 * 323 *
214 * RCU is locked, no other locks set 324 * RCU is locked, no other locks set
@@ -313,6 +423,7 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
313 tipc_bcast_lock(net); 423 tipc_bcast_lock(net);
314 tipc_link_add_bc_peer(snd_l, uc_l, xmitq); 424 tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
315 tipc_bcbase_select_primary(net); 425 tipc_bcbase_select_primary(net);
426 tipc_bcbase_calc_bc_threshold(net);
316 tipc_bcast_unlock(net); 427 tipc_bcast_unlock(net);
317} 428}
318 429
@@ -331,6 +442,7 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
331 tipc_bcast_lock(net); 442 tipc_bcast_lock(net);
332 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); 443 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
333 tipc_bcbase_select_primary(net); 444 tipc_bcbase_select_primary(net);
445 tipc_bcbase_calc_bc_threshold(net);
334 tipc_bcast_unlock(net); 446 tipc_bcast_unlock(net);
335 447
336 tipc_bcbase_xmit(net, &xmitq); 448 tipc_bcbase_xmit(net, &xmitq);
@@ -413,6 +525,8 @@ int tipc_bcast_init(struct net *net)
413 goto enomem; 525 goto enomem;
414 bb->link = l; 526 bb->link = l;
415 tn->bcl = l; 527 tn->bcl = l;
528 bb->rc_ratio = 25;
529 bb->rcast_support = true;
416 return 0; 530 return 0;
417enomem: 531enomem:
418 kfree(bb); 532 kfree(bb);
@@ -428,3 +542,33 @@ void tipc_bcast_stop(struct net *net)
428 kfree(tn->bcbase); 542 kfree(tn->bcbase);
429 kfree(tn->bcl); 543 kfree(tn->bcl);
430} 544}
545
546void tipc_nlist_init(struct tipc_nlist *nl, u32 self)
547{
548 memset(nl, 0, sizeof(*nl));
549 INIT_LIST_HEAD(&nl->list);
550 nl->self = self;
551}
552
553void tipc_nlist_add(struct tipc_nlist *nl, u32 node)
554{
555 if (node == nl->self)
556 nl->local = true;
557 else if (u32_push(&nl->list, node))
558 nl->remote++;
559}
560
561void tipc_nlist_del(struct tipc_nlist *nl, u32 node)
562{
563 if (node == nl->self)
564 nl->local = false;
565 else if (u32_del(&nl->list, node))
566 nl->remote--;
567}
568
569void tipc_nlist_purge(struct tipc_nlist *nl)
570{
571 u32_list_purge(&nl->list);
572 nl->remote = 0;
573 nl->local = 0;
574}
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 855d53c64ab3..751530ab0c49 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -42,9 +42,35 @@
42struct tipc_node; 42struct tipc_node;
43struct tipc_msg; 43struct tipc_msg;
44struct tipc_nl_msg; 44struct tipc_nl_msg;
45struct tipc_node_map; 45struct tipc_nlist;
46struct tipc_nitem;
46extern const char tipc_bclink_name[]; 47extern const char tipc_bclink_name[];
47 48
49#define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000)
50
51struct tipc_nlist {
52 struct list_head list;
53 u32 self;
54 u16 remote;
55 bool local;
56};
57
58void tipc_nlist_init(struct tipc_nlist *nl, u32 self);
59void tipc_nlist_purge(struct tipc_nlist *nl);
60void tipc_nlist_add(struct tipc_nlist *nl, u32 node);
61void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
62
63/* Cookie to be used between socket and broadcast layer
64 * @rcast: replicast (instead of broadcast) was used at previous xmit
65 * @mandatory: broadcast/replicast indication was set by user
66 * @expires: re-evaluate non-mandatory transmit method if we are past this
67 */
68struct tipc_mc_method {
69 bool rcast;
70 bool mandatory;
71 unsigned long expires;
72};
73
48int tipc_bcast_init(struct net *net); 74int tipc_bcast_init(struct net *net);
49void tipc_bcast_stop(struct net *net); 75void tipc_bcast_stop(struct net *net);
50void tipc_bcast_add_peer(struct net *net, struct tipc_link *l, 76void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
@@ -53,7 +79,10 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
53void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id); 79void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
54void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id); 80void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
55int tipc_bcast_get_mtu(struct net *net); 81int tipc_bcast_get_mtu(struct net *net);
56int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list); 82void tipc_bcast_disable_rcast(struct net *net);
83int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
84 struct tipc_mc_method *method, struct tipc_nlist *dests,
85 u16 *cong_link_cnt);
57int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb); 86int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
58void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, 87void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
59 struct tipc_msg *hdr); 88 struct tipc_msg *hdr);
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 52d74760fb68..33a5bdfbef76 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -431,7 +431,7 @@ int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b,
431 memset(&b->bcast_addr, 0, sizeof(b->bcast_addr)); 431 memset(&b->bcast_addr, 0, sizeof(b->bcast_addr));
432 memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len); 432 memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len);
433 b->bcast_addr.media_id = b->media->type_id; 433 b->bcast_addr.media_id = b->media->type_id;
434 b->bcast_addr.broadcast = 1; 434 b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT;
435 b->mtu = dev->mtu; 435 b->mtu = dev->mtu;
436 b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr); 436 b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr);
437 rcu_assign_pointer(dev->tipc_ptr, b); 437 rcu_assign_pointer(dev->tipc_ptr, b);
@@ -482,6 +482,19 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
482 return 0; 482 return 0;
483} 483}
484 484
485bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id)
486{
487 bool supp = false;
488 struct tipc_bearer *b;
489
490 rcu_read_lock();
491 b = bearer_get(net, bearer_id);
492 if (b)
493 supp = (b->bcast_addr.broadcast == TIPC_BROADCAST_SUPPORT);
494 rcu_read_unlock();
495 return supp;
496}
497
485int tipc_bearer_mtu(struct net *net, u32 bearer_id) 498int tipc_bearer_mtu(struct net *net, u32 bearer_id)
486{ 499{
487 int mtu = 0; 500 int mtu = 0;
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 278ff7f616f9..635c9086e19a 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -60,9 +60,14 @@
60#define TIPC_MEDIA_TYPE_IB 2 60#define TIPC_MEDIA_TYPE_IB 2
61#define TIPC_MEDIA_TYPE_UDP 3 61#define TIPC_MEDIA_TYPE_UDP 3
62 62
63/* minimum bearer MTU */ 63/* Minimum bearer MTU */
64#define TIPC_MIN_BEARER_MTU (MAX_H_SIZE + INT_H_SIZE) 64#define TIPC_MIN_BEARER_MTU (MAX_H_SIZE + INT_H_SIZE)
65 65
66/* Identifiers for distinguishing between broadcast/multicast and replicast
67 */
68#define TIPC_BROADCAST_SUPPORT 1
69#define TIPC_REPLICAST_SUPPORT 2
70
66/** 71/**
67 * struct tipc_media_addr - destination address used by TIPC bearers 72 * struct tipc_media_addr - destination address used by TIPC bearers
68 * @value: address info (format defined by media) 73 * @value: address info (format defined by media)
@@ -210,6 +215,7 @@ int tipc_bearer_setup(void);
210void tipc_bearer_cleanup(void); 215void tipc_bearer_cleanup(void);
211void tipc_bearer_stop(struct net *net); 216void tipc_bearer_stop(struct net *net);
212int tipc_bearer_mtu(struct net *net, u32 bearer_id); 217int tipc_bearer_mtu(struct net *net, u32 bearer_id);
218bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id);
213void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id, 219void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
214 struct sk_buff *skb, 220 struct sk_buff *skb,
215 struct tipc_media_addr *dest); 221 struct tipc_media_addr *dest);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 4e8647aef01c..ddd2dd6f77aa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -515,6 +515,10 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
515 if (link_is_bc_sndlink(l)) 515 if (link_is_bc_sndlink(l))
516 l->state = LINK_ESTABLISHED; 516 l->state = LINK_ESTABLISHED;
517 517
518 /* Disable replicast if even a single peer doesn't support it */
519 if (link_is_bc_rcvlink(l) && !(peer_caps & TIPC_BCAST_RCAST))
520 tipc_bcast_disable_rcast(net);
521
518 return true; 522 return true;
519} 523}
520 524
@@ -776,60 +780,47 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
776 780
777/** 781/**
778 * link_schedule_user - schedule a message sender for wakeup after congestion 782 * link_schedule_user - schedule a message sender for wakeup after congestion
779 * @link: congested link 783 * @l: congested link
780 * @list: message that was attempted sent 784 * @hdr: header of message that is being sent
781 * Create pseudo msg to send back to user when congestion abates 785 * Create pseudo msg to send back to user when congestion abates
782 * Does not consume buffer list
783 */ 786 */
784static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list) 787static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
785{ 788{
786 struct tipc_msg *msg = buf_msg(skb_peek(list)); 789 u32 dnode = tipc_own_addr(l->net);
787 int imp = msg_importance(msg); 790 u32 dport = msg_origport(hdr);
788 u32 oport = msg_origport(msg);
789 u32 addr = tipc_own_addr(link->net);
790 struct sk_buff *skb; 791 struct sk_buff *skb;
791 792
792 /* This really cannot happen... */
793 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
794 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
795 return -ENOBUFS;
796 }
797 /* Non-blocking sender: */
798 if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
799 return -ELINKCONG;
800
801 /* Create and schedule wakeup pseudo message */ 793 /* Create and schedule wakeup pseudo message */
802 skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, 794 skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
803 addr, addr, oport, 0, 0); 795 dnode, l->addr, dport, 0, 0);
804 if (!skb) 796 if (!skb)
805 return -ENOBUFS; 797 return -ENOBUFS;
806 TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list); 798 msg_set_dest_droppable(buf_msg(skb), true);
807 TIPC_SKB_CB(skb)->chain_imp = imp; 799 TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr);
808 skb_queue_tail(&link->wakeupq, skb); 800 skb_queue_tail(&l->wakeupq, skb);
809 link->stats.link_congs++; 801 l->stats.link_congs++;
810 return -ELINKCONG; 802 return -ELINKCONG;
811} 803}
812 804
813/** 805/**
814 * link_prepare_wakeup - prepare users for wakeup after congestion 806 * link_prepare_wakeup - prepare users for wakeup after congestion
815 * @link: congested link 807 * @l: congested link
816 * Move a number of waiting users, as permitted by available space in 808 * Wake up a number of waiting users, as permitted by available space
817 * the send queue, from link wait queue to node wait queue for wakeup 809 * in the send queue
818 */ 810 */
819void link_prepare_wakeup(struct tipc_link *l) 811void link_prepare_wakeup(struct tipc_link *l)
820{ 812{
821 int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
822 int imp, lim;
823 struct sk_buff *skb, *tmp; 813 struct sk_buff *skb, *tmp;
814 int imp, i = 0;
824 815
825 skb_queue_walk_safe(&l->wakeupq, skb, tmp) { 816 skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
826 imp = TIPC_SKB_CB(skb)->chain_imp; 817 imp = TIPC_SKB_CB(skb)->chain_imp;
827 lim = l->backlog[imp].limit; 818 if (l->backlog[imp].len < l->backlog[imp].limit) {
828 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; 819 skb_unlink(skb, &l->wakeupq);
829 if ((pnd[imp] + l->backlog[imp].len) >= lim) 820 skb_queue_tail(l->inputq, skb);
821 } else if (i++ > 10) {
830 break; 822 break;
831 skb_unlink(skb, &l->wakeupq); 823 }
832 skb_queue_tail(l->inputq, skb);
833 } 824 }
834} 825}
835 826
@@ -869,8 +860,7 @@ void tipc_link_reset(struct tipc_link *l)
869 * @list: chain of buffers containing message 860 * @list: chain of buffers containing message
870 * @xmitq: returned list of packets to be sent by caller 861 * @xmitq: returned list of packets to be sent by caller
871 * 862 *
872 * Consumes the buffer chain, except when returning -ELINKCONG, 863 * Consumes the buffer chain.
873 * since the caller then may want to make more send attempts.
874 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS 864 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
875 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted 865 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
876 */ 866 */
@@ -879,7 +869,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
879{ 869{
880 struct tipc_msg *hdr = buf_msg(skb_peek(list)); 870 struct tipc_msg *hdr = buf_msg(skb_peek(list));
881 unsigned int maxwin = l->window; 871 unsigned int maxwin = l->window;
882 unsigned int i, imp = msg_importance(hdr); 872 int imp = msg_importance(hdr);
883 unsigned int mtu = l->mtu; 873 unsigned int mtu = l->mtu;
884 u16 ack = l->rcv_nxt - 1; 874 u16 ack = l->rcv_nxt - 1;
885 u16 seqno = l->snd_nxt; 875 u16 seqno = l->snd_nxt;
@@ -888,19 +878,22 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
888 struct sk_buff_head *backlogq = &l->backlogq; 878 struct sk_buff_head *backlogq = &l->backlogq;
889 struct sk_buff *skb, *_skb, *bskb; 879 struct sk_buff *skb, *_skb, *bskb;
890 int pkt_cnt = skb_queue_len(list); 880 int pkt_cnt = skb_queue_len(list);
881 int rc = 0;
891 882
892 /* Match msg importance against this and all higher backlog limits: */
893 if (!skb_queue_empty(backlogq)) {
894 for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
895 if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
896 return link_schedule_user(l, list);
897 }
898 }
899 if (unlikely(msg_size(hdr) > mtu)) { 883 if (unlikely(msg_size(hdr) > mtu)) {
900 skb_queue_purge(list); 884 skb_queue_purge(list);
901 return -EMSGSIZE; 885 return -EMSGSIZE;
902 } 886 }
903 887
888 /* Allow oversubscription of one data msg per source at congestion */
889 if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) {
890 if (imp == TIPC_SYSTEM_IMPORTANCE) {
891 pr_warn("%s<%s>, link overflow", link_rst_msg, l->name);
892 return -ENOBUFS;
893 }
894 rc = link_schedule_user(l, hdr);
895 }
896
904 if (pkt_cnt > 1) { 897 if (pkt_cnt > 1) {
905 l->stats.sent_fragmented++; 898 l->stats.sent_fragmented++;
906 l->stats.sent_fragments += pkt_cnt; 899 l->stats.sent_fragments += pkt_cnt;
@@ -946,7 +939,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
946 skb_queue_splice_tail_init(list, backlogq); 939 skb_queue_splice_tail_init(list, backlogq);
947 } 940 }
948 l->snd_nxt = seqno; 941 l->snd_nxt = seqno;
949 return 0; 942 return rc;
950} 943}
951 944
952void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) 945void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
@@ -1043,11 +1036,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
1043static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, 1036static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1044 struct sk_buff_head *inputq) 1037 struct sk_buff_head *inputq)
1045{ 1038{
1046 switch (msg_user(buf_msg(skb))) { 1039 struct tipc_msg *hdr = buf_msg(skb);
1040
1041 switch (msg_user(hdr)) {
1047 case TIPC_LOW_IMPORTANCE: 1042 case TIPC_LOW_IMPORTANCE:
1048 case TIPC_MEDIUM_IMPORTANCE: 1043 case TIPC_MEDIUM_IMPORTANCE:
1049 case TIPC_HIGH_IMPORTANCE: 1044 case TIPC_HIGH_IMPORTANCE:
1050 case TIPC_CRITICAL_IMPORTANCE: 1045 case TIPC_CRITICAL_IMPORTANCE:
1046 if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
1047 skb_queue_tail(l->bc_rcvlink->inputq, skb);
1048 return true;
1049 }
1051 case CONN_MANAGER: 1050 case CONN_MANAGER:
1052 skb_queue_tail(inputq, skb); 1051 skb_queue_tail(inputq, skb);
1053 return true; 1052 return true;
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index ab02d0742476..312ef7de57d7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -607,6 +607,23 @@ error:
607 return false; 607 return false;
608} 608}
609 609
610bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
611 struct sk_buff_head *cpy)
612{
613 struct sk_buff *skb, *_skb;
614
615 skb_queue_walk(msg, skb) {
616 _skb = pskb_copy(skb, GFP_ATOMIC);
617 if (!_skb) {
618 __skb_queue_purge(cpy);
619 return false;
620 }
621 msg_set_destnode(buf_msg(_skb), dst);
622 __skb_queue_tail(cpy, _skb);
623 }
624 return true;
625}
626
610/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number 627/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
611 * @list: list to be appended to 628 * @list: list to be appended to
612 * @seqno: sequence number of buffer to add 629 * @seqno: sequence number of buffer to add
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 2c3dc38abf9c..c843fd2bc48d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -98,8 +98,6 @@ struct tipc_skb_cb {
98 u32 bytes_read; 98 u32 bytes_read;
99 struct sk_buff *tail; 99 struct sk_buff *tail;
100 bool validated; 100 bool validated;
101 bool wakeup_pending;
102 u16 chain_sz;
103 u16 chain_imp; 101 u16 chain_imp;
104 u16 ackers; 102 u16 ackers;
105}; 103};
@@ -633,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg *m, u32 id)
633 631
634static inline u32 msg_link_selector(struct tipc_msg *m) 632static inline u32 msg_link_selector(struct tipc_msg *m)
635{ 633{
634 if (msg_user(m) == MSG_FRAGMENTER)
635 m = (void *)msg_data(m);
636 return msg_bits(m, 4, 0, 1); 636 return msg_bits(m, 4, 0, 1);
637} 637}
638 638
639static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
640{
641 msg_set_bits(m, 4, 0, 1, n);
642}
643
644/* 639/*
645 * Word 5 640 * Word 5
646 */ 641 */
@@ -837,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
837 int offset, int dsz, int mtu, struct sk_buff_head *list); 832 int offset, int dsz, int mtu, struct sk_buff_head *list);
838bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); 833bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
839bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq); 834bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
835bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
836 struct sk_buff_head *cpy);
840void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, 837void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
841 struct sk_buff *skb); 838 struct sk_buff *skb);
842 839
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index e190460fe0d3..9be6592e4a6f 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -608,7 +608,7 @@ not_found:
608 * Returns non-zero if any off-node ports overlap 608 * Returns non-zero if any off-node ports overlap
609 */ 609 */
610int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 610int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
611 u32 limit, struct tipc_plist *dports) 611 u32 limit, struct list_head *dports)
612{ 612{
613 struct name_seq *seq; 613 struct name_seq *seq;
614 struct sub_seq *sseq; 614 struct sub_seq *sseq;
@@ -633,7 +633,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
633 info = sseq->info; 633 info = sseq->info;
634 list_for_each_entry(publ, &info->node_list, node_list) { 634 list_for_each_entry(publ, &info->node_list, node_list) {
635 if (publ->scope <= limit) 635 if (publ->scope <= limit)
636 tipc_plist_push(dports, publ->ref); 636 u32_push(dports, publ->ref);
637 } 637 }
638 638
639 if (info->cluster_list_size != info->node_list_size) 639 if (info->cluster_list_size != info->node_list_size)
@@ -645,6 +645,39 @@ exit:
645 return res; 645 return res;
646} 646}
647 647
648/* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes
649 * - Creates list of nodes that overlap the given multicast address
650 * - Determines if any node local ports overlap
651 */
652void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
653 u32 upper, u32 domain,
654 struct tipc_nlist *nodes)
655{
656 struct sub_seq *sseq, *stop;
657 struct publication *publ;
658 struct name_info *info;
659 struct name_seq *seq;
660
661 rcu_read_lock();
662 seq = nametbl_find_seq(net, type);
663 if (!seq)
664 goto exit;
665
666 spin_lock_bh(&seq->lock);
667 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
668 stop = seq->sseqs + seq->first_free;
669 for (; sseq->lower <= upper && sseq != stop; sseq++) {
670 info = sseq->info;
671 list_for_each_entry(publ, &info->zone_list, zone_list) {
672 if (tipc_in_scope(domain, publ->node))
673 tipc_nlist_add(nodes, publ->node);
674 }
675 }
676 spin_unlock_bh(&seq->lock);
677exit:
678 rcu_read_unlock();
679}
680
648/* 681/*
649 * tipc_nametbl_publish - add name publication to network name tables 682 * tipc_nametbl_publish - add name publication to network name tables
650 */ 683 */
@@ -1022,40 +1055,79 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
1022 return skb->len; 1055 return skb->len;
1023} 1056}
1024 1057
1025void tipc_plist_push(struct tipc_plist *pl, u32 port) 1058bool u32_find(struct list_head *l, u32 value)
1026{ 1059{
1027 struct tipc_plist *nl; 1060 struct u32_item *item;
1028 1061
1029 if (likely(!pl->port)) { 1062 list_for_each_entry(item, l, list) {
1030 pl->port = port; 1063 if (item->value == value)
1031 return; 1064 return true;
1032 } 1065 }
1033 if (pl->port == port) 1066 return false;
1034 return; 1067}
1035 list_for_each_entry(nl, &pl->list, list) { 1068
1036 if (nl->port == port) 1069bool u32_push(struct list_head *l, u32 value)
1037 return; 1070{
1071 struct u32_item *item;
1072
1073 list_for_each_entry(item, l, list) {
1074 if (item->value == value)
1075 return false;
1076 }
1077 item = kmalloc(sizeof(*item), GFP_ATOMIC);
1078 if (unlikely(!item))
1079 return false;
1080
1081 item->value = value;
1082 list_add(&item->list, l);
1083 return true;
1084}
1085
1086u32 u32_pop(struct list_head *l)
1087{
1088 struct u32_item *item;
1089 u32 value = 0;
1090
1091 if (list_empty(l))
1092 return 0;
1093 item = list_first_entry(l, typeof(*item), list);
1094 value = item->value;
1095 list_del(&item->list);
1096 kfree(item);
1097 return value;
1098}
1099
1100bool u32_del(struct list_head *l, u32 value)
1101{
1102 struct u32_item *item, *tmp;
1103
1104 list_for_each_entry_safe(item, tmp, l, list) {
1105 if (item->value != value)
1106 continue;
1107 list_del(&item->list);
1108 kfree(item);
1109 return true;
1038 } 1110 }
1039 nl = kmalloc(sizeof(*nl), GFP_ATOMIC); 1111 return false;
1040 if (nl) { 1112}
1041 nl->port = port; 1113
1042 list_add(&nl->list, &pl->list); 1114void u32_list_purge(struct list_head *l)
1115{
1116 struct u32_item *item, *tmp;
1117
1118 list_for_each_entry_safe(item, tmp, l, list) {
1119 list_del(&item->list);
1120 kfree(item);
1043 } 1121 }
1044} 1122}
1045 1123
1046u32 tipc_plist_pop(struct tipc_plist *pl) 1124int u32_list_len(struct list_head *l)
1047{ 1125{
1048 struct tipc_plist *nl; 1126 struct u32_item *item;
1049 u32 port = 0; 1127 int i = 0;
1050 1128
1051 if (likely(list_empty(&pl->list))) { 1129 list_for_each_entry(item, l, list) {
1052 port = pl->port; 1130 i++;
1053 pl->port = 0;
1054 return port;
1055 } 1131 }
1056 nl = list_first_entry(&pl->list, typeof(*nl), list); 1132 return i;
1057 port = nl->port;
1058 list_del(&nl->list);
1059 kfree(nl);
1060 return port;
1061} 1133}
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 1524a73830f7..6ebdeb1d84a5 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -39,6 +39,7 @@
39 39
40struct tipc_subscription; 40struct tipc_subscription;
41struct tipc_plist; 41struct tipc_plist;
42struct tipc_nlist;
42 43
43/* 44/*
44 * TIPC name types reserved for internal TIPC use (both current and planned) 45 * TIPC name types reserved for internal TIPC use (both current and planned)
@@ -99,7 +100,10 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
99 100
100u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); 101u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
101int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 102int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
102 u32 limit, struct tipc_plist *dports); 103 u32 limit, struct list_head *dports);
104void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
105 u32 upper, u32 domain,
106 struct tipc_nlist *nodes);
103struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, 107struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
104 u32 upper, u32 scope, u32 port_ref, 108 u32 upper, u32 scope, u32 port_ref,
105 u32 key); 109 u32 key);
@@ -116,18 +120,16 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
116int tipc_nametbl_init(struct net *net); 120int tipc_nametbl_init(struct net *net);
117void tipc_nametbl_stop(struct net *net); 121void tipc_nametbl_stop(struct net *net);
118 122
119struct tipc_plist { 123struct u32_item {
120 struct list_head list; 124 struct list_head list;
121 u32 port; 125 u32 value;
122}; 126};
123 127
124static inline void tipc_plist_init(struct tipc_plist *pl) 128bool u32_push(struct list_head *l, u32 value);
125{ 129u32 u32_pop(struct list_head *l);
126 INIT_LIST_HEAD(&pl->list); 130bool u32_find(struct list_head *l, u32 value);
127 pl->port = 0; 131bool u32_del(struct list_head *l, u32 value);
128} 132void u32_list_purge(struct list_head *l);
129 133int u32_list_len(struct list_head *l);
130void tipc_plist_push(struct tipc_plist *pl, u32 port);
131u32 tipc_plist_pop(struct tipc_plist *pl);
132 134
133#endif 135#endif
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 27753325e06e..e9295fa3a554 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1172,7 +1172,7 @@ msg_full:
1172 * @list: chain of buffers containing message 1172 * @list: chain of buffers containing message
1173 * @dnode: address of destination node 1173 * @dnode: address of destination node
1174 * @selector: a number used for deterministic link selection 1174 * @selector: a number used for deterministic link selection
1175 * Consumes the buffer chain, except when returning -ELINKCONG 1175 * Consumes the buffer chain.
1176 * Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF 1176 * Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF
1177 */ 1177 */
1178int tipc_node_xmit(struct net *net, struct sk_buff_head *list, 1178int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
@@ -1211,10 +1211,10 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1211 spin_unlock_bh(&le->lock); 1211 spin_unlock_bh(&le->lock);
1212 tipc_node_read_unlock(n); 1212 tipc_node_read_unlock(n);
1213 1213
1214 if (likely(rc == 0)) 1214 if (unlikely(rc == -ENOBUFS))
1215 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1216 else if (rc == -ENOBUFS)
1217 tipc_node_link_down(n, bearer_id, false); 1215 tipc_node_link_down(n, bearer_id, false);
1216 else
1217 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1218 1218
1219 tipc_node_put(n); 1219 tipc_node_put(n);
1220 1220
@@ -1226,20 +1226,15 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1226 * messages, which will not be rejected 1226 * messages, which will not be rejected
1227 * The only exception is datagram messages rerouted after secondary 1227 * The only exception is datagram messages rerouted after secondary
1228 * lookup, which are rare and safe to dispose of anyway. 1228 * lookup, which are rare and safe to dispose of anyway.
1229 * TODO: Return real return value, and let callers use
1230 * tipc_wait_for_sendpkt() where applicable
1231 */ 1229 */
1232int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, 1230int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1233 u32 selector) 1231 u32 selector)
1234{ 1232{
1235 struct sk_buff_head head; 1233 struct sk_buff_head head;
1236 int rc;
1237 1234
1238 skb_queue_head_init(&head); 1235 skb_queue_head_init(&head);
1239 __skb_queue_tail(&head, skb); 1236 __skb_queue_tail(&head, skb);
1240 rc = tipc_node_xmit(net, &head, dnode, selector); 1237 tipc_node_xmit(net, &head, dnode, selector);
1241 if (rc == -ELINKCONG)
1242 kfree_skb(skb);
1243 return 0; 1238 return 0;
1244} 1239}
1245 1240
@@ -1267,6 +1262,19 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
1267 kfree_skb(skb); 1262 kfree_skb(skb);
1268} 1263}
1269 1264
1265static void tipc_node_mcast_rcv(struct tipc_node *n)
1266{
1267 struct tipc_bclink_entry *be = &n->bc_entry;
1268
1269 /* 'arrvq' is under inputq2's lock protection */
1270 spin_lock_bh(&be->inputq2.lock);
1271 spin_lock_bh(&be->inputq1.lock);
1272 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1273 spin_unlock_bh(&be->inputq1.lock);
1274 spin_unlock_bh(&be->inputq2.lock);
1275 tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2);
1276}
1277
1270static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr, 1278static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
1271 int bearer_id, struct sk_buff_head *xmitq) 1279 int bearer_id, struct sk_buff_head *xmitq)
1272{ 1280{
@@ -1340,15 +1348,8 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
1340 if (!skb_queue_empty(&xmitq)) 1348 if (!skb_queue_empty(&xmitq))
1341 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); 1349 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1342 1350
1343 /* Deliver. 'arrvq' is under inputq2's lock protection */ 1351 if (!skb_queue_empty(&be->inputq1))
1344 if (!skb_queue_empty(&be->inputq1)) { 1352 tipc_node_mcast_rcv(n);
1345 spin_lock_bh(&be->inputq2.lock);
1346 spin_lock_bh(&be->inputq1.lock);
1347 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1348 spin_unlock_bh(&be->inputq1.lock);
1349 spin_unlock_bh(&be->inputq2.lock);
1350 tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
1351 }
1352 1353
1353 if (rc & TIPC_LINK_DOWN_EVT) { 1354 if (rc & TIPC_LINK_DOWN_EVT) {
1354 /* Reception reassembly failure => reset all links to peer */ 1355 /* Reception reassembly failure => reset all links to peer */
@@ -1575,6 +1576,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1575 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq))) 1576 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
1576 tipc_named_rcv(net, &n->bc_entry.namedq); 1577 tipc_named_rcv(net, &n->bc_entry.namedq);
1577 1578
1579 if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
1580 tipc_node_mcast_rcv(n);
1581
1578 if (!skb_queue_empty(&le->inputq)) 1582 if (!skb_queue_empty(&le->inputq))
1579 tipc_sk_rcv(net, &le->inputq); 1583 tipc_sk_rcv(net, &le->inputq);
1580 1584
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 39ef54c1f2ad..898c22916984 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -47,11 +47,13 @@
47enum { 47enum {
48 TIPC_BCAST_SYNCH = (1 << 1), 48 TIPC_BCAST_SYNCH = (1 << 1),
49 TIPC_BCAST_STATE_NACK = (1 << 2), 49 TIPC_BCAST_STATE_NACK = (1 << 2),
50 TIPC_BLOCK_FLOWCTL = (1 << 3) 50 TIPC_BLOCK_FLOWCTL = (1 << 3),
51 TIPC_BCAST_RCAST = (1 << 4)
51}; 52};
52 53
53#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ 54#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \
54 TIPC_BCAST_STATE_NACK | \ 55 TIPC_BCAST_STATE_NACK | \
56 TIPC_BCAST_RCAST | \
55 TIPC_BLOCK_FLOWCTL) 57 TIPC_BLOCK_FLOWCTL)
56#define INVALID_BEARER_ID -1 58#define INVALID_BEARER_ID -1
57 59
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 800caaa699a1..103d1fd058c0 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -67,16 +67,19 @@ enum {
67 * @max_pkt: maximum packet size "hint" used when building messages sent by port 67 * @max_pkt: maximum packet size "hint" used when building messages sent by port
68 * @portid: unique port identity in TIPC socket hash table 68 * @portid: unique port identity in TIPC socket hash table
69 * @phdr: preformatted message header used when sending messages 69 * @phdr: preformatted message header used when sending messages
70 * #cong_links: list of congested links
70 * @publications: list of publications for port 71 * @publications: list of publications for port
72 * @blocking_link: address of the congested link we are currently sleeping on
71 * @pub_count: total # of publications port has made during its lifetime 73 * @pub_count: total # of publications port has made during its lifetime
72 * @probing_state: 74 * @probing_state:
73 * @conn_timeout: the time we can wait for an unresponded setup request 75 * @conn_timeout: the time we can wait for an unresponded setup request
74 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue 76 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
75 * @link_cong: non-zero if owner must sleep because of link congestion 77 * @cong_link_cnt: number of congested links
76 * @sent_unacked: # messages sent by socket, and not yet acked by peer 78 * @sent_unacked: # messages sent by socket, and not yet acked by peer
77 * @rcv_unacked: # messages read by user, but not yet acked back to peer 79 * @rcv_unacked: # messages read by user, but not yet acked back to peer
78 * @peer: 'connected' peer for dgram/rdm 80 * @peer: 'connected' peer for dgram/rdm
79 * @node: hash table node 81 * @node: hash table node
82 * @mc_method: cookie for use between socket and broadcast layer
80 * @rcu: rcu struct for tipc_sock 83 * @rcu: rcu struct for tipc_sock
81 */ 84 */
82struct tipc_sock { 85struct tipc_sock {
@@ -87,13 +90,13 @@ struct tipc_sock {
87 u32 max_pkt; 90 u32 max_pkt;
88 u32 portid; 91 u32 portid;
89 struct tipc_msg phdr; 92 struct tipc_msg phdr;
90 struct list_head sock_list; 93 struct list_head cong_links;
91 struct list_head publications; 94 struct list_head publications;
92 u32 pub_count; 95 u32 pub_count;
93 uint conn_timeout; 96 uint conn_timeout;
94 atomic_t dupl_rcvcnt; 97 atomic_t dupl_rcvcnt;
95 bool probe_unacked; 98 bool probe_unacked;
96 bool link_cong; 99 u16 cong_link_cnt;
97 u16 snt_unacked; 100 u16 snt_unacked;
98 u16 snd_win; 101 u16 snd_win;
99 u16 peer_caps; 102 u16 peer_caps;
@@ -101,6 +104,7 @@ struct tipc_sock {
101 u16 rcv_win; 104 u16 rcv_win;
102 struct sockaddr_tipc peer; 105 struct sockaddr_tipc peer;
103 struct rhash_head node; 106 struct rhash_head node;
107 struct tipc_mc_method mc_method;
104 struct rcu_head rcu; 108 struct rcu_head rcu;
105}; 109};
106 110
@@ -110,7 +114,6 @@ static void tipc_write_space(struct sock *sk);
110static void tipc_sock_destruct(struct sock *sk); 114static void tipc_sock_destruct(struct sock *sk);
111static int tipc_release(struct socket *sock); 115static int tipc_release(struct socket *sock);
112static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); 116static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
113static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
114static void tipc_sk_timeout(unsigned long data); 117static void tipc_sk_timeout(unsigned long data);
115static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, 118static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
116 struct tipc_name_seq const *seq); 119 struct tipc_name_seq const *seq);
@@ -119,8 +122,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
119static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); 122static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
120static int tipc_sk_insert(struct tipc_sock *tsk); 123static int tipc_sk_insert(struct tipc_sock *tsk);
121static void tipc_sk_remove(struct tipc_sock *tsk); 124static void tipc_sk_remove(struct tipc_sock *tsk);
122static int __tipc_send_stream(struct socket *sock, struct msghdr *m, 125static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
123 size_t dsz);
124static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); 126static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
125 127
126static const struct proto_ops packet_ops; 128static const struct proto_ops packet_ops;
@@ -334,6 +336,49 @@ static int tipc_set_sk_state(struct sock *sk, int state)
334 return res; 336 return res;
335} 337}
336 338
339static int tipc_sk_sock_err(struct socket *sock, long *timeout)
340{
341 struct sock *sk = sock->sk;
342 int err = sock_error(sk);
343 int typ = sock->type;
344
345 if (err)
346 return err;
347 if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
348 if (sk->sk_state == TIPC_DISCONNECTING)
349 return -EPIPE;
350 else if (!tipc_sk_connected(sk))
351 return -ENOTCONN;
352 }
353 if (!*timeout)
354 return -EAGAIN;
355 if (signal_pending(current))
356 return sock_intr_errno(*timeout);
357
358 return 0;
359}
360
361#define tipc_wait_for_cond(sock_, timeout_, condition_) \
362({ \
363 int rc_ = 0; \
364 int done_ = 0; \
365 \
366 while (!(condition_) && !done_) { \
367 struct sock *sk_ = sock->sk; \
368 DEFINE_WAIT_FUNC(wait_, woken_wake_function); \
369 \
370 rc_ = tipc_sk_sock_err(sock_, timeout_); \
371 if (rc_) \
372 break; \
373 prepare_to_wait(sk_sleep(sk_), &wait_, \
374 TASK_INTERRUPTIBLE); \
375 done_ = sk_wait_event(sk_, timeout_, \
376 (condition_), &wait_); \
377 remove_wait_queue(sk_sleep(sk_), &wait_); \
378 } \
379 rc_; \
380})
381
337/** 382/**
338 * tipc_sk_create - create a TIPC socket 383 * tipc_sk_create - create a TIPC socket
339 * @net: network namespace (must be default network) 384 * @net: network namespace (must be default network)
@@ -382,6 +427,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
382 tsk = tipc_sk(sk); 427 tsk = tipc_sk(sk);
383 tsk->max_pkt = MAX_PKT_DEFAULT; 428 tsk->max_pkt = MAX_PKT_DEFAULT;
384 INIT_LIST_HEAD(&tsk->publications); 429 INIT_LIST_HEAD(&tsk->publications);
430 INIT_LIST_HEAD(&tsk->cong_links);
385 msg = &tsk->phdr; 431 msg = &tsk->phdr;
386 tn = net_generic(sock_net(sk), tipc_net_id); 432 tn = net_generic(sock_net(sk), tipc_net_id);
387 tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, 433 tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
@@ -432,9 +478,14 @@ static void __tipc_shutdown(struct socket *sock, int error)
432 struct sock *sk = sock->sk; 478 struct sock *sk = sock->sk;
433 struct tipc_sock *tsk = tipc_sk(sk); 479 struct tipc_sock *tsk = tipc_sk(sk);
434 struct net *net = sock_net(sk); 480 struct net *net = sock_net(sk);
481 long timeout = CONN_TIMEOUT_DEFAULT;
435 u32 dnode = tsk_peer_node(tsk); 482 u32 dnode = tsk_peer_node(tsk);
436 struct sk_buff *skb; 483 struct sk_buff *skb;
437 484
485 /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
486 tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
487 !tsk_conn_cong(tsk)));
488
438 /* Reject all unreceived messages, except on an active connection 489 /* Reject all unreceived messages, except on an active connection
439 * (which disconnects locally & sends a 'FIN+' to peer). 490 * (which disconnects locally & sends a 'FIN+' to peer).
440 */ 491 */
@@ -505,7 +556,8 @@ static int tipc_release(struct socket *sock)
505 556
506 /* Reject any messages that accumulated in backlog queue */ 557 /* Reject any messages that accumulated in backlog queue */
507 release_sock(sk); 558 release_sock(sk);
508 559 u32_list_purge(&tsk->cong_links);
560 tsk->cong_link_cnt = 0;
509 call_rcu(&tsk->rcu, tipc_sk_callback); 561 call_rcu(&tsk->rcu, tipc_sk_callback);
510 sock->sk = NULL; 562 sock->sk = NULL;
511 563
@@ -648,7 +700,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
648 700
649 switch (sk->sk_state) { 701 switch (sk->sk_state) {
650 case TIPC_ESTABLISHED: 702 case TIPC_ESTABLISHED:
651 if (!tsk->link_cong && !tsk_conn_cong(tsk)) 703 if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
652 mask |= POLLOUT; 704 mask |= POLLOUT;
653 /* fall thru' */ 705 /* fall thru' */
654 case TIPC_LISTEN: 706 case TIPC_LISTEN:
@@ -657,7 +709,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
657 mask |= (POLLIN | POLLRDNORM); 709 mask |= (POLLIN | POLLRDNORM);
658 break; 710 break;
659 case TIPC_OPEN: 711 case TIPC_OPEN:
660 if (!tsk->link_cong) 712 if (!tsk->cong_link_cnt)
661 mask |= POLLOUT; 713 mask |= POLLOUT;
662 if (tipc_sk_type_connectionless(sk) && 714 if (tipc_sk_type_connectionless(sk) &&
663 (!skb_queue_empty(&sk->sk_receive_queue))) 715 (!skb_queue_empty(&sk->sk_receive_queue)))
@@ -676,63 +728,60 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
676 * @sock: socket structure 728 * @sock: socket structure
677 * @seq: destination address 729 * @seq: destination address
678 * @msg: message to send 730 * @msg: message to send
679 * @dsz: total length of message data 731 * @dlen: length of data to send
680 * @timeo: timeout to wait for wakeup 732 * @timeout: timeout to wait for wakeup
681 * 733 *
682 * Called from function tipc_sendmsg(), which has done all sanity checks 734 * Called from function tipc_sendmsg(), which has done all sanity checks
683 * Returns the number of bytes sent on success, or errno 735 * Returns the number of bytes sent on success, or errno
684 */ 736 */
685static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, 737static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
686 struct msghdr *msg, size_t dsz, long timeo) 738 struct msghdr *msg, size_t dlen, long timeout)
687{ 739{
688 struct sock *sk = sock->sk; 740 struct sock *sk = sock->sk;
689 struct tipc_sock *tsk = tipc_sk(sk); 741 struct tipc_sock *tsk = tipc_sk(sk);
742 struct tipc_msg *hdr = &tsk->phdr;
690 struct net *net = sock_net(sk); 743 struct net *net = sock_net(sk);
691 struct tipc_msg *mhdr = &tsk->phdr; 744 int mtu = tipc_bcast_get_mtu(net);
692 struct sk_buff_head pktchain; 745 struct tipc_mc_method *method = &tsk->mc_method;
693 struct iov_iter save = msg->msg_iter; 746 u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
694 uint mtu; 747 struct sk_buff_head pkts;
748 struct tipc_nlist dsts;
695 int rc; 749 int rc;
696 750
697 if (!timeo && tsk->link_cong) 751 /* Block or return if any destination link is congested */
698 return -ELINKCONG; 752 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
753 if (unlikely(rc))
754 return rc;
699 755
700 msg_set_type(mhdr, TIPC_MCAST_MSG); 756 /* Lookup destination nodes */
701 msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE); 757 tipc_nlist_init(&dsts, tipc_own_addr(net));
702 msg_set_destport(mhdr, 0); 758 tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
703 msg_set_destnode(mhdr, 0); 759 seq->upper, domain, &dsts);
704 msg_set_nametype(mhdr, seq->type); 760 if (!dsts.local && !dsts.remote)
705 msg_set_namelower(mhdr, seq->lower); 761 return -EHOSTUNREACH;
706 msg_set_nameupper(mhdr, seq->upper);
707 msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
708 762
709 skb_queue_head_init(&pktchain); 763 /* Build message header */
764 msg_set_type(hdr, TIPC_MCAST_MSG);
765 msg_set_hdr_sz(hdr, MCAST_H_SIZE);
766 msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
767 msg_set_destport(hdr, 0);
768 msg_set_destnode(hdr, 0);
769 msg_set_nametype(hdr, seq->type);
770 msg_set_namelower(hdr, seq->lower);
771 msg_set_nameupper(hdr, seq->upper);
710 772
711new_mtu: 773 /* Build message as chain of buffers */
712 mtu = tipc_bcast_get_mtu(net); 774 skb_queue_head_init(&pkts);
713 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain); 775 rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
714 if (unlikely(rc < 0))
715 return rc;
716 776
717 do { 777 /* Send message if build was successful */
718 rc = tipc_bcast_xmit(net, &pktchain); 778 if (unlikely(rc == dlen))
719 if (likely(!rc)) 779 rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
720 return dsz; 780 &tsk->cong_link_cnt);
721 781
722 if (rc == -ELINKCONG) { 782 tipc_nlist_purge(&dsts);
723 tsk->link_cong = 1; 783
724 rc = tipc_wait_for_sndmsg(sock, &timeo); 784 return rc ? rc : dlen;
725 if (!rc)
726 continue;
727 }
728 __skb_queue_purge(&pktchain);
729 if (rc == -EMSGSIZE) {
730 msg->msg_iter = save;
731 goto new_mtu;
732 }
733 break;
734 } while (1);
735 return rc;
736} 785}
737 786
738/** 787/**
@@ -746,7 +795,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
746 struct sk_buff_head *inputq) 795 struct sk_buff_head *inputq)
747{ 796{
748 struct tipc_msg *msg; 797 struct tipc_msg *msg;
749 struct tipc_plist dports; 798 struct list_head dports;
750 u32 portid; 799 u32 portid;
751 u32 scope = TIPC_CLUSTER_SCOPE; 800 u32 scope = TIPC_CLUSTER_SCOPE;
752 struct sk_buff_head tmpq; 801 struct sk_buff_head tmpq;
@@ -754,7 +803,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
754 struct sk_buff *skb, *_skb; 803 struct sk_buff *skb, *_skb;
755 804
756 __skb_queue_head_init(&tmpq); 805 __skb_queue_head_init(&tmpq);
757 tipc_plist_init(&dports); 806 INIT_LIST_HEAD(&dports);
758 807
759 skb = tipc_skb_peek(arrvq, &inputq->lock); 808 skb = tipc_skb_peek(arrvq, &inputq->lock);
760 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { 809 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
@@ -768,8 +817,8 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
768 tipc_nametbl_mc_translate(net, 817 tipc_nametbl_mc_translate(net,
769 msg_nametype(msg), msg_namelower(msg), 818 msg_nametype(msg), msg_namelower(msg),
770 msg_nameupper(msg), scope, &dports); 819 msg_nameupper(msg), scope, &dports);
771 portid = tipc_plist_pop(&dports); 820 portid = u32_pop(&dports);
772 for (; portid; portid = tipc_plist_pop(&dports)) { 821 for (; portid; portid = u32_pop(&dports)) {
773 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); 822 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
774 if (_skb) { 823 if (_skb) {
775 msg_set_destport(buf_msg(_skb), portid); 824 msg_set_destport(buf_msg(_skb), portid);
@@ -830,31 +879,6 @@ exit:
830 kfree_skb(skb); 879 kfree_skb(skb);
831} 880}
832 881
833static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
834{
835 DEFINE_WAIT_FUNC(wait, woken_wake_function);
836 struct sock *sk = sock->sk;
837 struct tipc_sock *tsk = tipc_sk(sk);
838 int done;
839
840 do {
841 int err = sock_error(sk);
842 if (err)
843 return err;
844 if (sk->sk_shutdown & SEND_SHUTDOWN)
845 return -EPIPE;
846 if (!*timeo_p)
847 return -EAGAIN;
848 if (signal_pending(current))
849 return sock_intr_errno(*timeo_p);
850
851 add_wait_queue(sk_sleep(sk), &wait);
852 done = sk_wait_event(sk, timeo_p, !tsk->link_cong, &wait);
853 remove_wait_queue(sk_sleep(sk), &wait);
854 } while (!done);
855 return 0;
856}
857
858/** 882/**
859 * tipc_sendmsg - send message in connectionless manner 883 * tipc_sendmsg - send message in connectionless manner
860 * @sock: socket structure 884 * @sock: socket structure
@@ -881,35 +905,38 @@ static int tipc_sendmsg(struct socket *sock,
881 return ret; 905 return ret;
882} 906}
883 907
884static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) 908static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
885{ 909{
886 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
887 struct sock *sk = sock->sk; 910 struct sock *sk = sock->sk;
888 struct tipc_sock *tsk = tipc_sk(sk);
889 struct net *net = sock_net(sk); 911 struct net *net = sock_net(sk);
890 struct tipc_msg *mhdr = &tsk->phdr; 912 struct tipc_sock *tsk = tipc_sk(sk);
891 u32 dnode, dport; 913 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
892 struct sk_buff_head pktchain; 914 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
893 bool is_connectionless = tipc_sk_type_connectionless(sk); 915 struct list_head *clinks = &tsk->cong_links;
894 struct sk_buff *skb; 916 bool syn = !tipc_sk_type_connectionless(sk);
917 struct tipc_msg *hdr = &tsk->phdr;
895 struct tipc_name_seq *seq; 918 struct tipc_name_seq *seq;
896 struct iov_iter save; 919 struct sk_buff_head pkts;
897 u32 mtu; 920 u32 type, inst, domain;
898 long timeo; 921 u32 dnode, dport;
899 int rc; 922 int mtu, rc;
900 923
901 if (dsz > TIPC_MAX_USER_MSG_SIZE) 924 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
902 return -EMSGSIZE; 925 return -EMSGSIZE;
926
903 if (unlikely(!dest)) { 927 if (unlikely(!dest)) {
904 if (is_connectionless && tsk->peer.family == AF_TIPC) 928 dest = &tsk->peer;
905 dest = &tsk->peer; 929 if (!syn || dest->family != AF_TIPC)
906 else
907 return -EDESTADDRREQ; 930 return -EDESTADDRREQ;
908 } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
909 dest->family != AF_TIPC) {
910 return -EINVAL;
911 } 931 }
912 if (!is_connectionless) { 932
933 if (unlikely(m->msg_namelen < sizeof(*dest)))
934 return -EINVAL;
935
936 if (unlikely(dest->family != AF_TIPC))
937 return -EINVAL;
938
939 if (unlikely(syn)) {
913 if (sk->sk_state == TIPC_LISTEN) 940 if (sk->sk_state == TIPC_LISTEN)
914 return -EPIPE; 941 return -EPIPE;
915 if (sk->sk_state != TIPC_OPEN) 942 if (sk->sk_state != TIPC_OPEN)
@@ -921,102 +948,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
921 tsk->conn_instance = dest->addr.name.name.instance; 948 tsk->conn_instance = dest->addr.name.name.instance;
922 } 949 }
923 } 950 }
924 seq = &dest->addr.nameseq;
925 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
926 951
927 if (dest->addrtype == TIPC_ADDR_MCAST) { 952 seq = &dest->addr.nameseq;
928 return tipc_sendmcast(sock, seq, m, dsz, timeo); 953 if (dest->addrtype == TIPC_ADDR_MCAST)
929 } else if (dest->addrtype == TIPC_ADDR_NAME) { 954 return tipc_sendmcast(sock, seq, m, dlen, timeout);
930 u32 type = dest->addr.name.name.type;
931 u32 inst = dest->addr.name.name.instance;
932 u32 domain = dest->addr.name.domain;
933 955
956 if (dest->addrtype == TIPC_ADDR_NAME) {
957 type = dest->addr.name.name.type;
958 inst = dest->addr.name.name.instance;
959 domain = dest->addr.name.domain;
934 dnode = domain; 960 dnode = domain;
935 msg_set_type(mhdr, TIPC_NAMED_MSG); 961 msg_set_type(hdr, TIPC_NAMED_MSG);
936 msg_set_hdr_sz(mhdr, NAMED_H_SIZE); 962 msg_set_hdr_sz(hdr, NAMED_H_SIZE);
937 msg_set_nametype(mhdr, type); 963 msg_set_nametype(hdr, type);
938 msg_set_nameinst(mhdr, inst); 964 msg_set_nameinst(hdr, inst);
939 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); 965 msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
940 dport = tipc_nametbl_translate(net, type, inst, &dnode); 966 dport = tipc_nametbl_translate(net, type, inst, &dnode);
941 msg_set_destnode(mhdr, dnode); 967 msg_set_destnode(hdr, dnode);
942 msg_set_destport(mhdr, dport); 968 msg_set_destport(hdr, dport);
943 if (unlikely(!dport && !dnode)) 969 if (unlikely(!dport && !dnode))
944 return -EHOSTUNREACH; 970 return -EHOSTUNREACH;
971
945 } else if (dest->addrtype == TIPC_ADDR_ID) { 972 } else if (dest->addrtype == TIPC_ADDR_ID) {
946 dnode = dest->addr.id.node; 973 dnode = dest->addr.id.node;
947 msg_set_type(mhdr, TIPC_DIRECT_MSG); 974 msg_set_type(hdr, TIPC_DIRECT_MSG);
948 msg_set_lookup_scope(mhdr, 0); 975 msg_set_lookup_scope(hdr, 0);
949 msg_set_destnode(mhdr, dnode); 976 msg_set_destnode(hdr, dnode);
950 msg_set_destport(mhdr, dest->addr.id.ref); 977 msg_set_destport(hdr, dest->addr.id.ref);
951 msg_set_hdr_sz(mhdr, BASIC_H_SIZE); 978 msg_set_hdr_sz(hdr, BASIC_H_SIZE);
952 } 979 }
953 980
954 skb_queue_head_init(&pktchain); 981 /* Block or return if destination link is congested */
955 save = m->msg_iter; 982 rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
956new_mtu: 983 if (unlikely(rc))
957 mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
958 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
959 if (rc < 0)
960 return rc; 984 return rc;
961 985
962 do { 986 skb_queue_head_init(&pkts);
963 skb = skb_peek(&pktchain); 987 mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
964 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; 988 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
965 rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid); 989 if (unlikely(rc != dlen))
966 if (likely(!rc)) { 990 return rc;
967 if (!is_connectionless)
968 tipc_set_sk_state(sk, TIPC_CONNECTING);
969 return dsz;
970 }
971 if (rc == -ELINKCONG) {
972 tsk->link_cong = 1;
973 rc = tipc_wait_for_sndmsg(sock, &timeo);
974 if (!rc)
975 continue;
976 }
977 __skb_queue_purge(&pktchain);
978 if (rc == -EMSGSIZE) {
979 m->msg_iter = save;
980 goto new_mtu;
981 }
982 break;
983 } while (1);
984
985 return rc;
986}
987 991
988static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) 992 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
989{ 993 if (unlikely(rc == -ELINKCONG)) {
990 DEFINE_WAIT_FUNC(wait, woken_wake_function); 994 u32_push(clinks, dnode);
991 struct sock *sk = sock->sk; 995 tsk->cong_link_cnt++;
992 struct tipc_sock *tsk = tipc_sk(sk); 996 rc = 0;
993 int done; 997 }
994 998
995 do { 999 if (unlikely(syn && !rc))
996 int err = sock_error(sk); 1000 tipc_set_sk_state(sk, TIPC_CONNECTING);
997 if (err)
998 return err;
999 if (sk->sk_state == TIPC_DISCONNECTING)
1000 return -EPIPE;
1001 else if (!tipc_sk_connected(sk))
1002 return -ENOTCONN;
1003 if (!*timeo_p)
1004 return -EAGAIN;
1005 if (signal_pending(current))
1006 return sock_intr_errno(*timeo_p);
1007 1001
1008 add_wait_queue(sk_sleep(sk), &wait); 1002 return rc ? rc : dlen;
1009 done = sk_wait_event(sk, timeo_p,
1010 (!tsk->link_cong &&
1011 !tsk_conn_cong(tsk)) ||
1012 !tipc_sk_connected(sk), &wait);
1013 remove_wait_queue(sk_sleep(sk), &wait);
1014 } while (!done);
1015 return 0;
1016} 1003}
1017 1004
1018/** 1005/**
1019 * tipc_send_stream - send stream-oriented data 1006 * tipc_sendstream - send stream-oriented data
1020 * @sock: socket structure 1007 * @sock: socket structure
1021 * @m: data to send 1008 * @m: data to send
1022 * @dsz: total length of data to be transmitted 1009 * @dsz: total length of data to be transmitted
@@ -1026,94 +1013,69 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
1026 * Returns the number of bytes sent on success (or partial success), 1013 * Returns the number of bytes sent on success (or partial success),
1027 * or errno if no data sent 1014 * or errno if no data sent
1028 */ 1015 */
1029static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) 1016static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
1030{ 1017{
1031 struct sock *sk = sock->sk; 1018 struct sock *sk = sock->sk;
1032 int ret; 1019 int ret;
1033 1020
1034 lock_sock(sk); 1021 lock_sock(sk);
1035 ret = __tipc_send_stream(sock, m, dsz); 1022 ret = __tipc_sendstream(sock, m, dsz);
1036 release_sock(sk); 1023 release_sock(sk);
1037 1024
1038 return ret; 1025 return ret;
1039} 1026}
1040 1027
1041static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) 1028static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
1042{ 1029{
1043 struct sock *sk = sock->sk; 1030 struct sock *sk = sock->sk;
1044 struct net *net = sock_net(sk);
1045 struct tipc_sock *tsk = tipc_sk(sk);
1046 struct tipc_msg *mhdr = &tsk->phdr;
1047 struct sk_buff_head pktchain;
1048 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 1031 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1049 u32 portid = tsk->portid; 1032 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1050 int rc = -EINVAL; 1033 struct tipc_sock *tsk = tipc_sk(sk);
1051 long timeo; 1034 struct tipc_msg *hdr = &tsk->phdr;
1052 u32 dnode; 1035 struct net *net = sock_net(sk);
1053 uint mtu, send, sent = 0; 1036 struct sk_buff_head pkts;
1054 struct iov_iter save; 1037 u32 dnode = tsk_peer_node(tsk);
1055 int hlen = MIN_H_SIZE; 1038 int send, sent = 0;
1056 1039 int rc = 0;
1057 /* Handle implied connection establishment */
1058 if (unlikely(dest)) {
1059 rc = __tipc_sendmsg(sock, m, dsz);
1060 hlen = msg_hdr_sz(mhdr);
1061 if (dsz && (dsz == rc))
1062 tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
1063 return rc;
1064 }
1065 if (dsz > (uint)INT_MAX)
1066 return -EMSGSIZE;
1067
1068 if (unlikely(!tipc_sk_connected(sk))) {
1069 if (sk->sk_state == TIPC_DISCONNECTING)
1070 return -EPIPE;
1071 else
1072 return -ENOTCONN;
1073 }
1074 1040
1075 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 1041 skb_queue_head_init(&pkts);
1076 if (!timeo && tsk->link_cong)
1077 return -ELINKCONG;
1078 1042
1079 dnode = tsk_peer_node(tsk); 1043 if (unlikely(dlen > INT_MAX))
1080 skb_queue_head_init(&pktchain); 1044 return -EMSGSIZE;
1081 1045
1082next: 1046 /* Handle implicit connection setup */
1083 save = m->msg_iter; 1047 if (unlikely(dest)) {
1084 mtu = tsk->max_pkt; 1048 rc = __tipc_sendmsg(sock, m, dlen);
1085 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); 1049 if (dlen && (dlen == rc))
1086 rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain); 1050 tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
1087 if (unlikely(rc < 0))
1088 return rc; 1051 return rc;
1052 }
1089 1053
1090 do { 1054 do {
1091 if (likely(!tsk_conn_cong(tsk))) { 1055 rc = tipc_wait_for_cond(sock, &timeout,
1092 rc = tipc_node_xmit(net, &pktchain, dnode, portid); 1056 (!tsk->cong_link_cnt &&
1093 if (likely(!rc)) { 1057 !tsk_conn_cong(tsk) &&
1094 tsk->snt_unacked += tsk_inc(tsk, send + hlen); 1058 tipc_sk_connected(sk)));
1095 sent += send; 1059 if (unlikely(rc))
1096 if (sent == dsz) 1060 break;
1097 return dsz; 1061
1098 goto next; 1062 send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
1099 } 1063 rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
1100 if (rc == -EMSGSIZE) { 1064 if (unlikely(rc != send))
1101 __skb_queue_purge(&pktchain); 1065 break;
1102 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1103 portid);
1104 m->msg_iter = save;
1105 goto next;
1106 }
1107 if (rc != -ELINKCONG)
1108 break;
1109 1066
1110 tsk->link_cong = 1; 1067 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1068 if (unlikely(rc == -ELINKCONG)) {
1069 tsk->cong_link_cnt = 1;
1070 rc = 0;
1071 }
1072 if (likely(!rc)) {
1073 tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
1074 sent += send;
1111 } 1075 }
1112 rc = tipc_wait_for_sndpkt(sock, &timeo); 1076 } while (sent < dlen && !rc);
1113 } while (!rc);
1114 1077
1115 __skb_queue_purge(&pktchain); 1078 return rc ? rc : sent;
1116 return sent ? sent : rc;
1117} 1079}
1118 1080
1119/** 1081/**
@@ -1131,7 +1093,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1131 if (dsz > TIPC_MAX_USER_MSG_SIZE) 1093 if (dsz > TIPC_MAX_USER_MSG_SIZE)
1132 return -EMSGSIZE; 1094 return -EMSGSIZE;
1133 1095
1134 return tipc_send_stream(sock, m, dsz); 1096 return tipc_sendstream(sock, m, dsz);
1135} 1097}
1136 1098
1137/* tipc_sk_finish_conn - complete the setup of a connection 1099/* tipc_sk_finish_conn - complete the setup of a connection
@@ -1698,6 +1660,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1698 unsigned int limit = rcvbuf_limit(sk, skb); 1660 unsigned int limit = rcvbuf_limit(sk, skb);
1699 int err = TIPC_OK; 1661 int err = TIPC_OK;
1700 int usr = msg_user(hdr); 1662 int usr = msg_user(hdr);
1663 u32 onode;
1701 1664
1702 if (unlikely(msg_user(hdr) == CONN_MANAGER)) { 1665 if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
1703 tipc_sk_proto_rcv(tsk, skb, xmitq); 1666 tipc_sk_proto_rcv(tsk, skb, xmitq);
@@ -1705,8 +1668,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1705 } 1668 }
1706 1669
1707 if (unlikely(usr == SOCK_WAKEUP)) { 1670 if (unlikely(usr == SOCK_WAKEUP)) {
1671 onode = msg_orignode(hdr);
1708 kfree_skb(skb); 1672 kfree_skb(skb);
1709 tsk->link_cong = 0; 1673 u32_del(&tsk->cong_links, onode);
1674 tsk->cong_link_cnt--;
1710 sk->sk_write_space(sk); 1675 sk->sk_write_space(sk);
1711 return false; 1676 return false;
1712 } 1677 }
@@ -2114,7 +2079,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
2114 struct msghdr m = {NULL,}; 2079 struct msghdr m = {NULL,};
2115 2080
2116 tsk_advance_rx_queue(sk); 2081 tsk_advance_rx_queue(sk);
2117 __tipc_send_stream(new_sock, &m, 0); 2082 __tipc_sendstream(new_sock, &m, 0);
2118 } else { 2083 } else {
2119 __skb_dequeue(&sk->sk_receive_queue); 2084 __skb_dequeue(&sk->sk_receive_queue);
2120 __skb_queue_head(&new_sk->sk_receive_queue, buf); 2085 __skb_queue_head(&new_sk->sk_receive_queue, buf);
@@ -2382,18 +2347,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2382{ 2347{
2383 struct sock *sk = sock->sk; 2348 struct sock *sk = sock->sk;
2384 struct tipc_sock *tsk = tipc_sk(sk); 2349 struct tipc_sock *tsk = tipc_sk(sk);
2385 u32 value; 2350 u32 value = 0;
2386 int res; 2351 int res = 0;
2387 2352
2388 if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM)) 2353 if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2389 return 0; 2354 return 0;
2390 if (lvl != SOL_TIPC) 2355 if (lvl != SOL_TIPC)
2391 return -ENOPROTOOPT; 2356 return -ENOPROTOOPT;
2392 if (ol < sizeof(value)) 2357
2393 return -EINVAL; 2358 switch (opt) {
2394 res = get_user(value, (u32 __user *)ov); 2359 case TIPC_IMPORTANCE:
2395 if (res) 2360 case TIPC_SRC_DROPPABLE:
2396 return res; 2361 case TIPC_DEST_DROPPABLE:
2362 case TIPC_CONN_TIMEOUT:
2363 if (ol < sizeof(value))
2364 return -EINVAL;
2365 res = get_user(value, (u32 __user *)ov);
2366 if (res)
2367 return res;
2368 break;
2369 default:
2370 if (ov || ol)
2371 return -EINVAL;
2372 }
2397 2373
2398 lock_sock(sk); 2374 lock_sock(sk);
2399 2375
@@ -2412,7 +2388,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2412 break; 2388 break;
2413 case TIPC_CONN_TIMEOUT: 2389 case TIPC_CONN_TIMEOUT:
2414 tipc_sk(sk)->conn_timeout = value; 2390 tipc_sk(sk)->conn_timeout = value;
2415 /* no need to set "res", since already 0 at this point */ 2391 break;
2392 case TIPC_MCAST_BROADCAST:
2393 tsk->mc_method.rcast = false;
2394 tsk->mc_method.mandatory = true;
2395 break;
2396 case TIPC_MCAST_REPLICAST:
2397 tsk->mc_method.rcast = true;
2398 tsk->mc_method.mandatory = true;
2416 break; 2399 break;
2417 default: 2400 default:
2418 res = -EINVAL; 2401 res = -EINVAL;
@@ -2575,7 +2558,7 @@ static const struct proto_ops stream_ops = {
2575 .shutdown = tipc_shutdown, 2558 .shutdown = tipc_shutdown,
2576 .setsockopt = tipc_setsockopt, 2559 .setsockopt = tipc_setsockopt,
2577 .getsockopt = tipc_getsockopt, 2560 .getsockopt = tipc_getsockopt,
2578 .sendmsg = tipc_send_stream, 2561 .sendmsg = tipc_sendstream,
2579 .recvmsg = tipc_recv_stream, 2562 .recvmsg = tipc_recv_stream,
2580 .mmap = sock_no_mmap, 2563 .mmap = sock_no_mmap,
2581 .sendpage = sock_no_sendpage 2564 .sendpage = sock_no_sendpage
diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c
index b58dc95f3d35..46061cf48cd1 100644
--- a/net/tipc/udp_media.c
+++ b/net/tipc/udp_media.c
@@ -113,7 +113,7 @@ static void tipc_udp_media_addr_set(struct tipc_media_addr *addr,
113 memcpy(addr->value, ua, sizeof(struct udp_media_addr)); 113 memcpy(addr->value, ua, sizeof(struct udp_media_addr));
114 114
115 if (tipc_udp_is_mcast_addr(ua)) 115 if (tipc_udp_is_mcast_addr(ua))
116 addr->broadcast = 1; 116 addr->broadcast = TIPC_BROADCAST_SUPPORT;
117} 117}
118 118
119/* tipc_udp_addr2str - convert ip/udp address to string */ 119/* tipc_udp_addr2str - convert ip/udp address to string */
@@ -229,7 +229,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
229 goto out; 229 goto out;
230 } 230 }
231 231
232 if (!addr->broadcast || list_empty(&ub->rcast.list)) 232 if (addr->broadcast != TIPC_REPLICAST_SUPPORT)
233 return tipc_udp_xmit(net, skb, ub, src, dst); 233 return tipc_udp_xmit(net, skb, ub, src, dst);
234 234
235 /* Replicast, send an skb to each configured IP address */ 235 /* Replicast, send an skb to each configured IP address */
@@ -296,7 +296,7 @@ static int tipc_udp_rcast_add(struct tipc_bearer *b,
296 else if (ntohs(addr->proto) == ETH_P_IPV6) 296 else if (ntohs(addr->proto) == ETH_P_IPV6)
297 pr_info("New replicast peer: %pI6\n", &rcast->addr.ipv6); 297 pr_info("New replicast peer: %pI6\n", &rcast->addr.ipv6);
298#endif 298#endif
299 299 b->bcast_addr.broadcast = TIPC_REPLICAST_SUPPORT;
300 list_add_rcu(&rcast->list, &ub->rcast.list); 300 list_add_rcu(&rcast->list, &ub->rcast.list);
301 return 0; 301 return 0;
302} 302}
@@ -681,7 +681,7 @@ static int tipc_udp_enable(struct net *net, struct tipc_bearer *b,
681 goto err; 681 goto err;
682 682
683 b->bcast_addr.media_id = TIPC_MEDIA_TYPE_UDP; 683 b->bcast_addr.media_id = TIPC_MEDIA_TYPE_UDP;
684 b->bcast_addr.broadcast = 1; 684 b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT;
685 rcu_assign_pointer(b->media_ptr, ub); 685 rcu_assign_pointer(b->media_ptr, ub);
686 rcu_assign_pointer(ub->bearer, b); 686 rcu_assign_pointer(ub->bearer, b);
687 tipc_udp_media_addr_set(&b->addr, &local); 687 tipc_udp_media_addr_set(&b->addr, &local);