summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHoang Le <hoang.h.le@dektech.com.au>2019-03-19 07:49:50 -0400
committerDavid S. Miller <davem@davemloft.net>2019-03-19 16:56:17 -0400
commitc55c8edafa91139419ed011f7d036274ce96be0b (patch)
tree7feab866d748bdb576236c73efa81e7aae9cdb39
parentff2ebbfba6186adf3964eb816f8f255c6e664dc4 (diff)
tipc: smooth change between replicast and broadcast
Currently, a multicast stream may start out using replicast, because there are few destinations, and then it should ideally switch to L2/broadcast IGMP/multicast when the number of destinations grows beyond a certain limit. The opposite should happen when the number decreases below the limit. To eliminate the risk of message reordering caused by method change, a sending socket must stick to a previously selected method until it enters an idle period of 5 seconds. Means there is a 5 seconds pause in the traffic from the sender socket. If the sender never makes such a pause, the method will never change, and transmission may become very inefficient as the cluster grows. With this commit, we allow such a switch between replicast and broadcast without any need for a traffic pause. Solution is to send a dummy message with only the header, also with the SYN bit set, via broadcast or replicast. For the data message, the SYN bit is set and sending via replicast or broadcast (inverse method with dummy). Then, at receiving side any messages follow first SYN bit message (data or dummy message), they will be held in deferred queue until another pair (dummy or data message) arrived in other link. v2: reverse christmas tree declaration Acked-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: Hoang Le <hoang.h.le@dektech.com.au> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/bcast.c165
-rw-r--r--net/tipc/bcast.h5
-rw-r--r--net/tipc/msg.h10
-rw-r--r--net/tipc/socket.c5
4 files changed, 184 insertions, 1 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 12b59268bdd6..5264a8ff6e01 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -220,9 +220,24 @@ static void tipc_bcast_select_xmit_method(struct net *net, int dests,
220 } 220 }
221 /* Can current method be changed ? */ 221 /* Can current method be changed ? */
222 method->expires = jiffies + TIPC_METHOD_EXPIRE; 222 method->expires = jiffies + TIPC_METHOD_EXPIRE;
223 if (method->mandatory || time_before(jiffies, exp)) 223 if (method->mandatory)
224 return; 224 return;
225 225
226 if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) &&
227 time_before(jiffies, exp))
228 return;
229
230 /* Configuration as force 'broadcast' method */
231 if (bb->force_bcast) {
232 method->rcast = false;
233 return;
234 }
235 /* Configuration as force 'replicast' method */
236 if (bb->force_rcast) {
237 method->rcast = true;
238 return;
239 }
240 /* Configuration as 'autoselect' or default method */
226 /* Determine method to use now */ 241 /* Determine method to use now */
227 method->rcast = dests <= bb->bc_threshold; 242 method->rcast = dests <= bb->bc_threshold;
228} 243}
@@ -285,6 +300,63 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
285 return 0; 300 return 0;
286} 301}
287 302
303/* tipc_mcast_send_sync - deliver a dummy message with SYN bit
304 * @net: the applicable net namespace
305 * @skb: socket buffer to copy
306 * @method: send method to be used
307 * @dests: destination nodes for message.
308 * @cong_link_cnt: returns number of encountered congested destination links
309 * Returns 0 if success, otherwise errno
310 */
311static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
312 struct tipc_mc_method *method,
313 struct tipc_nlist *dests,
314 u16 *cong_link_cnt)
315{
316 struct tipc_msg *hdr, *_hdr;
317 struct sk_buff_head tmpq;
318 struct sk_buff *_skb;
319
320 /* Is a cluster supporting with new capabilities ? */
321 if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL))
322 return 0;
323
324 hdr = buf_msg(skb);
325 if (msg_user(hdr) == MSG_FRAGMENTER)
326 hdr = msg_get_wrapped(hdr);
327 if (msg_type(hdr) != TIPC_MCAST_MSG)
328 return 0;
329
330 /* Allocate dummy message */
331 _skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL);
332 if (!skb)
333 return -ENOMEM;
334
335 /* Preparing for 'synching' header */
336 msg_set_syn(hdr, 1);
337
338 /* Copy skb's header into a dummy header */
339 skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE);
340 skb_orphan(_skb);
341
342 /* Reverse method for dummy message */
343 _hdr = buf_msg(_skb);
344 msg_set_size(_hdr, MCAST_H_SIZE);
345 msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
346
347 skb_queue_head_init(&tmpq);
348 __skb_queue_tail(&tmpq, _skb);
349 if (method->rcast)
350 tipc_bcast_xmit(net, &tmpq, cong_link_cnt);
351 else
352 tipc_rcast_xmit(net, &tmpq, dests, cong_link_cnt);
353
354 /* This queue should normally be empty by now */
355 __skb_queue_purge(&tmpq);
356
357 return 0;
358}
359
288/* tipc_mcast_xmit - deliver message to indicated destination nodes 360/* tipc_mcast_xmit - deliver message to indicated destination nodes
289 * and to identified node local sockets 361 * and to identified node local sockets
290 * @net: the applicable net namespace 362 * @net: the applicable net namespace
@@ -300,6 +372,9 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
300 u16 *cong_link_cnt) 372 u16 *cong_link_cnt)
301{ 373{
302 struct sk_buff_head inputq, localq; 374 struct sk_buff_head inputq, localq;
375 bool rcast = method->rcast;
376 struct tipc_msg *hdr;
377 struct sk_buff *skb;
303 int rc = 0; 378 int rc = 0;
304 379
305 skb_queue_head_init(&inputq); 380 skb_queue_head_init(&inputq);
@@ -313,6 +388,18 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
313 /* Send according to determined transmit method */ 388 /* Send according to determined transmit method */
314 if (dests->remote) { 389 if (dests->remote) {
315 tipc_bcast_select_xmit_method(net, dests->remote, method); 390 tipc_bcast_select_xmit_method(net, dests->remote, method);
391
392 skb = skb_peek(pkts);
393 hdr = buf_msg(skb);
394 if (msg_user(hdr) == MSG_FRAGMENTER)
395 hdr = msg_get_wrapped(hdr);
396 msg_set_is_rcast(hdr, method->rcast);
397
398 /* Switch method ? */
399 if (rcast != method->rcast)
400 tipc_mcast_send_sync(net, skb, method,
401 dests, cong_link_cnt);
402
316 if (method->rcast) 403 if (method->rcast)
317 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); 404 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
318 else 405 else
@@ -672,3 +759,79 @@ u32 tipc_bcast_get_broadcast_ratio(struct net *net)
672 759
673 return bb->rc_ratio; 760 return bb->rc_ratio;
674} 761}
762
763void tipc_mcast_filter_msg(struct sk_buff_head *defq,
764 struct sk_buff_head *inputq)
765{
766 struct sk_buff *skb, *_skb, *tmp;
767 struct tipc_msg *hdr, *_hdr;
768 bool match = false;
769 u32 node, port;
770
771 skb = skb_peek(inputq);
772 hdr = buf_msg(skb);
773
774 if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
775 return;
776
777 node = msg_orignode(hdr);
778 port = msg_origport(hdr);
779
780 /* Has the twin SYN message already arrived ? */
781 skb_queue_walk(defq, _skb) {
782 _hdr = buf_msg(_skb);
783 if (msg_orignode(_hdr) != node)
784 continue;
785 if (msg_origport(_hdr) != port)
786 continue;
787 match = true;
788 break;
789 }
790
791 if (!match) {
792 if (!msg_is_syn(hdr))
793 return;
794 __skb_dequeue(inputq);
795 __skb_queue_tail(defq, skb);
796 return;
797 }
798
799 /* Deliver non-SYN message from other link, otherwise queue it */
800 if (!msg_is_syn(hdr)) {
801 if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
802 return;
803 __skb_dequeue(inputq);
804 __skb_queue_tail(defq, skb);
805 return;
806 }
807
808 /* Queue non-SYN/SYN message from same link */
809 if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) {
810 __skb_dequeue(inputq);
811 __skb_queue_tail(defq, skb);
812 return;
813 }
814
815 /* Matching SYN messages => return the one with data, if any */
816 __skb_unlink(_skb, defq);
817 if (msg_data_sz(hdr)) {
818 kfree_skb(_skb);
819 } else {
820 __skb_dequeue(inputq);
821 kfree_skb(skb);
822 __skb_queue_tail(inputq, _skb);
823 }
824
825 /* Deliver subsequent non-SYN messages from same peer */
826 skb_queue_walk_safe(defq, _skb, tmp) {
827 _hdr = buf_msg(_skb);
828 if (msg_orignode(_hdr) != node)
829 continue;
830 if (msg_origport(_hdr) != port)
831 continue;
832 if (msg_is_syn(_hdr))
833 break;
834 __skb_unlink(_skb, defq);
835 __skb_queue_tail(inputq, _skb);
836 }
837}
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 37c55e7347a5..484bde289d3a 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -67,11 +67,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
67/* Cookie to be used between socket and broadcast layer 67/* Cookie to be used between socket and broadcast layer
68 * @rcast: replicast (instead of broadcast) was used at previous xmit 68 * @rcast: replicast (instead of broadcast) was used at previous xmit
69 * @mandatory: broadcast/replicast indication was set by user 69 * @mandatory: broadcast/replicast indication was set by user
70 * @deferredq: defer queue to make message in order
70 * @expires: re-evaluate non-mandatory transmit method if we are past this 71 * @expires: re-evaluate non-mandatory transmit method if we are past this
71 */ 72 */
72struct tipc_mc_method { 73struct tipc_mc_method {
73 bool rcast; 74 bool rcast;
74 bool mandatory; 75 bool mandatory;
76 struct sk_buff_head deferredq;
75 unsigned long expires; 77 unsigned long expires;
76}; 78};
77 79
@@ -99,6 +101,9 @@ int tipc_bclink_reset_stats(struct net *net);
99u32 tipc_bcast_get_broadcast_mode(struct net *net); 101u32 tipc_bcast_get_broadcast_mode(struct net *net);
100u32 tipc_bcast_get_broadcast_ratio(struct net *net); 102u32 tipc_bcast_get_broadcast_ratio(struct net *net);
101 103
104void tipc_mcast_filter_msg(struct sk_buff_head *defq,
105 struct sk_buff_head *inputq);
106
102static inline void tipc_bcast_lock(struct net *net) 107static inline void tipc_bcast_lock(struct net *net)
103{ 108{
104 spin_lock_bh(&tipc_net(net)->bclock); 109 spin_lock_bh(&tipc_net(net)->bclock);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index d7e4b8b93f9d..528ba9241acc 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -257,6 +257,16 @@ static inline void msg_set_src_droppable(struct tipc_msg *m, u32 d)
257 msg_set_bits(m, 0, 18, 1, d); 257 msg_set_bits(m, 0, 18, 1, d);
258} 258}
259 259
260static inline bool msg_is_rcast(struct tipc_msg *m)
261{
262 return msg_bits(m, 0, 18, 0x1);
263}
264
265static inline void msg_set_is_rcast(struct tipc_msg *m, bool d)
266{
267 msg_set_bits(m, 0, 18, 0x1, d);
268}
269
260static inline void msg_set_size(struct tipc_msg *m, u32 sz) 270static inline void msg_set_size(struct tipc_msg *m, u32 sz)
261{ 271{
262 m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz); 272 m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index b542f14ed444..922b75ff56d3 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -485,6 +485,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
485 tsk_set_unreturnable(tsk, true); 485 tsk_set_unreturnable(tsk, true);
486 if (sock->type == SOCK_DGRAM) 486 if (sock->type == SOCK_DGRAM)
487 tsk_set_unreliable(tsk, true); 487 tsk_set_unreliable(tsk, true);
488 __skb_queue_head_init(&tsk->mc_method.deferredq);
488 } 489 }
489 490
490 trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " "); 491 trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
@@ -582,6 +583,7 @@ static int tipc_release(struct socket *sock)
582 sk->sk_shutdown = SHUTDOWN_MASK; 583 sk->sk_shutdown = SHUTDOWN_MASK;
583 tipc_sk_leave(tsk); 584 tipc_sk_leave(tsk);
584 tipc_sk_withdraw(tsk, 0, NULL); 585 tipc_sk_withdraw(tsk, 0, NULL);
586 __skb_queue_purge(&tsk->mc_method.deferredq);
585 sk_stop_timer(sk, &sk->sk_timer); 587 sk_stop_timer(sk, &sk->sk_timer);
586 tipc_sk_remove(tsk); 588 tipc_sk_remove(tsk);
587 589
@@ -2162,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
2162 if (unlikely(grp)) 2164 if (unlikely(grp))
2163 tipc_group_filter_msg(grp, &inputq, xmitq); 2165 tipc_group_filter_msg(grp, &inputq, xmitq);
2164 2166
2167 if (msg_type(hdr) == TIPC_MCAST_MSG)
2168 tipc_mcast_filter_msg(&tsk->mc_method.deferredq, &inputq);
2169
2165 /* Validate and add to receive buffer if there is space */ 2170 /* Validate and add to receive buffer if there is space */
2166 while ((skb = __skb_dequeue(&inputq))) { 2171 while ((skb = __skb_dequeue(&inputq))) {
2167 hdr = buf_msg(skb); 2172 hdr = buf_msg(skb);