aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2014-07-16 20:41:00 -0400
committerDavid S. Miller <davem@davemloft.net>2014-07-17 00:38:18 -0400
commit078bec826f7b73cf2a2397680537bcb7e075b492 (patch)
tree8b30dce59fa8512d15a53cbcc404962c09372047 /net
parent25b660c7e202d533e4985380b24729fd12de2b5e (diff)
tipc: add new functions for multicast and broadcast distribution
We add a new broadcast link transmit function in bclink.c and a new receive function in socket.c. The purpose is to move the branching between external and internal destination down to the link layer, just as we have done with unicast in earlier commits. We also make use of the new link-independent fragmentation support that was introduced in an earlier commit series. This gives a shorter and simpler code path, and makes it possible to obtain copy-free buffer delivery to all node local destination sockets. The new transmission code is added in parallel with the existing one, and will be used by the socket multicast send function in the next commit in this series. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net')
-rw-r--r--net/tipc/bcast.c55
-rw-r--r--net/tipc/bcast.h4
-rw-r--r--net/tipc/msg.c35
-rw-r--r--net/tipc/msg.h2
-rw-r--r--net/tipc/socket.c40
-rw-r--r--net/tipc/socket.h2
6 files changed, 136 insertions, 2 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 55c6c9d3e1ce..ac947251dd37 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/bcast.c: TIPC broadcast code 2 * net/tipc/bcast.c: TIPC broadcast code
3 * 3 *
4 * Copyright (c) 2004-2006, Ericsson AB 4 * Copyright (c) 2004-2006, 2014, Ericsson AB
5 * Copyright (c) 2004, Intel Corporation. 5 * Copyright (c) 2004, Intel Corporation.
6 * Copyright (c) 2005, 2010-2011, Wind River Systems 6 * Copyright (c) 2005, 2010-2011, Wind River Systems
7 * All rights reserved. 7 * All rights reserved.
@@ -38,6 +38,8 @@
38#include "core.h" 38#include "core.h"
39#include "link.h" 39#include "link.h"
40#include "port.h" 40#include "port.h"
41#include "socket.h"
42#include "msg.h"
41#include "bcast.h" 43#include "bcast.h"
42#include "name_distr.h" 44#include "name_distr.h"
43 45
@@ -138,6 +140,11 @@ static void tipc_bclink_unlock(void)
138 tipc_link_reset_all(node); 140 tipc_link_reset_all(node);
139} 141}
140 142
143uint tipc_bclink_get_mtu(void)
144{
145 return MAX_PKT_DEFAULT_MCAST;
146}
147
141void tipc_bclink_set_flags(unsigned int flags) 148void tipc_bclink_set_flags(unsigned int flags)
142{ 149{
143 bclink->flags |= flags; 150 bclink->flags |= flags;
@@ -408,6 +415,52 @@ exit:
408 return res; 415 return res;
409} 416}
410 417
418/* tipc_bclink_xmit2 - broadcast buffer chain to all nodes in cluster
419 * and to identified node local sockets
420 * @buf: chain of buffers containing message
421 * Consumes the buffer chain, except when returning -ELINKCONG
422 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
423 */
424int tipc_bclink_xmit2(struct sk_buff *buf)
425{
426 int rc = 0;
427 int bc = 0;
428 struct sk_buff *clbuf;
429
430 /* Prepare clone of message for local node */
431 clbuf = tipc_msg_reassemble(buf);
432 if (unlikely(!clbuf)) {
433 kfree_skb_list(buf);
434 return -EHOSTUNREACH;
435 }
436
437 /* Broadcast to all other nodes */
438 if (likely(bclink)) {
439 tipc_bclink_lock();
440 if (likely(bclink->bcast_nodes.count)) {
441 rc = __tipc_link_xmit(bcl, buf);
442 if (likely(!rc)) {
443 bclink_set_last_sent();
444 bcl->stats.queue_sz_counts++;
445 bcl->stats.accu_queue_sz += bcl->out_queue_size;
446 }
447 bc = 1;
448 }
449 tipc_bclink_unlock();
450 }
451
452 if (unlikely(!bc))
453 kfree_skb_list(buf);
454
455 /* Deliver message clone */
456 if (likely(!rc))
457 tipc_sk_mcast_rcv(clbuf);
458 else
459 kfree_skb(clbuf);
460
461 return rc;
462}
463
411/** 464/**
412 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 465 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
413 * 466 *
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 00330c45df3e..d90645f408aa 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/bcast.h: Include file for TIPC broadcast code 2 * net/tipc/bcast.h: Include file for TIPC broadcast code
3 * 3 *
4 * Copyright (c) 2003-2006, Ericsson AB 4 * Copyright (c) 2003-2006, 2014, Ericsson AB
5 * Copyright (c) 2005, 2010-2011, Wind River Systems 5 * Copyright (c) 2005, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -98,5 +98,7 @@ int tipc_bclink_stats(char *stats_buf, const u32 buf_size);
98int tipc_bclink_reset_stats(void); 98int tipc_bclink_reset_stats(void);
99int tipc_bclink_set_queue_limits(u32 limit); 99int tipc_bclink_set_queue_limits(u32 limit);
100void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action); 100void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
101uint tipc_bclink_get_mtu(void);
102int tipc_bclink_xmit2(struct sk_buff *buf);
101 103
102#endif 104#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index ce6d929d66d2..9682296f5e7c 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -417,3 +417,38 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
417 msg_set_destport(msg, dport); 417 msg_set_destport(msg, dport);
418 return TIPC_OK; 418 return TIPC_OK;
419} 419}
420
421/* tipc_msg_reassemble() - clone a buffer chain of fragments and
422 * reassemble the clones into one message
423 */
424struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
425{
426 struct sk_buff *buf = chain;
427 struct sk_buff *frag = buf;
428 struct sk_buff *head = NULL;
429 int hdr_sz;
430
431 /* Copy header if single buffer */
432 if (!buf->next) {
433 hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf));
434 return __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
435 }
436
437 /* Clone all fragments and reassemble */
438 while (buf) {
439 frag = skb_clone(buf, GFP_ATOMIC);
440 if (!frag)
441 goto error;
442 frag->next = NULL;
443 if (tipc_buf_append(&head, &frag))
444 break;
445 if (!head)
446 goto error;
447 buf = buf->next;
448 }
449 return frag;
450error:
451 pr_warn("Failed do clone local mcast rcv buffer\n");
452 kfree_skb(head);
453 return NULL;
454}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 7d574346e75e..a15d59601bf9 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -744,4 +744,6 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
744int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov, 744int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
745 int offset, int dsz, int mtu , struct sk_buff **chain); 745 int offset, int dsz, int mtu , struct sk_buff **chain);
746 746
747struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain);
748
747#endif 749#endif
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index de01622672b2..8d30995682b1 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -534,6 +534,46 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
534 return mask; 534 return mask;
535} 535}
536 536
537/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
538 */
539void tipc_sk_mcast_rcv(struct sk_buff *buf)
540{
541 struct tipc_msg *msg = buf_msg(buf);
542 struct tipc_port_list dports = {0, NULL, };
543 struct tipc_port_list *item;
544 struct sk_buff *b;
545 uint i, last, dst = 0;
546 u32 scope = TIPC_CLUSTER_SCOPE;
547
548 if (in_own_node(msg_orignode(msg)))
549 scope = TIPC_NODE_SCOPE;
550
551 /* Create destination port list: */
552 tipc_nametbl_mc_translate(msg_nametype(msg),
553 msg_namelower(msg),
554 msg_nameupper(msg),
555 scope,
556 &dports);
557 last = dports.count;
558 if (!last) {
559 kfree_skb(buf);
560 return;
561 }
562
563 for (item = &dports; item; item = item->next) {
564 for (i = 0; i < PLSIZE && ++dst <= last; i++) {
565 b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
566 if (!b) {
567 pr_warn("Failed do clone mcast rcv buffer\n");
568 continue;
569 }
570 msg_set_destport(msg, item->ports[i]);
571 tipc_sk_rcv(b);
572 }
573 }
574 tipc_port_list_free(&dports);
575}
576
537/** 577/**
538 * tipc_sk_proto_rcv - receive a connection mng protocol message 578 * tipc_sk_proto_rcv - receive a connection mng protocol message
539 * @tsk: receiving socket 579 * @tsk: receiving socket
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 2cdede9eda1b..43b75b3ceced 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -85,4 +85,6 @@ static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
85 85
86int tipc_sk_rcv(struct sk_buff *buf); 86int tipc_sk_rcv(struct sk_buff *buf);
87 87
88void tipc_sk_mcast_rcv(struct sk_buff *buf);
89
88#endif 90#endif