aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c165
-rw-r--r--net/tipc/bcast.h5
-rw-r--r--net/tipc/msg.h10
-rw-r--r--net/tipc/socket.c5
4 files changed, 184 insertions, 1 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 12b59268bdd6..5264a8ff6e01 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -220,9 +220,24 @@ static void tipc_bcast_select_xmit_method(struct net *net, int dests,
220 } 220 }
221 /* Can current method be changed ? */ 221 /* Can current method be changed ? */
222 method->expires = jiffies + TIPC_METHOD_EXPIRE; 222 method->expires = jiffies + TIPC_METHOD_EXPIRE;
223 if (method->mandatory || time_before(jiffies, exp)) 223 if (method->mandatory)
224 return; 224 return;
225 225
226 if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) &&
227 time_before(jiffies, exp))
228 return;
229
230 /* Configuration as force 'broadcast' method */
231 if (bb->force_bcast) {
232 method->rcast = false;
233 return;
234 }
235 /* Configuration as force 'replicast' method */
236 if (bb->force_rcast) {
237 method->rcast = true;
238 return;
239 }
240 /* Configuration as 'autoselect' or default method */
226 /* Determine method to use now */ 241 /* Determine method to use now */
227 method->rcast = dests <= bb->bc_threshold; 242 method->rcast = dests <= bb->bc_threshold;
228} 243}
@@ -285,6 +300,63 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
285 return 0; 300 return 0;
286} 301}
287 302
303/* tipc_mcast_send_sync - deliver a dummy message with SYN bit
304 * @net: the applicable net namespace
305 * @skb: socket buffer to copy
306 * @method: send method to be used
307 * @dests: destination nodes for message.
308 * @cong_link_cnt: returns number of encountered congested destination links
309 * Returns 0 if success, otherwise errno
310 */
311static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
312 struct tipc_mc_method *method,
313 struct tipc_nlist *dests,
314 u16 *cong_link_cnt)
315{
316 struct tipc_msg *hdr, *_hdr;
317 struct sk_buff_head tmpq;
318 struct sk_buff *_skb;
319
320 /* Is a cluster supporting with new capabilities ? */
321 if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL))
322 return 0;
323
324 hdr = buf_msg(skb);
325 if (msg_user(hdr) == MSG_FRAGMENTER)
326 hdr = msg_get_wrapped(hdr);
327 if (msg_type(hdr) != TIPC_MCAST_MSG)
328 return 0;
329
330 /* Allocate dummy message */
331 _skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL);
332 if (!skb)
333 return -ENOMEM;
334
335 /* Preparing for 'synching' header */
336 msg_set_syn(hdr, 1);
337
338 /* Copy skb's header into a dummy header */
339 skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE);
340 skb_orphan(_skb);
341
342 /* Reverse method for dummy message */
343 _hdr = buf_msg(_skb);
344 msg_set_size(_hdr, MCAST_H_SIZE);
345 msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
346
347 skb_queue_head_init(&tmpq);
348 __skb_queue_tail(&tmpq, _skb);
349 if (method->rcast)
350 tipc_bcast_xmit(net, &tmpq, cong_link_cnt);
351 else
352 tipc_rcast_xmit(net, &tmpq, dests, cong_link_cnt);
353
354 /* This queue should normally be empty by now */
355 __skb_queue_purge(&tmpq);
356
357 return 0;
358}
359
288/* tipc_mcast_xmit - deliver message to indicated destination nodes 360/* tipc_mcast_xmit - deliver message to indicated destination nodes
289 * and to identified node local sockets 361 * and to identified node local sockets
290 * @net: the applicable net namespace 362 * @net: the applicable net namespace
@@ -300,6 +372,9 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
300 u16 *cong_link_cnt) 372 u16 *cong_link_cnt)
301{ 373{
302 struct sk_buff_head inputq, localq; 374 struct sk_buff_head inputq, localq;
375 bool rcast = method->rcast;
376 struct tipc_msg *hdr;
377 struct sk_buff *skb;
303 int rc = 0; 378 int rc = 0;
304 379
305 skb_queue_head_init(&inputq); 380 skb_queue_head_init(&inputq);
@@ -313,6 +388,18 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
313 /* Send according to determined transmit method */ 388 /* Send according to determined transmit method */
314 if (dests->remote) { 389 if (dests->remote) {
315 tipc_bcast_select_xmit_method(net, dests->remote, method); 390 tipc_bcast_select_xmit_method(net, dests->remote, method);
391
392 skb = skb_peek(pkts);
393 hdr = buf_msg(skb);
394 if (msg_user(hdr) == MSG_FRAGMENTER)
395 hdr = msg_get_wrapped(hdr);
396 msg_set_is_rcast(hdr, method->rcast);
397
398 /* Switch method ? */
399 if (rcast != method->rcast)
400 tipc_mcast_send_sync(net, skb, method,
401 dests, cong_link_cnt);
402
316 if (method->rcast) 403 if (method->rcast)
317 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); 404 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
318 else 405 else
@@ -672,3 +759,79 @@ u32 tipc_bcast_get_broadcast_ratio(struct net *net)
672 759
673 return bb->rc_ratio; 760 return bb->rc_ratio;
674} 761}
762
763void tipc_mcast_filter_msg(struct sk_buff_head *defq,
764 struct sk_buff_head *inputq)
765{
766 struct sk_buff *skb, *_skb, *tmp;
767 struct tipc_msg *hdr, *_hdr;
768 bool match = false;
769 u32 node, port;
770
771 skb = skb_peek(inputq);
772 hdr = buf_msg(skb);
773
774 if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
775 return;
776
777 node = msg_orignode(hdr);
778 port = msg_origport(hdr);
779
780 /* Has the twin SYN message already arrived ? */
781 skb_queue_walk(defq, _skb) {
782 _hdr = buf_msg(_skb);
783 if (msg_orignode(_hdr) != node)
784 continue;
785 if (msg_origport(_hdr) != port)
786 continue;
787 match = true;
788 break;
789 }
790
791 if (!match) {
792 if (!msg_is_syn(hdr))
793 return;
794 __skb_dequeue(inputq);
795 __skb_queue_tail(defq, skb);
796 return;
797 }
798
799 /* Deliver non-SYN message from other link, otherwise queue it */
800 if (!msg_is_syn(hdr)) {
801 if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
802 return;
803 __skb_dequeue(inputq);
804 __skb_queue_tail(defq, skb);
805 return;
806 }
807
808 /* Queue non-SYN/SYN message from same link */
809 if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) {
810 __skb_dequeue(inputq);
811 __skb_queue_tail(defq, skb);
812 return;
813 }
814
815 /* Matching SYN messages => return the one with data, if any */
816 __skb_unlink(_skb, defq);
817 if (msg_data_sz(hdr)) {
818 kfree_skb(_skb);
819 } else {
820 __skb_dequeue(inputq);
821 kfree_skb(skb);
822 __skb_queue_tail(inputq, _skb);
823 }
824
825 /* Deliver subsequent non-SYN messages from same peer */
826 skb_queue_walk_safe(defq, _skb, tmp) {
827 _hdr = buf_msg(_skb);
828 if (msg_orignode(_hdr) != node)
829 continue;
830 if (msg_origport(_hdr) != port)
831 continue;
832 if (msg_is_syn(_hdr))
833 break;
834 __skb_unlink(_skb, defq);
835 __skb_queue_tail(inputq, _skb);
836 }
837}
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 37c55e7347a5..484bde289d3a 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -67,11 +67,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
67/* Cookie to be used between socket and broadcast layer 67/* Cookie to be used between socket and broadcast layer
68 * @rcast: replicast (instead of broadcast) was used at previous xmit 68 * @rcast: replicast (instead of broadcast) was used at previous xmit
69 * @mandatory: broadcast/replicast indication was set by user 69 * @mandatory: broadcast/replicast indication was set by user
70 * @deferredq: defer queue to make message in order
70 * @expires: re-evaluate non-mandatory transmit method if we are past this 71 * @expires: re-evaluate non-mandatory transmit method if we are past this
71 */ 72 */
72struct tipc_mc_method { 73struct tipc_mc_method {
73 bool rcast; 74 bool rcast;
74 bool mandatory; 75 bool mandatory;
76 struct sk_buff_head deferredq;
75 unsigned long expires; 77 unsigned long expires;
76}; 78};
77 79
@@ -99,6 +101,9 @@ int tipc_bclink_reset_stats(struct net *net);
99u32 tipc_bcast_get_broadcast_mode(struct net *net); 101u32 tipc_bcast_get_broadcast_mode(struct net *net);
100u32 tipc_bcast_get_broadcast_ratio(struct net *net); 102u32 tipc_bcast_get_broadcast_ratio(struct net *net);
101 103
104void tipc_mcast_filter_msg(struct sk_buff_head *defq,
105 struct sk_buff_head *inputq);
106
102static inline void tipc_bcast_lock(struct net *net) 107static inline void tipc_bcast_lock(struct net *net)
103{ 108{
104 spin_lock_bh(&tipc_net(net)->bclock); 109 spin_lock_bh(&tipc_net(net)->bclock);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index d7e4b8b93f9d..528ba9241acc 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -257,6 +257,16 @@ static inline void msg_set_src_droppable(struct tipc_msg *m, u32 d)
257 msg_set_bits(m, 0, 18, 1, d); 257 msg_set_bits(m, 0, 18, 1, d);
258} 258}
259 259
260static inline bool msg_is_rcast(struct tipc_msg *m)
261{
262 return msg_bits(m, 0, 18, 0x1);
263}
264
265static inline void msg_set_is_rcast(struct tipc_msg *m, bool d)
266{
267 msg_set_bits(m, 0, 18, 0x1, d);
268}
269
260static inline void msg_set_size(struct tipc_msg *m, u32 sz) 270static inline void msg_set_size(struct tipc_msg *m, u32 sz)
261{ 271{
262 m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz); 272 m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index b542f14ed444..922b75ff56d3 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -485,6 +485,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
485 tsk_set_unreturnable(tsk, true); 485 tsk_set_unreturnable(tsk, true);
486 if (sock->type == SOCK_DGRAM) 486 if (sock->type == SOCK_DGRAM)
487 tsk_set_unreliable(tsk, true); 487 tsk_set_unreliable(tsk, true);
488 __skb_queue_head_init(&tsk->mc_method.deferredq);
488 } 489 }
489 490
490 trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " "); 491 trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
@@ -582,6 +583,7 @@ static int tipc_release(struct socket *sock)
582 sk->sk_shutdown = SHUTDOWN_MASK; 583 sk->sk_shutdown = SHUTDOWN_MASK;
583 tipc_sk_leave(tsk); 584 tipc_sk_leave(tsk);
584 tipc_sk_withdraw(tsk, 0, NULL); 585 tipc_sk_withdraw(tsk, 0, NULL);
586 __skb_queue_purge(&tsk->mc_method.deferredq);
585 sk_stop_timer(sk, &sk->sk_timer); 587 sk_stop_timer(sk, &sk->sk_timer);
586 tipc_sk_remove(tsk); 588 tipc_sk_remove(tsk);
587 589
@@ -2162,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
2162 if (unlikely(grp)) 2164 if (unlikely(grp))
2163 tipc_group_filter_msg(grp, &inputq, xmitq); 2165 tipc_group_filter_msg(grp, &inputq, xmitq);
2164 2166
2167 if (msg_type(hdr) == TIPC_MCAST_MSG)
2168 tipc_mcast_filter_msg(&tsk->mc_method.deferredq, &inputq);
2169
2165 /* Validate and add to receive buffer if there is space */ 2170 /* Validate and add to receive buffer if there is space */
2166 while ((skb = __skb_dequeue(&inputq))) { 2171 while ((skb = __skb_dequeue(&inputq))) {
2167 hdr = buf_msg(skb); 2172 hdr = buf_msg(skb);