aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2017-01-18 13:50:53 -0500
committerDavid S. Miller <davem@davemloft.net>2017-01-20 12:10:17 -0500
commit01fd12bb189a0772301dd37e9b31e53761269a1b (patch)
tree6e865477ba51e3dd0216c00362e7c3421de1e57b /net/tipc
parenta853e4c6d0843729e1f25a7a7beff168e1dd7420 (diff)
tipc: make replicast a user selectable option
If the bearer carrying multicast messages supports broadcast, those messages will be sent to all cluster nodes, irrespective of whether these nodes host any actual destinations socket or not. This is clearly wasteful if the cluster is large and there are only a few real destinations for the message being sent. In this commit we extend the eligibility of the newly introduced "replicast" transmit option. We now make it possible for a user to select which method he wants to be used, either as a mandatory setting via setsockopt(), or as a relative setting where we let the broadcast layer decide which method to use based on the ratio between cluster size and the message's actual number of destination nodes. In the latter case, a sending socket must stick to a previously selected method until it enters an idle period of at least 5 seconds. This eliminates the risk of message reordering caused by method change, i.e., when changes to cluster size or number of destinations would otherwise mandate a new method to be used. Reviewed-by: Parthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c62
-rw-r--r--net/tipc/bcast.h17
-rw-r--r--net/tipc/link.c4
-rw-r--r--net/tipc/node.h4
-rw-r--r--net/tipc/socket.c36
5 files changed, 108 insertions, 15 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 672e6ef93cab..7d99029df342 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -54,6 +54,9 @@ const char tipc_bclink_name[] = "broadcast-link";
54 * @dest: array keeping number of reachable destinations per bearer 54 * @dest: array keeping number of reachable destinations per bearer
55 * @primary_bearer: a bearer having links to all broadcast destinations, if any 55 * @primary_bearer: a bearer having links to all broadcast destinations, if any
56 * @bcast_support: indicates if primary bearer, if any, supports broadcast 56 * @bcast_support: indicates if primary bearer, if any, supports broadcast
57 * @rcast_support: indicates if all peer nodes support replicast
58 * @rc_ratio: dest count as percentage of cluster size where send method changes
59 * @bc_threshold: calculated drom rc_ratio; if dests > threshold use broadcast
57 */ 60 */
58struct tipc_bc_base { 61struct tipc_bc_base {
59 struct tipc_link *link; 62 struct tipc_link *link;
@@ -61,6 +64,9 @@ struct tipc_bc_base {
61 int dests[MAX_BEARERS]; 64 int dests[MAX_BEARERS];
62 int primary_bearer; 65 int primary_bearer;
63 bool bcast_support; 66 bool bcast_support;
67 bool rcast_support;
68 int rc_ratio;
69 int bc_threshold;
64}; 70};
65 71
66static struct tipc_bc_base *tipc_bc_base(struct net *net) 72static struct tipc_bc_base *tipc_bc_base(struct net *net)
@@ -73,6 +79,19 @@ int tipc_bcast_get_mtu(struct net *net)
73 return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE; 79 return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
74} 80}
75 81
82void tipc_bcast_disable_rcast(struct net *net)
83{
84 tipc_bc_base(net)->rcast_support = false;
85}
86
87static void tipc_bcbase_calc_bc_threshold(struct net *net)
88{
89 struct tipc_bc_base *bb = tipc_bc_base(net);
90 int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net));
91
92 bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100);
93}
94
76/* tipc_bcbase_select_primary(): find a bearer with links to all destinations, 95/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
77 * if any, and make it primary bearer 96 * if any, and make it primary bearer
78 */ 97 */
@@ -175,6 +194,31 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
175 __skb_queue_purge(&_xmitq); 194 __skb_queue_purge(&_xmitq);
176} 195}
177 196
197static void tipc_bcast_select_xmit_method(struct net *net, int dests,
198 struct tipc_mc_method *method)
199{
200 struct tipc_bc_base *bb = tipc_bc_base(net);
201 unsigned long exp = method->expires;
202
203 /* Broadcast supported by used bearer/bearers? */
204 if (!bb->bcast_support) {
205 method->rcast = true;
206 return;
207 }
208 /* Any destinations which don't support replicast ? */
209 if (!bb->rcast_support) {
210 method->rcast = false;
211 return;
212 }
213 /* Can current method be changed ? */
214 method->expires = jiffies + TIPC_METHOD_EXPIRE;
215 if (method->mandatory || time_before(jiffies, exp))
216 return;
217
218 /* Determine method to use now */
219 method->rcast = dests <= bb->bc_threshold;
220}
221
178/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes 222/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
179 * @net: the applicable net namespace 223 * @net: the applicable net namespace
180 * @pkts: chain of buffers containing message 224 * @pkts: chain of buffers containing message
@@ -237,16 +281,16 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
237 * and to identified node local sockets 281 * and to identified node local sockets
238 * @net: the applicable net namespace 282 * @net: the applicable net namespace
239 * @pkts: chain of buffers containing message 283 * @pkts: chain of buffers containing message
240 * @dests: destination nodes for message. Not consumed. 284 * @method: send method to be used
285 * @dests: destination nodes for message.
241 * @cong_link_cnt: returns number of encountered congested destination links 286 * @cong_link_cnt: returns number of encountered congested destination links
242 * @cong_links: returns identities of congested links
243 * Consumes buffer chain. 287 * Consumes buffer chain.
244 * Returns 0 if success, otherwise errno 288 * Returns 0 if success, otherwise errno
245 */ 289 */
246int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, 290int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
247 struct tipc_nlist *dests, u16 *cong_link_cnt) 291 struct tipc_mc_method *method, struct tipc_nlist *dests,
292 u16 *cong_link_cnt)
248{ 293{
249 struct tipc_bc_base *bb = tipc_bc_base(net);
250 struct sk_buff_head inputq, localq; 294 struct sk_buff_head inputq, localq;
251 int rc = 0; 295 int rc = 0;
252 296
@@ -258,9 +302,10 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
258 rc = -ENOMEM; 302 rc = -ENOMEM;
259 goto exit; 303 goto exit;
260 } 304 }
261 305 /* Send according to determined transmit method */
262 if (dests->remote) { 306 if (dests->remote) {
263 if (!bb->bcast_support) 307 tipc_bcast_select_xmit_method(net, dests->remote, method);
308 if (method->rcast)
264 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); 309 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
265 else 310 else
266 rc = tipc_bcast_xmit(net, pkts, cong_link_cnt); 311 rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
@@ -269,6 +314,7 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
269 if (dests->local) 314 if (dests->local)
270 tipc_sk_mcast_rcv(net, &localq, &inputq); 315 tipc_sk_mcast_rcv(net, &localq, &inputq);
271exit: 316exit:
317 /* This queue should normally be empty by now */
272 __skb_queue_purge(pkts); 318 __skb_queue_purge(pkts);
273 return rc; 319 return rc;
274} 320}
@@ -377,6 +423,7 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
377 tipc_bcast_lock(net); 423 tipc_bcast_lock(net);
378 tipc_link_add_bc_peer(snd_l, uc_l, xmitq); 424 tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
379 tipc_bcbase_select_primary(net); 425 tipc_bcbase_select_primary(net);
426 tipc_bcbase_calc_bc_threshold(net);
380 tipc_bcast_unlock(net); 427 tipc_bcast_unlock(net);
381} 428}
382 429
@@ -395,6 +442,7 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
395 tipc_bcast_lock(net); 442 tipc_bcast_lock(net);
396 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); 443 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
397 tipc_bcbase_select_primary(net); 444 tipc_bcbase_select_primary(net);
445 tipc_bcbase_calc_bc_threshold(net);
398 tipc_bcast_unlock(net); 446 tipc_bcast_unlock(net);
399 447
400 tipc_bcbase_xmit(net, &xmitq); 448 tipc_bcbase_xmit(net, &xmitq);
@@ -477,6 +525,8 @@ int tipc_bcast_init(struct net *net)
477 goto enomem; 525 goto enomem;
478 bb->link = l; 526 bb->link = l;
479 tn->bcl = l; 527 tn->bcl = l;
528 bb->rc_ratio = 25;
529 bb->rcast_support = true;
480 return 0; 530 return 0;
481enomem: 531enomem:
482 kfree(bb); 532 kfree(bb);
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index dd772e6f6fa4..751530ab0c49 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -46,6 +46,8 @@ struct tipc_nlist;
46struct tipc_nitem; 46struct tipc_nitem;
47extern const char tipc_bclink_name[]; 47extern const char tipc_bclink_name[];
48 48
49#define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000)
50
49struct tipc_nlist { 51struct tipc_nlist {
50 struct list_head list; 52 struct list_head list;
51 u32 self; 53 u32 self;
@@ -58,6 +60,17 @@ void tipc_nlist_purge(struct tipc_nlist *nl);
58void tipc_nlist_add(struct tipc_nlist *nl, u32 node); 60void tipc_nlist_add(struct tipc_nlist *nl, u32 node);
59void tipc_nlist_del(struct tipc_nlist *nl, u32 node); 61void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
60 62
63/* Cookie to be used between socket and broadcast layer
64 * @rcast: replicast (instead of broadcast) was used at previous xmit
65 * @mandatory: broadcast/replicast indication was set by user
66 * @expires: re-evaluate non-mandatory transmit method if we are past this
67 */
68struct tipc_mc_method {
69 bool rcast;
70 bool mandatory;
71 unsigned long expires;
72};
73
61int tipc_bcast_init(struct net *net); 74int tipc_bcast_init(struct net *net);
62void tipc_bcast_stop(struct net *net); 75void tipc_bcast_stop(struct net *net);
63void tipc_bcast_add_peer(struct net *net, struct tipc_link *l, 76void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
@@ -66,8 +79,10 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
66void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id); 79void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
67void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id); 80void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
68int tipc_bcast_get_mtu(struct net *net); 81int tipc_bcast_get_mtu(struct net *net);
82void tipc_bcast_disable_rcast(struct net *net);
69int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, 83int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
70 struct tipc_nlist *dests, u16 *cong_link_cnt); 84 struct tipc_mc_method *method, struct tipc_nlist *dests,
85 u16 *cong_link_cnt);
71int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb); 86int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
72void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, 87void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
73 struct tipc_msg *hdr); 88 struct tipc_msg *hdr);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index b17b9e155469..ddd2dd6f77aa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -515,6 +515,10 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
515 if (link_is_bc_sndlink(l)) 515 if (link_is_bc_sndlink(l))
516 l->state = LINK_ESTABLISHED; 516 l->state = LINK_ESTABLISHED;
517 517
518 /* Disable replicast if even a single peer doesn't support it */
519 if (link_is_bc_rcvlink(l) && !(peer_caps & TIPC_BCAST_RCAST))
520 tipc_bcast_disable_rcast(net);
521
518 return true; 522 return true;
519} 523}
520 524
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 39ef54c1f2ad..898c22916984 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -47,11 +47,13 @@
47enum { 47enum {
48 TIPC_BCAST_SYNCH = (1 << 1), 48 TIPC_BCAST_SYNCH = (1 << 1),
49 TIPC_BCAST_STATE_NACK = (1 << 2), 49 TIPC_BCAST_STATE_NACK = (1 << 2),
50 TIPC_BLOCK_FLOWCTL = (1 << 3) 50 TIPC_BLOCK_FLOWCTL = (1 << 3),
51 TIPC_BCAST_RCAST = (1 << 4)
51}; 52};
52 53
53#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ 54#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \
54 TIPC_BCAST_STATE_NACK | \ 55 TIPC_BCAST_STATE_NACK | \
56 TIPC_BCAST_RCAST | \
55 TIPC_BLOCK_FLOWCTL) 57 TIPC_BLOCK_FLOWCTL)
56#define INVALID_BEARER_ID -1 58#define INVALID_BEARER_ID -1
57 59
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 93b6ae3154c9..5bec8aac5008 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -79,6 +79,7 @@ enum {
79 * @rcv_unacked: # messages read by user, but not yet acked back to peer 79 * @rcv_unacked: # messages read by user, but not yet acked back to peer
80 * @peer: 'connected' peer for dgram/rdm 80 * @peer: 'connected' peer for dgram/rdm
81 * @node: hash table node 81 * @node: hash table node
82 * @mc_method: cookie for use between socket and broadcast layer
82 * @rcu: rcu struct for tipc_sock 83 * @rcu: rcu struct for tipc_sock
83 */ 84 */
84struct tipc_sock { 85struct tipc_sock {
@@ -103,6 +104,7 @@ struct tipc_sock {
103 u16 rcv_win; 104 u16 rcv_win;
104 struct sockaddr_tipc peer; 105 struct sockaddr_tipc peer;
105 struct rhash_head node; 106 struct rhash_head node;
107 struct tipc_mc_method mc_method;
106 struct rcu_head rcu; 108 struct rcu_head rcu;
107}; 109};
108 110
@@ -740,6 +742,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
740 struct tipc_msg *hdr = &tsk->phdr; 742 struct tipc_msg *hdr = &tsk->phdr;
741 struct net *net = sock_net(sk); 743 struct net *net = sock_net(sk);
742 int mtu = tipc_bcast_get_mtu(net); 744 int mtu = tipc_bcast_get_mtu(net);
745 struct tipc_mc_method *method = &tsk->mc_method;
743 u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE); 746 u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
744 struct sk_buff_head pkts; 747 struct sk_buff_head pkts;
745 struct tipc_nlist dsts; 748 struct tipc_nlist dsts;
@@ -773,7 +776,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
773 776
774 /* Send message if build was successful */ 777 /* Send message if build was successful */
775 if (unlikely(rc == dlen)) 778 if (unlikely(rc == dlen))
776 rc = tipc_mcast_xmit(net, &pkts, &dsts, 779 rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
777 &tsk->cong_link_cnt); 780 &tsk->cong_link_cnt);
778 781
779 tipc_nlist_purge(&dsts); 782 tipc_nlist_purge(&dsts);
@@ -2344,18 +2347,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2344{ 2347{
2345 struct sock *sk = sock->sk; 2348 struct sock *sk = sock->sk;
2346 struct tipc_sock *tsk = tipc_sk(sk); 2349 struct tipc_sock *tsk = tipc_sk(sk);
2347 u32 value; 2350 u32 value = 0;
2348 int res; 2351 int res;
2349 2352
2350 if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM)) 2353 if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2351 return 0; 2354 return 0;
2352 if (lvl != SOL_TIPC) 2355 if (lvl != SOL_TIPC)
2353 return -ENOPROTOOPT; 2356 return -ENOPROTOOPT;
2354 if (ol < sizeof(value)) 2357
2355 return -EINVAL; 2358 switch (opt) {
2356 res = get_user(value, (u32 __user *)ov); 2359 case TIPC_IMPORTANCE:
2357 if (res) 2360 case TIPC_SRC_DROPPABLE:
2358 return res; 2361 case TIPC_DEST_DROPPABLE:
2362 case TIPC_CONN_TIMEOUT:
2363 if (ol < sizeof(value))
2364 return -EINVAL;
2365 res = get_user(value, (u32 __user *)ov);
2366 if (res)
2367 return res;
2368 break;
2369 default:
2370 if (ov || ol)
2371 return -EINVAL;
2372 }
2359 2373
2360 lock_sock(sk); 2374 lock_sock(sk);
2361 2375
@@ -2376,6 +2390,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2376 tipc_sk(sk)->conn_timeout = value; 2390 tipc_sk(sk)->conn_timeout = value;
2377 /* no need to set "res", since already 0 at this point */ 2391 /* no need to set "res", since already 0 at this point */
2378 break; 2392 break;
2393 case TIPC_MCAST_BROADCAST:
2394 tsk->mc_method.rcast = false;
2395 tsk->mc_method.mandatory = true;
2396 break;
2397 case TIPC_MCAST_REPLICAST:
2398 tsk->mc_method.rcast = true;
2399 tsk->mc_method.mandatory = true;
2400 break;
2379 default: 2401 default:
2380 res = -EINVAL; 2402 res = -EINVAL;
2381 } 2403 }