aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c204
-rw-r--r--net/tipc/bcast.h33
-rw-r--r--net/tipc/bearer.c15
-rw-r--r--net/tipc/bearer.h8
-rw-r--r--net/tipc/link.c87
-rw-r--r--net/tipc/msg.c17
-rw-r--r--net/tipc/msg.h11
-rw-r--r--net/tipc/name_table.c128
-rw-r--r--net/tipc/name_table.h24
-rw-r--r--net/tipc/net.c4
-rw-r--r--net/tipc/node.c54
-rw-r--r--net/tipc/node.h4
-rw-r--r--net/tipc/socket.c527
-rw-r--r--net/tipc/udp_media.c8
14 files changed, 704 insertions, 420 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index aa1babbea385..7d99029df342 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/bcast.c: TIPC broadcast code 2 * net/tipc/bcast.c: TIPC broadcast code
3 * 3 *
4 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB 4 * Copyright (c) 2004-2006, 2014-2016, Ericsson AB
5 * Copyright (c) 2004, Intel Corporation. 5 * Copyright (c) 2004, Intel Corporation.
6 * Copyright (c) 2005, 2010-2011, Wind River Systems 6 * Copyright (c) 2005, 2010-2011, Wind River Systems
7 * All rights reserved. 7 * All rights reserved.
@@ -39,9 +39,8 @@
39#include "socket.h" 39#include "socket.h"
40#include "msg.h" 40#include "msg.h"
41#include "bcast.h" 41#include "bcast.h"
42#include "name_distr.h"
43#include "link.h" 42#include "link.h"
44#include "node.h" 43#include "name_table.h"
45 44
46#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ 45#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */
47#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ 46#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */
@@ -54,12 +53,20 @@ const char tipc_bclink_name[] = "broadcast-link";
54 * @inputq: data input queue; will only carry SOCK_WAKEUP messages 53 * @inputq: data input queue; will only carry SOCK_WAKEUP messages
55 * @dest: array keeping number of reachable destinations per bearer 54 * @dest: array keeping number of reachable destinations per bearer
56 * @primary_bearer: a bearer having links to all broadcast destinations, if any 55 * @primary_bearer: a bearer having links to all broadcast destinations, if any
56 * @bcast_support: indicates if primary bearer, if any, supports broadcast
57 * @rcast_support: indicates if all peer nodes support replicast
58 * @rc_ratio: dest count as percentage of cluster size where send method changes
59 * @bc_threshold: calculated drom rc_ratio; if dests > threshold use broadcast
57 */ 60 */
58struct tipc_bc_base { 61struct tipc_bc_base {
59 struct tipc_link *link; 62 struct tipc_link *link;
60 struct sk_buff_head inputq; 63 struct sk_buff_head inputq;
61 int dests[MAX_BEARERS]; 64 int dests[MAX_BEARERS];
62 int primary_bearer; 65 int primary_bearer;
66 bool bcast_support;
67 bool rcast_support;
68 int rc_ratio;
69 int bc_threshold;
63}; 70};
64 71
65static struct tipc_bc_base *tipc_bc_base(struct net *net) 72static struct tipc_bc_base *tipc_bc_base(struct net *net)
@@ -69,7 +76,20 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
69 76
70int tipc_bcast_get_mtu(struct net *net) 77int tipc_bcast_get_mtu(struct net *net)
71{ 78{
72 return tipc_link_mtu(tipc_bc_sndlink(net)); 79 return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
80}
81
82void tipc_bcast_disable_rcast(struct net *net)
83{
84 tipc_bc_base(net)->rcast_support = false;
85}
86
87static void tipc_bcbase_calc_bc_threshold(struct net *net)
88{
89 struct tipc_bc_base *bb = tipc_bc_base(net);
90 int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net));
91
92 bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100);
73} 93}
74 94
75/* tipc_bcbase_select_primary(): find a bearer with links to all destinations, 95/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
@@ -79,9 +99,10 @@ static void tipc_bcbase_select_primary(struct net *net)
79{ 99{
80 struct tipc_bc_base *bb = tipc_bc_base(net); 100 struct tipc_bc_base *bb = tipc_bc_base(net);
81 int all_dests = tipc_link_bc_peers(bb->link); 101 int all_dests = tipc_link_bc_peers(bb->link);
82 int i, mtu; 102 int i, mtu, prim;
83 103
84 bb->primary_bearer = INVALID_BEARER_ID; 104 bb->primary_bearer = INVALID_BEARER_ID;
105 bb->bcast_support = true;
85 106
86 if (!all_dests) 107 if (!all_dests)
87 return; 108 return;
@@ -93,7 +114,7 @@ static void tipc_bcbase_select_primary(struct net *net)
93 mtu = tipc_bearer_mtu(net, i); 114 mtu = tipc_bearer_mtu(net, i);
94 if (mtu < tipc_link_mtu(bb->link)) 115 if (mtu < tipc_link_mtu(bb->link))
95 tipc_link_set_mtu(bb->link, mtu); 116 tipc_link_set_mtu(bb->link, mtu);
96 117 bb->bcast_support &= tipc_bearer_bcast_support(net, i);
97 if (bb->dests[i] < all_dests) 118 if (bb->dests[i] < all_dests)
98 continue; 119 continue;
99 120
@@ -103,6 +124,9 @@ static void tipc_bcbase_select_primary(struct net *net)
103 if ((i ^ tipc_own_addr(net)) & 1) 124 if ((i ^ tipc_own_addr(net)) & 1)
104 break; 125 break;
105 } 126 }
127 prim = bb->primary_bearer;
128 if (prim != INVALID_BEARER_ID)
129 bb->bcast_support = tipc_bearer_bcast_support(net, prim);
106} 130}
107 131
108void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) 132void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id)
@@ -170,45 +194,131 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
170 __skb_queue_purge(&_xmitq); 194 __skb_queue_purge(&_xmitq);
171} 195}
172 196
173/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster 197static void tipc_bcast_select_xmit_method(struct net *net, int dests,
174 * and to identified node local sockets 198 struct tipc_mc_method *method)
199{
200 struct tipc_bc_base *bb = tipc_bc_base(net);
201 unsigned long exp = method->expires;
202
203 /* Broadcast supported by used bearer/bearers? */
204 if (!bb->bcast_support) {
205 method->rcast = true;
206 return;
207 }
208 /* Any destinations which don't support replicast ? */
209 if (!bb->rcast_support) {
210 method->rcast = false;
211 return;
212 }
213 /* Can current method be changed ? */
214 method->expires = jiffies + TIPC_METHOD_EXPIRE;
215 if (method->mandatory || time_before(jiffies, exp))
216 return;
217
218 /* Determine method to use now */
219 method->rcast = dests <= bb->bc_threshold;
220}
221
222/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
175 * @net: the applicable net namespace 223 * @net: the applicable net namespace
176 * @list: chain of buffers containing message 224 * @pkts: chain of buffers containing message
177 * Consumes the buffer chain, except when returning -ELINKCONG 225 * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
178 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 226 * Consumes the buffer chain.
227 * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
179 */ 228 */
180int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) 229static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
230 u16 *cong_link_cnt)
181{ 231{
182 struct tipc_link *l = tipc_bc_sndlink(net); 232 struct tipc_link *l = tipc_bc_sndlink(net);
183 struct sk_buff_head xmitq, inputq, rcvq; 233 struct sk_buff_head xmitq;
184 int rc = 0; 234 int rc = 0;
185 235
186 __skb_queue_head_init(&rcvq);
187 __skb_queue_head_init(&xmitq); 236 __skb_queue_head_init(&xmitq);
188 skb_queue_head_init(&inputq);
189
190 /* Prepare message clone for local node */
191 if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
192 return -EHOSTUNREACH;
193
194 tipc_bcast_lock(net); 237 tipc_bcast_lock(net);
195 if (tipc_link_bc_peers(l)) 238 if (tipc_link_bc_peers(l))
196 rc = tipc_link_xmit(l, list, &xmitq); 239 rc = tipc_link_xmit(l, pkts, &xmitq);
197 tipc_bcast_unlock(net); 240 tipc_bcast_unlock(net);
198 241 tipc_bcbase_xmit(net, &xmitq);
199 /* Don't send to local node if adding to link failed */ 242 __skb_queue_purge(pkts);
200 if (unlikely(rc)) { 243 if (rc == -ELINKCONG) {
201 __skb_queue_purge(&rcvq); 244 *cong_link_cnt = 1;
202 return rc; 245 rc = 0;
203 } 246 }
247 return rc;
248}
204 249
205 /* Broadcast to all nodes, inluding local node */ 250/* tipc_rcast_xmit - replicate and send a message to given destination nodes
206 tipc_bcbase_xmit(net, &xmitq); 251 * @net: the applicable net namespace
207 tipc_sk_mcast_rcv(net, &rcvq, &inputq); 252 * @pkts: chain of buffers containing message
208 __skb_queue_purge(list); 253 * @dests: list of destination nodes
254 * @cong_link_cnt: returns number of congested links
255 * @cong_links: returns identities of congested links
256 * Returns 0 if success, otherwise errno
257 */
258static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
259 struct tipc_nlist *dests, u16 *cong_link_cnt)
260{
261 struct sk_buff_head _pkts;
262 struct u32_item *n, *tmp;
263 u32 dst, selector;
264
265 selector = msg_link_selector(buf_msg(skb_peek(pkts)));
266 __skb_queue_head_init(&_pkts);
267
268 list_for_each_entry_safe(n, tmp, &dests->list, list) {
269 dst = n->value;
270 if (!tipc_msg_pskb_copy(dst, pkts, &_pkts))
271 return -ENOMEM;
272
273 /* Any other return value than -ELINKCONG is ignored */
274 if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG)
275 (*cong_link_cnt)++;
276 }
209 return 0; 277 return 0;
210} 278}
211 279
280/* tipc_mcast_xmit - deliver message to indicated destination nodes
281 * and to identified node local sockets
282 * @net: the applicable net namespace
283 * @pkts: chain of buffers containing message
284 * @method: send method to be used
285 * @dests: destination nodes for message.
286 * @cong_link_cnt: returns number of encountered congested destination links
287 * Consumes buffer chain.
288 * Returns 0 if success, otherwise errno
289 */
290int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
291 struct tipc_mc_method *method, struct tipc_nlist *dests,
292 u16 *cong_link_cnt)
293{
294 struct sk_buff_head inputq, localq;
295 int rc = 0;
296
297 skb_queue_head_init(&inputq);
298 skb_queue_head_init(&localq);
299
300 /* Clone packets before they are consumed by next call */
301 if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
302 rc = -ENOMEM;
303 goto exit;
304 }
305 /* Send according to determined transmit method */
306 if (dests->remote) {
307 tipc_bcast_select_xmit_method(net, dests->remote, method);
308 if (method->rcast)
309 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
310 else
311 rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
312 }
313
314 if (dests->local)
315 tipc_sk_mcast_rcv(net, &localq, &inputq);
316exit:
317 /* This queue should normally be empty by now */
318 __skb_queue_purge(pkts);
319 return rc;
320}
321
212/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link 322/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
213 * 323 *
214 * RCU is locked, no other locks set 324 * RCU is locked, no other locks set
@@ -313,6 +423,7 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
313 tipc_bcast_lock(net); 423 tipc_bcast_lock(net);
314 tipc_link_add_bc_peer(snd_l, uc_l, xmitq); 424 tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
315 tipc_bcbase_select_primary(net); 425 tipc_bcbase_select_primary(net);
426 tipc_bcbase_calc_bc_threshold(net);
316 tipc_bcast_unlock(net); 427 tipc_bcast_unlock(net);
317} 428}
318 429
@@ -331,6 +442,7 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
331 tipc_bcast_lock(net); 442 tipc_bcast_lock(net);
332 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); 443 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
333 tipc_bcbase_select_primary(net); 444 tipc_bcbase_select_primary(net);
445 tipc_bcbase_calc_bc_threshold(net);
334 tipc_bcast_unlock(net); 446 tipc_bcast_unlock(net);
335 447
336 tipc_bcbase_xmit(net, &xmitq); 448 tipc_bcbase_xmit(net, &xmitq);
@@ -413,6 +525,8 @@ int tipc_bcast_init(struct net *net)
413 goto enomem; 525 goto enomem;
414 bb->link = l; 526 bb->link = l;
415 tn->bcl = l; 527 tn->bcl = l;
528 bb->rc_ratio = 25;
529 bb->rcast_support = true;
416 return 0; 530 return 0;
417enomem: 531enomem:
418 kfree(bb); 532 kfree(bb);
@@ -428,3 +542,33 @@ void tipc_bcast_stop(struct net *net)
428 kfree(tn->bcbase); 542 kfree(tn->bcbase);
429 kfree(tn->bcl); 543 kfree(tn->bcl);
430} 544}
545
546void tipc_nlist_init(struct tipc_nlist *nl, u32 self)
547{
548 memset(nl, 0, sizeof(*nl));
549 INIT_LIST_HEAD(&nl->list);
550 nl->self = self;
551}
552
553void tipc_nlist_add(struct tipc_nlist *nl, u32 node)
554{
555 if (node == nl->self)
556 nl->local = true;
557 else if (u32_push(&nl->list, node))
558 nl->remote++;
559}
560
561void tipc_nlist_del(struct tipc_nlist *nl, u32 node)
562{
563 if (node == nl->self)
564 nl->local = false;
565 else if (u32_del(&nl->list, node))
566 nl->remote--;
567}
568
569void tipc_nlist_purge(struct tipc_nlist *nl)
570{
571 u32_list_purge(&nl->list);
572 nl->remote = 0;
573 nl->local = 0;
574}
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 855d53c64ab3..751530ab0c49 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -42,9 +42,35 @@
42struct tipc_node; 42struct tipc_node;
43struct tipc_msg; 43struct tipc_msg;
44struct tipc_nl_msg; 44struct tipc_nl_msg;
45struct tipc_node_map; 45struct tipc_nlist;
46struct tipc_nitem;
46extern const char tipc_bclink_name[]; 47extern const char tipc_bclink_name[];
47 48
49#define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000)
50
51struct tipc_nlist {
52 struct list_head list;
53 u32 self;
54 u16 remote;
55 bool local;
56};
57
58void tipc_nlist_init(struct tipc_nlist *nl, u32 self);
59void tipc_nlist_purge(struct tipc_nlist *nl);
60void tipc_nlist_add(struct tipc_nlist *nl, u32 node);
61void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
62
63/* Cookie to be used between socket and broadcast layer
64 * @rcast: replicast (instead of broadcast) was used at previous xmit
65 * @mandatory: broadcast/replicast indication was set by user
66 * @expires: re-evaluate non-mandatory transmit method if we are past this
67 */
68struct tipc_mc_method {
69 bool rcast;
70 bool mandatory;
71 unsigned long expires;
72};
73
48int tipc_bcast_init(struct net *net); 74int tipc_bcast_init(struct net *net);
49void tipc_bcast_stop(struct net *net); 75void tipc_bcast_stop(struct net *net);
50void tipc_bcast_add_peer(struct net *net, struct tipc_link *l, 76void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
@@ -53,7 +79,10 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
53void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id); 79void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
54void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id); 80void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
55int tipc_bcast_get_mtu(struct net *net); 81int tipc_bcast_get_mtu(struct net *net);
56int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list); 82void tipc_bcast_disable_rcast(struct net *net);
83int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
84 struct tipc_mc_method *method, struct tipc_nlist *dests,
85 u16 *cong_link_cnt);
57int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb); 86int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
58void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, 87void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
59 struct tipc_msg *hdr); 88 struct tipc_msg *hdr);
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 52d74760fb68..33a5bdfbef76 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -431,7 +431,7 @@ int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b,
431 memset(&b->bcast_addr, 0, sizeof(b->bcast_addr)); 431 memset(&b->bcast_addr, 0, sizeof(b->bcast_addr));
432 memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len); 432 memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len);
433 b->bcast_addr.media_id = b->media->type_id; 433 b->bcast_addr.media_id = b->media->type_id;
434 b->bcast_addr.broadcast = 1; 434 b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT;
435 b->mtu = dev->mtu; 435 b->mtu = dev->mtu;
436 b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr); 436 b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr);
437 rcu_assign_pointer(dev->tipc_ptr, b); 437 rcu_assign_pointer(dev->tipc_ptr, b);
@@ -482,6 +482,19 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
482 return 0; 482 return 0;
483} 483}
484 484
485bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id)
486{
487 bool supp = false;
488 struct tipc_bearer *b;
489
490 rcu_read_lock();
491 b = bearer_get(net, bearer_id);
492 if (b)
493 supp = (b->bcast_addr.broadcast == TIPC_BROADCAST_SUPPORT);
494 rcu_read_unlock();
495 return supp;
496}
497
485int tipc_bearer_mtu(struct net *net, u32 bearer_id) 498int tipc_bearer_mtu(struct net *net, u32 bearer_id)
486{ 499{
487 int mtu = 0; 500 int mtu = 0;
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 278ff7f616f9..635c9086e19a 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -60,9 +60,14 @@
60#define TIPC_MEDIA_TYPE_IB 2 60#define TIPC_MEDIA_TYPE_IB 2
61#define TIPC_MEDIA_TYPE_UDP 3 61#define TIPC_MEDIA_TYPE_UDP 3
62 62
63/* minimum bearer MTU */ 63/* Minimum bearer MTU */
64#define TIPC_MIN_BEARER_MTU (MAX_H_SIZE + INT_H_SIZE) 64#define TIPC_MIN_BEARER_MTU (MAX_H_SIZE + INT_H_SIZE)
65 65
66/* Identifiers for distinguishing between broadcast/multicast and replicast
67 */
68#define TIPC_BROADCAST_SUPPORT 1
69#define TIPC_REPLICAST_SUPPORT 2
70
66/** 71/**
67 * struct tipc_media_addr - destination address used by TIPC bearers 72 * struct tipc_media_addr - destination address used by TIPC bearers
68 * @value: address info (format defined by media) 73 * @value: address info (format defined by media)
@@ -210,6 +215,7 @@ int tipc_bearer_setup(void);
210void tipc_bearer_cleanup(void); 215void tipc_bearer_cleanup(void);
211void tipc_bearer_stop(struct net *net); 216void tipc_bearer_stop(struct net *net);
212int tipc_bearer_mtu(struct net *net, u32 bearer_id); 217int tipc_bearer_mtu(struct net *net, u32 bearer_id);
218bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id);
213void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id, 219void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
214 struct sk_buff *skb, 220 struct sk_buff *skb,
215 struct tipc_media_addr *dest); 221 struct tipc_media_addr *dest);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 4e8647aef01c..ddd2dd6f77aa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -515,6 +515,10 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
515 if (link_is_bc_sndlink(l)) 515 if (link_is_bc_sndlink(l))
516 l->state = LINK_ESTABLISHED; 516 l->state = LINK_ESTABLISHED;
517 517
518 /* Disable replicast if even a single peer doesn't support it */
519 if (link_is_bc_rcvlink(l) && !(peer_caps & TIPC_BCAST_RCAST))
520 tipc_bcast_disable_rcast(net);
521
518 return true; 522 return true;
519} 523}
520 524
@@ -776,60 +780,47 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
776 780
777/** 781/**
778 * link_schedule_user - schedule a message sender for wakeup after congestion 782 * link_schedule_user - schedule a message sender for wakeup after congestion
779 * @link: congested link 783 * @l: congested link
780 * @list: message that was attempted sent 784 * @hdr: header of message that is being sent
781 * Create pseudo msg to send back to user when congestion abates 785 * Create pseudo msg to send back to user when congestion abates
782 * Does not consume buffer list
783 */ 786 */
784static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list) 787static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
785{ 788{
786 struct tipc_msg *msg = buf_msg(skb_peek(list)); 789 u32 dnode = tipc_own_addr(l->net);
787 int imp = msg_importance(msg); 790 u32 dport = msg_origport(hdr);
788 u32 oport = msg_origport(msg);
789 u32 addr = tipc_own_addr(link->net);
790 struct sk_buff *skb; 791 struct sk_buff *skb;
791 792
792 /* This really cannot happen... */
793 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
794 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
795 return -ENOBUFS;
796 }
797 /* Non-blocking sender: */
798 if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
799 return -ELINKCONG;
800
801 /* Create and schedule wakeup pseudo message */ 793 /* Create and schedule wakeup pseudo message */
802 skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, 794 skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
803 addr, addr, oport, 0, 0); 795 dnode, l->addr, dport, 0, 0);
804 if (!skb) 796 if (!skb)
805 return -ENOBUFS; 797 return -ENOBUFS;
806 TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list); 798 msg_set_dest_droppable(buf_msg(skb), true);
807 TIPC_SKB_CB(skb)->chain_imp = imp; 799 TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr);
808 skb_queue_tail(&link->wakeupq, skb); 800 skb_queue_tail(&l->wakeupq, skb);
809 link->stats.link_congs++; 801 l->stats.link_congs++;
810 return -ELINKCONG; 802 return -ELINKCONG;
811} 803}
812 804
813/** 805/**
814 * link_prepare_wakeup - prepare users for wakeup after congestion 806 * link_prepare_wakeup - prepare users for wakeup after congestion
815 * @link: congested link 807 * @l: congested link
816 * Move a number of waiting users, as permitted by available space in 808 * Wake up a number of waiting users, as permitted by available space
817 * the send queue, from link wait queue to node wait queue for wakeup 809 * in the send queue
818 */ 810 */
819void link_prepare_wakeup(struct tipc_link *l) 811void link_prepare_wakeup(struct tipc_link *l)
820{ 812{
821 int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
822 int imp, lim;
823 struct sk_buff *skb, *tmp; 813 struct sk_buff *skb, *tmp;
814 int imp, i = 0;
824 815
825 skb_queue_walk_safe(&l->wakeupq, skb, tmp) { 816 skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
826 imp = TIPC_SKB_CB(skb)->chain_imp; 817 imp = TIPC_SKB_CB(skb)->chain_imp;
827 lim = l->backlog[imp].limit; 818 if (l->backlog[imp].len < l->backlog[imp].limit) {
828 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; 819 skb_unlink(skb, &l->wakeupq);
829 if ((pnd[imp] + l->backlog[imp].len) >= lim) 820 skb_queue_tail(l->inputq, skb);
821 } else if (i++ > 10) {
830 break; 822 break;
831 skb_unlink(skb, &l->wakeupq); 823 }
832 skb_queue_tail(l->inputq, skb);
833 } 824 }
834} 825}
835 826
@@ -869,8 +860,7 @@ void tipc_link_reset(struct tipc_link *l)
869 * @list: chain of buffers containing message 860 * @list: chain of buffers containing message
870 * @xmitq: returned list of packets to be sent by caller 861 * @xmitq: returned list of packets to be sent by caller
871 * 862 *
872 * Consumes the buffer chain, except when returning -ELINKCONG, 863 * Consumes the buffer chain.
873 * since the caller then may want to make more send attempts.
874 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS 864 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
875 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted 865 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
876 */ 866 */
@@ -879,7 +869,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
879{ 869{
880 struct tipc_msg *hdr = buf_msg(skb_peek(list)); 870 struct tipc_msg *hdr = buf_msg(skb_peek(list));
881 unsigned int maxwin = l->window; 871 unsigned int maxwin = l->window;
882 unsigned int i, imp = msg_importance(hdr); 872 int imp = msg_importance(hdr);
883 unsigned int mtu = l->mtu; 873 unsigned int mtu = l->mtu;
884 u16 ack = l->rcv_nxt - 1; 874 u16 ack = l->rcv_nxt - 1;
885 u16 seqno = l->snd_nxt; 875 u16 seqno = l->snd_nxt;
@@ -888,19 +878,22 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
888 struct sk_buff_head *backlogq = &l->backlogq; 878 struct sk_buff_head *backlogq = &l->backlogq;
889 struct sk_buff *skb, *_skb, *bskb; 879 struct sk_buff *skb, *_skb, *bskb;
890 int pkt_cnt = skb_queue_len(list); 880 int pkt_cnt = skb_queue_len(list);
881 int rc = 0;
891 882
892 /* Match msg importance against this and all higher backlog limits: */
893 if (!skb_queue_empty(backlogq)) {
894 for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
895 if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
896 return link_schedule_user(l, list);
897 }
898 }
899 if (unlikely(msg_size(hdr) > mtu)) { 883 if (unlikely(msg_size(hdr) > mtu)) {
900 skb_queue_purge(list); 884 skb_queue_purge(list);
901 return -EMSGSIZE; 885 return -EMSGSIZE;
902 } 886 }
903 887
888 /* Allow oversubscription of one data msg per source at congestion */
889 if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) {
890 if (imp == TIPC_SYSTEM_IMPORTANCE) {
891 pr_warn("%s<%s>, link overflow", link_rst_msg, l->name);
892 return -ENOBUFS;
893 }
894 rc = link_schedule_user(l, hdr);
895 }
896
904 if (pkt_cnt > 1) { 897 if (pkt_cnt > 1) {
905 l->stats.sent_fragmented++; 898 l->stats.sent_fragmented++;
906 l->stats.sent_fragments += pkt_cnt; 899 l->stats.sent_fragments += pkt_cnt;
@@ -946,7 +939,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
946 skb_queue_splice_tail_init(list, backlogq); 939 skb_queue_splice_tail_init(list, backlogq);
947 } 940 }
948 l->snd_nxt = seqno; 941 l->snd_nxt = seqno;
949 return 0; 942 return rc;
950} 943}
951 944
952void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) 945void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
@@ -1043,11 +1036,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
1043static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, 1036static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1044 struct sk_buff_head *inputq) 1037 struct sk_buff_head *inputq)
1045{ 1038{
1046 switch (msg_user(buf_msg(skb))) { 1039 struct tipc_msg *hdr = buf_msg(skb);
1040
1041 switch (msg_user(hdr)) {
1047 case TIPC_LOW_IMPORTANCE: 1042 case TIPC_LOW_IMPORTANCE:
1048 case TIPC_MEDIUM_IMPORTANCE: 1043 case TIPC_MEDIUM_IMPORTANCE:
1049 case TIPC_HIGH_IMPORTANCE: 1044 case TIPC_HIGH_IMPORTANCE:
1050 case TIPC_CRITICAL_IMPORTANCE: 1045 case TIPC_CRITICAL_IMPORTANCE:
1046 if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
1047 skb_queue_tail(l->bc_rcvlink->inputq, skb);
1048 return true;
1049 }
1051 case CONN_MANAGER: 1050 case CONN_MANAGER:
1052 skb_queue_tail(inputq, skb); 1051 skb_queue_tail(inputq, skb);
1053 return true; 1052 return true;
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index ab02d0742476..312ef7de57d7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -607,6 +607,23 @@ error:
607 return false; 607 return false;
608} 608}
609 609
610bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
611 struct sk_buff_head *cpy)
612{
613 struct sk_buff *skb, *_skb;
614
615 skb_queue_walk(msg, skb) {
616 _skb = pskb_copy(skb, GFP_ATOMIC);
617 if (!_skb) {
618 __skb_queue_purge(cpy);
619 return false;
620 }
621 msg_set_destnode(buf_msg(_skb), dst);
622 __skb_queue_tail(cpy, _skb);
623 }
624 return true;
625}
626
610/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number 627/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
611 * @list: list to be appended to 628 * @list: list to be appended to
612 * @seqno: sequence number of buffer to add 629 * @seqno: sequence number of buffer to add
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 2c3dc38abf9c..c843fd2bc48d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -98,8 +98,6 @@ struct tipc_skb_cb {
98 u32 bytes_read; 98 u32 bytes_read;
99 struct sk_buff *tail; 99 struct sk_buff *tail;
100 bool validated; 100 bool validated;
101 bool wakeup_pending;
102 u16 chain_sz;
103 u16 chain_imp; 101 u16 chain_imp;
104 u16 ackers; 102 u16 ackers;
105}; 103};
@@ -633,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg *m, u32 id)
633 631
634static inline u32 msg_link_selector(struct tipc_msg *m) 632static inline u32 msg_link_selector(struct tipc_msg *m)
635{ 633{
634 if (msg_user(m) == MSG_FRAGMENTER)
635 m = (void *)msg_data(m);
636 return msg_bits(m, 4, 0, 1); 636 return msg_bits(m, 4, 0, 1);
637} 637}
638 638
639static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
640{
641 msg_set_bits(m, 4, 0, 1, n);
642}
643
644/* 639/*
645 * Word 5 640 * Word 5
646 */ 641 */
@@ -837,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
837 int offset, int dsz, int mtu, struct sk_buff_head *list); 832 int offset, int dsz, int mtu, struct sk_buff_head *list);
838bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); 833bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
839bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq); 834bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
835bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
836 struct sk_buff_head *cpy);
840void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, 837void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
841 struct sk_buff *skb); 838 struct sk_buff *skb);
842 839
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index e190460fe0d3..9be6592e4a6f 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -608,7 +608,7 @@ not_found:
608 * Returns non-zero if any off-node ports overlap 608 * Returns non-zero if any off-node ports overlap
609 */ 609 */
610int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 610int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
611 u32 limit, struct tipc_plist *dports) 611 u32 limit, struct list_head *dports)
612{ 612{
613 struct name_seq *seq; 613 struct name_seq *seq;
614 struct sub_seq *sseq; 614 struct sub_seq *sseq;
@@ -633,7 +633,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
633 info = sseq->info; 633 info = sseq->info;
634 list_for_each_entry(publ, &info->node_list, node_list) { 634 list_for_each_entry(publ, &info->node_list, node_list) {
635 if (publ->scope <= limit) 635 if (publ->scope <= limit)
636 tipc_plist_push(dports, publ->ref); 636 u32_push(dports, publ->ref);
637 } 637 }
638 638
639 if (info->cluster_list_size != info->node_list_size) 639 if (info->cluster_list_size != info->node_list_size)
@@ -645,6 +645,39 @@ exit:
645 return res; 645 return res;
646} 646}
647 647
648/* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes
649 * - Creates list of nodes that overlap the given multicast address
650 * - Determines if any node local ports overlap
651 */
652void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
653 u32 upper, u32 domain,
654 struct tipc_nlist *nodes)
655{
656 struct sub_seq *sseq, *stop;
657 struct publication *publ;
658 struct name_info *info;
659 struct name_seq *seq;
660
661 rcu_read_lock();
662 seq = nametbl_find_seq(net, type);
663 if (!seq)
664 goto exit;
665
666 spin_lock_bh(&seq->lock);
667 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
668 stop = seq->sseqs + seq->first_free;
669 for (; sseq->lower <= upper && sseq != stop; sseq++) {
670 info = sseq->info;
671 list_for_each_entry(publ, &info->zone_list, zone_list) {
672 if (tipc_in_scope(domain, publ->node))
673 tipc_nlist_add(nodes, publ->node);
674 }
675 }
676 spin_unlock_bh(&seq->lock);
677exit:
678 rcu_read_unlock();
679}
680
648/* 681/*
649 * tipc_nametbl_publish - add name publication to network name tables 682 * tipc_nametbl_publish - add name publication to network name tables
650 */ 683 */
@@ -1022,40 +1055,79 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
1022 return skb->len; 1055 return skb->len;
1023} 1056}
1024 1057
1025void tipc_plist_push(struct tipc_plist *pl, u32 port) 1058bool u32_find(struct list_head *l, u32 value)
1026{ 1059{
1027 struct tipc_plist *nl; 1060 struct u32_item *item;
1028 1061
1029 if (likely(!pl->port)) { 1062 list_for_each_entry(item, l, list) {
1030 pl->port = port; 1063 if (item->value == value)
1031 return; 1064 return true;
1032 } 1065 }
1033 if (pl->port == port) 1066 return false;
1034 return; 1067}
1035 list_for_each_entry(nl, &pl->list, list) { 1068
1036 if (nl->port == port) 1069bool u32_push(struct list_head *l, u32 value)
1037 return; 1070{
1071 struct u32_item *item;
1072
1073 list_for_each_entry(item, l, list) {
1074 if (item->value == value)
1075 return false;
1076 }
1077 item = kmalloc(sizeof(*item), GFP_ATOMIC);
1078 if (unlikely(!item))
1079 return false;
1080
1081 item->value = value;
1082 list_add(&item->list, l);
1083 return true;
1084}
1085
1086u32 u32_pop(struct list_head *l)
1087{
1088 struct u32_item *item;
1089 u32 value = 0;
1090
1091 if (list_empty(l))
1092 return 0;
1093 item = list_first_entry(l, typeof(*item), list);
1094 value = item->value;
1095 list_del(&item->list);
1096 kfree(item);
1097 return value;
1098}
1099
1100bool u32_del(struct list_head *l, u32 value)
1101{
1102 struct u32_item *item, *tmp;
1103
1104 list_for_each_entry_safe(item, tmp, l, list) {
1105 if (item->value != value)
1106 continue;
1107 list_del(&item->list);
1108 kfree(item);
1109 return true;
1038 } 1110 }
1039 nl = kmalloc(sizeof(*nl), GFP_ATOMIC); 1111 return false;
1040 if (nl) { 1112}
1041 nl->port = port; 1113
1042 list_add(&nl->list, &pl->list); 1114void u32_list_purge(struct list_head *l)
1115{
1116 struct u32_item *item, *tmp;
1117
1118 list_for_each_entry_safe(item, tmp, l, list) {
1119 list_del(&item->list);
1120 kfree(item);
1043 } 1121 }
1044} 1122}
1045 1123
1046u32 tipc_plist_pop(struct tipc_plist *pl) 1124int u32_list_len(struct list_head *l)
1047{ 1125{
1048 struct tipc_plist *nl; 1126 struct u32_item *item;
1049 u32 port = 0; 1127 int i = 0;
1050 1128
1051 if (likely(list_empty(&pl->list))) { 1129 list_for_each_entry(item, l, list) {
1052 port = pl->port; 1130 i++;
1053 pl->port = 0;
1054 return port;
1055 } 1131 }
1056 nl = list_first_entry(&pl->list, typeof(*nl), list); 1132 return i;
1057 port = nl->port;
1058 list_del(&nl->list);
1059 kfree(nl);
1060 return port;
1061} 1133}
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 1524a73830f7..6ebdeb1d84a5 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -39,6 +39,7 @@
39 39
40struct tipc_subscription; 40struct tipc_subscription;
41struct tipc_plist; 41struct tipc_plist;
42struct tipc_nlist;
42 43
43/* 44/*
44 * TIPC name types reserved for internal TIPC use (both current and planned) 45 * TIPC name types reserved for internal TIPC use (both current and planned)
@@ -99,7 +100,10 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
99 100
100u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); 101u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
101int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 102int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
102 u32 limit, struct tipc_plist *dports); 103 u32 limit, struct list_head *dports);
104void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
105 u32 upper, u32 domain,
106 struct tipc_nlist *nodes);
103struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, 107struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
104 u32 upper, u32 scope, u32 port_ref, 108 u32 upper, u32 scope, u32 port_ref,
105 u32 key); 109 u32 key);
@@ -116,18 +120,16 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
116int tipc_nametbl_init(struct net *net); 120int tipc_nametbl_init(struct net *net);
117void tipc_nametbl_stop(struct net *net); 121void tipc_nametbl_stop(struct net *net);
118 122
119struct tipc_plist { 123struct u32_item {
120 struct list_head list; 124 struct list_head list;
121 u32 port; 125 u32 value;
122}; 126};
123 127
124static inline void tipc_plist_init(struct tipc_plist *pl) 128bool u32_push(struct list_head *l, u32 value);
125{ 129u32 u32_pop(struct list_head *l);
126 INIT_LIST_HEAD(&pl->list); 130bool u32_find(struct list_head *l, u32 value);
127 pl->port = 0; 131bool u32_del(struct list_head *l, u32 value);
128} 132void u32_list_purge(struct list_head *l);
129 133int u32_list_len(struct list_head *l);
130void tipc_plist_push(struct tipc_plist *pl, u32 port);
131u32 tipc_plist_pop(struct tipc_plist *pl);
132 134
133#endif 135#endif
diff --git a/net/tipc/net.c b/net/tipc/net.c
index 28bf4feeb81c..ab8a2d5d1e32 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -110,6 +110,10 @@ int tipc_net_start(struct net *net, u32 addr)
110 char addr_string[16]; 110 char addr_string[16];
111 111
112 tn->own_addr = addr; 112 tn->own_addr = addr;
113
114 /* Ensure that the new address is visible before we reinit. */
115 smp_mb();
116
113 tipc_named_reinit(net); 117 tipc_named_reinit(net);
114 tipc_sk_reinit(net); 118 tipc_sk_reinit(net);
115 119
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 27753325e06e..4512e83652b1 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1172,7 +1172,7 @@ msg_full:
1172 * @list: chain of buffers containing message 1172 * @list: chain of buffers containing message
1173 * @dnode: address of destination node 1173 * @dnode: address of destination node
1174 * @selector: a number used for deterministic link selection 1174 * @selector: a number used for deterministic link selection
1175 * Consumes the buffer chain, except when returning -ELINKCONG 1175 * Consumes the buffer chain.
1176 * Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF 1176 * Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF
1177 */ 1177 */
1178int tipc_node_xmit(struct net *net, struct sk_buff_head *list, 1178int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
@@ -1211,10 +1211,10 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1211 spin_unlock_bh(&le->lock); 1211 spin_unlock_bh(&le->lock);
1212 tipc_node_read_unlock(n); 1212 tipc_node_read_unlock(n);
1213 1213
1214 if (likely(rc == 0)) 1214 if (unlikely(rc == -ENOBUFS))
1215 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1216 else if (rc == -ENOBUFS)
1217 tipc_node_link_down(n, bearer_id, false); 1215 tipc_node_link_down(n, bearer_id, false);
1216 else
1217 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1218 1218
1219 tipc_node_put(n); 1219 tipc_node_put(n);
1220 1220
@@ -1226,20 +1226,15 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1226 * messages, which will not be rejected 1226 * messages, which will not be rejected
1227 * The only exception is datagram messages rerouted after secondary 1227 * The only exception is datagram messages rerouted after secondary
1228 * lookup, which are rare and safe to dispose of anyway. 1228 * lookup, which are rare and safe to dispose of anyway.
1229 * TODO: Return real return value, and let callers use
1230 * tipc_wait_for_sendpkt() where applicable
1231 */ 1229 */
1232int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, 1230int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1233 u32 selector) 1231 u32 selector)
1234{ 1232{
1235 struct sk_buff_head head; 1233 struct sk_buff_head head;
1236 int rc;
1237 1234
1238 skb_queue_head_init(&head); 1235 skb_queue_head_init(&head);
1239 __skb_queue_tail(&head, skb); 1236 __skb_queue_tail(&head, skb);
1240 rc = tipc_node_xmit(net, &head, dnode, selector); 1237 tipc_node_xmit(net, &head, dnode, selector);
1241 if (rc == -ELINKCONG)
1242 kfree_skb(skb);
1243 return 0; 1238 return 0;
1244} 1239}
1245 1240
@@ -1267,6 +1262,19 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
1267 kfree_skb(skb); 1262 kfree_skb(skb);
1268} 1263}
1269 1264
1265static void tipc_node_mcast_rcv(struct tipc_node *n)
1266{
1267 struct tipc_bclink_entry *be = &n->bc_entry;
1268
1269 /* 'arrvq' is under inputq2's lock protection */
1270 spin_lock_bh(&be->inputq2.lock);
1271 spin_lock_bh(&be->inputq1.lock);
1272 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1273 spin_unlock_bh(&be->inputq1.lock);
1274 spin_unlock_bh(&be->inputq2.lock);
1275 tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2);
1276}
1277
1270static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr, 1278static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
1271 int bearer_id, struct sk_buff_head *xmitq) 1279 int bearer_id, struct sk_buff_head *xmitq)
1272{ 1280{
@@ -1340,15 +1348,8 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
1340 if (!skb_queue_empty(&xmitq)) 1348 if (!skb_queue_empty(&xmitq))
1341 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); 1349 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1342 1350
1343 /* Deliver. 'arrvq' is under inputq2's lock protection */ 1351 if (!skb_queue_empty(&be->inputq1))
1344 if (!skb_queue_empty(&be->inputq1)) { 1352 tipc_node_mcast_rcv(n);
1345 spin_lock_bh(&be->inputq2.lock);
1346 spin_lock_bh(&be->inputq1.lock);
1347 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1348 spin_unlock_bh(&be->inputq1.lock);
1349 spin_unlock_bh(&be->inputq2.lock);
1350 tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
1351 }
1352 1353
1353 if (rc & TIPC_LINK_DOWN_EVT) { 1354 if (rc & TIPC_LINK_DOWN_EVT) {
1354 /* Reception reassembly failure => reset all links to peer */ 1355 /* Reception reassembly failure => reset all links to peer */
@@ -1504,19 +1505,21 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1504{ 1505{
1505 struct sk_buff_head xmitq; 1506 struct sk_buff_head xmitq;
1506 struct tipc_node *n; 1507 struct tipc_node *n;
1507 struct tipc_msg *hdr = buf_msg(skb); 1508 struct tipc_msg *hdr;
1508 int usr = msg_user(hdr);
1509 int bearer_id = b->identity; 1509 int bearer_id = b->identity;
1510 struct tipc_link_entry *le; 1510 struct tipc_link_entry *le;
1511 u16 bc_ack = msg_bcast_ack(hdr);
1512 u32 self = tipc_own_addr(net); 1511 u32 self = tipc_own_addr(net);
1513 int rc = 0; 1512 int usr, rc = 0;
1513 u16 bc_ack;
1514 1514
1515 __skb_queue_head_init(&xmitq); 1515 __skb_queue_head_init(&xmitq);
1516 1516
1517 /* Ensure message is well-formed */ 1517 /* Ensure message is well-formed before touching the header */
1518 if (unlikely(!tipc_msg_validate(skb))) 1518 if (unlikely(!tipc_msg_validate(skb)))
1519 goto discard; 1519 goto discard;
1520 hdr = buf_msg(skb);
1521 usr = msg_user(hdr);
1522 bc_ack = msg_bcast_ack(hdr);
1520 1523
1521 /* Handle arrival of discovery or broadcast packet */ 1524 /* Handle arrival of discovery or broadcast packet */
1522 if (unlikely(msg_non_seq(hdr))) { 1525 if (unlikely(msg_non_seq(hdr))) {
@@ -1575,6 +1578,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1575 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq))) 1578 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
1576 tipc_named_rcv(net, &n->bc_entry.namedq); 1579 tipc_named_rcv(net, &n->bc_entry.namedq);
1577 1580
1581 if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
1582 tipc_node_mcast_rcv(n);
1583
1578 if (!skb_queue_empty(&le->inputq)) 1584 if (!skb_queue_empty(&le->inputq))
1579 tipc_sk_rcv(net, &le->inputq); 1585 tipc_sk_rcv(net, &le->inputq);
1580 1586
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 39ef54c1f2ad..898c22916984 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -47,11 +47,13 @@
47enum { 47enum {
48 TIPC_BCAST_SYNCH = (1 << 1), 48 TIPC_BCAST_SYNCH = (1 << 1),
49 TIPC_BCAST_STATE_NACK = (1 << 2), 49 TIPC_BCAST_STATE_NACK = (1 << 2),
50 TIPC_BLOCK_FLOWCTL = (1 << 3) 50 TIPC_BLOCK_FLOWCTL = (1 << 3),
51 TIPC_BCAST_RCAST = (1 << 4)
51}; 52};
52 53
53#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ 54#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \
54 TIPC_BCAST_STATE_NACK | \ 55 TIPC_BCAST_STATE_NACK | \
56 TIPC_BCAST_RCAST | \
55 TIPC_BLOCK_FLOWCTL) 57 TIPC_BLOCK_FLOWCTL)
56#define INVALID_BEARER_ID -1 58#define INVALID_BEARER_ID -1
57 59
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 800caaa699a1..43e4045e72bc 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -35,6 +35,8 @@
35 */ 35 */
36 36
37#include <linux/rhashtable.h> 37#include <linux/rhashtable.h>
38#include <linux/sched/signal.h>
39
38#include "core.h" 40#include "core.h"
39#include "name_table.h" 41#include "name_table.h"
40#include "node.h" 42#include "node.h"
@@ -67,16 +69,19 @@ enum {
67 * @max_pkt: maximum packet size "hint" used when building messages sent by port 69 * @max_pkt: maximum packet size "hint" used when building messages sent by port
68 * @portid: unique port identity in TIPC socket hash table 70 * @portid: unique port identity in TIPC socket hash table
69 * @phdr: preformatted message header used when sending messages 71 * @phdr: preformatted message header used when sending messages
72 * #cong_links: list of congested links
70 * @publications: list of publications for port 73 * @publications: list of publications for port
74 * @blocking_link: address of the congested link we are currently sleeping on
71 * @pub_count: total # of publications port has made during its lifetime 75 * @pub_count: total # of publications port has made during its lifetime
72 * @probing_state: 76 * @probing_state:
73 * @conn_timeout: the time we can wait for an unresponded setup request 77 * @conn_timeout: the time we can wait for an unresponded setup request
74 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue 78 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
75 * @link_cong: non-zero if owner must sleep because of link congestion 79 * @cong_link_cnt: number of congested links
76 * @sent_unacked: # messages sent by socket, and not yet acked by peer 80 * @sent_unacked: # messages sent by socket, and not yet acked by peer
77 * @rcv_unacked: # messages read by user, but not yet acked back to peer 81 * @rcv_unacked: # messages read by user, but not yet acked back to peer
78 * @peer: 'connected' peer for dgram/rdm 82 * @peer: 'connected' peer for dgram/rdm
79 * @node: hash table node 83 * @node: hash table node
84 * @mc_method: cookie for use between socket and broadcast layer
80 * @rcu: rcu struct for tipc_sock 85 * @rcu: rcu struct for tipc_sock
81 */ 86 */
82struct tipc_sock { 87struct tipc_sock {
@@ -87,13 +92,13 @@ struct tipc_sock {
87 u32 max_pkt; 92 u32 max_pkt;
88 u32 portid; 93 u32 portid;
89 struct tipc_msg phdr; 94 struct tipc_msg phdr;
90 struct list_head sock_list; 95 struct list_head cong_links;
91 struct list_head publications; 96 struct list_head publications;
92 u32 pub_count; 97 u32 pub_count;
93 uint conn_timeout; 98 uint conn_timeout;
94 atomic_t dupl_rcvcnt; 99 atomic_t dupl_rcvcnt;
95 bool probe_unacked; 100 bool probe_unacked;
96 bool link_cong; 101 u16 cong_link_cnt;
97 u16 snt_unacked; 102 u16 snt_unacked;
98 u16 snd_win; 103 u16 snd_win;
99 u16 peer_caps; 104 u16 peer_caps;
@@ -101,6 +106,7 @@ struct tipc_sock {
101 u16 rcv_win; 106 u16 rcv_win;
102 struct sockaddr_tipc peer; 107 struct sockaddr_tipc peer;
103 struct rhash_head node; 108 struct rhash_head node;
109 struct tipc_mc_method mc_method;
104 struct rcu_head rcu; 110 struct rcu_head rcu;
105}; 111};
106 112
@@ -110,7 +116,6 @@ static void tipc_write_space(struct sock *sk);
110static void tipc_sock_destruct(struct sock *sk); 116static void tipc_sock_destruct(struct sock *sk);
111static int tipc_release(struct socket *sock); 117static int tipc_release(struct socket *sock);
112static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); 118static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
113static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
114static void tipc_sk_timeout(unsigned long data); 119static void tipc_sk_timeout(unsigned long data);
115static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, 120static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
116 struct tipc_name_seq const *seq); 121 struct tipc_name_seq const *seq);
@@ -119,8 +124,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
119static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); 124static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
120static int tipc_sk_insert(struct tipc_sock *tsk); 125static int tipc_sk_insert(struct tipc_sock *tsk);
121static void tipc_sk_remove(struct tipc_sock *tsk); 126static void tipc_sk_remove(struct tipc_sock *tsk);
122static int __tipc_send_stream(struct socket *sock, struct msghdr *m, 127static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
123 size_t dsz);
124static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); 128static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
125 129
126static const struct proto_ops packet_ops; 130static const struct proto_ops packet_ops;
@@ -334,6 +338,49 @@ static int tipc_set_sk_state(struct sock *sk, int state)
334 return res; 338 return res;
335} 339}
336 340
341static int tipc_sk_sock_err(struct socket *sock, long *timeout)
342{
343 struct sock *sk = sock->sk;
344 int err = sock_error(sk);
345 int typ = sock->type;
346
347 if (err)
348 return err;
349 if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
350 if (sk->sk_state == TIPC_DISCONNECTING)
351 return -EPIPE;
352 else if (!tipc_sk_connected(sk))
353 return -ENOTCONN;
354 }
355 if (!*timeout)
356 return -EAGAIN;
357 if (signal_pending(current))
358 return sock_intr_errno(*timeout);
359
360 return 0;
361}
362
363#define tipc_wait_for_cond(sock_, timeout_, condition_) \
364({ \
365 int rc_ = 0; \
366 int done_ = 0; \
367 \
368 while (!(condition_) && !done_) { \
369 struct sock *sk_ = sock->sk; \
370 DEFINE_WAIT_FUNC(wait_, woken_wake_function); \
371 \
372 rc_ = tipc_sk_sock_err(sock_, timeout_); \
373 if (rc_) \
374 break; \
375 prepare_to_wait(sk_sleep(sk_), &wait_, \
376 TASK_INTERRUPTIBLE); \
377 done_ = sk_wait_event(sk_, timeout_, \
378 (condition_), &wait_); \
379 remove_wait_queue(sk_sleep(sk_), &wait_); \
380 } \
381 rc_; \
382})
383
337/** 384/**
338 * tipc_sk_create - create a TIPC socket 385 * tipc_sk_create - create a TIPC socket
339 * @net: network namespace (must be default network) 386 * @net: network namespace (must be default network)
@@ -382,10 +429,9 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
382 tsk = tipc_sk(sk); 429 tsk = tipc_sk(sk);
383 tsk->max_pkt = MAX_PKT_DEFAULT; 430 tsk->max_pkt = MAX_PKT_DEFAULT;
384 INIT_LIST_HEAD(&tsk->publications); 431 INIT_LIST_HEAD(&tsk->publications);
432 INIT_LIST_HEAD(&tsk->cong_links);
385 msg = &tsk->phdr; 433 msg = &tsk->phdr;
386 tn = net_generic(sock_net(sk), tipc_net_id); 434 tn = net_generic(sock_net(sk), tipc_net_id);
387 tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
388 NAMED_H_SIZE, 0);
389 435
390 /* Finish initializing socket data structures */ 436 /* Finish initializing socket data structures */
391 sock->ops = ops; 437 sock->ops = ops;
@@ -395,6 +441,13 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
395 pr_warn("Socket create failed; port number exhausted\n"); 441 pr_warn("Socket create failed; port number exhausted\n");
396 return -EINVAL; 442 return -EINVAL;
397 } 443 }
444
445 /* Ensure tsk is visible before we read own_addr. */
446 smp_mb();
447
448 tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
449 NAMED_H_SIZE, 0);
450
398 msg_set_origport(msg, tsk->portid); 451 msg_set_origport(msg, tsk->portid);
399 setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk); 452 setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
400 sk->sk_shutdown = 0; 453 sk->sk_shutdown = 0;
@@ -432,9 +485,14 @@ static void __tipc_shutdown(struct socket *sock, int error)
432 struct sock *sk = sock->sk; 485 struct sock *sk = sock->sk;
433 struct tipc_sock *tsk = tipc_sk(sk); 486 struct tipc_sock *tsk = tipc_sk(sk);
434 struct net *net = sock_net(sk); 487 struct net *net = sock_net(sk);
488 long timeout = CONN_TIMEOUT_DEFAULT;
435 u32 dnode = tsk_peer_node(tsk); 489 u32 dnode = tsk_peer_node(tsk);
436 struct sk_buff *skb; 490 struct sk_buff *skb;
437 491
492 /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
493 tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
494 !tsk_conn_cong(tsk)));
495
438 /* Reject all unreceived messages, except on an active connection 496 /* Reject all unreceived messages, except on an active connection
439 * (which disconnects locally & sends a 'FIN+' to peer). 497 * (which disconnects locally & sends a 'FIN+' to peer).
440 */ 498 */
@@ -505,7 +563,8 @@ static int tipc_release(struct socket *sock)
505 563
506 /* Reject any messages that accumulated in backlog queue */ 564 /* Reject any messages that accumulated in backlog queue */
507 release_sock(sk); 565 release_sock(sk);
508 566 u32_list_purge(&tsk->cong_links);
567 tsk->cong_link_cnt = 0;
509 call_rcu(&tsk->rcu, tipc_sk_callback); 568 call_rcu(&tsk->rcu, tipc_sk_callback);
510 sock->sk = NULL; 569 sock->sk = NULL;
511 570
@@ -648,7 +707,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
648 707
649 switch (sk->sk_state) { 708 switch (sk->sk_state) {
650 case TIPC_ESTABLISHED: 709 case TIPC_ESTABLISHED:
651 if (!tsk->link_cong && !tsk_conn_cong(tsk)) 710 if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
652 mask |= POLLOUT; 711 mask |= POLLOUT;
653 /* fall thru' */ 712 /* fall thru' */
654 case TIPC_LISTEN: 713 case TIPC_LISTEN:
@@ -657,7 +716,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
657 mask |= (POLLIN | POLLRDNORM); 716 mask |= (POLLIN | POLLRDNORM);
658 break; 717 break;
659 case TIPC_OPEN: 718 case TIPC_OPEN:
660 if (!tsk->link_cong) 719 if (!tsk->cong_link_cnt)
661 mask |= POLLOUT; 720 mask |= POLLOUT;
662 if (tipc_sk_type_connectionless(sk) && 721 if (tipc_sk_type_connectionless(sk) &&
663 (!skb_queue_empty(&sk->sk_receive_queue))) 722 (!skb_queue_empty(&sk->sk_receive_queue)))
@@ -676,63 +735,60 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
676 * @sock: socket structure 735 * @sock: socket structure
677 * @seq: destination address 736 * @seq: destination address
678 * @msg: message to send 737 * @msg: message to send
679 * @dsz: total length of message data 738 * @dlen: length of data to send
680 * @timeo: timeout to wait for wakeup 739 * @timeout: timeout to wait for wakeup
681 * 740 *
682 * Called from function tipc_sendmsg(), which has done all sanity checks 741 * Called from function tipc_sendmsg(), which has done all sanity checks
683 * Returns the number of bytes sent on success, or errno 742 * Returns the number of bytes sent on success, or errno
684 */ 743 */
685static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, 744static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
686 struct msghdr *msg, size_t dsz, long timeo) 745 struct msghdr *msg, size_t dlen, long timeout)
687{ 746{
688 struct sock *sk = sock->sk; 747 struct sock *sk = sock->sk;
689 struct tipc_sock *tsk = tipc_sk(sk); 748 struct tipc_sock *tsk = tipc_sk(sk);
749 struct tipc_msg *hdr = &tsk->phdr;
690 struct net *net = sock_net(sk); 750 struct net *net = sock_net(sk);
691 struct tipc_msg *mhdr = &tsk->phdr; 751 int mtu = tipc_bcast_get_mtu(net);
692 struct sk_buff_head pktchain; 752 struct tipc_mc_method *method = &tsk->mc_method;
693 struct iov_iter save = msg->msg_iter; 753 u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
694 uint mtu; 754 struct sk_buff_head pkts;
755 struct tipc_nlist dsts;
695 int rc; 756 int rc;
696 757
697 if (!timeo && tsk->link_cong) 758 /* Block or return if any destination link is congested */
698 return -ELINKCONG; 759 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
760 if (unlikely(rc))
761 return rc;
699 762
700 msg_set_type(mhdr, TIPC_MCAST_MSG); 763 /* Lookup destination nodes */
701 msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE); 764 tipc_nlist_init(&dsts, tipc_own_addr(net));
702 msg_set_destport(mhdr, 0); 765 tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
703 msg_set_destnode(mhdr, 0); 766 seq->upper, domain, &dsts);
704 msg_set_nametype(mhdr, seq->type); 767 if (!dsts.local && !dsts.remote)
705 msg_set_namelower(mhdr, seq->lower); 768 return -EHOSTUNREACH;
706 msg_set_nameupper(mhdr, seq->upper);
707 msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
708 769
709 skb_queue_head_init(&pktchain); 770 /* Build message header */
771 msg_set_type(hdr, TIPC_MCAST_MSG);
772 msg_set_hdr_sz(hdr, MCAST_H_SIZE);
773 msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
774 msg_set_destport(hdr, 0);
775 msg_set_destnode(hdr, 0);
776 msg_set_nametype(hdr, seq->type);
777 msg_set_namelower(hdr, seq->lower);
778 msg_set_nameupper(hdr, seq->upper);
710 779
711new_mtu: 780 /* Build message as chain of buffers */
712 mtu = tipc_bcast_get_mtu(net); 781 skb_queue_head_init(&pkts);
713 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain); 782 rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
714 if (unlikely(rc < 0))
715 return rc;
716 783
717 do { 784 /* Send message if build was successful */
718 rc = tipc_bcast_xmit(net, &pktchain); 785 if (unlikely(rc == dlen))
719 if (likely(!rc)) 786 rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
720 return dsz; 787 &tsk->cong_link_cnt);
721 788
722 if (rc == -ELINKCONG) { 789 tipc_nlist_purge(&dsts);
723 tsk->link_cong = 1; 790
724 rc = tipc_wait_for_sndmsg(sock, &timeo); 791 return rc ? rc : dlen;
725 if (!rc)
726 continue;
727 }
728 __skb_queue_purge(&pktchain);
729 if (rc == -EMSGSIZE) {
730 msg->msg_iter = save;
731 goto new_mtu;
732 }
733 break;
734 } while (1);
735 return rc;
736} 792}
737 793
738/** 794/**
@@ -746,7 +802,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
746 struct sk_buff_head *inputq) 802 struct sk_buff_head *inputq)
747{ 803{
748 struct tipc_msg *msg; 804 struct tipc_msg *msg;
749 struct tipc_plist dports; 805 struct list_head dports;
750 u32 portid; 806 u32 portid;
751 u32 scope = TIPC_CLUSTER_SCOPE; 807 u32 scope = TIPC_CLUSTER_SCOPE;
752 struct sk_buff_head tmpq; 808 struct sk_buff_head tmpq;
@@ -754,7 +810,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
754 struct sk_buff *skb, *_skb; 810 struct sk_buff *skb, *_skb;
755 811
756 __skb_queue_head_init(&tmpq); 812 __skb_queue_head_init(&tmpq);
757 tipc_plist_init(&dports); 813 INIT_LIST_HEAD(&dports);
758 814
759 skb = tipc_skb_peek(arrvq, &inputq->lock); 815 skb = tipc_skb_peek(arrvq, &inputq->lock);
760 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { 816 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
@@ -768,8 +824,8 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
768 tipc_nametbl_mc_translate(net, 824 tipc_nametbl_mc_translate(net,
769 msg_nametype(msg), msg_namelower(msg), 825 msg_nametype(msg), msg_namelower(msg),
770 msg_nameupper(msg), scope, &dports); 826 msg_nameupper(msg), scope, &dports);
771 portid = tipc_plist_pop(&dports); 827 portid = u32_pop(&dports);
772 for (; portid; portid = tipc_plist_pop(&dports)) { 828 for (; portid; portid = u32_pop(&dports)) {
773 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); 829 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
774 if (_skb) { 830 if (_skb) {
775 msg_set_destport(buf_msg(_skb), portid); 831 msg_set_destport(buf_msg(_skb), portid);
@@ -830,31 +886,6 @@ exit:
830 kfree_skb(skb); 886 kfree_skb(skb);
831} 887}
832 888
833static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
834{
835 DEFINE_WAIT_FUNC(wait, woken_wake_function);
836 struct sock *sk = sock->sk;
837 struct tipc_sock *tsk = tipc_sk(sk);
838 int done;
839
840 do {
841 int err = sock_error(sk);
842 if (err)
843 return err;
844 if (sk->sk_shutdown & SEND_SHUTDOWN)
845 return -EPIPE;
846 if (!*timeo_p)
847 return -EAGAIN;
848 if (signal_pending(current))
849 return sock_intr_errno(*timeo_p);
850
851 add_wait_queue(sk_sleep(sk), &wait);
852 done = sk_wait_event(sk, timeo_p, !tsk->link_cong, &wait);
853 remove_wait_queue(sk_sleep(sk), &wait);
854 } while (!done);
855 return 0;
856}
857
858/** 889/**
859 * tipc_sendmsg - send message in connectionless manner 890 * tipc_sendmsg - send message in connectionless manner
860 * @sock: socket structure 891 * @sock: socket structure
@@ -881,35 +912,38 @@ static int tipc_sendmsg(struct socket *sock,
881 return ret; 912 return ret;
882} 913}
883 914
884static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) 915static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
885{ 916{
886 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
887 struct sock *sk = sock->sk; 917 struct sock *sk = sock->sk;
888 struct tipc_sock *tsk = tipc_sk(sk);
889 struct net *net = sock_net(sk); 918 struct net *net = sock_net(sk);
890 struct tipc_msg *mhdr = &tsk->phdr; 919 struct tipc_sock *tsk = tipc_sk(sk);
891 u32 dnode, dport; 920 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
892 struct sk_buff_head pktchain; 921 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
893 bool is_connectionless = tipc_sk_type_connectionless(sk); 922 struct list_head *clinks = &tsk->cong_links;
894 struct sk_buff *skb; 923 bool syn = !tipc_sk_type_connectionless(sk);
924 struct tipc_msg *hdr = &tsk->phdr;
895 struct tipc_name_seq *seq; 925 struct tipc_name_seq *seq;
896 struct iov_iter save; 926 struct sk_buff_head pkts;
897 u32 mtu; 927 u32 type, inst, domain;
898 long timeo; 928 u32 dnode, dport;
899 int rc; 929 int mtu, rc;
900 930
901 if (dsz > TIPC_MAX_USER_MSG_SIZE) 931 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
902 return -EMSGSIZE; 932 return -EMSGSIZE;
933
903 if (unlikely(!dest)) { 934 if (unlikely(!dest)) {
904 if (is_connectionless && tsk->peer.family == AF_TIPC) 935 dest = &tsk->peer;
905 dest = &tsk->peer; 936 if (!syn || dest->family != AF_TIPC)
906 else
907 return -EDESTADDRREQ; 937 return -EDESTADDRREQ;
908 } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
909 dest->family != AF_TIPC) {
910 return -EINVAL;
911 } 938 }
912 if (!is_connectionless) { 939
940 if (unlikely(m->msg_namelen < sizeof(*dest)))
941 return -EINVAL;
942
943 if (unlikely(dest->family != AF_TIPC))
944 return -EINVAL;
945
946 if (unlikely(syn)) {
913 if (sk->sk_state == TIPC_LISTEN) 947 if (sk->sk_state == TIPC_LISTEN)
914 return -EPIPE; 948 return -EPIPE;
915 if (sk->sk_state != TIPC_OPEN) 949 if (sk->sk_state != TIPC_OPEN)
@@ -921,102 +955,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
921 tsk->conn_instance = dest->addr.name.name.instance; 955 tsk->conn_instance = dest->addr.name.name.instance;
922 } 956 }
923 } 957 }
924 seq = &dest->addr.nameseq;
925 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
926 958
927 if (dest->addrtype == TIPC_ADDR_MCAST) { 959 seq = &dest->addr.nameseq;
928 return tipc_sendmcast(sock, seq, m, dsz, timeo); 960 if (dest->addrtype == TIPC_ADDR_MCAST)
929 } else if (dest->addrtype == TIPC_ADDR_NAME) { 961 return tipc_sendmcast(sock, seq, m, dlen, timeout);
930 u32 type = dest->addr.name.name.type;
931 u32 inst = dest->addr.name.name.instance;
932 u32 domain = dest->addr.name.domain;
933 962
963 if (dest->addrtype == TIPC_ADDR_NAME) {
964 type = dest->addr.name.name.type;
965 inst = dest->addr.name.name.instance;
966 domain = dest->addr.name.domain;
934 dnode = domain; 967 dnode = domain;
935 msg_set_type(mhdr, TIPC_NAMED_MSG); 968 msg_set_type(hdr, TIPC_NAMED_MSG);
936 msg_set_hdr_sz(mhdr, NAMED_H_SIZE); 969 msg_set_hdr_sz(hdr, NAMED_H_SIZE);
937 msg_set_nametype(mhdr, type); 970 msg_set_nametype(hdr, type);
938 msg_set_nameinst(mhdr, inst); 971 msg_set_nameinst(hdr, inst);
939 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); 972 msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
940 dport = tipc_nametbl_translate(net, type, inst, &dnode); 973 dport = tipc_nametbl_translate(net, type, inst, &dnode);
941 msg_set_destnode(mhdr, dnode); 974 msg_set_destnode(hdr, dnode);
942 msg_set_destport(mhdr, dport); 975 msg_set_destport(hdr, dport);
943 if (unlikely(!dport && !dnode)) 976 if (unlikely(!dport && !dnode))
944 return -EHOSTUNREACH; 977 return -EHOSTUNREACH;
978
945 } else if (dest->addrtype == TIPC_ADDR_ID) { 979 } else if (dest->addrtype == TIPC_ADDR_ID) {
946 dnode = dest->addr.id.node; 980 dnode = dest->addr.id.node;
947 msg_set_type(mhdr, TIPC_DIRECT_MSG); 981 msg_set_type(hdr, TIPC_DIRECT_MSG);
948 msg_set_lookup_scope(mhdr, 0); 982 msg_set_lookup_scope(hdr, 0);
949 msg_set_destnode(mhdr, dnode); 983 msg_set_destnode(hdr, dnode);
950 msg_set_destport(mhdr, dest->addr.id.ref); 984 msg_set_destport(hdr, dest->addr.id.ref);
951 msg_set_hdr_sz(mhdr, BASIC_H_SIZE); 985 msg_set_hdr_sz(hdr, BASIC_H_SIZE);
952 } 986 }
953 987
954 skb_queue_head_init(&pktchain); 988 /* Block or return if destination link is congested */
955 save = m->msg_iter; 989 rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
956new_mtu: 990 if (unlikely(rc))
957 mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
958 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
959 if (rc < 0)
960 return rc; 991 return rc;
961 992
962 do { 993 skb_queue_head_init(&pkts);
963 skb = skb_peek(&pktchain); 994 mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
964 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; 995 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
965 rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid); 996 if (unlikely(rc != dlen))
966 if (likely(!rc)) { 997 return rc;
967 if (!is_connectionless)
968 tipc_set_sk_state(sk, TIPC_CONNECTING);
969 return dsz;
970 }
971 if (rc == -ELINKCONG) {
972 tsk->link_cong = 1;
973 rc = tipc_wait_for_sndmsg(sock, &timeo);
974 if (!rc)
975 continue;
976 }
977 __skb_queue_purge(&pktchain);
978 if (rc == -EMSGSIZE) {
979 m->msg_iter = save;
980 goto new_mtu;
981 }
982 break;
983 } while (1);
984
985 return rc;
986}
987 998
988static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) 999 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
989{ 1000 if (unlikely(rc == -ELINKCONG)) {
990 DEFINE_WAIT_FUNC(wait, woken_wake_function); 1001 u32_push(clinks, dnode);
991 struct sock *sk = sock->sk; 1002 tsk->cong_link_cnt++;
992 struct tipc_sock *tsk = tipc_sk(sk); 1003 rc = 0;
993 int done; 1004 }
994 1005
995 do { 1006 if (unlikely(syn && !rc))
996 int err = sock_error(sk); 1007 tipc_set_sk_state(sk, TIPC_CONNECTING);
997 if (err)
998 return err;
999 if (sk->sk_state == TIPC_DISCONNECTING)
1000 return -EPIPE;
1001 else if (!tipc_sk_connected(sk))
1002 return -ENOTCONN;
1003 if (!*timeo_p)
1004 return -EAGAIN;
1005 if (signal_pending(current))
1006 return sock_intr_errno(*timeo_p);
1007 1008
1008 add_wait_queue(sk_sleep(sk), &wait); 1009 return rc ? rc : dlen;
1009 done = sk_wait_event(sk, timeo_p,
1010 (!tsk->link_cong &&
1011 !tsk_conn_cong(tsk)) ||
1012 !tipc_sk_connected(sk), &wait);
1013 remove_wait_queue(sk_sleep(sk), &wait);
1014 } while (!done);
1015 return 0;
1016} 1010}
1017 1011
1018/** 1012/**
1019 * tipc_send_stream - send stream-oriented data 1013 * tipc_sendstream - send stream-oriented data
1020 * @sock: socket structure 1014 * @sock: socket structure
1021 * @m: data to send 1015 * @m: data to send
1022 * @dsz: total length of data to be transmitted 1016 * @dsz: total length of data to be transmitted
@@ -1026,94 +1020,69 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
1026 * Returns the number of bytes sent on success (or partial success), 1020 * Returns the number of bytes sent on success (or partial success),
1027 * or errno if no data sent 1021 * or errno if no data sent
1028 */ 1022 */
1029static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) 1023static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
1030{ 1024{
1031 struct sock *sk = sock->sk; 1025 struct sock *sk = sock->sk;
1032 int ret; 1026 int ret;
1033 1027
1034 lock_sock(sk); 1028 lock_sock(sk);
1035 ret = __tipc_send_stream(sock, m, dsz); 1029 ret = __tipc_sendstream(sock, m, dsz);
1036 release_sock(sk); 1030 release_sock(sk);
1037 1031
1038 return ret; 1032 return ret;
1039} 1033}
1040 1034
1041static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) 1035static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
1042{ 1036{
1043 struct sock *sk = sock->sk; 1037 struct sock *sk = sock->sk;
1044 struct net *net = sock_net(sk);
1045 struct tipc_sock *tsk = tipc_sk(sk);
1046 struct tipc_msg *mhdr = &tsk->phdr;
1047 struct sk_buff_head pktchain;
1048 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 1038 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1049 u32 portid = tsk->portid; 1039 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1050 int rc = -EINVAL; 1040 struct tipc_sock *tsk = tipc_sk(sk);
1051 long timeo; 1041 struct tipc_msg *hdr = &tsk->phdr;
1052 u32 dnode; 1042 struct net *net = sock_net(sk);
1053 uint mtu, send, sent = 0; 1043 struct sk_buff_head pkts;
1054 struct iov_iter save; 1044 u32 dnode = tsk_peer_node(tsk);
1055 int hlen = MIN_H_SIZE; 1045 int send, sent = 0;
1056 1046 int rc = 0;
1057 /* Handle implied connection establishment */
1058 if (unlikely(dest)) {
1059 rc = __tipc_sendmsg(sock, m, dsz);
1060 hlen = msg_hdr_sz(mhdr);
1061 if (dsz && (dsz == rc))
1062 tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
1063 return rc;
1064 }
1065 if (dsz > (uint)INT_MAX)
1066 return -EMSGSIZE;
1067
1068 if (unlikely(!tipc_sk_connected(sk))) {
1069 if (sk->sk_state == TIPC_DISCONNECTING)
1070 return -EPIPE;
1071 else
1072 return -ENOTCONN;
1073 }
1074 1047
1075 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 1048 skb_queue_head_init(&pkts);
1076 if (!timeo && tsk->link_cong)
1077 return -ELINKCONG;
1078 1049
1079 dnode = tsk_peer_node(tsk); 1050 if (unlikely(dlen > INT_MAX))
1080 skb_queue_head_init(&pktchain); 1051 return -EMSGSIZE;
1081 1052
1082next: 1053 /* Handle implicit connection setup */
1083 save = m->msg_iter; 1054 if (unlikely(dest)) {
1084 mtu = tsk->max_pkt; 1055 rc = __tipc_sendmsg(sock, m, dlen);
1085 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); 1056 if (dlen && (dlen == rc))
1086 rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain); 1057 tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
1087 if (unlikely(rc < 0))
1088 return rc; 1058 return rc;
1059 }
1089 1060
1090 do { 1061 do {
1091 if (likely(!tsk_conn_cong(tsk))) { 1062 rc = tipc_wait_for_cond(sock, &timeout,
1092 rc = tipc_node_xmit(net, &pktchain, dnode, portid); 1063 (!tsk->cong_link_cnt &&
1093 if (likely(!rc)) { 1064 !tsk_conn_cong(tsk) &&
1094 tsk->snt_unacked += tsk_inc(tsk, send + hlen); 1065 tipc_sk_connected(sk)));
1095 sent += send; 1066 if (unlikely(rc))
1096 if (sent == dsz) 1067 break;
1097 return dsz;
1098 goto next;
1099 }
1100 if (rc == -EMSGSIZE) {
1101 __skb_queue_purge(&pktchain);
1102 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1103 portid);
1104 m->msg_iter = save;
1105 goto next;
1106 }
1107 if (rc != -ELINKCONG)
1108 break;
1109 1068
1110 tsk->link_cong = 1; 1069 send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
1070 rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
1071 if (unlikely(rc != send))
1072 break;
1073
1074 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1075 if (unlikely(rc == -ELINKCONG)) {
1076 tsk->cong_link_cnt = 1;
1077 rc = 0;
1111 } 1078 }
1112 rc = tipc_wait_for_sndpkt(sock, &timeo); 1079 if (likely(!rc)) {
1113 } while (!rc); 1080 tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
1081 sent += send;
1082 }
1083 } while (sent < dlen && !rc);
1114 1084
1115 __skb_queue_purge(&pktchain); 1085 return rc ? rc : sent;
1116 return sent ? sent : rc;
1117} 1086}
1118 1087
1119/** 1088/**
@@ -1131,7 +1100,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1131 if (dsz > TIPC_MAX_USER_MSG_SIZE) 1100 if (dsz > TIPC_MAX_USER_MSG_SIZE)
1132 return -EMSGSIZE; 1101 return -EMSGSIZE;
1133 1102
1134 return tipc_send_stream(sock, m, dsz); 1103 return tipc_sendstream(sock, m, dsz);
1135} 1104}
1136 1105
1137/* tipc_sk_finish_conn - complete the setup of a connection 1106/* tipc_sk_finish_conn - complete the setup of a connection
@@ -1698,6 +1667,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1698 unsigned int limit = rcvbuf_limit(sk, skb); 1667 unsigned int limit = rcvbuf_limit(sk, skb);
1699 int err = TIPC_OK; 1668 int err = TIPC_OK;
1700 int usr = msg_user(hdr); 1669 int usr = msg_user(hdr);
1670 u32 onode;
1701 1671
1702 if (unlikely(msg_user(hdr) == CONN_MANAGER)) { 1672 if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
1703 tipc_sk_proto_rcv(tsk, skb, xmitq); 1673 tipc_sk_proto_rcv(tsk, skb, xmitq);
@@ -1705,8 +1675,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1705 } 1675 }
1706 1676
1707 if (unlikely(usr == SOCK_WAKEUP)) { 1677 if (unlikely(usr == SOCK_WAKEUP)) {
1678 onode = msg_orignode(hdr);
1708 kfree_skb(skb); 1679 kfree_skb(skb);
1709 tsk->link_cong = 0; 1680 u32_del(&tsk->cong_links, onode);
1681 tsk->cong_link_cnt--;
1710 sk->sk_write_space(sk); 1682 sk->sk_write_space(sk);
1711 return false; 1683 return false;
1712 } 1684 }
@@ -2114,7 +2086,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
2114 struct msghdr m = {NULL,}; 2086 struct msghdr m = {NULL,};
2115 2087
2116 tsk_advance_rx_queue(sk); 2088 tsk_advance_rx_queue(sk);
2117 __tipc_send_stream(new_sock, &m, 0); 2089 __tipc_sendstream(new_sock, &m, 0);
2118 } else { 2090 } else {
2119 __skb_dequeue(&sk->sk_receive_queue); 2091 __skb_dequeue(&sk->sk_receive_queue);
2120 __skb_queue_head(&new_sk->sk_receive_queue, buf); 2092 __skb_queue_head(&new_sk->sk_receive_queue, buf);
@@ -2269,24 +2241,27 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2269void tipc_sk_reinit(struct net *net) 2241void tipc_sk_reinit(struct net *net)
2270{ 2242{
2271 struct tipc_net *tn = net_generic(net, tipc_net_id); 2243 struct tipc_net *tn = net_generic(net, tipc_net_id);
2272 const struct bucket_table *tbl; 2244 struct rhashtable_iter iter;
2273 struct rhash_head *pos;
2274 struct tipc_sock *tsk; 2245 struct tipc_sock *tsk;
2275 struct tipc_msg *msg; 2246 struct tipc_msg *msg;
2276 int i;
2277 2247
2278 rcu_read_lock(); 2248 rhashtable_walk_enter(&tn->sk_rht, &iter);
2279 tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht); 2249
2280 for (i = 0; i < tbl->size; i++) { 2250 do {
2281 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) { 2251 tsk = ERR_PTR(rhashtable_walk_start(&iter));
2252 if (tsk)
2253 continue;
2254
2255 while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
2282 spin_lock_bh(&tsk->sk.sk_lock.slock); 2256 spin_lock_bh(&tsk->sk.sk_lock.slock);
2283 msg = &tsk->phdr; 2257 msg = &tsk->phdr;
2284 msg_set_prevnode(msg, tn->own_addr); 2258 msg_set_prevnode(msg, tn->own_addr);
2285 msg_set_orignode(msg, tn->own_addr); 2259 msg_set_orignode(msg, tn->own_addr);
2286 spin_unlock_bh(&tsk->sk.sk_lock.slock); 2260 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2287 } 2261 }
2288 } 2262
2289 rcu_read_unlock(); 2263 rhashtable_walk_stop(&iter);
2264 } while (tsk == ERR_PTR(-EAGAIN));
2290} 2265}
2291 2266
2292static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid) 2267static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
@@ -2382,18 +2357,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2382{ 2357{
2383 struct sock *sk = sock->sk; 2358 struct sock *sk = sock->sk;
2384 struct tipc_sock *tsk = tipc_sk(sk); 2359 struct tipc_sock *tsk = tipc_sk(sk);
2385 u32 value; 2360 u32 value = 0;
2386 int res; 2361 int res = 0;
2387 2362
2388 if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM)) 2363 if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2389 return 0; 2364 return 0;
2390 if (lvl != SOL_TIPC) 2365 if (lvl != SOL_TIPC)
2391 return -ENOPROTOOPT; 2366 return -ENOPROTOOPT;
2392 if (ol < sizeof(value)) 2367
2393 return -EINVAL; 2368 switch (opt) {
2394 res = get_user(value, (u32 __user *)ov); 2369 case TIPC_IMPORTANCE:
2395 if (res) 2370 case TIPC_SRC_DROPPABLE:
2396 return res; 2371 case TIPC_DEST_DROPPABLE:
2372 case TIPC_CONN_TIMEOUT:
2373 if (ol < sizeof(value))
2374 return -EINVAL;
2375 res = get_user(value, (u32 __user *)ov);
2376 if (res)
2377 return res;
2378 break;
2379 default:
2380 if (ov || ol)
2381 return -EINVAL;
2382 }
2397 2383
2398 lock_sock(sk); 2384 lock_sock(sk);
2399 2385
@@ -2412,7 +2398,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2412 break; 2398 break;
2413 case TIPC_CONN_TIMEOUT: 2399 case TIPC_CONN_TIMEOUT:
2414 tipc_sk(sk)->conn_timeout = value; 2400 tipc_sk(sk)->conn_timeout = value;
2415 /* no need to set "res", since already 0 at this point */ 2401 break;
2402 case TIPC_MCAST_BROADCAST:
2403 tsk->mc_method.rcast = false;
2404 tsk->mc_method.mandatory = true;
2405 break;
2406 case TIPC_MCAST_REPLICAST:
2407 tsk->mc_method.rcast = true;
2408 tsk->mc_method.mandatory = true;
2416 break; 2409 break;
2417 default: 2410 default:
2418 res = -EINVAL; 2411 res = -EINVAL;
@@ -2575,7 +2568,7 @@ static const struct proto_ops stream_ops = {
2575 .shutdown = tipc_shutdown, 2568 .shutdown = tipc_shutdown,
2576 .setsockopt = tipc_setsockopt, 2569 .setsockopt = tipc_setsockopt,
2577 .getsockopt = tipc_getsockopt, 2570 .getsockopt = tipc_getsockopt,
2578 .sendmsg = tipc_send_stream, 2571 .sendmsg = tipc_sendstream,
2579 .recvmsg = tipc_recv_stream, 2572 .recvmsg = tipc_recv_stream,
2580 .mmap = sock_no_mmap, 2573 .mmap = sock_no_mmap,
2581 .sendpage = sock_no_sendpage 2574 .sendpage = sock_no_sendpage
diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c
index b58dc95f3d35..46061cf48cd1 100644
--- a/net/tipc/udp_media.c
+++ b/net/tipc/udp_media.c
@@ -113,7 +113,7 @@ static void tipc_udp_media_addr_set(struct tipc_media_addr *addr,
113 memcpy(addr->value, ua, sizeof(struct udp_media_addr)); 113 memcpy(addr->value, ua, sizeof(struct udp_media_addr));
114 114
115 if (tipc_udp_is_mcast_addr(ua)) 115 if (tipc_udp_is_mcast_addr(ua))
116 addr->broadcast = 1; 116 addr->broadcast = TIPC_BROADCAST_SUPPORT;
117} 117}
118 118
119/* tipc_udp_addr2str - convert ip/udp address to string */ 119/* tipc_udp_addr2str - convert ip/udp address to string */
@@ -229,7 +229,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
229 goto out; 229 goto out;
230 } 230 }
231 231
232 if (!addr->broadcast || list_empty(&ub->rcast.list)) 232 if (addr->broadcast != TIPC_REPLICAST_SUPPORT)
233 return tipc_udp_xmit(net, skb, ub, src, dst); 233 return tipc_udp_xmit(net, skb, ub, src, dst);
234 234
235 /* Replicast, send an skb to each configured IP address */ 235 /* Replicast, send an skb to each configured IP address */
@@ -296,7 +296,7 @@ static int tipc_udp_rcast_add(struct tipc_bearer *b,
296 else if (ntohs(addr->proto) == ETH_P_IPV6) 296 else if (ntohs(addr->proto) == ETH_P_IPV6)
297 pr_info("New replicast peer: %pI6\n", &rcast->addr.ipv6); 297 pr_info("New replicast peer: %pI6\n", &rcast->addr.ipv6);
298#endif 298#endif
299 299 b->bcast_addr.broadcast = TIPC_REPLICAST_SUPPORT;
300 list_add_rcu(&rcast->list, &ub->rcast.list); 300 list_add_rcu(&rcast->list, &ub->rcast.list);
301 return 0; 301 return 0;
302} 302}
@@ -681,7 +681,7 @@ static int tipc_udp_enable(struct net *net, struct tipc_bearer *b,
681 goto err; 681 goto err;
682 682
683 b->bcast_addr.media_id = TIPC_MEDIA_TYPE_UDP; 683 b->bcast_addr.media_id = TIPC_MEDIA_TYPE_UDP;
684 b->bcast_addr.broadcast = 1; 684 b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT;
685 rcu_assign_pointer(b->media_ptr, ub); 685 rcu_assign_pointer(b->media_ptr, ub);
686 rcu_assign_pointer(ub->bearer, b); 686 rcu_assign_pointer(ub->bearer, b);
687 tipc_udp_media_addr_set(&b->addr, &local); 687 tipc_udp_media_addr_set(&b->addr, &local);