aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c98
-rw-r--r--net/tipc/link.c19
-rw-r--r--net/tipc/link.h4
-rw-r--r--net/tipc/msg.c19
-rw-r--r--net/tipc/msg.h2
5 files changed, 85 insertions, 57 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3b7bd2174330..08d64e7527cb 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -112,6 +112,11 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
112 return tipc_net(net)->bcbase; 112 return tipc_net(net)->bcbase;
113} 113}
114 114
115static struct tipc_link *tipc_bc_sndlink(struct net *net)
116{
117 return tipc_net(net)->bcl;
118}
119
115/** 120/**
116 * tipc_nmap_equal - test for equality of node maps 121 * tipc_nmap_equal - test for equality of node maps
117 */ 122 */
@@ -121,6 +126,7 @@ static int tipc_nmap_equal(struct tipc_node_map *nm_a,
121 return !memcmp(nm_a, nm_b, sizeof(*nm_a)); 126 return !memcmp(nm_a, nm_b, sizeof(*nm_a));
122} 127}
123 128
129static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq);
124static void tipc_nmap_diff(struct tipc_node_map *nm_a, 130static void tipc_nmap_diff(struct tipc_node_map *nm_a,
125 struct tipc_node_map *nm_b, 131 struct tipc_node_map *nm_b,
126 struct tipc_node_map *nm_diff); 132 struct tipc_node_map *nm_diff);
@@ -148,14 +154,14 @@ uint tipc_bcast_get_mtu(void)
148 return MAX_PKT_DEFAULT_MCAST; 154 return MAX_PKT_DEFAULT_MCAST;
149} 155}
150 156
151static u32 bcbuf_acks(struct sk_buff *buf) 157static u16 bcbuf_acks(struct sk_buff *skb)
152{ 158{
153 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 159 return TIPC_SKB_CB(skb)->ackers;
154} 160}
155 161
156static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) 162static void bcbuf_set_acks(struct sk_buff *buf, u16 ackers)
157{ 163{
158 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; 164 TIPC_SKB_CB(buf)->ackers = ackers;
159} 165}
160 166
161static void bcbuf_decr_acks(struct sk_buff *buf) 167static void bcbuf_decr_acks(struct sk_buff *buf)
@@ -166,9 +172,10 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
166void tipc_bclink_add_node(struct net *net, u32 addr) 172void tipc_bclink_add_node(struct net *net, u32 addr)
167{ 173{
168 struct tipc_net *tn = net_generic(net, tipc_net_id); 174 struct tipc_net *tn = net_generic(net, tipc_net_id);
169 175 struct tipc_link *l = tipc_bc_sndlink(net);
170 tipc_bclink_lock(net); 176 tipc_bclink_lock(net);
171 tipc_nmap_add(&tn->bcbase->bcast_nodes, addr); 177 tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
178 tipc_link_add_bc_peer(l);
172 tipc_bclink_unlock(net); 179 tipc_bclink_unlock(net);
173} 180}
174 181
@@ -178,6 +185,7 @@ void tipc_bclink_remove_node(struct net *net, u32 addr)
178 185
179 tipc_bclink_lock(net); 186 tipc_bclink_lock(net);
180 tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr); 187 tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
188 tn->bcl->ackers--;
181 189
182 /* Last node? => reset backlog queue */ 190 /* Last node? => reset backlog queue */
183 if (!tn->bcbase->bcast_nodes.count) 191 if (!tn->bcbase->bcast_nodes.count)
@@ -295,7 +303,6 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
295 303
296 if (unlikely(!n_ptr->bclink.recv_permitted)) 304 if (unlikely(!n_ptr->bclink.recv_permitted))
297 return; 305 return;
298
299 tipc_bclink_lock(net); 306 tipc_bclink_lock(net);
300 307
301 /* Bail out if tx queue is empty (no clean up is required) */ 308 /* Bail out if tx queue is empty (no clean up is required) */
@@ -324,13 +331,11 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
324 less_eq(acked, n_ptr->bclink.acked)) 331 less_eq(acked, n_ptr->bclink.acked))
325 goto exit; 332 goto exit;
326 } 333 }
327
328 /* Skip over packets that node has previously acknowledged */ 334 /* Skip over packets that node has previously acknowledged */
329 skb_queue_walk(&tn->bcl->transmq, skb) { 335 skb_queue_walk(&tn->bcl->transmq, skb) {
330 if (more(buf_seqno(skb), n_ptr->bclink.acked)) 336 if (more(buf_seqno(skb), n_ptr->bclink.acked))
331 break; 337 break;
332 } 338 }
333
334 /* Update packets that node is now acknowledging */ 339 /* Update packets that node is now acknowledging */
335 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) { 340 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
336 if (more(buf_seqno(skb), acked)) 341 if (more(buf_seqno(skb), acked))
@@ -367,6 +372,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
367 struct sk_buff *buf; 372 struct sk_buff *buf;
368 struct net *net = n_ptr->net; 373 struct net *net = n_ptr->net;
369 struct tipc_net *tn = net_generic(net, tipc_net_id); 374 struct tipc_net *tn = net_generic(net, tipc_net_id);
375 struct tipc_link *bcl = tn->bcl;
370 376
371 /* Ignore "stale" link state info */ 377 /* Ignore "stale" link state info */
372 if (less_eq(last_sent, n_ptr->bclink.last_in)) 378 if (less_eq(last_sent, n_ptr->bclink.last_in))
@@ -375,6 +381,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
375 /* Update link synchronization state; quit if in sync */ 381 /* Update link synchronization state; quit if in sync */
376 bclink_update_last_sent(n_ptr, last_sent); 382 bclink_update_last_sent(n_ptr, last_sent);
377 383
384 /* This is a good location for statistical profiling */
385 bcl->stats.queue_sz_counts++;
386 bcl->stats.accu_queue_sz += skb_queue_len(&bcl->transmq);
387
378 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 388 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
379 return; 389 return;
380 390
@@ -468,52 +478,35 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
468 */ 478 */
469int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) 479int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
470{ 480{
471 struct tipc_net *tn = net_generic(net, tipc_net_id); 481 struct tipc_link *l = tipc_bc_sndlink(net);
472 struct tipc_link *bcl = tn->bcl; 482 struct sk_buff_head xmitq, inputq, rcvq;
473 struct tipc_bc_base *bclink = tn->bcbase;
474 int rc = 0; 483 int rc = 0;
475 int bc = 0;
476 struct sk_buff *skb;
477 struct sk_buff_head arrvq;
478 struct sk_buff_head inputq;
479 484
480 /* Prepare clone of message for local node */ 485 __skb_queue_head_init(&rcvq);
481 skb = tipc_msg_reassemble(list); 486 __skb_queue_head_init(&xmitq);
482 if (unlikely(!skb)) 487 skb_queue_head_init(&inputq);
483 return -EHOSTUNREACH;
484 488
485 /* Broadcast to all nodes */ 489 /* Prepare message clone for local node */
486 if (likely(bclink)) { 490 if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
487 tipc_bclink_lock(net); 491 return -EHOSTUNREACH;
488 if (likely(bclink->bcast_nodes.count)) {
489 rc = __tipc_link_xmit(net, bcl, list);
490 if (likely(!rc)) {
491 u32 len = skb_queue_len(&bcl->transmq);
492
493 bclink_set_last_sent(net);
494 bcl->stats.queue_sz_counts++;
495 bcl->stats.accu_queue_sz += len;
496 }
497 bc = 1;
498 }
499 tipc_bclink_unlock(net);
500 }
501 492
502 if (unlikely(!bc)) 493 tipc_bcast_lock(net);
503 __skb_queue_purge(list); 494 if (tipc_link_bc_peers(l))
495 rc = tipc_link_xmit(l, list, &xmitq);
496 bclink_set_last_sent(net);
497 tipc_bcast_unlock(net);
504 498
499 /* Don't send to local node if adding to link failed */
505 if (unlikely(rc)) { 500 if (unlikely(rc)) {
506 kfree_skb(skb); 501 __skb_queue_purge(&rcvq);
507 return rc; 502 return rc;
508 } 503 }
509 /* Deliver message clone */ 504 /* Broadcast to all nodes, inluding local node */
510 __skb_queue_head_init(&arrvq); 505 tipc_bcbearer_xmit(net, &xmitq);
511 skb_queue_head_init(&inputq); 506 tipc_sk_mcast_rcv(net, &rcvq, &inputq);
512 __skb_queue_tail(&arrvq, skb); 507 __skb_queue_purge(list);
513 tipc_sk_mcast_rcv(net, &arrvq, &inputq); 508 return 0;
514 return rc;
515} 509}
516
517/** 510/**
518 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 511 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
519 * 512 *
@@ -564,7 +557,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
564 node = tipc_node_find(net, msg_prevnode(msg)); 557 node = tipc_node_find(net, msg_prevnode(msg));
565 if (unlikely(!node)) 558 if (unlikely(!node))
566 goto exit; 559 goto exit;
567
568 tipc_node_lock(node); 560 tipc_node_lock(node);
569 if (unlikely(!node->bclink.recv_permitted)) 561 if (unlikely(!node->bclink.recv_permitted))
570 goto unlock; 562 goto unlock;
@@ -589,7 +581,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
589 tipc_node_put(node); 581 tipc_node_put(node);
590 goto exit; 582 goto exit;
591 } 583 }
592
593 /* Handle in-sequence broadcast message */ 584 /* Handle in-sequence broadcast message */
594 seqno = msg_seqno(msg); 585 seqno = msg_seqno(msg);
595 next_in = mod(node->bclink.last_in + 1); 586 next_in = mod(node->bclink.last_in + 1);
@@ -778,6 +769,19 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
778 return 0; 769 return 0;
779} 770}
780 771
772static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq)
773{
774 struct sk_buff *skb, *tmp;
775
776 skb_queue_walk_safe(xmitq, skb, tmp) {
777 __skb_dequeue(xmitq);
778 tipc_bcbearer_send(net, skb, NULL, NULL);
779
780 /* Until we remove cloning in tipc_l2_send_msg(): */
781 kfree_skb(skb);
782 }
783}
784
781/** 785/**
782 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 786 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
783 */ 787 */
diff --git a/net/tipc/link.c b/net/tipc/link.c
index dfc738e5cff9..363da5f85704 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -158,6 +158,21 @@ int tipc_link_is_active(struct tipc_link *l)
158 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); 158 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l);
159} 159}
160 160
161void tipc_link_add_bc_peer(struct tipc_link *l)
162{
163 l->ackers++;
164}
165
166void tipc_link_remove_bc_peer(struct tipc_link *l)
167{
168 l->ackers--;
169}
170
171int tipc_link_bc_peers(struct tipc_link *l)
172{
173 return l->ackers;
174}
175
161static u32 link_own_addr(struct tipc_link *l) 176static u32 link_own_addr(struct tipc_link *l)
162{ 177{
163 return msg_prevnode(l->pmsg); 178 return msg_prevnode(l->pmsg);
@@ -258,6 +273,7 @@ bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window,
258 l = *link; 273 l = *link;
259 strcpy(l->name, tipc_bclink_name); 274 strcpy(l->name, tipc_bclink_name);
260 tipc_link_reset(l); 275 tipc_link_reset(l);
276 l->ackers = 0;
261 return true; 277 return true;
262} 278}
263 279
@@ -898,8 +914,7 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
898 char addr_string[16]; 914 char addr_string[16];
899 915
900 pr_info("Msg seq number: %u, ", msg_seqno(msg)); 916 pr_info("Msg seq number: %u, ", msg_seqno(msg));
901 pr_cont("Outstanding acks: %lu\n", 917 pr_cont("Outstanding acks: %u\n", TIPC_SKB_CB(buf)->ackers);
902 (unsigned long) TIPC_SKB_CB(buf)->handle);
903 918
904 n_ptr = tipc_bclink_retransmit_to(net); 919 n_ptr = tipc_bclink_retransmit_to(net);
905 920
diff --git a/net/tipc/link.h b/net/tipc/link.h
index be24d1fd5132..9c4acc26b3b1 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -262,5 +262,9 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
262int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq); 262int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
263int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, 263int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
264 struct sk_buff_head *xmitq); 264 struct sk_buff_head *xmitq);
265void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
266void tipc_link_add_bc_peer(struct tipc_link *l);
267void tipc_link_remove_bc_peer(struct tipc_link *l);
268int tipc_link_bc_peers(struct tipc_link *l);
265 269
266#endif 270#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 26d38b3d8760..5b47468739e7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -565,18 +565,22 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
565/* tipc_msg_reassemble() - clone a buffer chain of fragments and 565/* tipc_msg_reassemble() - clone a buffer chain of fragments and
566 * reassemble the clones into one message 566 * reassemble the clones into one message
567 */ 567 */
568struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list) 568bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq)
569{ 569{
570 struct sk_buff *skb; 570 struct sk_buff *skb, *_skb;
571 struct sk_buff *frag = NULL; 571 struct sk_buff *frag = NULL;
572 struct sk_buff *head = NULL; 572 struct sk_buff *head = NULL;
573 int hdr_sz; 573 int hdr_len;
574 574
575 /* Copy header if single buffer */ 575 /* Copy header if single buffer */
576 if (skb_queue_len(list) == 1) { 576 if (skb_queue_len(list) == 1) {
577 skb = skb_peek(list); 577 skb = skb_peek(list);
578 hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb)); 578 hdr_len = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
579 return __pskb_copy(skb, hdr_sz, GFP_ATOMIC); 579 _skb = __pskb_copy(skb, hdr_len, GFP_ATOMIC);
580 if (!_skb)
581 return false;
582 __skb_queue_tail(rcvq, _skb);
583 return true;
580 } 584 }
581 585
582 /* Clone all fragments and reassemble */ 586 /* Clone all fragments and reassemble */
@@ -590,11 +594,12 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list)
590 if (!head) 594 if (!head)
591 goto error; 595 goto error;
592 } 596 }
593 return frag; 597 __skb_queue_tail(rcvq, frag);
598 return true;
594error: 599error:
595 pr_warn("Failed do clone local mcast rcv buffer\n"); 600 pr_warn("Failed do clone local mcast rcv buffer\n");
596 kfree_skb(head); 601 kfree_skb(head);
597 return NULL; 602 return false;
598} 603}
599 604
600/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number 605/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 799782c47f6c..fbf51fa1075d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -790,7 +790,7 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
790int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 790int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
791 int offset, int dsz, int mtu, struct sk_buff_head *list); 791 int offset, int dsz, int mtu, struct sk_buff_head *list);
792bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); 792bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
793struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); 793bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
794void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, 794void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
795 struct sk_buff *skb); 795 struct sk_buff *skb);
796 796