aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-10-22 08:51:39 -0400
committerDavid S. Miller <davem@davemloft.net>2015-10-24 09:56:32 -0400
commit2f566124570625c29c3fd79bac4d9cd97c0c31a1 (patch)
treeeb1330a7984296a6f1eac0da479f6c83322691c8 /net/tipc
parentc1ab3f1dea3df566ad38caf98baf69c656679090 (diff)
tipc: let broadcast transmission use new link transmit function
This commit simplifies the broadcast link transmission function, by leveraging previous changes to the link transmission function and the broadcast transmission link life cycle. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c98
-rw-r--r--net/tipc/link.c19
-rw-r--r--net/tipc/link.h4
-rw-r--r--net/tipc/msg.c19
-rw-r--r--net/tipc/msg.h2
5 files changed, 85 insertions, 57 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3b7bd2174330..08d64e7527cb 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -112,6 +112,11 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
112 return tipc_net(net)->bcbase; 112 return tipc_net(net)->bcbase;
113} 113}
114 114
115static struct tipc_link *tipc_bc_sndlink(struct net *net)
116{
117 return tipc_net(net)->bcl;
118}
119
115/** 120/**
116 * tipc_nmap_equal - test for equality of node maps 121 * tipc_nmap_equal - test for equality of node maps
117 */ 122 */
@@ -121,6 +126,7 @@ static int tipc_nmap_equal(struct tipc_node_map *nm_a,
121 return !memcmp(nm_a, nm_b, sizeof(*nm_a)); 126 return !memcmp(nm_a, nm_b, sizeof(*nm_a));
122} 127}
123 128
129static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq);
124static void tipc_nmap_diff(struct tipc_node_map *nm_a, 130static void tipc_nmap_diff(struct tipc_node_map *nm_a,
125 struct tipc_node_map *nm_b, 131 struct tipc_node_map *nm_b,
126 struct tipc_node_map *nm_diff); 132 struct tipc_node_map *nm_diff);
@@ -148,14 +154,14 @@ uint tipc_bcast_get_mtu(void)
148 return MAX_PKT_DEFAULT_MCAST; 154 return MAX_PKT_DEFAULT_MCAST;
149} 155}
150 156
151static u32 bcbuf_acks(struct sk_buff *buf) 157static u16 bcbuf_acks(struct sk_buff *skb)
152{ 158{
153 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 159 return TIPC_SKB_CB(skb)->ackers;
154} 160}
155 161
156static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) 162static void bcbuf_set_acks(struct sk_buff *buf, u16 ackers)
157{ 163{
158 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; 164 TIPC_SKB_CB(buf)->ackers = ackers;
159} 165}
160 166
161static void bcbuf_decr_acks(struct sk_buff *buf) 167static void bcbuf_decr_acks(struct sk_buff *buf)
@@ -166,9 +172,10 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
166void tipc_bclink_add_node(struct net *net, u32 addr) 172void tipc_bclink_add_node(struct net *net, u32 addr)
167{ 173{
168 struct tipc_net *tn = net_generic(net, tipc_net_id); 174 struct tipc_net *tn = net_generic(net, tipc_net_id);
169 175 struct tipc_link *l = tipc_bc_sndlink(net);
170 tipc_bclink_lock(net); 176 tipc_bclink_lock(net);
171 tipc_nmap_add(&tn->bcbase->bcast_nodes, addr); 177 tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
178 tipc_link_add_bc_peer(l);
172 tipc_bclink_unlock(net); 179 tipc_bclink_unlock(net);
173} 180}
174 181
@@ -178,6 +185,7 @@ void tipc_bclink_remove_node(struct net *net, u32 addr)
178 185
179 tipc_bclink_lock(net); 186 tipc_bclink_lock(net);
180 tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr); 187 tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
188 tn->bcl->ackers--;
181 189
182 /* Last node? => reset backlog queue */ 190 /* Last node? => reset backlog queue */
183 if (!tn->bcbase->bcast_nodes.count) 191 if (!tn->bcbase->bcast_nodes.count)
@@ -295,7 +303,6 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
295 303
296 if (unlikely(!n_ptr->bclink.recv_permitted)) 304 if (unlikely(!n_ptr->bclink.recv_permitted))
297 return; 305 return;
298
299 tipc_bclink_lock(net); 306 tipc_bclink_lock(net);
300 307
301 /* Bail out if tx queue is empty (no clean up is required) */ 308 /* Bail out if tx queue is empty (no clean up is required) */
@@ -324,13 +331,11 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
324 less_eq(acked, n_ptr->bclink.acked)) 331 less_eq(acked, n_ptr->bclink.acked))
325 goto exit; 332 goto exit;
326 } 333 }
327
328 /* Skip over packets that node has previously acknowledged */ 334 /* Skip over packets that node has previously acknowledged */
329 skb_queue_walk(&tn->bcl->transmq, skb) { 335 skb_queue_walk(&tn->bcl->transmq, skb) {
330 if (more(buf_seqno(skb), n_ptr->bclink.acked)) 336 if (more(buf_seqno(skb), n_ptr->bclink.acked))
331 break; 337 break;
332 } 338 }
333
334 /* Update packets that node is now acknowledging */ 339 /* Update packets that node is now acknowledging */
335 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) { 340 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
336 if (more(buf_seqno(skb), acked)) 341 if (more(buf_seqno(skb), acked))
@@ -367,6 +372,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
367 struct sk_buff *buf; 372 struct sk_buff *buf;
368 struct net *net = n_ptr->net; 373 struct net *net = n_ptr->net;
369 struct tipc_net *tn = net_generic(net, tipc_net_id); 374 struct tipc_net *tn = net_generic(net, tipc_net_id);
375 struct tipc_link *bcl = tn->bcl;
370 376
371 /* Ignore "stale" link state info */ 377 /* Ignore "stale" link state info */
372 if (less_eq(last_sent, n_ptr->bclink.last_in)) 378 if (less_eq(last_sent, n_ptr->bclink.last_in))
@@ -375,6 +381,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
375 /* Update link synchronization state; quit if in sync */ 381 /* Update link synchronization state; quit if in sync */
376 bclink_update_last_sent(n_ptr, last_sent); 382 bclink_update_last_sent(n_ptr, last_sent);
377 383
384 /* This is a good location for statistical profiling */
385 bcl->stats.queue_sz_counts++;
386 bcl->stats.accu_queue_sz += skb_queue_len(&bcl->transmq);
387
378 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 388 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
379 return; 389 return;
380 390
@@ -468,52 +478,35 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
468 */ 478 */
469int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) 479int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
470{ 480{
471 struct tipc_net *tn = net_generic(net, tipc_net_id); 481 struct tipc_link *l = tipc_bc_sndlink(net);
472 struct tipc_link *bcl = tn->bcl; 482 struct sk_buff_head xmitq, inputq, rcvq;
473 struct tipc_bc_base *bclink = tn->bcbase;
474 int rc = 0; 483 int rc = 0;
475 int bc = 0;
476 struct sk_buff *skb;
477 struct sk_buff_head arrvq;
478 struct sk_buff_head inputq;
479 484
480 /* Prepare clone of message for local node */ 485 __skb_queue_head_init(&rcvq);
481 skb = tipc_msg_reassemble(list); 486 __skb_queue_head_init(&xmitq);
482 if (unlikely(!skb)) 487 skb_queue_head_init(&inputq);
483 return -EHOSTUNREACH;
484 488
485 /* Broadcast to all nodes */ 489 /* Prepare message clone for local node */
486 if (likely(bclink)) { 490 if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
487 tipc_bclink_lock(net); 491 return -EHOSTUNREACH;
488 if (likely(bclink->bcast_nodes.count)) {
489 rc = __tipc_link_xmit(net, bcl, list);
490 if (likely(!rc)) {
491 u32 len = skb_queue_len(&bcl->transmq);
492
493 bclink_set_last_sent(net);
494 bcl->stats.queue_sz_counts++;
495 bcl->stats.accu_queue_sz += len;
496 }
497 bc = 1;
498 }
499 tipc_bclink_unlock(net);
500 }
501 492
502 if (unlikely(!bc)) 493 tipc_bcast_lock(net);
503 __skb_queue_purge(list); 494 if (tipc_link_bc_peers(l))
495 rc = tipc_link_xmit(l, list, &xmitq);
496 bclink_set_last_sent(net);
497 tipc_bcast_unlock(net);
504 498
499 /* Don't send to local node if adding to link failed */
505 if (unlikely(rc)) { 500 if (unlikely(rc)) {
506 kfree_skb(skb); 501 __skb_queue_purge(&rcvq);
507 return rc; 502 return rc;
508 } 503 }
509 /* Deliver message clone */ 504 /* Broadcast to all nodes, inluding local node */
510 __skb_queue_head_init(&arrvq); 505 tipc_bcbearer_xmit(net, &xmitq);
511 skb_queue_head_init(&inputq); 506 tipc_sk_mcast_rcv(net, &rcvq, &inputq);
512 __skb_queue_tail(&arrvq, skb); 507 __skb_queue_purge(list);
513 tipc_sk_mcast_rcv(net, &arrvq, &inputq); 508 return 0;
514 return rc;
515} 509}
516
517/** 510/**
518 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 511 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
519 * 512 *
@@ -564,7 +557,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
564 node = tipc_node_find(net, msg_prevnode(msg)); 557 node = tipc_node_find(net, msg_prevnode(msg));
565 if (unlikely(!node)) 558 if (unlikely(!node))
566 goto exit; 559 goto exit;
567
568 tipc_node_lock(node); 560 tipc_node_lock(node);
569 if (unlikely(!node->bclink.recv_permitted)) 561 if (unlikely(!node->bclink.recv_permitted))
570 goto unlock; 562 goto unlock;
@@ -589,7 +581,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
589 tipc_node_put(node); 581 tipc_node_put(node);
590 goto exit; 582 goto exit;
591 } 583 }
592
593 /* Handle in-sequence broadcast message */ 584 /* Handle in-sequence broadcast message */
594 seqno = msg_seqno(msg); 585 seqno = msg_seqno(msg);
595 next_in = mod(node->bclink.last_in + 1); 586 next_in = mod(node->bclink.last_in + 1);
@@ -778,6 +769,19 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
778 return 0; 769 return 0;
779} 770}
780 771
772static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq)
773{
774 struct sk_buff *skb, *tmp;
775
776 skb_queue_walk_safe(xmitq, skb, tmp) {
777 __skb_dequeue(xmitq);
778 tipc_bcbearer_send(net, skb, NULL, NULL);
779
780 /* Until we remove cloning in tipc_l2_send_msg(): */
781 kfree_skb(skb);
782 }
783}
784
781/** 785/**
782 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 786 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
783 */ 787 */
diff --git a/net/tipc/link.c b/net/tipc/link.c
index dfc738e5cff9..363da5f85704 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -158,6 +158,21 @@ int tipc_link_is_active(struct tipc_link *l)
158 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); 158 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l);
159} 159}
160 160
161void tipc_link_add_bc_peer(struct tipc_link *l)
162{
163 l->ackers++;
164}
165
166void tipc_link_remove_bc_peer(struct tipc_link *l)
167{
168 l->ackers--;
169}
170
171int tipc_link_bc_peers(struct tipc_link *l)
172{
173 return l->ackers;
174}
175
161static u32 link_own_addr(struct tipc_link *l) 176static u32 link_own_addr(struct tipc_link *l)
162{ 177{
163 return msg_prevnode(l->pmsg); 178 return msg_prevnode(l->pmsg);
@@ -258,6 +273,7 @@ bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window,
258 l = *link; 273 l = *link;
259 strcpy(l->name, tipc_bclink_name); 274 strcpy(l->name, tipc_bclink_name);
260 tipc_link_reset(l); 275 tipc_link_reset(l);
276 l->ackers = 0;
261 return true; 277 return true;
262} 278}
263 279
@@ -898,8 +914,7 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
898 char addr_string[16]; 914 char addr_string[16];
899 915
900 pr_info("Msg seq number: %u, ", msg_seqno(msg)); 916 pr_info("Msg seq number: %u, ", msg_seqno(msg));
901 pr_cont("Outstanding acks: %lu\n", 917 pr_cont("Outstanding acks: %u\n", TIPC_SKB_CB(buf)->ackers);
902 (unsigned long) TIPC_SKB_CB(buf)->handle);
903 918
904 n_ptr = tipc_bclink_retransmit_to(net); 919 n_ptr = tipc_bclink_retransmit_to(net);
905 920
diff --git a/net/tipc/link.h b/net/tipc/link.h
index be24d1fd5132..9c4acc26b3b1 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -262,5 +262,9 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
262int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq); 262int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
263int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, 263int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
264 struct sk_buff_head *xmitq); 264 struct sk_buff_head *xmitq);
265void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
266void tipc_link_add_bc_peer(struct tipc_link *l);
267void tipc_link_remove_bc_peer(struct tipc_link *l);
268int tipc_link_bc_peers(struct tipc_link *l);
265 269
266#endif 270#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 26d38b3d8760..5b47468739e7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -565,18 +565,22 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
565/* tipc_msg_reassemble() - clone a buffer chain of fragments and 565/* tipc_msg_reassemble() - clone a buffer chain of fragments and
566 * reassemble the clones into one message 566 * reassemble the clones into one message
567 */ 567 */
568struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list) 568bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq)
569{ 569{
570 struct sk_buff *skb; 570 struct sk_buff *skb, *_skb;
571 struct sk_buff *frag = NULL; 571 struct sk_buff *frag = NULL;
572 struct sk_buff *head = NULL; 572 struct sk_buff *head = NULL;
573 int hdr_sz; 573 int hdr_len;
574 574
575 /* Copy header if single buffer */ 575 /* Copy header if single buffer */
576 if (skb_queue_len(list) == 1) { 576 if (skb_queue_len(list) == 1) {
577 skb = skb_peek(list); 577 skb = skb_peek(list);
578 hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb)); 578 hdr_len = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
579 return __pskb_copy(skb, hdr_sz, GFP_ATOMIC); 579 _skb = __pskb_copy(skb, hdr_len, GFP_ATOMIC);
580 if (!_skb)
581 return false;
582 __skb_queue_tail(rcvq, _skb);
583 return true;
580 } 584 }
581 585
582 /* Clone all fragments and reassemble */ 586 /* Clone all fragments and reassemble */
@@ -590,11 +594,12 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list)
590 if (!head) 594 if (!head)
591 goto error; 595 goto error;
592 } 596 }
593 return frag; 597 __skb_queue_tail(rcvq, frag);
598 return true;
594error: 599error:
595 pr_warn("Failed do clone local mcast rcv buffer\n"); 600 pr_warn("Failed do clone local mcast rcv buffer\n");
596 kfree_skb(head); 601 kfree_skb(head);
597 return NULL; 602 return false;
598} 603}
599 604
600/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number 605/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 799782c47f6c..fbf51fa1075d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -790,7 +790,7 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
790int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 790int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
791 int offset, int dsz, int mtu, struct sk_buff_head *list); 791 int offset, int dsz, int mtu, struct sk_buff_head *list);
792bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); 792bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
793struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); 793bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
794void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, 794void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
795 struct sk_buff *skb); 795 struct sk_buff *skb);
796 796