aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/tipc/bcast.c105
-rw-r--r--net/tipc/bcast.h3
-rw-r--r--net/tipc/link.c8
-rw-r--r--net/tipc/msg.c17
-rw-r--r--net/tipc/msg.h9
-rw-r--r--net/tipc/node.c27
-rw-r--r--net/tipc/socket.c27
7 files changed, 149 insertions, 47 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 412d3351abb7..672e6ef93cab 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -70,7 +70,7 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
70 70
71int tipc_bcast_get_mtu(struct net *net) 71int tipc_bcast_get_mtu(struct net *net)
72{ 72{
73 return tipc_link_mtu(tipc_bc_sndlink(net)); 73 return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
74} 74}
75 75
76/* tipc_bcbase_select_primary(): find a bearer with links to all destinations, 76/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
@@ -175,42 +175,101 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
175 __skb_queue_purge(&_xmitq); 175 __skb_queue_purge(&_xmitq);
176} 176}
177 177
178/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster 178/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
179 * and to identified node local sockets
180 * @net: the applicable net namespace 179 * @net: the applicable net namespace
181 * @list: chain of buffers containing message 180 * @pkts: chain of buffers containing message
181 * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
182 * Consumes the buffer chain. 182 * Consumes the buffer chain.
183 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 183 * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
184 */ 184 */
185int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) 185static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
186 u16 *cong_link_cnt)
186{ 187{
187 struct tipc_link *l = tipc_bc_sndlink(net); 188 struct tipc_link *l = tipc_bc_sndlink(net);
188 struct sk_buff_head xmitq, inputq, rcvq; 189 struct sk_buff_head xmitq;
189 int rc = 0; 190 int rc = 0;
190 191
191 __skb_queue_head_init(&rcvq);
192 __skb_queue_head_init(&xmitq); 192 __skb_queue_head_init(&xmitq);
193 skb_queue_head_init(&inputq);
194
195 /* Prepare message clone for local node */
196 if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
197 return -EHOSTUNREACH;
198
199 tipc_bcast_lock(net); 193 tipc_bcast_lock(net);
200 if (tipc_link_bc_peers(l)) 194 if (tipc_link_bc_peers(l))
201 rc = tipc_link_xmit(l, list, &xmitq); 195 rc = tipc_link_xmit(l, pkts, &xmitq);
202 tipc_bcast_unlock(net); 196 tipc_bcast_unlock(net);
197 tipc_bcbase_xmit(net, &xmitq);
198 __skb_queue_purge(pkts);
199 if (rc == -ELINKCONG) {
200 *cong_link_cnt = 1;
201 rc = 0;
202 }
203 return rc;
204}
203 205
204 /* Don't send to local node if adding to link failed */ 206/* tipc_rcast_xmit - replicate and send a message to given destination nodes
205 if (unlikely(rc && (rc != -ELINKCONG))) { 207 * @net: the applicable net namespace
206 __skb_queue_purge(&rcvq); 208 * @pkts: chain of buffers containing message
207 return rc; 209 * @dests: list of destination nodes
210 * @cong_link_cnt: returns number of congested links
211 * @cong_links: returns identities of congested links
212 * Returns 0 if success, otherwise errno
213 */
214static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
215 struct tipc_nlist *dests, u16 *cong_link_cnt)
216{
217 struct sk_buff_head _pkts;
218 struct u32_item *n, *tmp;
219 u32 dst, selector;
220
221 selector = msg_link_selector(buf_msg(skb_peek(pkts)));
222 __skb_queue_head_init(&_pkts);
223
224 list_for_each_entry_safe(n, tmp, &dests->list, list) {
225 dst = n->value;
226 if (!tipc_msg_pskb_copy(dst, pkts, &_pkts))
227 return -ENOMEM;
228
229 /* Any other return value than -ELINKCONG is ignored */
230 if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG)
231 (*cong_link_cnt)++;
208 } 232 }
233 return 0;
234}
209 235
210 /* Broadcast to all nodes, inluding local node */ 236/* tipc_mcast_xmit - deliver message to indicated destination nodes
211 tipc_bcbase_xmit(net, &xmitq); 237 * and to identified node local sockets
212 tipc_sk_mcast_rcv(net, &rcvq, &inputq); 238 * @net: the applicable net namespace
213 __skb_queue_purge(list); 239 * @pkts: chain of buffers containing message
240 * @dests: destination nodes for message. Not consumed.
241 * @cong_link_cnt: returns number of encountered congested destination links
242 * @cong_links: returns identities of congested links
243 * Consumes buffer chain.
244 * Returns 0 if success, otherwise errno
245 */
246int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
247 struct tipc_nlist *dests, u16 *cong_link_cnt)
248{
249 struct tipc_bc_base *bb = tipc_bc_base(net);
250 struct sk_buff_head inputq, localq;
251 int rc = 0;
252
253 skb_queue_head_init(&inputq);
254 skb_queue_head_init(&localq);
255
256 /* Clone packets before they are consumed by next call */
257 if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
258 rc = -ENOMEM;
259 goto exit;
260 }
261
262 if (dests->remote) {
263 if (!bb->bcast_support)
264 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
265 else
266 rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
267 }
268
269 if (dests->local)
270 tipc_sk_mcast_rcv(net, &localq, &inputq);
271exit:
272 __skb_queue_purge(pkts);
214 return rc; 273 return rc;
215} 274}
216 275
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 18f379198f8f..dd772e6f6fa4 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -66,7 +66,8 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
66void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id); 66void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
67void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id); 67void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
68int tipc_bcast_get_mtu(struct net *net); 68int tipc_bcast_get_mtu(struct net *net);
69int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list); 69int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
70 struct tipc_nlist *dests, u16 *cong_link_cnt);
70int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb); 71int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
71void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, 72void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
72 struct tipc_msg *hdr); 73 struct tipc_msg *hdr);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index b0f8646e0631..b17b9e155469 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1032,11 +1032,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
1032static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, 1032static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1033 struct sk_buff_head *inputq) 1033 struct sk_buff_head *inputq)
1034{ 1034{
1035 switch (msg_user(buf_msg(skb))) { 1035 struct tipc_msg *hdr = buf_msg(skb);
1036
1037 switch (msg_user(hdr)) {
1036 case TIPC_LOW_IMPORTANCE: 1038 case TIPC_LOW_IMPORTANCE:
1037 case TIPC_MEDIUM_IMPORTANCE: 1039 case TIPC_MEDIUM_IMPORTANCE:
1038 case TIPC_HIGH_IMPORTANCE: 1040 case TIPC_HIGH_IMPORTANCE:
1039 case TIPC_CRITICAL_IMPORTANCE: 1041 case TIPC_CRITICAL_IMPORTANCE:
1042 if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
1043 skb_queue_tail(l->bc_rcvlink->inputq, skb);
1044 return true;
1045 }
1040 case CONN_MANAGER: 1046 case CONN_MANAGER:
1041 skb_queue_tail(inputq, skb); 1047 skb_queue_tail(inputq, skb);
1042 return true; 1048 return true;
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index ab02d0742476..312ef7de57d7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -607,6 +607,23 @@ error:
607 return false; 607 return false;
608} 608}
609 609
610bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
611 struct sk_buff_head *cpy)
612{
613 struct sk_buff *skb, *_skb;
614
615 skb_queue_walk(msg, skb) {
616 _skb = pskb_copy(skb, GFP_ATOMIC);
617 if (!_skb) {
618 __skb_queue_purge(cpy);
619 return false;
620 }
621 msg_set_destnode(buf_msg(_skb), dst);
622 __skb_queue_tail(cpy, _skb);
623 }
624 return true;
625}
626
610/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number 627/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
611 * @list: list to be appended to 628 * @list: list to be appended to
612 * @seqno: sequence number of buffer to add 629 * @seqno: sequence number of buffer to add
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index f07b51e3f6f1..c843fd2bc48d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -631,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg *m, u32 id)
631 631
632static inline u32 msg_link_selector(struct tipc_msg *m) 632static inline u32 msg_link_selector(struct tipc_msg *m)
633{ 633{
634 if (msg_user(m) == MSG_FRAGMENTER)
635 m = (void *)msg_data(m);
634 return msg_bits(m, 4, 0, 1); 636 return msg_bits(m, 4, 0, 1);
635} 637}
636 638
637static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
638{
639 msg_set_bits(m, 4, 0, 1, n);
640}
641
642/* 639/*
643 * Word 5 640 * Word 5
644 */ 641 */
@@ -835,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
835 int offset, int dsz, int mtu, struct sk_buff_head *list); 832 int offset, int dsz, int mtu, struct sk_buff_head *list);
836bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); 833bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
837bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq); 834bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
835bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
836 struct sk_buff_head *cpy);
838void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, 837void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
839 struct sk_buff *skb); 838 struct sk_buff *skb);
840 839
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 2883f6a0ed98..f96dacf173ab 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1257,6 +1257,19 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
1257 kfree_skb(skb); 1257 kfree_skb(skb);
1258} 1258}
1259 1259
1260static void tipc_node_mcast_rcv(struct tipc_node *n)
1261{
1262 struct tipc_bclink_entry *be = &n->bc_entry;
1263
1264 /* 'arrvq' is under inputq2's lock protection */
1265 spin_lock_bh(&be->inputq2.lock);
1266 spin_lock_bh(&be->inputq1.lock);
1267 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1268 spin_unlock_bh(&be->inputq1.lock);
1269 spin_unlock_bh(&be->inputq2.lock);
1270 tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2);
1271}
1272
1260static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr, 1273static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
1261 int bearer_id, struct sk_buff_head *xmitq) 1274 int bearer_id, struct sk_buff_head *xmitq)
1262{ 1275{
@@ -1330,15 +1343,8 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
1330 if (!skb_queue_empty(&xmitq)) 1343 if (!skb_queue_empty(&xmitq))
1331 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); 1344 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1332 1345
1333 /* Deliver. 'arrvq' is under inputq2's lock protection */ 1346 if (!skb_queue_empty(&be->inputq1))
1334 if (!skb_queue_empty(&be->inputq1)) { 1347 tipc_node_mcast_rcv(n);
1335 spin_lock_bh(&be->inputq2.lock);
1336 spin_lock_bh(&be->inputq1.lock);
1337 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1338 spin_unlock_bh(&be->inputq1.lock);
1339 spin_unlock_bh(&be->inputq2.lock);
1340 tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
1341 }
1342 1348
1343 if (rc & TIPC_LINK_DOWN_EVT) { 1349 if (rc & TIPC_LINK_DOWN_EVT) {
1344 /* Reception reassembly failure => reset all links to peer */ 1350 /* Reception reassembly failure => reset all links to peer */
@@ -1565,6 +1571,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1565 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq))) 1571 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
1566 tipc_named_rcv(net, &n->bc_entry.namedq); 1572 tipc_named_rcv(net, &n->bc_entry.namedq);
1567 1573
1574 if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
1575 tipc_node_mcast_rcv(n);
1576
1568 if (!skb_queue_empty(&le->inputq)) 1577 if (!skb_queue_empty(&le->inputq))
1569 tipc_sk_rcv(net, &le->inputq); 1578 tipc_sk_rcv(net, &le->inputq);
1570 1579
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d2f353934f82..93b6ae3154c9 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -740,32 +740,43 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
740 struct tipc_msg *hdr = &tsk->phdr; 740 struct tipc_msg *hdr = &tsk->phdr;
741 struct net *net = sock_net(sk); 741 struct net *net = sock_net(sk);
742 int mtu = tipc_bcast_get_mtu(net); 742 int mtu = tipc_bcast_get_mtu(net);
743 u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
743 struct sk_buff_head pkts; 744 struct sk_buff_head pkts;
745 struct tipc_nlist dsts;
744 int rc; 746 int rc;
745 747
748 /* Block or return if any destination link is congested */
746 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); 749 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
747 if (unlikely(rc)) 750 if (unlikely(rc))
748 return rc; 751 return rc;
749 752
753 /* Lookup destination nodes */
754 tipc_nlist_init(&dsts, tipc_own_addr(net));
755 tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
756 seq->upper, domain, &dsts);
757 if (!dsts.local && !dsts.remote)
758 return -EHOSTUNREACH;
759
760 /* Build message header */
750 msg_set_type(hdr, TIPC_MCAST_MSG); 761 msg_set_type(hdr, TIPC_MCAST_MSG);
762 msg_set_hdr_sz(hdr, MCAST_H_SIZE);
751 msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); 763 msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
752 msg_set_destport(hdr, 0); 764 msg_set_destport(hdr, 0);
753 msg_set_destnode(hdr, 0); 765 msg_set_destnode(hdr, 0);
754 msg_set_nametype(hdr, seq->type); 766 msg_set_nametype(hdr, seq->type);
755 msg_set_namelower(hdr, seq->lower); 767 msg_set_namelower(hdr, seq->lower);
756 msg_set_nameupper(hdr, seq->upper); 768 msg_set_nameupper(hdr, seq->upper);
757 msg_set_hdr_sz(hdr, MCAST_H_SIZE);
758 769
770 /* Build message as chain of buffers */
759 skb_queue_head_init(&pkts); 771 skb_queue_head_init(&pkts);
760 rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts); 772 rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
761 if (unlikely(rc != dlen))
762 return rc;
763 773
764 rc = tipc_bcast_xmit(net, &pkts); 774 /* Send message if build was successful */
765 if (unlikely(rc == -ELINKCONG)) { 775 if (unlikely(rc == dlen))
766 tsk->cong_link_cnt = 1; 776 rc = tipc_mcast_xmit(net, &pkts, &dsts,
767 rc = 0; 777 &tsk->cong_link_cnt);
768 } 778
779 tipc_nlist_purge(&dsts);
769 780
770 return rc ? rc : dlen; 781 return rc ? rc : dlen;
771} 782}