aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/Makefile4
-rw-r--r--net/tipc/bcast.c230
-rw-r--r--net/tipc/bcast.h6
-rw-r--r--net/tipc/bearer.c447
-rw-r--r--net/tipc/bearer.h16
-rw-r--r--net/tipc/core.h2
-rw-r--r--net/tipc/link.c981
-rw-r--r--net/tipc/link.h55
-rw-r--r--net/tipc/msg.c133
-rw-r--r--net/tipc/msg.h16
-rw-r--r--net/tipc/name_distr.c181
-rw-r--r--net/tipc/name_distr.h1
-rw-r--r--net/tipc/name_table.c373
-rw-r--r--net/tipc/name_table.h30
-rw-r--r--net/tipc/net.c106
-rw-r--r--net/tipc/net.h8
-rw-r--r--net/tipc/netlink.c133
-rw-r--r--net/tipc/netlink.h (renamed from net/tipc/node_subscr.h)35
-rw-r--r--net/tipc/node.c108
-rw-r--r--net/tipc/node.h16
-rw-r--r--net/tipc/node_subscr.c96
-rw-r--r--net/tipc/socket.c419
-rw-r--r--net/tipc/socket.h3
-rw-r--r--net/tipc/subscr.c1
24 files changed, 2486 insertions, 914 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index b8a13caad59a..333e4592772c 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -7,8 +7,8 @@ obj-$(CONFIG_TIPC) := tipc.o
7tipc-y += addr.o bcast.o bearer.o config.o \ 7tipc-y += addr.o bcast.o bearer.o config.o \
8 core.o link.o discover.o msg.o \ 8 core.o link.o discover.o msg.o \
9 name_distr.o subscr.o name_table.o net.o \ 9 name_distr.o subscr.o name_table.o net.o \
10 netlink.o node.o node_subscr.o \ 10 netlink.o node.o socket.o log.o eth_media.o \
11 socket.o log.o eth_media.o server.o 11 server.o
12 12
13tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o 13tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o
14tipc-$(CONFIG_SYSCTL) += sysctl.o 14tipc-$(CONFIG_SYSCTL) += sysctl.o
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index b8670bf262e2..96ceefeb9daf 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -217,12 +217,13 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
217 */ 217 */
218static void bclink_retransmit_pkt(u32 after, u32 to) 218static void bclink_retransmit_pkt(u32 after, u32 to)
219{ 219{
220 struct sk_buff *buf; 220 struct sk_buff *skb;
221 221
222 buf = bcl->first_out; 222 skb_queue_walk(&bcl->outqueue, skb) {
223 while (buf && less_eq(buf_seqno(buf), after)) 223 if (more(buf_seqno(skb), after))
224 buf = buf->next; 224 break;
225 tipc_link_retransmit(bcl, buf, mod(to - after)); 225 }
226 tipc_link_retransmit(bcl, skb, mod(to - after));
226} 227}
227 228
228/** 229/**
@@ -232,8 +233,11 @@ static void bclink_retransmit_pkt(u32 after, u32 to)
232 */ 233 */
233void tipc_bclink_wakeup_users(void) 234void tipc_bclink_wakeup_users(void)
234{ 235{
235 while (skb_queue_len(&bclink->link.waiting_sks)) 236 struct sk_buff *skb;
236 tipc_sk_rcv(skb_dequeue(&bclink->link.waiting_sks)); 237
238 while ((skb = skb_dequeue(&bclink->link.waiting_sks)))
239 tipc_sk_rcv(skb);
240
237} 241}
238 242
239/** 243/**
@@ -245,14 +249,14 @@ void tipc_bclink_wakeup_users(void)
245 */ 249 */
246void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 250void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
247{ 251{
248 struct sk_buff *crs; 252 struct sk_buff *skb, *tmp;
249 struct sk_buff *next; 253 struct sk_buff *next;
250 unsigned int released = 0; 254 unsigned int released = 0;
251 255
252 tipc_bclink_lock(); 256 tipc_bclink_lock();
253 /* Bail out if tx queue is empty (no clean up is required) */ 257 /* Bail out if tx queue is empty (no clean up is required) */
254 crs = bcl->first_out; 258 skb = skb_peek(&bcl->outqueue);
255 if (!crs) 259 if (!skb)
256 goto exit; 260 goto exit;
257 261
258 /* Determine which messages need to be acknowledged */ 262 /* Determine which messages need to be acknowledged */
@@ -271,43 +275,43 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
271 * Bail out if specified sequence number does not correspond 275 * Bail out if specified sequence number does not correspond
272 * to a message that has been sent and not yet acknowledged 276 * to a message that has been sent and not yet acknowledged
273 */ 277 */
274 if (less(acked, buf_seqno(crs)) || 278 if (less(acked, buf_seqno(skb)) ||
275 less(bcl->fsm_msg_cnt, acked) || 279 less(bcl->fsm_msg_cnt, acked) ||
276 less_eq(acked, n_ptr->bclink.acked)) 280 less_eq(acked, n_ptr->bclink.acked))
277 goto exit; 281 goto exit;
278 } 282 }
279 283
280 /* Skip over packets that node has previously acknowledged */ 284 /* Skip over packets that node has previously acknowledged */
281 while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked)) 285 skb_queue_walk(&bcl->outqueue, skb) {
282 crs = crs->next; 286 if (more(buf_seqno(skb), n_ptr->bclink.acked))
287 break;
288 }
283 289
284 /* Update packets that node is now acknowledging */ 290 /* Update packets that node is now acknowledging */
291 skb_queue_walk_from_safe(&bcl->outqueue, skb, tmp) {
292 if (more(buf_seqno(skb), acked))
293 break;
285 294
286 while (crs && less_eq(buf_seqno(crs), acked)) { 295 next = tipc_skb_queue_next(&bcl->outqueue, skb);
287 next = crs->next; 296 if (skb != bcl->next_out) {
288 297 bcbuf_decr_acks(skb);
289 if (crs != bcl->next_out) 298 } else {
290 bcbuf_decr_acks(crs); 299 bcbuf_set_acks(skb, 0);
291 else {
292 bcbuf_set_acks(crs, 0);
293 bcl->next_out = next; 300 bcl->next_out = next;
294 bclink_set_last_sent(); 301 bclink_set_last_sent();
295 } 302 }
296 303
297 if (bcbuf_acks(crs) == 0) { 304 if (bcbuf_acks(skb) == 0) {
298 bcl->first_out = next; 305 __skb_unlink(skb, &bcl->outqueue);
299 bcl->out_queue_size--; 306 kfree_skb(skb);
300 kfree_skb(crs);
301 released = 1; 307 released = 1;
302 } 308 }
303 crs = next;
304 } 309 }
305 n_ptr->bclink.acked = acked; 310 n_ptr->bclink.acked = acked;
306 311
307 /* Try resolving broadcast link congestion, if necessary */ 312 /* Try resolving broadcast link congestion, if necessary */
308
309 if (unlikely(bcl->next_out)) { 313 if (unlikely(bcl->next_out)) {
310 tipc_link_push_queue(bcl); 314 tipc_link_push_packets(bcl);
311 bclink_set_last_sent(); 315 bclink_set_last_sent();
312 } 316 }
313 if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks))) 317 if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks)))
@@ -327,19 +331,16 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
327 struct sk_buff *buf; 331 struct sk_buff *buf;
328 332
329 /* Ignore "stale" link state info */ 333 /* Ignore "stale" link state info */
330
331 if (less_eq(last_sent, n_ptr->bclink.last_in)) 334 if (less_eq(last_sent, n_ptr->bclink.last_in))
332 return; 335 return;
333 336
334 /* Update link synchronization state; quit if in sync */ 337 /* Update link synchronization state; quit if in sync */
335
336 bclink_update_last_sent(n_ptr, last_sent); 338 bclink_update_last_sent(n_ptr, last_sent);
337 339
338 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 340 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
339 return; 341 return;
340 342
341 /* Update out-of-sync state; quit if loss is still unconfirmed */ 343 /* Update out-of-sync state; quit if loss is still unconfirmed */
342
343 if ((++n_ptr->bclink.oos_state) == 1) { 344 if ((++n_ptr->bclink.oos_state) == 1) {
344 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) 345 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
345 return; 346 return;
@@ -347,15 +348,15 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
347 } 348 }
348 349
349 /* Don't NACK if one has been recently sent (or seen) */ 350 /* Don't NACK if one has been recently sent (or seen) */
350
351 if (n_ptr->bclink.oos_state & 0x1) 351 if (n_ptr->bclink.oos_state & 0x1)
352 return; 352 return;
353 353
354 /* Send NACK */ 354 /* Send NACK */
355
356 buf = tipc_buf_acquire(INT_H_SIZE); 355 buf = tipc_buf_acquire(INT_H_SIZE);
357 if (buf) { 356 if (buf) {
358 struct tipc_msg *msg = buf_msg(buf); 357 struct tipc_msg *msg = buf_msg(buf);
358 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue);
359 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
359 360
360 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, 361 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
361 INT_H_SIZE, n_ptr->addr); 362 INT_H_SIZE, n_ptr->addr);
@@ -363,9 +364,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
363 msg_set_mc_netid(msg, tipc_net_id); 364 msg_set_mc_netid(msg, tipc_net_id);
364 msg_set_bcast_ack(msg, n_ptr->bclink.last_in); 365 msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
365 msg_set_bcgap_after(msg, n_ptr->bclink.last_in); 366 msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
366 msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head 367 msg_set_bcgap_to(msg, to);
367 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
368 : n_ptr->bclink.last_sent);
369 368
370 tipc_bclink_lock(); 369 tipc_bclink_lock();
371 tipc_bearer_send(MAX_BEARERS, buf, NULL); 370 tipc_bearer_send(MAX_BEARERS, buf, NULL);
@@ -402,20 +401,20 @@ static void bclink_peek_nack(struct tipc_msg *msg)
402 401
403/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster 402/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
404 * and to identified node local sockets 403 * and to identified node local sockets
405 * @buf: chain of buffers containing message 404 * @list: chain of buffers containing message
406 * Consumes the buffer chain, except when returning -ELINKCONG 405 * Consumes the buffer chain, except when returning -ELINKCONG
407 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 406 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
408 */ 407 */
409int tipc_bclink_xmit(struct sk_buff *buf) 408int tipc_bclink_xmit(struct sk_buff_head *list)
410{ 409{
411 int rc = 0; 410 int rc = 0;
412 int bc = 0; 411 int bc = 0;
413 struct sk_buff *clbuf; 412 struct sk_buff *skb;
414 413
415 /* Prepare clone of message for local node */ 414 /* Prepare clone of message for local node */
416 clbuf = tipc_msg_reassemble(buf); 415 skb = tipc_msg_reassemble(list);
417 if (unlikely(!clbuf)) { 416 if (unlikely(!skb)) {
418 kfree_skb_list(buf); 417 __skb_queue_purge(list);
419 return -EHOSTUNREACH; 418 return -EHOSTUNREACH;
420 } 419 }
421 420
@@ -423,11 +422,13 @@ int tipc_bclink_xmit(struct sk_buff *buf)
423 if (likely(bclink)) { 422 if (likely(bclink)) {
424 tipc_bclink_lock(); 423 tipc_bclink_lock();
425 if (likely(bclink->bcast_nodes.count)) { 424 if (likely(bclink->bcast_nodes.count)) {
426 rc = __tipc_link_xmit(bcl, buf); 425 rc = __tipc_link_xmit(bcl, list);
427 if (likely(!rc)) { 426 if (likely(!rc)) {
427 u32 len = skb_queue_len(&bcl->outqueue);
428
428 bclink_set_last_sent(); 429 bclink_set_last_sent();
429 bcl->stats.queue_sz_counts++; 430 bcl->stats.queue_sz_counts++;
430 bcl->stats.accu_queue_sz += bcl->out_queue_size; 431 bcl->stats.accu_queue_sz += len;
431 } 432 }
432 bc = 1; 433 bc = 1;
433 } 434 }
@@ -435,13 +436,13 @@ int tipc_bclink_xmit(struct sk_buff *buf)
435 } 436 }
436 437
437 if (unlikely(!bc)) 438 if (unlikely(!bc))
438 kfree_skb_list(buf); 439 __skb_queue_purge(list);
439 440
440 /* Deliver message clone */ 441 /* Deliver message clone */
441 if (likely(!rc)) 442 if (likely(!rc))
442 tipc_sk_mcast_rcv(clbuf); 443 tipc_sk_mcast_rcv(skb);
443 else 444 else
444 kfree_skb(clbuf); 445 kfree_skb(skb);
445 446
446 return rc; 447 return rc;
447} 448}
@@ -462,7 +463,6 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
462 * Unicast an ACK periodically, ensuring that 463 * Unicast an ACK periodically, ensuring that
463 * all nodes in the cluster don't ACK at the same time 464 * all nodes in the cluster don't ACK at the same time
464 */ 465 */
465
466 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { 466 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
467 tipc_link_proto_xmit(node->active_links[node->addr & 1], 467 tipc_link_proto_xmit(node->active_links[node->addr & 1],
468 STATE_MSG, 0, 0, 0, 0, 0); 468 STATE_MSG, 0, 0, 0, 0, 0);
@@ -484,7 +484,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
484 int deferred = 0; 484 int deferred = 0;
485 485
486 /* Screen out unwanted broadcast messages */ 486 /* Screen out unwanted broadcast messages */
487
488 if (msg_mc_netid(msg) != tipc_net_id) 487 if (msg_mc_netid(msg) != tipc_net_id)
489 goto exit; 488 goto exit;
490 489
@@ -497,7 +496,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
497 goto unlock; 496 goto unlock;
498 497
499 /* Handle broadcast protocol message */ 498 /* Handle broadcast protocol message */
500
501 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 499 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
502 if (msg_type(msg) != STATE_MSG) 500 if (msg_type(msg) != STATE_MSG)
503 goto unlock; 501 goto unlock;
@@ -518,14 +516,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)
518 } 516 }
519 517
520 /* Handle in-sequence broadcast message */ 518 /* Handle in-sequence broadcast message */
521
522 seqno = msg_seqno(msg); 519 seqno = msg_seqno(msg);
523 next_in = mod(node->bclink.last_in + 1); 520 next_in = mod(node->bclink.last_in + 1);
524 521
525 if (likely(seqno == next_in)) { 522 if (likely(seqno == next_in)) {
526receive: 523receive:
527 /* Deliver message to destination */ 524 /* Deliver message to destination */
528
529 if (likely(msg_isdata(msg))) { 525 if (likely(msg_isdata(msg))) {
530 tipc_bclink_lock(); 526 tipc_bclink_lock();
531 bclink_accept_pkt(node, seqno); 527 bclink_accept_pkt(node, seqno);
@@ -574,7 +570,6 @@ receive:
574 buf = NULL; 570 buf = NULL;
575 571
576 /* Determine new synchronization state */ 572 /* Determine new synchronization state */
577
578 tipc_node_lock(node); 573 tipc_node_lock(node);
579 if (unlikely(!tipc_node_is_up(node))) 574 if (unlikely(!tipc_node_is_up(node)))
580 goto unlock; 575 goto unlock;
@@ -582,33 +577,26 @@ receive:
582 if (node->bclink.last_in == node->bclink.last_sent) 577 if (node->bclink.last_in == node->bclink.last_sent)
583 goto unlock; 578 goto unlock;
584 579
585 if (!node->bclink.deferred_head) { 580 if (skb_queue_empty(&node->bclink.deferred_queue)) {
586 node->bclink.oos_state = 1; 581 node->bclink.oos_state = 1;
587 goto unlock; 582 goto unlock;
588 } 583 }
589 584
590 msg = buf_msg(node->bclink.deferred_head); 585 msg = buf_msg(skb_peek(&node->bclink.deferred_queue));
591 seqno = msg_seqno(msg); 586 seqno = msg_seqno(msg);
592 next_in = mod(next_in + 1); 587 next_in = mod(next_in + 1);
593 if (seqno != next_in) 588 if (seqno != next_in)
594 goto unlock; 589 goto unlock;
595 590
596 /* Take in-sequence message from deferred queue & deliver it */ 591 /* Take in-sequence message from deferred queue & deliver it */
597 592 buf = __skb_dequeue(&node->bclink.deferred_queue);
598 buf = node->bclink.deferred_head;
599 node->bclink.deferred_head = buf->next;
600 buf->next = NULL;
601 node->bclink.deferred_size--;
602 goto receive; 593 goto receive;
603 } 594 }
604 595
605 /* Handle out-of-sequence broadcast message */ 596 /* Handle out-of-sequence broadcast message */
606
607 if (less(next_in, seqno)) { 597 if (less(next_in, seqno)) {
608 deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, 598 deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue,
609 &node->bclink.deferred_tail,
610 buf); 599 buf);
611 node->bclink.deferred_size += deferred;
612 bclink_update_last_sent(node, seqno); 600 bclink_update_last_sent(node, seqno);
613 buf = NULL; 601 buf = NULL;
614 } 602 }
@@ -767,6 +755,118 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
767 tipc_bclink_unlock(); 755 tipc_bclink_unlock();
768} 756}
769 757
758static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
759 struct tipc_stats *stats)
760{
761 int i;
762 struct nlattr *nest;
763
764 struct nla_map {
765 __u32 key;
766 __u32 val;
767 };
768
769 struct nla_map map[] = {
770 {TIPC_NLA_STATS_RX_INFO, stats->recv_info},
771 {TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
772 {TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
773 {TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
774 {TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
775 {TIPC_NLA_STATS_TX_INFO, stats->sent_info},
776 {TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
777 {TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
778 {TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
779 {TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
780 {TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
781 {TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
782 {TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
783 {TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
784 {TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
785 {TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
786 {TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
787 {TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
788 {TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
789 (stats->accu_queue_sz / stats->queue_sz_counts) : 0}
790 };
791
792 nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
793 if (!nest)
794 return -EMSGSIZE;
795
796 for (i = 0; i < ARRAY_SIZE(map); i++)
797 if (nla_put_u32(skb, map[i].key, map[i].val))
798 goto msg_full;
799
800 nla_nest_end(skb, nest);
801
802 return 0;
803msg_full:
804 nla_nest_cancel(skb, nest);
805
806 return -EMSGSIZE;
807}
808
809int tipc_nl_add_bc_link(struct tipc_nl_msg *msg)
810{
811 int err;
812 void *hdr;
813 struct nlattr *attrs;
814 struct nlattr *prop;
815
816 if (!bcl)
817 return 0;
818
819 tipc_bclink_lock();
820
821 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
822 NLM_F_MULTI, TIPC_NL_LINK_GET);
823 if (!hdr)
824 return -EMSGSIZE;
825
826 attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
827 if (!attrs)
828 goto msg_full;
829
830 /* The broadcast link is always up */
831 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
832 goto attr_msg_full;
833
834 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
835 goto attr_msg_full;
836 if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
837 goto attr_msg_full;
838 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->next_in_no))
839 goto attr_msg_full;
840 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->next_out_no))
841 goto attr_msg_full;
842
843 prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
844 if (!prop)
845 goto attr_msg_full;
846 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0]))
847 goto prop_msg_full;
848 nla_nest_end(msg->skb, prop);
849
850 err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
851 if (err)
852 goto attr_msg_full;
853
854 tipc_bclink_unlock();
855 nla_nest_end(msg->skb, attrs);
856 genlmsg_end(msg->skb, hdr);
857
858 return 0;
859
860prop_msg_full:
861 nla_nest_cancel(msg->skb, prop);
862attr_msg_full:
863 nla_nest_cancel(msg->skb, attrs);
864msg_full:
865 tipc_bclink_unlock();
866 genlmsg_cancel(msg->skb, hdr);
867
868 return -EMSGSIZE;
869}
770 870
771int tipc_bclink_stats(char *buf, const u32 buf_size) 871int tipc_bclink_stats(char *buf, const u32 buf_size)
772{ 872{
@@ -851,7 +951,9 @@ int tipc_bclink_init(void)
851 sprintf(bcbearer->media.name, "tipc-broadcast"); 951 sprintf(bcbearer->media.name, "tipc-broadcast");
852 952
853 spin_lock_init(&bclink->lock); 953 spin_lock_init(&bclink->lock);
854 __skb_queue_head_init(&bcl->waiting_sks); 954 __skb_queue_head_init(&bcl->outqueue);
955 __skb_queue_head_init(&bcl->deferred_queue);
956 skb_queue_head_init(&bcl->waiting_sks);
855 bcl->next_out_no = 1; 957 bcl->next_out_no = 1;
856 spin_lock_init(&bclink->node.lock); 958 spin_lock_init(&bclink->node.lock);
857 __skb_queue_head_init(&bclink->node.waiting_sks); 959 __skb_queue_head_init(&bclink->node.waiting_sks);
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index e7b0f85a82bc..644d79129fba 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -37,6 +37,8 @@
37#ifndef _TIPC_BCAST_H 37#ifndef _TIPC_BCAST_H
38#define _TIPC_BCAST_H 38#define _TIPC_BCAST_H
39 39
40#include "netlink.h"
41
40#define MAX_NODES 4096 42#define MAX_NODES 4096
41#define WSIZE 32 43#define WSIZE 32
42#define TIPC_BCLINK_RESET 1 44#define TIPC_BCLINK_RESET 1
@@ -98,6 +100,8 @@ int tipc_bclink_reset_stats(void);
98int tipc_bclink_set_queue_limits(u32 limit); 100int tipc_bclink_set_queue_limits(u32 limit);
99void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action); 101void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
100uint tipc_bclink_get_mtu(void); 102uint tipc_bclink_get_mtu(void);
101int tipc_bclink_xmit(struct sk_buff *buf); 103int tipc_bclink_xmit(struct sk_buff_head *list);
102void tipc_bclink_wakeup_users(void); 104void tipc_bclink_wakeup_users(void);
105int tipc_nl_add_bc_link(struct tipc_nl_msg *msg);
106
103#endif 107#endif
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 264474394f9f..463db5b15b8b 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/bearer.c: TIPC bearer code 2 * net/tipc/bearer.c: TIPC bearer code
3 * 3 *
4 * Copyright (c) 1996-2006, 2013, Ericsson AB 4 * Copyright (c) 1996-2006, 2013-2014, Ericsson AB
5 * Copyright (c) 2004-2006, 2010-2013, Wind River Systems 5 * Copyright (c) 2004-2006, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -37,6 +37,7 @@
37#include "core.h" 37#include "core.h"
38#include "config.h" 38#include "config.h"
39#include "bearer.h" 39#include "bearer.h"
40#include "link.h"
40#include "discover.h" 41#include "discover.h"
41 42
42#define MAX_ADDR_STR 60 43#define MAX_ADDR_STR 60
@@ -49,6 +50,23 @@ static struct tipc_media * const media_info_array[] = {
49 NULL 50 NULL
50}; 51};
51 52
53static const struct nla_policy
54tipc_nl_bearer_policy[TIPC_NLA_BEARER_MAX + 1] = {
55 [TIPC_NLA_BEARER_UNSPEC] = { .type = NLA_UNSPEC },
56 [TIPC_NLA_BEARER_NAME] = {
57 .type = NLA_STRING,
58 .len = TIPC_MAX_BEARER_NAME
59 },
60 [TIPC_NLA_BEARER_PROP] = { .type = NLA_NESTED },
61 [TIPC_NLA_BEARER_DOMAIN] = { .type = NLA_U32 }
62};
63
64static const struct nla_policy tipc_nl_media_policy[TIPC_NLA_MEDIA_MAX + 1] = {
65 [TIPC_NLA_MEDIA_UNSPEC] = { .type = NLA_UNSPEC },
66 [TIPC_NLA_MEDIA_NAME] = { .type = NLA_STRING },
67 [TIPC_NLA_MEDIA_PROP] = { .type = NLA_NESTED }
68};
69
52struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1]; 70struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1];
53 71
54static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down); 72static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down);
@@ -627,3 +645,430 @@ void tipc_bearer_stop(void)
627 } 645 }
628 } 646 }
629} 647}
648
649/* Caller should hold rtnl_lock to protect the bearer */
650static int __tipc_nl_add_bearer(struct tipc_nl_msg *msg,
651 struct tipc_bearer *bearer)
652{
653 void *hdr;
654 struct nlattr *attrs;
655 struct nlattr *prop;
656
657 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
658 NLM_F_MULTI, TIPC_NL_BEARER_GET);
659 if (!hdr)
660 return -EMSGSIZE;
661
662 attrs = nla_nest_start(msg->skb, TIPC_NLA_BEARER);
663 if (!attrs)
664 goto msg_full;
665
666 if (nla_put_string(msg->skb, TIPC_NLA_BEARER_NAME, bearer->name))
667 goto attr_msg_full;
668
669 prop = nla_nest_start(msg->skb, TIPC_NLA_BEARER_PROP);
670 if (!prop)
671 goto prop_msg_full;
672 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, bearer->priority))
673 goto prop_msg_full;
674 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, bearer->tolerance))
675 goto prop_msg_full;
676 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bearer->window))
677 goto prop_msg_full;
678
679 nla_nest_end(msg->skb, prop);
680 nla_nest_end(msg->skb, attrs);
681 genlmsg_end(msg->skb, hdr);
682
683 return 0;
684
685prop_msg_full:
686 nla_nest_cancel(msg->skb, prop);
687attr_msg_full:
688 nla_nest_cancel(msg->skb, attrs);
689msg_full:
690 genlmsg_cancel(msg->skb, hdr);
691
692 return -EMSGSIZE;
693}
694
695int tipc_nl_bearer_dump(struct sk_buff *skb, struct netlink_callback *cb)
696{
697 int err;
698 int i = cb->args[0];
699 struct tipc_bearer *bearer;
700 struct tipc_nl_msg msg;
701
702 if (i == MAX_BEARERS)
703 return 0;
704
705 msg.skb = skb;
706 msg.portid = NETLINK_CB(cb->skb).portid;
707 msg.seq = cb->nlh->nlmsg_seq;
708
709 rtnl_lock();
710 for (i = 0; i < MAX_BEARERS; i++) {
711 bearer = rtnl_dereference(bearer_list[i]);
712 if (!bearer)
713 continue;
714
715 err = __tipc_nl_add_bearer(&msg, bearer);
716 if (err)
717 break;
718 }
719 rtnl_unlock();
720
721 cb->args[0] = i;
722 return skb->len;
723}
724
725int tipc_nl_bearer_get(struct sk_buff *skb, struct genl_info *info)
726{
727 int err;
728 char *name;
729 struct sk_buff *rep;
730 struct tipc_bearer *bearer;
731 struct tipc_nl_msg msg;
732 struct nlattr *attrs[TIPC_NLA_BEARER_MAX + 1];
733
734 if (!info->attrs[TIPC_NLA_BEARER])
735 return -EINVAL;
736
737 err = nla_parse_nested(attrs, TIPC_NLA_BEARER_MAX,
738 info->attrs[TIPC_NLA_BEARER],
739 tipc_nl_bearer_policy);
740 if (err)
741 return err;
742
743 if (!attrs[TIPC_NLA_BEARER_NAME])
744 return -EINVAL;
745 name = nla_data(attrs[TIPC_NLA_BEARER_NAME]);
746
747 rep = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
748 if (!rep)
749 return -ENOMEM;
750
751 msg.skb = rep;
752 msg.portid = info->snd_portid;
753 msg.seq = info->snd_seq;
754
755 rtnl_lock();
756 bearer = tipc_bearer_find(name);
757 if (!bearer) {
758 err = -EINVAL;
759 goto err_out;
760 }
761
762 err = __tipc_nl_add_bearer(&msg, bearer);
763 if (err)
764 goto err_out;
765 rtnl_unlock();
766
767 return genlmsg_reply(rep, info);
768err_out:
769 rtnl_unlock();
770 nlmsg_free(rep);
771
772 return err;
773}
774
775int tipc_nl_bearer_disable(struct sk_buff *skb, struct genl_info *info)
776{
777 int err;
778 char *name;
779 struct tipc_bearer *bearer;
780 struct nlattr *attrs[TIPC_NLA_BEARER_MAX + 1];
781
782 if (!info->attrs[TIPC_NLA_BEARER])
783 return -EINVAL;
784
785 err = nla_parse_nested(attrs, TIPC_NLA_BEARER_MAX,
786 info->attrs[TIPC_NLA_BEARER],
787 tipc_nl_bearer_policy);
788 if (err)
789 return err;
790
791 if (!attrs[TIPC_NLA_BEARER_NAME])
792 return -EINVAL;
793
794 name = nla_data(attrs[TIPC_NLA_BEARER_NAME]);
795
796 rtnl_lock();
797 bearer = tipc_bearer_find(name);
798 if (!bearer) {
799 rtnl_unlock();
800 return -EINVAL;
801 }
802
803 bearer_disable(bearer, false);
804 rtnl_unlock();
805
806 return 0;
807}
808
809int tipc_nl_bearer_enable(struct sk_buff *skb, struct genl_info *info)
810{
811 int err;
812 char *bearer;
813 struct nlattr *attrs[TIPC_NLA_BEARER_MAX + 1];
814 u32 domain;
815 u32 prio;
816
817 prio = TIPC_MEDIA_LINK_PRI;
818 domain = tipc_own_addr & TIPC_CLUSTER_MASK;
819
820 if (!info->attrs[TIPC_NLA_BEARER])
821 return -EINVAL;
822
823 err = nla_parse_nested(attrs, TIPC_NLA_BEARER_MAX,
824 info->attrs[TIPC_NLA_BEARER],
825 tipc_nl_bearer_policy);
826 if (err)
827 return err;
828
829 if (!attrs[TIPC_NLA_BEARER_NAME])
830 return -EINVAL;
831
832 bearer = nla_data(attrs[TIPC_NLA_BEARER_NAME]);
833
834 if (attrs[TIPC_NLA_BEARER_DOMAIN])
835 domain = nla_get_u32(attrs[TIPC_NLA_BEARER_DOMAIN]);
836
837 if (attrs[TIPC_NLA_BEARER_PROP]) {
838 struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
839
840 err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_BEARER_PROP],
841 props);
842 if (err)
843 return err;
844
845 if (props[TIPC_NLA_PROP_PRIO])
846 prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
847 }
848
849 rtnl_lock();
850 err = tipc_enable_bearer(bearer, domain, prio);
851 if (err) {
852 rtnl_unlock();
853 return err;
854 }
855 rtnl_unlock();
856
857 return 0;
858}
859
860int tipc_nl_bearer_set(struct sk_buff *skb, struct genl_info *info)
861{
862 int err;
863 char *name;
864 struct tipc_bearer *b;
865 struct nlattr *attrs[TIPC_NLA_BEARER_MAX + 1];
866
867 if (!info->attrs[TIPC_NLA_BEARER])
868 return -EINVAL;
869
870 err = nla_parse_nested(attrs, TIPC_NLA_BEARER_MAX,
871 info->attrs[TIPC_NLA_BEARER],
872 tipc_nl_bearer_policy);
873 if (err)
874 return err;
875
876 if (!attrs[TIPC_NLA_BEARER_NAME])
877 return -EINVAL;
878 name = nla_data(attrs[TIPC_NLA_BEARER_NAME]);
879
880 rtnl_lock();
881 b = tipc_bearer_find(name);
882 if (!b) {
883 rtnl_unlock();
884 return -EINVAL;
885 }
886
887 if (attrs[TIPC_NLA_BEARER_PROP]) {
888 struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
889
890 err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_BEARER_PROP],
891 props);
892 if (err) {
893 rtnl_unlock();
894 return err;
895 }
896
897 if (props[TIPC_NLA_PROP_TOL])
898 b->tolerance = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
899 if (props[TIPC_NLA_PROP_PRIO])
900 b->priority = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
901 if (props[TIPC_NLA_PROP_WIN])
902 b->window = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
903 }
904 rtnl_unlock();
905
906 return 0;
907}
908
909static int __tipc_nl_add_media(struct tipc_nl_msg *msg,
910 struct tipc_media *media)
911{
912 void *hdr;
913 struct nlattr *attrs;
914 struct nlattr *prop;
915
916 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
917 NLM_F_MULTI, TIPC_NL_MEDIA_GET);
918 if (!hdr)
919 return -EMSGSIZE;
920
921 attrs = nla_nest_start(msg->skb, TIPC_NLA_MEDIA);
922 if (!attrs)
923 goto msg_full;
924
925 if (nla_put_string(msg->skb, TIPC_NLA_MEDIA_NAME, media->name))
926 goto attr_msg_full;
927
928 prop = nla_nest_start(msg->skb, TIPC_NLA_MEDIA_PROP);
929 if (!prop)
930 goto prop_msg_full;
931 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, media->priority))
932 goto prop_msg_full;
933 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, media->tolerance))
934 goto prop_msg_full;
935 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, media->window))
936 goto prop_msg_full;
937
938 nla_nest_end(msg->skb, prop);
939 nla_nest_end(msg->skb, attrs);
940 genlmsg_end(msg->skb, hdr);
941
942 return 0;
943
944prop_msg_full:
945 nla_nest_cancel(msg->skb, prop);
946attr_msg_full:
947 nla_nest_cancel(msg->skb, attrs);
948msg_full:
949 genlmsg_cancel(msg->skb, hdr);
950
951 return -EMSGSIZE;
952}
953
954int tipc_nl_media_dump(struct sk_buff *skb, struct netlink_callback *cb)
955{
956 int err;
957 int i = cb->args[0];
958 struct tipc_nl_msg msg;
959
960 if (i == MAX_MEDIA)
961 return 0;
962
963 msg.skb = skb;
964 msg.portid = NETLINK_CB(cb->skb).portid;
965 msg.seq = cb->nlh->nlmsg_seq;
966
967 rtnl_lock();
968 for (; media_info_array[i] != NULL; i++) {
969 err = __tipc_nl_add_media(&msg, media_info_array[i]);
970 if (err)
971 break;
972 }
973 rtnl_unlock();
974
975 cb->args[0] = i;
976 return skb->len;
977}
978
979int tipc_nl_media_get(struct sk_buff *skb, struct genl_info *info)
980{
981 int err;
982 char *name;
983 struct tipc_nl_msg msg;
984 struct tipc_media *media;
985 struct sk_buff *rep;
986 struct nlattr *attrs[TIPC_NLA_BEARER_MAX + 1];
987
988 if (!info->attrs[TIPC_NLA_MEDIA])
989 return -EINVAL;
990
991 err = nla_parse_nested(attrs, TIPC_NLA_MEDIA_MAX,
992 info->attrs[TIPC_NLA_MEDIA],
993 tipc_nl_media_policy);
994 if (err)
995 return err;
996
997 if (!attrs[TIPC_NLA_MEDIA_NAME])
998 return -EINVAL;
999 name = nla_data(attrs[TIPC_NLA_MEDIA_NAME]);
1000
1001 rep = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
1002 if (!rep)
1003 return -ENOMEM;
1004
1005 msg.skb = rep;
1006 msg.portid = info->snd_portid;
1007 msg.seq = info->snd_seq;
1008
1009 rtnl_lock();
1010 media = tipc_media_find(name);
1011 if (!media) {
1012 err = -EINVAL;
1013 goto err_out;
1014 }
1015
1016 err = __tipc_nl_add_media(&msg, media);
1017 if (err)
1018 goto err_out;
1019 rtnl_unlock();
1020
1021 return genlmsg_reply(rep, info);
1022err_out:
1023 rtnl_unlock();
1024 nlmsg_free(rep);
1025
1026 return err;
1027}
1028
1029int tipc_nl_media_set(struct sk_buff *skb, struct genl_info *info)
1030{
1031 int err;
1032 char *name;
1033 struct tipc_media *m;
1034 struct nlattr *attrs[TIPC_NLA_BEARER_MAX + 1];
1035
1036 if (!info->attrs[TIPC_NLA_MEDIA])
1037 return -EINVAL;
1038
1039 err = nla_parse_nested(attrs, TIPC_NLA_MEDIA_MAX,
1040 info->attrs[TIPC_NLA_MEDIA],
1041 tipc_nl_media_policy);
1042
1043 if (!attrs[TIPC_NLA_MEDIA_NAME])
1044 return -EINVAL;
1045 name = nla_data(attrs[TIPC_NLA_MEDIA_NAME]);
1046
1047 rtnl_lock();
1048 m = tipc_media_find(name);
1049 if (!m) {
1050 rtnl_unlock();
1051 return -EINVAL;
1052 }
1053
1054 if (attrs[TIPC_NLA_MEDIA_PROP]) {
1055 struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
1056
1057 err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_MEDIA_PROP],
1058 props);
1059 if (err) {
1060 rtnl_unlock();
1061 return err;
1062 }
1063
1064 if (props[TIPC_NLA_PROP_TOL])
1065 m->tolerance = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
1066 if (props[TIPC_NLA_PROP_PRIO])
1067 m->priority = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
1068 if (props[TIPC_NLA_PROP_WIN])
1069 m->window = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
1070 }
1071 rtnl_unlock();
1072
1073 return 0;
1074}
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 78fccc49de23..2c1230ac5dfe 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/bearer.h: Include file for TIPC bearer code 2 * net/tipc/bearer.h: Include file for TIPC bearer code
3 * 3 *
4 * Copyright (c) 1996-2006, 2013, Ericsson AB 4 * Copyright (c) 1996-2006, 2013-2014, Ericsson AB
5 * Copyright (c) 2005, 2010-2011, Wind River Systems 5 * Copyright (c) 2005, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -38,6 +38,8 @@
38#define _TIPC_BEARER_H 38#define _TIPC_BEARER_H
39 39
40#include "bcast.h" 40#include "bcast.h"
41#include "netlink.h"
42#include <net/genetlink.h>
41 43
42#define MAX_BEARERS 2 44#define MAX_BEARERS 2
43#define MAX_MEDIA 2 45#define MAX_MEDIA 2
@@ -163,7 +165,7 @@ extern struct tipc_bearer __rcu *bearer_list[];
163 * TIPC routines available to supported media types 165 * TIPC routines available to supported media types
164 */ 166 */
165 167
166void tipc_rcv(struct sk_buff *buf, struct tipc_bearer *tb_ptr); 168void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *tb_ptr);
167int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority); 169int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority);
168int tipc_disable_bearer(const char *name); 170int tipc_disable_bearer(const char *name);
169 171
@@ -176,6 +178,16 @@ extern struct tipc_media eth_media_info;
176extern struct tipc_media ib_media_info; 178extern struct tipc_media ib_media_info;
177#endif 179#endif
178 180
181int tipc_nl_bearer_disable(struct sk_buff *skb, struct genl_info *info);
182int tipc_nl_bearer_enable(struct sk_buff *skb, struct genl_info *info);
183int tipc_nl_bearer_dump(struct sk_buff *skb, struct netlink_callback *cb);
184int tipc_nl_bearer_get(struct sk_buff *skb, struct genl_info *info);
185int tipc_nl_bearer_set(struct sk_buff *skb, struct genl_info *info);
186
187int tipc_nl_media_dump(struct sk_buff *skb, struct netlink_callback *cb);
188int tipc_nl_media_get(struct sk_buff *skb, struct genl_info *info);
189int tipc_nl_media_set(struct sk_buff *skb, struct genl_info *info);
190
179int tipc_media_set_priority(const char *name, u32 new_value); 191int tipc_media_set_priority(const char *name, u32 new_value);
180int tipc_media_set_window(const char *name, u32 new_value); 192int tipc_media_set_window(const char *name, u32 new_value);
181void tipc_media_addr_printf(char *buf, int len, struct tipc_media_addr *a); 193void tipc_media_addr_printf(char *buf, int len, struct tipc_media_addr *a);
diff --git a/net/tipc/core.h b/net/tipc/core.h
index f773b148722f..84602137ce20 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -41,6 +41,7 @@
41 41
42#include <linux/tipc.h> 42#include <linux/tipc.h>
43#include <linux/tipc_config.h> 43#include <linux/tipc_config.h>
44#include <linux/tipc_netlink.h>
44#include <linux/types.h> 45#include <linux/types.h>
45#include <linux/kernel.h> 46#include <linux/kernel.h>
46#include <linux/errno.h> 47#include <linux/errno.h>
@@ -191,6 +192,7 @@ struct tipc_skb_cb {
191 struct sk_buff *tail; 192 struct sk_buff *tail;
192 bool deferred; 193 bool deferred;
193 bool wakeup_pending; 194 bool wakeup_pending;
195 bool bundling;
194 u16 chain_sz; 196 u16 chain_sz;
195 u16 chain_imp; 197 u16 chain_imp;
196}; 198};
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 1db162aa64a5..23bcc1132365 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -36,10 +36,12 @@
36 36
37#include "core.h" 37#include "core.h"
38#include "link.h" 38#include "link.h"
39#include "bcast.h"
39#include "socket.h" 40#include "socket.h"
40#include "name_distr.h" 41#include "name_distr.h"
41#include "discover.h" 42#include "discover.h"
42#include "config.h" 43#include "config.h"
44#include "netlink.h"
43 45
44#include <linux/pkt_sched.h> 46#include <linux/pkt_sched.h>
45 47
@@ -50,6 +52,30 @@ static const char *link_co_err = "Link changeover error, ";
50static const char *link_rst_msg = "Resetting link "; 52static const char *link_rst_msg = "Resetting link ";
51static const char *link_unk_evt = "Unknown link event "; 53static const char *link_unk_evt = "Unknown link event ";
52 54
55static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
56 [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC },
57 [TIPC_NLA_LINK_NAME] = {
58 .type = NLA_STRING,
59 .len = TIPC_MAX_LINK_NAME
60 },
61 [TIPC_NLA_LINK_MTU] = { .type = NLA_U32 },
62 [TIPC_NLA_LINK_BROADCAST] = { .type = NLA_FLAG },
63 [TIPC_NLA_LINK_UP] = { .type = NLA_FLAG },
64 [TIPC_NLA_LINK_ACTIVE] = { .type = NLA_FLAG },
65 [TIPC_NLA_LINK_PROP] = { .type = NLA_NESTED },
66 [TIPC_NLA_LINK_STATS] = { .type = NLA_NESTED },
67 [TIPC_NLA_LINK_RX] = { .type = NLA_U32 },
68 [TIPC_NLA_LINK_TX] = { .type = NLA_U32 }
69};
70
71/* Properties valid for media, bearar and link */
72static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
73 [TIPC_NLA_PROP_UNSPEC] = { .type = NLA_UNSPEC },
74 [TIPC_NLA_PROP_PRIO] = { .type = NLA_U32 },
75 [TIPC_NLA_PROP_TOL] = { .type = NLA_U32 },
76 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 }
77};
78
53/* 79/*
54 * Out-of-range value for link session numbers 80 * Out-of-range value for link session numbers
55 */ 81 */
@@ -123,18 +149,6 @@ static void link_init_max_pkt(struct tipc_link *l_ptr)
123 l_ptr->max_pkt_probes = 0; 149 l_ptr->max_pkt_probes = 0;
124} 150}
125 151
126static u32 link_next_sent(struct tipc_link *l_ptr)
127{
128 if (l_ptr->next_out)
129 return buf_seqno(l_ptr->next_out);
130 return mod(l_ptr->next_out_no);
131}
132
133static u32 link_last_sent(struct tipc_link *l_ptr)
134{
135 return mod(link_next_sent(l_ptr) - 1);
136}
137
138/* 152/*
139 * Simple non-static link routines (i.e. referenced outside this file) 153 * Simple non-static link routines (i.e. referenced outside this file)
140 */ 154 */
@@ -157,14 +171,17 @@ int tipc_link_is_active(struct tipc_link *l_ptr)
157 */ 171 */
158static void link_timeout(struct tipc_link *l_ptr) 172static void link_timeout(struct tipc_link *l_ptr)
159{ 173{
174 struct sk_buff *skb;
175
160 tipc_node_lock(l_ptr->owner); 176 tipc_node_lock(l_ptr->owner);
161 177
162 /* update counters used in statistical profiling of send traffic */ 178 /* update counters used in statistical profiling of send traffic */
163 l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; 179 l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue);
164 l_ptr->stats.queue_sz_counts++; 180 l_ptr->stats.queue_sz_counts++;
165 181
166 if (l_ptr->first_out) { 182 skb = skb_peek(&l_ptr->outqueue);
167 struct tipc_msg *msg = buf_msg(l_ptr->first_out); 183 if (skb) {
184 struct tipc_msg *msg = buf_msg(skb);
168 u32 length = msg_size(msg); 185 u32 length = msg_size(msg);
169 186
170 if ((msg_user(msg) == MSG_FRAGMENTER) && 187 if ((msg_user(msg) == MSG_FRAGMENTER) &&
@@ -192,11 +209,10 @@ static void link_timeout(struct tipc_link *l_ptr)
192 } 209 }
193 210
194 /* do all other link processing performed on a periodic basis */ 211 /* do all other link processing performed on a periodic basis */
195
196 link_state_event(l_ptr, TIMEOUT_EVT); 212 link_state_event(l_ptr, TIMEOUT_EVT);
197 213
198 if (l_ptr->next_out) 214 if (l_ptr->next_out)
199 tipc_link_push_queue(l_ptr); 215 tipc_link_push_packets(l_ptr);
200 216
201 tipc_node_unlock(l_ptr->owner); 217 tipc_node_unlock(l_ptr->owner);
202} 218}
@@ -224,9 +240,10 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
224 char addr_string[16]; 240 char addr_string[16];
225 u32 peer = n_ptr->addr; 241 u32 peer = n_ptr->addr;
226 242
227 if (n_ptr->link_cnt >= 2) { 243 if (n_ptr->link_cnt >= MAX_BEARERS) {
228 tipc_addr_string_fill(addr_string, n_ptr->addr); 244 tipc_addr_string_fill(addr_string, n_ptr->addr);
229 pr_err("Attempt to establish third link to %s\n", addr_string); 245 pr_err("Attempt to establish %uth link to %s. Max %u allowed.\n",
246 n_ptr->link_cnt, addr_string, MAX_BEARERS);
230 return NULL; 247 return NULL;
231 } 248 }
232 249
@@ -274,7 +291,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
274 link_init_max_pkt(l_ptr); 291 link_init_max_pkt(l_ptr);
275 292
276 l_ptr->next_out_no = 1; 293 l_ptr->next_out_no = 1;
277 __skb_queue_head_init(&l_ptr->waiting_sks); 294 __skb_queue_head_init(&l_ptr->outqueue);
295 __skb_queue_head_init(&l_ptr->deferred_queue);
296 skb_queue_head_init(&l_ptr->waiting_sks);
278 297
279 link_reset_statistics(l_ptr); 298 link_reset_statistics(l_ptr);
280 299
@@ -339,7 +358,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
339 return false; 358 return false;
340 TIPC_SKB_CB(buf)->chain_sz = chain_sz; 359 TIPC_SKB_CB(buf)->chain_sz = chain_sz;
341 TIPC_SKB_CB(buf)->chain_imp = imp; 360 TIPC_SKB_CB(buf)->chain_imp = imp;
342 __skb_queue_tail(&link->waiting_sks, buf); 361 skb_queue_tail(&link->waiting_sks, buf);
343 link->stats.link_congs++; 362 link->stats.link_congs++;
344 return true; 363 return true;
345} 364}
@@ -352,30 +371,19 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
352 */ 371 */
353static void link_prepare_wakeup(struct tipc_link *link) 372static void link_prepare_wakeup(struct tipc_link *link)
354{ 373{
355 struct sk_buff_head *wq = &link->waiting_sks; 374 uint pend_qsz = skb_queue_len(&link->outqueue);
356 struct sk_buff *buf; 375 struct sk_buff *skb, *tmp;
357 uint pend_qsz = link->out_queue_size;
358 376
359 for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) { 377 skb_queue_walk_safe(&link->waiting_sks, skb, tmp) {
360 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp]) 378 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
361 break; 379 break;
362 pend_qsz += TIPC_SKB_CB(buf)->chain_sz; 380 pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
363 __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq)); 381 skb_unlink(skb, &link->waiting_sks);
382 skb_queue_tail(&link->owner->waiting_sks, skb);
364 } 383 }
365} 384}
366 385
367/** 386/**
368 * link_release_outqueue - purge link's outbound message queue
369 * @l_ptr: pointer to link
370 */
371static void link_release_outqueue(struct tipc_link *l_ptr)
372{
373 kfree_skb_list(l_ptr->first_out);
374 l_ptr->first_out = NULL;
375 l_ptr->out_queue_size = 0;
376}
377
378/**
379 * tipc_link_reset_fragments - purge link's inbound message fragments queue 387 * tipc_link_reset_fragments - purge link's inbound message fragments queue
380 * @l_ptr: pointer to link 388 * @l_ptr: pointer to link
381 */ 389 */
@@ -391,11 +399,9 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
391 */ 399 */
392void tipc_link_purge_queues(struct tipc_link *l_ptr) 400void tipc_link_purge_queues(struct tipc_link *l_ptr)
393{ 401{
394 kfree_skb_list(l_ptr->oldest_deferred_in); 402 __skb_queue_purge(&l_ptr->deferred_queue);
395 kfree_skb_list(l_ptr->first_out); 403 __skb_queue_purge(&l_ptr->outqueue);
396 tipc_link_reset_fragments(l_ptr); 404 tipc_link_reset_fragments(l_ptr);
397 kfree_skb(l_ptr->proto_msg_queue);
398 l_ptr->proto_msg_queue = NULL;
399} 405}
400 406
401void tipc_link_reset(struct tipc_link *l_ptr) 407void tipc_link_reset(struct tipc_link *l_ptr)
@@ -427,25 +433,16 @@ void tipc_link_reset(struct tipc_link *l_ptr)
427 } 433 }
428 434
429 /* Clean up all queues: */ 435 /* Clean up all queues: */
430 link_release_outqueue(l_ptr); 436 __skb_queue_purge(&l_ptr->outqueue);
431 kfree_skb(l_ptr->proto_msg_queue); 437 __skb_queue_purge(&l_ptr->deferred_queue);
432 l_ptr->proto_msg_queue = NULL;
433 kfree_skb_list(l_ptr->oldest_deferred_in);
434 if (!skb_queue_empty(&l_ptr->waiting_sks)) { 438 if (!skb_queue_empty(&l_ptr->waiting_sks)) {
435 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); 439 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
436 owner->action_flags |= TIPC_WAKEUP_USERS; 440 owner->action_flags |= TIPC_WAKEUP_USERS;
437 } 441 }
438 l_ptr->retransm_queue_head = 0;
439 l_ptr->retransm_queue_size = 0;
440 l_ptr->last_out = NULL;
441 l_ptr->first_out = NULL;
442 l_ptr->next_out = NULL; 442 l_ptr->next_out = NULL;
443 l_ptr->unacked_window = 0; 443 l_ptr->unacked_window = 0;
444 l_ptr->checkpoint = 1; 444 l_ptr->checkpoint = 1;
445 l_ptr->next_out_no = 1; 445 l_ptr->next_out_no = 1;
446 l_ptr->deferred_inqueue_sz = 0;
447 l_ptr->oldest_deferred_in = NULL;
448 l_ptr->newest_deferred_in = NULL;
449 l_ptr->fsm_msg_cnt = 0; 446 l_ptr->fsm_msg_cnt = 0;
450 l_ptr->stale_count = 0; 447 l_ptr->stale_count = 0;
451 link_reset_statistics(l_ptr); 448 link_reset_statistics(l_ptr);
@@ -667,9 +664,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
667 * - For all other messages we discard the buffer and return -EHOSTUNREACH 664 * - For all other messages we discard the buffer and return -EHOSTUNREACH
668 * - For TIPC internal messages we also reset the link 665 * - For TIPC internal messages we also reset the link
669 */ 666 */
670static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) 667static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
671{ 668{
672 struct tipc_msg *msg = buf_msg(buf); 669 struct sk_buff *skb = skb_peek(list);
670 struct tipc_msg *msg = buf_msg(skb);
673 uint imp = tipc_msg_tot_importance(msg); 671 uint imp = tipc_msg_tot_importance(msg);
674 u32 oport = msg_tot_origport(msg); 672 u32 oport = msg_tot_origport(msg);
675 673
@@ -682,30 +680,30 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
682 goto drop; 680 goto drop;
683 if (unlikely(msg_reroute_cnt(msg))) 681 if (unlikely(msg_reroute_cnt(msg)))
684 goto drop; 682 goto drop;
685 if (TIPC_SKB_CB(buf)->wakeup_pending) 683 if (TIPC_SKB_CB(skb)->wakeup_pending)
686 return -ELINKCONG; 684 return -ELINKCONG;
687 if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp)) 685 if (link_schedule_user(link, oport, skb_queue_len(list), imp))
688 return -ELINKCONG; 686 return -ELINKCONG;
689drop: 687drop:
690 kfree_skb_list(buf); 688 __skb_queue_purge(list);
691 return -EHOSTUNREACH; 689 return -EHOSTUNREACH;
692} 690}
693 691
694/** 692/**
695 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked 693 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
696 * @link: link to use 694 * @link: link to use
697 * @buf: chain of buffers containing message 695 * @list: chain of buffers containing message
696 *
698 * Consumes the buffer chain, except when returning -ELINKCONG 697 * Consumes the buffer chain, except when returning -ELINKCONG
699 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket 698 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
700 * user data messages) or -EHOSTUNREACH (all other messages/senders) 699 * user data messages) or -EHOSTUNREACH (all other messages/senders)
701 * Only the socket functions tipc_send_stream() and tipc_send_packet() need 700 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
702 * to act on the return value, since they may need to do more send attempts. 701 * to act on the return value, since they may need to do more send attempts.
703 */ 702 */
704int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) 703int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list)
705{ 704{
706 struct tipc_msg *msg = buf_msg(buf); 705 struct tipc_msg *msg = buf_msg(skb_peek(list));
707 uint psz = msg_size(msg); 706 uint psz = msg_size(msg);
708 uint qsz = link->out_queue_size;
709 uint sndlim = link->queue_limit[0]; 707 uint sndlim = link->queue_limit[0];
710 uint imp = tipc_msg_tot_importance(msg); 708 uint imp = tipc_msg_tot_importance(msg);
711 uint mtu = link->max_pkt; 709 uint mtu = link->max_pkt;
@@ -713,71 +711,83 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
713 uint seqno = link->next_out_no; 711 uint seqno = link->next_out_no;
714 uint bc_last_in = link->owner->bclink.last_in; 712 uint bc_last_in = link->owner->bclink.last_in;
715 struct tipc_media_addr *addr = &link->media_addr; 713 struct tipc_media_addr *addr = &link->media_addr;
716 struct sk_buff *next = buf->next; 714 struct sk_buff_head *outqueue = &link->outqueue;
715 struct sk_buff *skb, *tmp;
717 716
718 /* Match queue limits against msg importance: */ 717 /* Match queue limits against msg importance: */
719 if (unlikely(qsz >= link->queue_limit[imp])) 718 if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
720 return tipc_link_cong(link, buf); 719 return tipc_link_cong(link, list);
721 720
722 /* Has valid packet limit been used ? */ 721 /* Has valid packet limit been used ? */
723 if (unlikely(psz > mtu)) { 722 if (unlikely(psz > mtu)) {
724 kfree_skb_list(buf); 723 __skb_queue_purge(list);
725 return -EMSGSIZE; 724 return -EMSGSIZE;
726 } 725 }
727 726
728 /* Prepare each packet for sending, and add to outqueue: */ 727 /* Prepare each packet for sending, and add to outqueue: */
729 while (buf) { 728 skb_queue_walk_safe(list, skb, tmp) {
730 next = buf->next; 729 __skb_unlink(skb, list);
731 msg = buf_msg(buf); 730 msg = buf_msg(skb);
732 msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); 731 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
733 msg_set_bcast_ack(msg, bc_last_in); 732 msg_set_bcast_ack(msg, bc_last_in);
734 733
735 if (!link->first_out) { 734 if (skb_queue_len(outqueue) < sndlim) {
736 link->first_out = buf; 735 __skb_queue_tail(outqueue, skb);
737 } else if (qsz < sndlim) { 736 tipc_bearer_send(link->bearer_id, skb, addr);
738 link->last_out->next = buf; 737 link->next_out = NULL;
739 } else if (tipc_msg_bundle(link->last_out, buf, mtu)) { 738 link->unacked_window = 0;
739 } else if (tipc_msg_bundle(outqueue, skb, mtu)) {
740 link->stats.sent_bundled++; 740 link->stats.sent_bundled++;
741 buf = next;
742 next = buf->next;
743 continue; 741 continue;
744 } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) { 742 } else if (tipc_msg_make_bundle(outqueue, skb, mtu,
743 link->addr)) {
745 link->stats.sent_bundled++; 744 link->stats.sent_bundled++;
746 link->stats.sent_bundles++; 745 link->stats.sent_bundles++;
747 link->last_out->next = buf;
748 if (!link->next_out) 746 if (!link->next_out)
749 link->next_out = buf; 747 link->next_out = skb_peek_tail(outqueue);
750 } else { 748 } else {
751 link->last_out->next = buf; 749 __skb_queue_tail(outqueue, skb);
752 if (!link->next_out) 750 if (!link->next_out)
753 link->next_out = buf; 751 link->next_out = skb;
754 }
755
756 /* Send packet if possible: */
757 if (likely(++qsz <= sndlim)) {
758 tipc_bearer_send(link->bearer_id, buf, addr);
759 link->next_out = next;
760 link->unacked_window = 0;
761 } 752 }
762 seqno++; 753 seqno++;
763 link->last_out = buf;
764 buf = next;
765 } 754 }
766 link->next_out_no = seqno; 755 link->next_out_no = seqno;
767 link->out_queue_size = qsz;
768 return 0; 756 return 0;
769} 757}
770 758
759static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
760{
761 __skb_queue_head_init(list);
762 __skb_queue_tail(list, skb);
763}
764
765static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
766{
767 struct sk_buff_head head;
768
769 skb2list(skb, &head);
770 return __tipc_link_xmit(link, &head);
771}
772
773int tipc_link_xmit_skb(struct sk_buff *skb, u32 dnode, u32 selector)
774{
775 struct sk_buff_head head;
776
777 skb2list(skb, &head);
778 return tipc_link_xmit(&head, dnode, selector);
779}
780
771/** 781/**
772 * tipc_link_xmit() is the general link level function for message sending 782 * tipc_link_xmit() is the general link level function for message sending
773 * @buf: chain of buffers containing message 783 * @list: chain of buffers containing message
774 * @dsz: amount of user data to be sent 784 * @dsz: amount of user data to be sent
775 * @dnode: address of destination node 785 * @dnode: address of destination node
776 * @selector: a number used for deterministic link selection 786 * @selector: a number used for deterministic link selection
777 * Consumes the buffer chain, except when returning -ELINKCONG 787 * Consumes the buffer chain, except when returning -ELINKCONG
778 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 788 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
779 */ 789 */
780int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) 790int tipc_link_xmit(struct sk_buff_head *list, u32 dnode, u32 selector)
781{ 791{
782 struct tipc_link *link = NULL; 792 struct tipc_link *link = NULL;
783 struct tipc_node *node; 793 struct tipc_node *node;
@@ -788,17 +798,22 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
788 tipc_node_lock(node); 798 tipc_node_lock(node);
789 link = node->active_links[selector & 1]; 799 link = node->active_links[selector & 1];
790 if (link) 800 if (link)
791 rc = __tipc_link_xmit(link, buf); 801 rc = __tipc_link_xmit(link, list);
792 tipc_node_unlock(node); 802 tipc_node_unlock(node);
793 } 803 }
794 804
795 if (link) 805 if (link)
796 return rc; 806 return rc;
797 807
798 if (likely(in_own_node(dnode))) 808 if (likely(in_own_node(dnode))) {
799 return tipc_sk_rcv(buf); 809 /* As a node local message chain never contains more than one
810 * buffer, we just need to dequeue one SKB buffer from the
811 * head list.
812 */
813 return tipc_sk_rcv(__skb_dequeue(list));
814 }
815 __skb_queue_purge(list);
800 816
801 kfree_skb_list(buf);
802 return rc; 817 return rc;
803} 818}
804 819
@@ -812,17 +827,17 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
812 */ 827 */
813static void tipc_link_sync_xmit(struct tipc_link *link) 828static void tipc_link_sync_xmit(struct tipc_link *link)
814{ 829{
815 struct sk_buff *buf; 830 struct sk_buff *skb;
816 struct tipc_msg *msg; 831 struct tipc_msg *msg;
817 832
818 buf = tipc_buf_acquire(INT_H_SIZE); 833 skb = tipc_buf_acquire(INT_H_SIZE);
819 if (!buf) 834 if (!skb)
820 return; 835 return;
821 836
822 msg = buf_msg(buf); 837 msg = buf_msg(skb);
823 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); 838 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
824 msg_set_last_bcast(msg, link->owner->bclink.acked); 839 msg_set_last_bcast(msg, link->owner->bclink.acked);
825 __tipc_link_xmit(link, buf); 840 __tipc_link_xmit_skb(link, skb);
826} 841}
827 842
828/* 843/*
@@ -842,85 +857,46 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
842 kfree_skb(buf); 857 kfree_skb(buf);
843} 858}
844 859
860struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
861 const struct sk_buff *skb)
862{
863 if (skb_queue_is_last(list, skb))
864 return NULL;
865 return skb->next;
866}
867
845/* 868/*
846 * tipc_link_push_packet: Push one unsent packet to the media 869 * tipc_link_push_packets - push unsent packets to bearer
870 *
871 * Push out the unsent messages of a link where congestion
872 * has abated. Node is locked.
873 *
874 * Called with node locked
847 */ 875 */
848static u32 tipc_link_push_packet(struct tipc_link *l_ptr) 876void tipc_link_push_packets(struct tipc_link *l_ptr)
849{ 877{
850 struct sk_buff *buf = l_ptr->first_out; 878 struct sk_buff_head *outqueue = &l_ptr->outqueue;
851 u32 r_q_size = l_ptr->retransm_queue_size; 879 struct sk_buff *skb = l_ptr->next_out;
852 u32 r_q_head = l_ptr->retransm_queue_head; 880 struct tipc_msg *msg;
853 881 u32 next, first;
854 /* Step to position where retransmission failed, if any, */
855 /* consider that buffers may have been released in meantime */
856 if (r_q_size && buf) {
857 u32 last = lesser(mod(r_q_head + r_q_size),
858 link_last_sent(l_ptr));
859 u32 first = buf_seqno(buf);
860
861 while (buf && less(first, r_q_head)) {
862 first = mod(first + 1);
863 buf = buf->next;
864 }
865 l_ptr->retransm_queue_head = r_q_head = first;
866 l_ptr->retransm_queue_size = r_q_size = mod(last - first);
867 }
868
869 /* Continue retransmission now, if there is anything: */
870 if (r_q_size && buf) {
871 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
872 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
873 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
874 l_ptr->retransm_queue_head = mod(++r_q_head);
875 l_ptr->retransm_queue_size = --r_q_size;
876 l_ptr->stats.retransmitted++;
877 return 0;
878 }
879
880 /* Send deferred protocol message, if any: */
881 buf = l_ptr->proto_msg_queue;
882 if (buf) {
883 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
884 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
885 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
886 l_ptr->unacked_window = 0;
887 kfree_skb(buf);
888 l_ptr->proto_msg_queue = NULL;
889 return 0;
890 }
891 882
892 /* Send one deferred data message, if send window not full: */ 883 skb_queue_walk_from(outqueue, skb) {
893 buf = l_ptr->next_out; 884 msg = buf_msg(skb);
894 if (buf) { 885 next = msg_seqno(msg);
895 struct tipc_msg *msg = buf_msg(buf); 886 first = buf_seqno(skb_peek(outqueue));
896 u32 next = msg_seqno(msg);
897 u32 first = buf_seqno(l_ptr->first_out);
898 887
899 if (mod(next - first) < l_ptr->queue_limit[0]) { 888 if (mod(next - first) < l_ptr->queue_limit[0]) {
900 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 889 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
901 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 890 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
902 tipc_bearer_send(l_ptr->bearer_id, buf,
903 &l_ptr->media_addr);
904 if (msg_user(msg) == MSG_BUNDLER) 891 if (msg_user(msg) == MSG_BUNDLER)
905 msg_set_type(msg, BUNDLE_CLOSED); 892 TIPC_SKB_CB(skb)->bundling = false;
906 l_ptr->next_out = buf->next; 893 tipc_bearer_send(l_ptr->bearer_id, skb,
907 return 0; 894 &l_ptr->media_addr);
895 l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
896 } else {
897 break;
908 } 898 }
909 } 899 }
910 return 1;
911}
912
913/*
914 * push_queue(): push out the unsent messages of a link where
915 * congestion has abated. Node is locked
916 */
917void tipc_link_push_queue(struct tipc_link *l_ptr)
918{
919 u32 res;
920
921 do {
922 res = tipc_link_push_packet(l_ptr);
923 } while (!res);
924} 900}
925 901
926void tipc_link_reset_all(struct tipc_node *node) 902void tipc_link_reset_all(struct tipc_node *node)
@@ -984,20 +960,20 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
984 } 960 }
985} 961}
986 962
987void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, 963void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
988 u32 retransmits) 964 u32 retransmits)
989{ 965{
990 struct tipc_msg *msg; 966 struct tipc_msg *msg;
991 967
992 if (!buf) 968 if (!skb)
993 return; 969 return;
994 970
995 msg = buf_msg(buf); 971 msg = buf_msg(skb);
996 972
997 /* Detect repeated retransmit failures */ 973 /* Detect repeated retransmit failures */
998 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 974 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
999 if (++l_ptr->stale_count > 100) { 975 if (++l_ptr->stale_count > 100) {
1000 link_retransmit_failure(l_ptr, buf); 976 link_retransmit_failure(l_ptr, skb);
1001 return; 977 return;
1002 } 978 }
1003 } else { 979 } else {
@@ -1005,38 +981,29 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1005 l_ptr->stale_count = 1; 981 l_ptr->stale_count = 1;
1006 } 982 }
1007 983
1008 while (retransmits && (buf != l_ptr->next_out) && buf) { 984 skb_queue_walk_from(&l_ptr->outqueue, skb) {
1009 msg = buf_msg(buf); 985 if (!retransmits || skb == l_ptr->next_out)
986 break;
987 msg = buf_msg(skb);
1010 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 988 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1011 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 989 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1012 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); 990 tipc_bearer_send(l_ptr->bearer_id, skb, &l_ptr->media_addr);
1013 buf = buf->next;
1014 retransmits--; 991 retransmits--;
1015 l_ptr->stats.retransmitted++; 992 l_ptr->stats.retransmitted++;
1016 } 993 }
1017
1018 l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1019} 994}
1020 995
1021/** 996static void link_retrieve_defq(struct tipc_link *link,
1022 * link_insert_deferred_queue - insert deferred messages back into receive chain 997 struct sk_buff_head *list)
1023 */
1024static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
1025 struct sk_buff *buf)
1026{ 998{
1027 u32 seq_no; 999 u32 seq_no;
1028 1000
1029 if (l_ptr->oldest_deferred_in == NULL) 1001 if (skb_queue_empty(&link->deferred_queue))
1030 return buf; 1002 return;
1031 1003
1032 seq_no = buf_seqno(l_ptr->oldest_deferred_in); 1004 seq_no = buf_seqno(skb_peek(&link->deferred_queue));
1033 if (seq_no == mod(l_ptr->next_in_no)) { 1005 if (seq_no == mod(link->next_in_no))
1034 l_ptr->newest_deferred_in->next = buf; 1006 skb_queue_splice_tail_init(&link->deferred_queue, list);
1035 buf = l_ptr->oldest_deferred_in;
1036 l_ptr->oldest_deferred_in = NULL;
1037 l_ptr->deferred_inqueue_sz = 0;
1038 }
1039 return buf;
1040} 1007}
1041 1008
1042/** 1009/**
@@ -1096,43 +1063,42 @@ static int link_recv_buf_validate(struct sk_buff *buf)
1096 1063
1097/** 1064/**
1098 * tipc_rcv - process TIPC packets/messages arriving from off-node 1065 * tipc_rcv - process TIPC packets/messages arriving from off-node
1099 * @head: pointer to message buffer chain 1066 * @skb: TIPC packet
1100 * @b_ptr: pointer to bearer message arrived on 1067 * @b_ptr: pointer to bearer message arrived on
1101 * 1068 *
1102 * Invoked with no locks held. Bearer pointer must point to a valid bearer 1069 * Invoked with no locks held. Bearer pointer must point to a valid bearer
1103 * structure (i.e. cannot be NULL), but bearer can be inactive. 1070 * structure (i.e. cannot be NULL), but bearer can be inactive.
1104 */ 1071 */
1105void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) 1072void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *b_ptr)
1106{ 1073{
1107 while (head) { 1074 struct sk_buff_head head;
1108 struct tipc_node *n_ptr; 1075 struct tipc_node *n_ptr;
1109 struct tipc_link *l_ptr; 1076 struct tipc_link *l_ptr;
1110 struct sk_buff *crs; 1077 struct sk_buff *skb1, *tmp;
1111 struct sk_buff *buf = head; 1078 struct tipc_msg *msg;
1112 struct tipc_msg *msg; 1079 u32 seq_no;
1113 u32 seq_no; 1080 u32 ackd;
1114 u32 ackd; 1081 u32 released;
1115 u32 released = 0;
1116 1082
1117 head = head->next; 1083 skb2list(skb, &head);
1118 buf->next = NULL;
1119 1084
1085 while ((skb = __skb_dequeue(&head))) {
1120 /* Ensure message is well-formed */ 1086 /* Ensure message is well-formed */
1121 if (unlikely(!link_recv_buf_validate(buf))) 1087 if (unlikely(!link_recv_buf_validate(skb)))
1122 goto discard; 1088 goto discard;
1123 1089
1124 /* Ensure message data is a single contiguous unit */ 1090 /* Ensure message data is a single contiguous unit */
1125 if (unlikely(skb_linearize(buf))) 1091 if (unlikely(skb_linearize(skb)))
1126 goto discard; 1092 goto discard;
1127 1093
1128 /* Handle arrival of a non-unicast link message */ 1094 /* Handle arrival of a non-unicast link message */
1129 msg = buf_msg(buf); 1095 msg = buf_msg(skb);
1130 1096
1131 if (unlikely(msg_non_seq(msg))) { 1097 if (unlikely(msg_non_seq(msg))) {
1132 if (msg_user(msg) == LINK_CONFIG) 1098 if (msg_user(msg) == LINK_CONFIG)
1133 tipc_disc_rcv(buf, b_ptr); 1099 tipc_disc_rcv(skb, b_ptr);
1134 else 1100 else
1135 tipc_bclink_rcv(buf); 1101 tipc_bclink_rcv(skb);
1136 continue; 1102 continue;
1137 } 1103 }
1138 1104
@@ -1171,22 +1137,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1171 if (n_ptr->bclink.recv_permitted) 1137 if (n_ptr->bclink.recv_permitted)
1172 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1138 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1173 1139
1174 crs = l_ptr->first_out; 1140 released = 0;
1175 while ((crs != l_ptr->next_out) && 1141 skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) {
1176 less_eq(buf_seqno(crs), ackd)) { 1142 if (skb1 == l_ptr->next_out ||
1177 struct sk_buff *next = crs->next; 1143 more(buf_seqno(skb1), ackd))
1178 kfree_skb(crs); 1144 break;
1179 crs = next; 1145 __skb_unlink(skb1, &l_ptr->outqueue);
1180 released++; 1146 kfree_skb(skb1);
1181 } 1147 released = 1;
1182 if (released) {
1183 l_ptr->first_out = crs;
1184 l_ptr->out_queue_size -= released;
1185 } 1148 }
1186 1149
1187 /* Try sending any messages link endpoint has pending */ 1150 /* Try sending any messages link endpoint has pending */
1188 if (unlikely(l_ptr->next_out)) 1151 if (unlikely(l_ptr->next_out))
1189 tipc_link_push_queue(l_ptr); 1152 tipc_link_push_packets(l_ptr);
1190 1153
1191 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) { 1154 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
1192 link_prepare_wakeup(l_ptr); 1155 link_prepare_wakeup(l_ptr);
@@ -1196,8 +1159,8 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1196 /* Process the incoming packet */ 1159 /* Process the incoming packet */
1197 if (unlikely(!link_working_working(l_ptr))) { 1160 if (unlikely(!link_working_working(l_ptr))) {
1198 if (msg_user(msg) == LINK_PROTOCOL) { 1161 if (msg_user(msg) == LINK_PROTOCOL) {
1199 tipc_link_proto_rcv(l_ptr, buf); 1162 tipc_link_proto_rcv(l_ptr, skb);
1200 head = link_insert_deferred_queue(l_ptr, head); 1163 link_retrieve_defq(l_ptr, &head);
1201 tipc_node_unlock(n_ptr); 1164 tipc_node_unlock(n_ptr);
1202 continue; 1165 continue;
1203 } 1166 }
@@ -1207,8 +1170,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1207 1170
1208 if (link_working_working(l_ptr)) { 1171 if (link_working_working(l_ptr)) {
1209 /* Re-insert buffer in front of queue */ 1172 /* Re-insert buffer in front of queue */
1210 buf->next = head; 1173 __skb_queue_head(&head, skb);
1211 head = buf;
1212 tipc_node_unlock(n_ptr); 1174 tipc_node_unlock(n_ptr);
1213 continue; 1175 continue;
1214 } 1176 }
@@ -1217,33 +1179,33 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1217 1179
1218 /* Link is now in state WORKING_WORKING */ 1180 /* Link is now in state WORKING_WORKING */
1219 if (unlikely(seq_no != mod(l_ptr->next_in_no))) { 1181 if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
1220 link_handle_out_of_seq_msg(l_ptr, buf); 1182 link_handle_out_of_seq_msg(l_ptr, skb);
1221 head = link_insert_deferred_queue(l_ptr, head); 1183 link_retrieve_defq(l_ptr, &head);
1222 tipc_node_unlock(n_ptr); 1184 tipc_node_unlock(n_ptr);
1223 continue; 1185 continue;
1224 } 1186 }
1225 l_ptr->next_in_no++; 1187 l_ptr->next_in_no++;
1226 if (unlikely(l_ptr->oldest_deferred_in)) 1188 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
1227 head = link_insert_deferred_queue(l_ptr, head); 1189 link_retrieve_defq(l_ptr, &head);
1228 1190
1229 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { 1191 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1230 l_ptr->stats.sent_acks++; 1192 l_ptr->stats.sent_acks++;
1231 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1193 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1232 } 1194 }
1233 1195
1234 if (tipc_link_prepare_input(l_ptr, &buf)) { 1196 if (tipc_link_prepare_input(l_ptr, &skb)) {
1235 tipc_node_unlock(n_ptr); 1197 tipc_node_unlock(n_ptr);
1236 continue; 1198 continue;
1237 } 1199 }
1238 tipc_node_unlock(n_ptr); 1200 tipc_node_unlock(n_ptr);
1239 msg = buf_msg(buf); 1201
1240 if (tipc_link_input(l_ptr, buf) != 0) 1202 if (tipc_link_input(l_ptr, skb) != 0)
1241 goto discard; 1203 goto discard;
1242 continue; 1204 continue;
1243unlock_discard: 1205unlock_discard:
1244 tipc_node_unlock(n_ptr); 1206 tipc_node_unlock(n_ptr);
1245discard: 1207discard:
1246 kfree_skb(buf); 1208 kfree_skb(skb);
1247 } 1209 }
1248} 1210}
1249 1211
@@ -1326,48 +1288,37 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
1326 * 1288 *
1327 * Returns increase in queue length (i.e. 0 or 1) 1289 * Returns increase in queue length (i.e. 0 or 1)
1328 */ 1290 */
1329u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, 1291u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1330 struct sk_buff *buf)
1331{ 1292{
1332 struct sk_buff *queue_buf; 1293 struct sk_buff *skb1;
1333 struct sk_buff **prev; 1294 u32 seq_no = buf_seqno(skb);
1334 u32 seq_no = buf_seqno(buf);
1335
1336 buf->next = NULL;
1337 1295
1338 /* Empty queue ? */ 1296 /* Empty queue ? */
1339 if (*head == NULL) { 1297 if (skb_queue_empty(list)) {
1340 *head = *tail = buf; 1298 __skb_queue_tail(list, skb);
1341 return 1; 1299 return 1;
1342 } 1300 }
1343 1301
1344 /* Last ? */ 1302 /* Last ? */
1345 if (less(buf_seqno(*tail), seq_no)) { 1303 if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1346 (*tail)->next = buf; 1304 __skb_queue_tail(list, skb);
1347 *tail = buf;
1348 return 1; 1305 return 1;
1349 } 1306 }
1350 1307
1351 /* Locate insertion point in queue, then insert; discard if duplicate */ 1308 /* Locate insertion point in queue, then insert; discard if duplicate */
1352 prev = head; 1309 skb_queue_walk(list, skb1) {
1353 queue_buf = *head; 1310 u32 curr_seqno = buf_seqno(skb1);
1354 for (;;) {
1355 u32 curr_seqno = buf_seqno(queue_buf);
1356 1311
1357 if (seq_no == curr_seqno) { 1312 if (seq_no == curr_seqno) {
1358 kfree_skb(buf); 1313 kfree_skb(skb);
1359 return 0; 1314 return 0;
1360 } 1315 }
1361 1316
1362 if (less(seq_no, curr_seqno)) 1317 if (less(seq_no, curr_seqno))
1363 break; 1318 break;
1364
1365 prev = &queue_buf->next;
1366 queue_buf = queue_buf->next;
1367 } 1319 }
1368 1320
1369 buf->next = queue_buf; 1321 __skb_queue_before(list, skb1, skb);
1370 *prev = buf;
1371 return 1; 1322 return 1;
1372} 1323}
1373 1324
@@ -1397,15 +1348,14 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1397 return; 1348 return;
1398 } 1349 }
1399 1350
1400 if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in, 1351 if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) {
1401 &l_ptr->newest_deferred_in, buf)) {
1402 l_ptr->deferred_inqueue_sz++;
1403 l_ptr->stats.deferred_recv++; 1352 l_ptr->stats.deferred_recv++;
1404 TIPC_SKB_CB(buf)->deferred = true; 1353 TIPC_SKB_CB(buf)->deferred = true;
1405 if ((l_ptr->deferred_inqueue_sz % 16) == 1) 1354 if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
1406 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1355 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1407 } else 1356 } else {
1408 l_ptr->stats.duplicates++; 1357 l_ptr->stats.duplicates++;
1358 }
1409} 1359}
1410 1360
1411/* 1361/*
@@ -1419,12 +1369,6 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1419 u32 msg_size = sizeof(l_ptr->proto_msg); 1369 u32 msg_size = sizeof(l_ptr->proto_msg);
1420 int r_flag; 1370 int r_flag;
1421 1371
1422 /* Discard any previous message that was deferred due to congestion */
1423 if (l_ptr->proto_msg_queue) {
1424 kfree_skb(l_ptr->proto_msg_queue);
1425 l_ptr->proto_msg_queue = NULL;
1426 }
1427
1428 /* Don't send protocol message during link changeover */ 1372 /* Don't send protocol message during link changeover */
1429 if (l_ptr->exp_msg_count) 1373 if (l_ptr->exp_msg_count)
1430 return; 1374 return;
@@ -1447,8 +1391,8 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1447 if (l_ptr->next_out) 1391 if (l_ptr->next_out)
1448 next_sent = buf_seqno(l_ptr->next_out); 1392 next_sent = buf_seqno(l_ptr->next_out);
1449 msg_set_next_sent(msg, next_sent); 1393 msg_set_next_sent(msg, next_sent);
1450 if (l_ptr->oldest_deferred_in) { 1394 if (!skb_queue_empty(&l_ptr->deferred_queue)) {
1451 u32 rec = buf_seqno(l_ptr->oldest_deferred_in); 1395 u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue));
1452 gap = mod(rec - mod(l_ptr->next_in_no)); 1396 gap = mod(rec - mod(l_ptr->next_in_no));
1453 } 1397 }
1454 msg_set_seq_gap(msg, gap); 1398 msg_set_seq_gap(msg, gap);
@@ -1636,7 +1580,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
1636 } 1580 }
1637 if (msg_seq_gap(msg)) { 1581 if (msg_seq_gap(msg)) {
1638 l_ptr->stats.recv_nacks++; 1582 l_ptr->stats.recv_nacks++;
1639 tipc_link_retransmit(l_ptr, l_ptr->first_out, 1583 tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue),
1640 msg_seq_gap(msg)); 1584 msg_seq_gap(msg));
1641 } 1585 }
1642 break; 1586 break;
@@ -1655,7 +1599,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1655 u32 selector) 1599 u32 selector)
1656{ 1600{
1657 struct tipc_link *tunnel; 1601 struct tipc_link *tunnel;
1658 struct sk_buff *buf; 1602 struct sk_buff *skb;
1659 u32 length = msg_size(msg); 1603 u32 length = msg_size(msg);
1660 1604
1661 tunnel = l_ptr->owner->active_links[selector & 1]; 1605 tunnel = l_ptr->owner->active_links[selector & 1];
@@ -1664,14 +1608,14 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1664 return; 1608 return;
1665 } 1609 }
1666 msg_set_size(tunnel_hdr, length + INT_H_SIZE); 1610 msg_set_size(tunnel_hdr, length + INT_H_SIZE);
1667 buf = tipc_buf_acquire(length + INT_H_SIZE); 1611 skb = tipc_buf_acquire(length + INT_H_SIZE);
1668 if (!buf) { 1612 if (!skb) {
1669 pr_warn("%sunable to send tunnel msg\n", link_co_err); 1613 pr_warn("%sunable to send tunnel msg\n", link_co_err);
1670 return; 1614 return;
1671 } 1615 }
1672 skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); 1616 skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
1673 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); 1617 skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
1674 __tipc_link_xmit(tunnel, buf); 1618 __tipc_link_xmit_skb(tunnel, skb);
1675} 1619}
1676 1620
1677 1621
@@ -1683,10 +1627,10 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1683 */ 1627 */
1684void tipc_link_failover_send_queue(struct tipc_link *l_ptr) 1628void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1685{ 1629{
1686 u32 msgcount = l_ptr->out_queue_size; 1630 u32 msgcount = skb_queue_len(&l_ptr->outqueue);
1687 struct sk_buff *crs = l_ptr->first_out;
1688 struct tipc_link *tunnel = l_ptr->owner->active_links[0]; 1631 struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1689 struct tipc_msg tunnel_hdr; 1632 struct tipc_msg tunnel_hdr;
1633 struct sk_buff *skb;
1690 int split_bundles; 1634 int split_bundles;
1691 1635
1692 if (!tunnel) 1636 if (!tunnel)
@@ -1697,14 +1641,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1697 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1641 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1698 msg_set_msgcnt(&tunnel_hdr, msgcount); 1642 msg_set_msgcnt(&tunnel_hdr, msgcount);
1699 1643
1700 if (!l_ptr->first_out) { 1644 if (skb_queue_empty(&l_ptr->outqueue)) {
1701 struct sk_buff *buf; 1645 skb = tipc_buf_acquire(INT_H_SIZE);
1702 1646 if (skb) {
1703 buf = tipc_buf_acquire(INT_H_SIZE); 1647 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
1704 if (buf) {
1705 skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
1706 msg_set_size(&tunnel_hdr, INT_H_SIZE); 1648 msg_set_size(&tunnel_hdr, INT_H_SIZE);
1707 __tipc_link_xmit(tunnel, buf); 1649 __tipc_link_xmit_skb(tunnel, skb);
1708 } else { 1650 } else {
1709 pr_warn("%sunable to send changeover msg\n", 1651 pr_warn("%sunable to send changeover msg\n",
1710 link_co_err); 1652 link_co_err);
@@ -1715,8 +1657,8 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1715 split_bundles = (l_ptr->owner->active_links[0] != 1657 split_bundles = (l_ptr->owner->active_links[0] !=
1716 l_ptr->owner->active_links[1]); 1658 l_ptr->owner->active_links[1]);
1717 1659
1718 while (crs) { 1660 skb_queue_walk(&l_ptr->outqueue, skb) {
1719 struct tipc_msg *msg = buf_msg(crs); 1661 struct tipc_msg *msg = buf_msg(skb);
1720 1662
1721 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 1663 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
1722 struct tipc_msg *m = msg_get_wrapped(msg); 1664 struct tipc_msg *m = msg_get_wrapped(msg);
@@ -1734,7 +1676,6 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1734 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, 1676 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
1735 msg_link_selector(msg)); 1677 msg_link_selector(msg));
1736 } 1678 }
1737 crs = crs->next;
1738 } 1679 }
1739} 1680}
1740 1681
@@ -1750,17 +1691,16 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1750void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, 1691void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1751 struct tipc_link *tunnel) 1692 struct tipc_link *tunnel)
1752{ 1693{
1753 struct sk_buff *iter; 1694 struct sk_buff *skb;
1754 struct tipc_msg tunnel_hdr; 1695 struct tipc_msg tunnel_hdr;
1755 1696
1756 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 1697 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
1757 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); 1698 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
1758 msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); 1699 msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
1759 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1700 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1760 iter = l_ptr->first_out; 1701 skb_queue_walk(&l_ptr->outqueue, skb) {
1761 while (iter) { 1702 struct sk_buff *outskb;
1762 struct sk_buff *outbuf; 1703 struct tipc_msg *msg = buf_msg(skb);
1763 struct tipc_msg *msg = buf_msg(iter);
1764 u32 length = msg_size(msg); 1704 u32 length = msg_size(msg);
1765 1705
1766 if (msg_user(msg) == MSG_BUNDLER) 1706 if (msg_user(msg) == MSG_BUNDLER)
@@ -1768,19 +1708,18 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1768 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 1708 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */
1769 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1709 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1770 msg_set_size(&tunnel_hdr, length + INT_H_SIZE); 1710 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
1771 outbuf = tipc_buf_acquire(length + INT_H_SIZE); 1711 outskb = tipc_buf_acquire(length + INT_H_SIZE);
1772 if (outbuf == NULL) { 1712 if (outskb == NULL) {
1773 pr_warn("%sunable to send duplicate msg\n", 1713 pr_warn("%sunable to send duplicate msg\n",
1774 link_co_err); 1714 link_co_err);
1775 return; 1715 return;
1776 } 1716 }
1777 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); 1717 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
1778 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, 1718 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
1779 length); 1719 length);
1780 __tipc_link_xmit(tunnel, outbuf); 1720 __tipc_link_xmit_skb(tunnel, outskb);
1781 if (!tipc_link_is_up(l_ptr)) 1721 if (!tipc_link_is_up(l_ptr))
1782 return; 1722 return;
1783 iter = iter->next;
1784 } 1723 }
1785} 1724}
1786 1725
@@ -2375,3 +2314,435 @@ static void link_print(struct tipc_link *l_ptr, const char *str)
2375 else 2314 else
2376 pr_cont("\n"); 2315 pr_cont("\n");
2377} 2316}
2317
2318/* Parse and validate nested (link) properties valid for media, bearer and link
2319 */
2320int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[])
2321{
2322 int err;
2323
2324 err = nla_parse_nested(props, TIPC_NLA_PROP_MAX, prop,
2325 tipc_nl_prop_policy);
2326 if (err)
2327 return err;
2328
2329 if (props[TIPC_NLA_PROP_PRIO]) {
2330 u32 prio;
2331
2332 prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
2333 if (prio > TIPC_MAX_LINK_PRI)
2334 return -EINVAL;
2335 }
2336
2337 if (props[TIPC_NLA_PROP_TOL]) {
2338 u32 tol;
2339
2340 tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
2341 if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
2342 return -EINVAL;
2343 }
2344
2345 if (props[TIPC_NLA_PROP_WIN]) {
2346 u32 win;
2347
2348 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
2349 if ((win < TIPC_MIN_LINK_WIN) || (win > TIPC_MAX_LINK_WIN))
2350 return -EINVAL;
2351 }
2352
2353 return 0;
2354}
2355
2356int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
2357{
2358 int err;
2359 int res = 0;
2360 int bearer_id;
2361 char *name;
2362 struct tipc_link *link;
2363 struct tipc_node *node;
2364 struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
2365
2366 if (!info->attrs[TIPC_NLA_LINK])
2367 return -EINVAL;
2368
2369 err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
2370 info->attrs[TIPC_NLA_LINK],
2371 tipc_nl_link_policy);
2372 if (err)
2373 return err;
2374
2375 if (!attrs[TIPC_NLA_LINK_NAME])
2376 return -EINVAL;
2377
2378 name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
2379
2380 node = tipc_link_find_owner(name, &bearer_id);
2381 if (!node)
2382 return -EINVAL;
2383
2384 tipc_node_lock(node);
2385
2386 link = node->links[bearer_id];
2387 if (!link) {
2388 res = -EINVAL;
2389 goto out;
2390 }
2391
2392 if (attrs[TIPC_NLA_LINK_PROP]) {
2393 struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
2394
2395 err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP],
2396 props);
2397 if (err) {
2398 res = err;
2399 goto out;
2400 }
2401
2402 if (props[TIPC_NLA_PROP_TOL]) {
2403 u32 tol;
2404
2405 tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
2406 link_set_supervision_props(link, tol);
2407 tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0, 0);
2408 }
2409 if (props[TIPC_NLA_PROP_PRIO]) {
2410 u32 prio;
2411
2412 prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
2413 link->priority = prio;
2414 tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio, 0);
2415 }
2416 if (props[TIPC_NLA_PROP_WIN]) {
2417 u32 win;
2418
2419 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
2420 tipc_link_set_queue_limits(link, win);
2421 }
2422 }
2423
2424out:
2425 tipc_node_unlock(node);
2426
2427 return res;
2428}
2429
2430static int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s)
2431{
2432 int i;
2433 struct nlattr *stats;
2434
2435 struct nla_map {
2436 u32 key;
2437 u32 val;
2438 };
2439
2440 struct nla_map map[] = {
2441 {TIPC_NLA_STATS_RX_INFO, s->recv_info},
2442 {TIPC_NLA_STATS_RX_FRAGMENTS, s->recv_fragments},
2443 {TIPC_NLA_STATS_RX_FRAGMENTED, s->recv_fragmented},
2444 {TIPC_NLA_STATS_RX_BUNDLES, s->recv_bundles},
2445 {TIPC_NLA_STATS_RX_BUNDLED, s->recv_bundled},
2446 {TIPC_NLA_STATS_TX_INFO, s->sent_info},
2447 {TIPC_NLA_STATS_TX_FRAGMENTS, s->sent_fragments},
2448 {TIPC_NLA_STATS_TX_FRAGMENTED, s->sent_fragmented},
2449 {TIPC_NLA_STATS_TX_BUNDLES, s->sent_bundles},
2450 {TIPC_NLA_STATS_TX_BUNDLED, s->sent_bundled},
2451 {TIPC_NLA_STATS_MSG_PROF_TOT, (s->msg_length_counts) ?
2452 s->msg_length_counts : 1},
2453 {TIPC_NLA_STATS_MSG_LEN_CNT, s->msg_length_counts},
2454 {TIPC_NLA_STATS_MSG_LEN_TOT, s->msg_lengths_total},
2455 {TIPC_NLA_STATS_MSG_LEN_P0, s->msg_length_profile[0]},
2456 {TIPC_NLA_STATS_MSG_LEN_P1, s->msg_length_profile[1]},
2457 {TIPC_NLA_STATS_MSG_LEN_P2, s->msg_length_profile[2]},
2458 {TIPC_NLA_STATS_MSG_LEN_P3, s->msg_length_profile[3]},
2459 {TIPC_NLA_STATS_MSG_LEN_P4, s->msg_length_profile[4]},
2460 {TIPC_NLA_STATS_MSG_LEN_P5, s->msg_length_profile[5]},
2461 {TIPC_NLA_STATS_MSG_LEN_P6, s->msg_length_profile[6]},
2462 {TIPC_NLA_STATS_RX_STATES, s->recv_states},
2463 {TIPC_NLA_STATS_RX_PROBES, s->recv_probes},
2464 {TIPC_NLA_STATS_RX_NACKS, s->recv_nacks},
2465 {TIPC_NLA_STATS_RX_DEFERRED, s->deferred_recv},
2466 {TIPC_NLA_STATS_TX_STATES, s->sent_states},
2467 {TIPC_NLA_STATS_TX_PROBES, s->sent_probes},
2468 {TIPC_NLA_STATS_TX_NACKS, s->sent_nacks},
2469 {TIPC_NLA_STATS_TX_ACKS, s->sent_acks},
2470 {TIPC_NLA_STATS_RETRANSMITTED, s->retransmitted},
2471 {TIPC_NLA_STATS_DUPLICATES, s->duplicates},
2472 {TIPC_NLA_STATS_LINK_CONGS, s->link_congs},
2473 {TIPC_NLA_STATS_MAX_QUEUE, s->max_queue_sz},
2474 {TIPC_NLA_STATS_AVG_QUEUE, s->queue_sz_counts ?
2475 (s->accu_queue_sz / s->queue_sz_counts) : 0}
2476 };
2477
2478 stats = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
2479 if (!stats)
2480 return -EMSGSIZE;
2481
2482 for (i = 0; i < ARRAY_SIZE(map); i++)
2483 if (nla_put_u32(skb, map[i].key, map[i].val))
2484 goto msg_full;
2485
2486 nla_nest_end(skb, stats);
2487
2488 return 0;
2489msg_full:
2490 nla_nest_cancel(skb, stats);
2491
2492 return -EMSGSIZE;
2493}
2494
2495/* Caller should hold appropriate locks to protect the link */
2496static int __tipc_nl_add_link(struct tipc_nl_msg *msg, struct tipc_link *link)
2497{
2498 int err;
2499 void *hdr;
2500 struct nlattr *attrs;
2501 struct nlattr *prop;
2502
2503 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
2504 NLM_F_MULTI, TIPC_NL_LINK_GET);
2505 if (!hdr)
2506 return -EMSGSIZE;
2507
2508 attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
2509 if (!attrs)
2510 goto msg_full;
2511
2512 if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, link->name))
2513 goto attr_msg_full;
2514 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_DEST,
2515 tipc_cluster_mask(tipc_own_addr)))
2516 goto attr_msg_full;
2517 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->max_pkt))
2518 goto attr_msg_full;
2519 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, link->next_in_no))
2520 goto attr_msg_full;
2521 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, link->next_out_no))
2522 goto attr_msg_full;
2523
2524 if (tipc_link_is_up(link))
2525 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
2526 goto attr_msg_full;
2527 if (tipc_link_is_active(link))
2528 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE))
2529 goto attr_msg_full;
2530
2531 prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
2532 if (!prop)
2533 goto attr_msg_full;
2534 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2535 goto prop_msg_full;
2536 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))
2537 goto prop_msg_full;
2538 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN,
2539 link->queue_limit[TIPC_LOW_IMPORTANCE]))
2540 goto prop_msg_full;
2541 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2542 goto prop_msg_full;
2543 nla_nest_end(msg->skb, prop);
2544
2545 err = __tipc_nl_add_stats(msg->skb, &link->stats);
2546 if (err)
2547 goto attr_msg_full;
2548
2549 nla_nest_end(msg->skb, attrs);
2550 genlmsg_end(msg->skb, hdr);
2551
2552 return 0;
2553
2554prop_msg_full:
2555 nla_nest_cancel(msg->skb, prop);
2556attr_msg_full:
2557 nla_nest_cancel(msg->skb, attrs);
2558msg_full:
2559 genlmsg_cancel(msg->skb, hdr);
2560
2561 return -EMSGSIZE;
2562}
2563
2564/* Caller should hold node lock */
2565static int __tipc_nl_add_node_links(struct tipc_nl_msg *msg,
2566 struct tipc_node *node,
2567 u32 *prev_link)
2568{
2569 u32 i;
2570 int err;
2571
2572 for (i = *prev_link; i < MAX_BEARERS; i++) {
2573 *prev_link = i;
2574
2575 if (!node->links[i])
2576 continue;
2577
2578 err = __tipc_nl_add_link(msg, node->links[i]);
2579 if (err)
2580 return err;
2581 }
2582 *prev_link = 0;
2583
2584 return 0;
2585}
2586
2587int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
2588{
2589 struct tipc_node *node;
2590 struct tipc_nl_msg msg;
2591 u32 prev_node = cb->args[0];
2592 u32 prev_link = cb->args[1];
2593 int done = cb->args[2];
2594 int err;
2595
2596 if (done)
2597 return 0;
2598
2599 msg.skb = skb;
2600 msg.portid = NETLINK_CB(cb->skb).portid;
2601 msg.seq = cb->nlh->nlmsg_seq;
2602
2603 rcu_read_lock();
2604
2605 if (prev_node) {
2606 node = tipc_node_find(prev_node);
2607 if (!node) {
2608 /* We never set seq or call nl_dump_check_consistent()
2609 * this means that setting prev_seq here will cause the
2610 * consistence check to fail in the netlink callback
2611 * handler. Resulting in the last NLMSG_DONE message
2612 * having the NLM_F_DUMP_INTR flag set.
2613 */
2614 cb->prev_seq = 1;
2615 goto out;
2616 }
2617
2618 list_for_each_entry_continue_rcu(node, &tipc_node_list, list) {
2619 tipc_node_lock(node);
2620 err = __tipc_nl_add_node_links(&msg, node, &prev_link);
2621 tipc_node_unlock(node);
2622 if (err)
2623 goto out;
2624
2625 prev_node = node->addr;
2626 }
2627 } else {
2628 err = tipc_nl_add_bc_link(&msg);
2629 if (err)
2630 goto out;
2631
2632 list_for_each_entry_rcu(node, &tipc_node_list, list) {
2633 tipc_node_lock(node);
2634 err = __tipc_nl_add_node_links(&msg, node, &prev_link);
2635 tipc_node_unlock(node);
2636 if (err)
2637 goto out;
2638
2639 prev_node = node->addr;
2640 }
2641 }
2642 done = 1;
2643out:
2644 rcu_read_unlock();
2645
2646 cb->args[0] = prev_node;
2647 cb->args[1] = prev_link;
2648 cb->args[2] = done;
2649
2650 return skb->len;
2651}
2652
2653int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
2654{
2655 struct sk_buff *ans_skb;
2656 struct tipc_nl_msg msg;
2657 struct tipc_link *link;
2658 struct tipc_node *node;
2659 char *name;
2660 int bearer_id;
2661 int err;
2662
2663 if (!info->attrs[TIPC_NLA_LINK_NAME])
2664 return -EINVAL;
2665
2666 name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]);
2667 node = tipc_link_find_owner(name, &bearer_id);
2668 if (!node)
2669 return -EINVAL;
2670
2671 ans_skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
2672 if (!ans_skb)
2673 return -ENOMEM;
2674
2675 msg.skb = ans_skb;
2676 msg.portid = info->snd_portid;
2677 msg.seq = info->snd_seq;
2678
2679 tipc_node_lock(node);
2680 link = node->links[bearer_id];
2681 if (!link) {
2682 err = -EINVAL;
2683 goto err_out;
2684 }
2685
2686 err = __tipc_nl_add_link(&msg, link);
2687 if (err)
2688 goto err_out;
2689
2690 tipc_node_unlock(node);
2691
2692 return genlmsg_reply(ans_skb, info);
2693
2694err_out:
2695 tipc_node_unlock(node);
2696 nlmsg_free(ans_skb);
2697
2698 return err;
2699}
2700
2701int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
2702{
2703 int err;
2704 char *link_name;
2705 unsigned int bearer_id;
2706 struct tipc_link *link;
2707 struct tipc_node *node;
2708 struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
2709
2710 if (!info->attrs[TIPC_NLA_LINK])
2711 return -EINVAL;
2712
2713 err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
2714 info->attrs[TIPC_NLA_LINK],
2715 tipc_nl_link_policy);
2716 if (err)
2717 return err;
2718
2719 if (!attrs[TIPC_NLA_LINK_NAME])
2720 return -EINVAL;
2721
2722 link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
2723
2724 if (strcmp(link_name, tipc_bclink_name) == 0) {
2725 err = tipc_bclink_reset_stats();
2726 if (err)
2727 return err;
2728 return 0;
2729 }
2730
2731 node = tipc_link_find_owner(link_name, &bearer_id);
2732 if (!node)
2733 return -EINVAL;
2734
2735 tipc_node_lock(node);
2736
2737 link = node->links[bearer_id];
2738 if (!link) {
2739 tipc_node_unlock(node);
2740 return -EINVAL;
2741 }
2742
2743 link_reset_statistics(link);
2744
2745 tipc_node_unlock(node);
2746
2747 return 0;
2748}
diff --git a/net/tipc/link.h b/net/tipc/link.h
index b567a3427fda..55812e87ca1e 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -37,6 +37,7 @@
37#ifndef _TIPC_LINK_H 37#ifndef _TIPC_LINK_H
38#define _TIPC_LINK_H 38#define _TIPC_LINK_H
39 39
40#include <net/genetlink.h>
40#include "msg.h" 41#include "msg.h"
41#include "node.h" 42#include "node.h"
42 43
@@ -118,20 +119,13 @@ struct tipc_stats {
118 * @max_pkt: current maximum packet size for this link 119 * @max_pkt: current maximum packet size for this link
119 * @max_pkt_target: desired maximum packet size for this link 120 * @max_pkt_target: desired maximum packet size for this link
120 * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target) 121 * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target)
121 * @out_queue_size: # of messages in outbound message queue 122 * @outqueue: outbound message queue
122 * @first_out: ptr to first outbound message in queue
123 * @last_out: ptr to last outbound message in queue
124 * @next_out_no: next sequence number to use for outbound messages 123 * @next_out_no: next sequence number to use for outbound messages
125 * @last_retransmitted: sequence number of most recently retransmitted message 124 * @last_retransmitted: sequence number of most recently retransmitted message
126 * @stale_count: # of identical retransmit requests made by peer 125 * @stale_count: # of identical retransmit requests made by peer
127 * @next_in_no: next sequence number to expect for inbound messages 126 * @next_in_no: next sequence number to expect for inbound messages
128 * @deferred_inqueue_sz: # of messages in inbound message queue 127 * @deferred_queue: deferred queue saved OOS b'cast message received from node
129 * @oldest_deferred_in: ptr to first inbound message in queue
130 * @newest_deferred_in: ptr to last inbound message in queue
131 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer 128 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer
132 * @proto_msg_queue: ptr to (single) outbound control message
133 * @retransm_queue_size: number of messages to retransmit
134 * @retransm_queue_head: sequence number of first message to retransmit
135 * @next_out: ptr to first unsent outbound message in queue 129 * @next_out: ptr to first unsent outbound message in queue
136 * @waiting_sks: linked list of sockets waiting for link congestion to abate 130 * @waiting_sks: linked list of sockets waiting for link congestion to abate
137 * @long_msg_seq_no: next identifier to use for outbound fragmented messages 131 * @long_msg_seq_no: next identifier to use for outbound fragmented messages
@@ -175,24 +169,17 @@ struct tipc_link {
175 u32 max_pkt_probes; 169 u32 max_pkt_probes;
176 170
177 /* Sending */ 171 /* Sending */
178 u32 out_queue_size; 172 struct sk_buff_head outqueue;
179 struct sk_buff *first_out;
180 struct sk_buff *last_out;
181 u32 next_out_no; 173 u32 next_out_no;
182 u32 last_retransmitted; 174 u32 last_retransmitted;
183 u32 stale_count; 175 u32 stale_count;
184 176
185 /* Reception */ 177 /* Reception */
186 u32 next_in_no; 178 u32 next_in_no;
187 u32 deferred_inqueue_sz; 179 struct sk_buff_head deferred_queue;
188 struct sk_buff *oldest_deferred_in;
189 struct sk_buff *newest_deferred_in;
190 u32 unacked_window; 180 u32 unacked_window;
191 181
192 /* Congestion handling */ 182 /* Congestion handling */
193 struct sk_buff *proto_msg_queue;
194 u32 retransm_queue_size;
195 u32 retransm_queue_head;
196 struct sk_buff *next_out; 183 struct sk_buff *next_out;
197 struct sk_buff_head waiting_sks; 184 struct sk_buff_head waiting_sks;
198 185
@@ -226,18 +213,26 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area,
226void tipc_link_reset_all(struct tipc_node *node); 213void tipc_link_reset_all(struct tipc_node *node);
227void tipc_link_reset(struct tipc_link *l_ptr); 214void tipc_link_reset(struct tipc_link *l_ptr);
228void tipc_link_reset_list(unsigned int bearer_id); 215void tipc_link_reset_list(unsigned int bearer_id);
229int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); 216int tipc_link_xmit_skb(struct sk_buff *skb, u32 dest, u32 selector);
230int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf); 217int tipc_link_xmit(struct sk_buff_head *list, u32 dest, u32 selector);
218int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list);
231u32 tipc_link_get_max_pkt(u32 dest, u32 selector); 219u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
232void tipc_link_bundle_rcv(struct sk_buff *buf); 220void tipc_link_bundle_rcv(struct sk_buff *buf);
233void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, 221void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
234 u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); 222 u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
235void tipc_link_push_queue(struct tipc_link *l_ptr); 223void tipc_link_push_packets(struct tipc_link *l_ptr);
236u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, 224u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *buf);
237 struct sk_buff *buf);
238void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); 225void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);
239void tipc_link_retransmit(struct tipc_link *l_ptr, 226void tipc_link_retransmit(struct tipc_link *l_ptr,
240 struct sk_buff *start, u32 retransmits); 227 struct sk_buff *start, u32 retransmits);
228struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
229 const struct sk_buff *skb);
230
231int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb);
232int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
233int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info);
234int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);
235int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
241 236
242/* 237/*
243 * Link sequence number manipulation routines (uses modulo 2**16 arithmetic) 238 * Link sequence number manipulation routines (uses modulo 2**16 arithmetic)
@@ -252,18 +247,14 @@ static inline u32 mod(u32 x)
252 return x & 0xffffu; 247 return x & 0xffffu;
253} 248}
254 249
255static inline int between(u32 lower, u32 upper, u32 n) 250static inline int less_eq(u32 left, u32 right)
256{ 251{
257 if ((lower < n) && (n < upper)) 252 return mod(right - left) < 32768u;
258 return 1;
259 if ((upper < lower) && ((n > lower) || (n < upper)))
260 return 1;
261 return 0;
262} 253}
263 254
264static inline int less_eq(u32 left, u32 right) 255static inline int more(u32 left, u32 right)
265{ 256{
266 return mod(right - left) < 32768u; 257 return !less_eq(left, right);
267} 258}
268 259
269static inline int less(u32 left, u32 right) 260static inline int less(u32 left, u32 right)
@@ -302,7 +293,7 @@ static inline int link_reset_reset(struct tipc_link *l_ptr)
302 293
303static inline int link_congested(struct tipc_link *l_ptr) 294static inline int link_congested(struct tipc_link *l_ptr)
304{ 295{
305 return l_ptr->out_queue_size >= l_ptr->queue_limit[0]; 296 return skb_queue_len(&l_ptr->outqueue) >= l_ptr->queue_limit[0];
306} 297}
307 298
308#endif 299#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 74745a47d72a..a687b30a699c 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -91,7 +91,7 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
91 * @*headbuf: in: NULL for first frag, otherwise value returned from prev call 91 * @*headbuf: in: NULL for first frag, otherwise value returned from prev call
92 * out: set when successful non-complete reassembly, otherwise NULL 92 * out: set when successful non-complete reassembly, otherwise NULL
93 * @*buf: in: the buffer to append. Always defined 93 * @*buf: in: the buffer to append. Always defined
94 * out: head buf after sucessful complete reassembly, otherwise NULL 94 * out: head buf after successful complete reassembly, otherwise NULL
95 * Returns 1 when reassembly complete, otherwise 0 95 * Returns 1 when reassembly complete, otherwise 0
96 */ 96 */
97int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) 97int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
@@ -162,15 +162,16 @@ err:
162/** 162/**
163 * tipc_msg_build - create buffer chain containing specified header and data 163 * tipc_msg_build - create buffer chain containing specified header and data
164 * @mhdr: Message header, to be prepended to data 164 * @mhdr: Message header, to be prepended to data
165 * @iov: User data 165 * @m: User message
166 * @offset: Posision in iov to start copying from 166 * @offset: Posision in iov to start copying from
167 * @dsz: Total length of user data 167 * @dsz: Total length of user data
168 * @pktmax: Max packet size that can be used 168 * @pktmax: Max packet size that can be used
169 * @chain: Buffer or chain of buffers to be returned to caller 169 * @list: Buffer or chain of buffers to be returned to caller
170 *
170 * Returns message data size or errno: -ENOMEM, -EFAULT 171 * Returns message data size or errno: -ENOMEM, -EFAULT
171 */ 172 */
172int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, 173int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset,
173 int offset, int dsz, int pktmax , struct sk_buff **chain) 174 int dsz, int pktmax, struct sk_buff_head *list)
174{ 175{
175 int mhsz = msg_hdr_sz(mhdr); 176 int mhsz = msg_hdr_sz(mhdr);
176 int msz = mhsz + dsz; 177 int msz = mhsz + dsz;
@@ -179,22 +180,22 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
179 int pktrem = pktmax; 180 int pktrem = pktmax;
180 int drem = dsz; 181 int drem = dsz;
181 struct tipc_msg pkthdr; 182 struct tipc_msg pkthdr;
182 struct sk_buff *buf, *prev; 183 struct sk_buff *skb;
183 char *pktpos; 184 char *pktpos;
184 int rc; 185 int rc;
185 uint chain_sz = 0; 186
186 msg_set_size(mhdr, msz); 187 msg_set_size(mhdr, msz);
187 188
188 /* No fragmentation needed? */ 189 /* No fragmentation needed? */
189 if (likely(msz <= pktmax)) { 190 if (likely(msz <= pktmax)) {
190 buf = tipc_buf_acquire(msz); 191 skb = tipc_buf_acquire(msz);
191 *chain = buf; 192 if (unlikely(!skb))
192 if (unlikely(!buf))
193 return -ENOMEM; 193 return -ENOMEM;
194 skb_copy_to_linear_data(buf, mhdr, mhsz); 194 __skb_queue_tail(list, skb);
195 pktpos = buf->data + mhsz; 195 skb_copy_to_linear_data(skb, mhdr, mhsz);
196 TIPC_SKB_CB(buf)->chain_sz = 1; 196 pktpos = skb->data + mhsz;
197 if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz)) 197 if (!dsz || !memcpy_fromiovecend(pktpos, m->msg_iter.iov, offset,
198 dsz))
198 return dsz; 199 return dsz;
199 rc = -EFAULT; 200 rc = -EFAULT;
200 goto error; 201 goto error;
@@ -207,15 +208,15 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
207 msg_set_fragm_no(&pkthdr, pktno); 208 msg_set_fragm_no(&pkthdr, pktno);
208 209
209 /* Prepare first fragment */ 210 /* Prepare first fragment */
210 *chain = buf = tipc_buf_acquire(pktmax); 211 skb = tipc_buf_acquire(pktmax);
211 if (!buf) 212 if (!skb)
212 return -ENOMEM; 213 return -ENOMEM;
213 chain_sz = 1; 214 __skb_queue_tail(list, skb);
214 pktpos = buf->data; 215 pktpos = skb->data;
215 skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); 216 skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
216 pktpos += INT_H_SIZE; 217 pktpos += INT_H_SIZE;
217 pktrem -= INT_H_SIZE; 218 pktrem -= INT_H_SIZE;
218 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, mhdr, mhsz); 219 skb_copy_to_linear_data_offset(skb, INT_H_SIZE, mhdr, mhsz);
219 pktpos += mhsz; 220 pktpos += mhsz;
220 pktrem -= mhsz; 221 pktrem -= mhsz;
221 222
@@ -223,7 +224,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
223 if (drem < pktrem) 224 if (drem < pktrem)
224 pktrem = drem; 225 pktrem = drem;
225 226
226 if (memcpy_fromiovecend(pktpos, iov, offset, pktrem)) { 227 if (memcpy_fromiovecend(pktpos, m->msg_iter.iov, offset, pktrem)) {
227 rc = -EFAULT; 228 rc = -EFAULT;
228 goto error; 229 goto error;
229 } 230 }
@@ -238,43 +239,41 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
238 pktsz = drem + INT_H_SIZE; 239 pktsz = drem + INT_H_SIZE;
239 else 240 else
240 pktsz = pktmax; 241 pktsz = pktmax;
241 prev = buf; 242 skb = tipc_buf_acquire(pktsz);
242 buf = tipc_buf_acquire(pktsz); 243 if (!skb) {
243 if (!buf) {
244 rc = -ENOMEM; 244 rc = -ENOMEM;
245 goto error; 245 goto error;
246 } 246 }
247 chain_sz++; 247 __skb_queue_tail(list, skb);
248 prev->next = buf;
249 msg_set_type(&pkthdr, FRAGMENT); 248 msg_set_type(&pkthdr, FRAGMENT);
250 msg_set_size(&pkthdr, pktsz); 249 msg_set_size(&pkthdr, pktsz);
251 msg_set_fragm_no(&pkthdr, ++pktno); 250 msg_set_fragm_no(&pkthdr, ++pktno);
252 skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); 251 skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
253 pktpos = buf->data + INT_H_SIZE; 252 pktpos = skb->data + INT_H_SIZE;
254 pktrem = pktsz - INT_H_SIZE; 253 pktrem = pktsz - INT_H_SIZE;
255 254
256 } while (1); 255 } while (1);
257 TIPC_SKB_CB(*chain)->chain_sz = chain_sz; 256 msg_set_type(buf_msg(skb), LAST_FRAGMENT);
258 msg_set_type(buf_msg(buf), LAST_FRAGMENT);
259 return dsz; 257 return dsz;
260error: 258error:
261 kfree_skb_list(*chain); 259 __skb_queue_purge(list);
262 *chain = NULL; 260 __skb_queue_head_init(list);
263 return rc; 261 return rc;
264} 262}
265 263
266/** 264/**
267 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one 265 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
268 * @bbuf: the existing buffer ("bundle") 266 * @list: the buffer chain of the existing buffer ("bundle")
269 * @buf: buffer to be appended 267 * @skb: buffer to be appended
270 * @mtu: max allowable size for the bundle buffer 268 * @mtu: max allowable size for the bundle buffer
271 * Consumes buffer if successful 269 * Consumes buffer if successful
272 * Returns true if bundling could be performed, otherwise false 270 * Returns true if bundling could be performed, otherwise false
273 */ 271 */
274bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu) 272bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
275{ 273{
276 struct tipc_msg *bmsg = buf_msg(bbuf); 274 struct sk_buff *bskb = skb_peek_tail(list);
277 struct tipc_msg *msg = buf_msg(buf); 275 struct tipc_msg *bmsg = buf_msg(bskb);
276 struct tipc_msg *msg = buf_msg(skb);
278 unsigned int bsz = msg_size(bmsg); 277 unsigned int bsz = msg_size(bmsg);
279 unsigned int msz = msg_size(msg); 278 unsigned int msz = msg_size(msg);
280 u32 start = align(bsz); 279 u32 start = align(bsz);
@@ -289,35 +288,36 @@ bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu)
289 return false; 288 return false;
290 if (likely(msg_user(bmsg) != MSG_BUNDLER)) 289 if (likely(msg_user(bmsg) != MSG_BUNDLER))
291 return false; 290 return false;
292 if (likely(msg_type(bmsg) != BUNDLE_OPEN)) 291 if (likely(!TIPC_SKB_CB(bskb)->bundling))
293 return false; 292 return false;
294 if (unlikely(skb_tailroom(bbuf) < (pad + msz))) 293 if (unlikely(skb_tailroom(bskb) < (pad + msz)))
295 return false; 294 return false;
296 if (unlikely(max < (start + msz))) 295 if (unlikely(max < (start + msz)))
297 return false; 296 return false;
298 297
299 skb_put(bbuf, pad + msz); 298 skb_put(bskb, pad + msz);
300 skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz); 299 skb_copy_to_linear_data_offset(bskb, start, skb->data, msz);
301 msg_set_size(bmsg, start + msz); 300 msg_set_size(bmsg, start + msz);
302 msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1); 301 msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
303 bbuf->next = buf->next; 302 kfree_skb(skb);
304 kfree_skb(buf);
305 return true; 303 return true;
306} 304}
307 305
308/** 306/**
309 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail 307 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
310 * @buf: buffer to be appended and replaced 308 * @list: the buffer chain
311 * @mtu: max allowable size for the bundle buffer, inclusive header 309 * @skb: buffer to be appended and replaced
310 * @mtu: max allowable size for the bundle buffer, inclusive header
312 * @dnode: destination node for message. (Not always present in header) 311 * @dnode: destination node for message. (Not always present in header)
313 * Replaces buffer if successful 312 * Replaces buffer if successful
314 * Returns true if sucess, otherwise false 313 * Returns true if success, otherwise false
315 */ 314 */
316bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode) 315bool tipc_msg_make_bundle(struct sk_buff_head *list, struct sk_buff *skb,
316 u32 mtu, u32 dnode)
317{ 317{
318 struct sk_buff *bbuf; 318 struct sk_buff *bskb;
319 struct tipc_msg *bmsg; 319 struct tipc_msg *bmsg;
320 struct tipc_msg *msg = buf_msg(*buf); 320 struct tipc_msg *msg = buf_msg(skb);
321 u32 msz = msg_size(msg); 321 u32 msz = msg_size(msg);
322 u32 max = mtu - INT_H_SIZE; 322 u32 max = mtu - INT_H_SIZE;
323 323
@@ -330,20 +330,19 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
330 if (msz > (max / 2)) 330 if (msz > (max / 2))
331 return false; 331 return false;
332 332
333 bbuf = tipc_buf_acquire(max); 333 bskb = tipc_buf_acquire(max);
334 if (!bbuf) 334 if (!bskb)
335 return false; 335 return false;
336 336
337 skb_trim(bbuf, INT_H_SIZE); 337 skb_trim(bskb, INT_H_SIZE);
338 bmsg = buf_msg(bbuf); 338 bmsg = buf_msg(bskb);
339 tipc_msg_init(bmsg, MSG_BUNDLER, BUNDLE_OPEN, INT_H_SIZE, dnode); 339 tipc_msg_init(bmsg, MSG_BUNDLER, 0, INT_H_SIZE, dnode);
340 msg_set_seqno(bmsg, msg_seqno(msg)); 340 msg_set_seqno(bmsg, msg_seqno(msg));
341 msg_set_ack(bmsg, msg_ack(msg)); 341 msg_set_ack(bmsg, msg_ack(msg));
342 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); 342 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
343 bbuf->next = (*buf)->next; 343 TIPC_SKB_CB(bskb)->bundling = true;
344 tipc_msg_bundle(bbuf, *buf, mtu); 344 __skb_queue_tail(list, bskb);
345 *buf = bbuf; 345 return tipc_msg_bundle(list, skb, mtu);
346 return true;
347} 346}
348 347
349/** 348/**
@@ -429,22 +428,23 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
429/* tipc_msg_reassemble() - clone a buffer chain of fragments and 428/* tipc_msg_reassemble() - clone a buffer chain of fragments and
430 * reassemble the clones into one message 429 * reassemble the clones into one message
431 */ 430 */
432struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain) 431struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list)
433{ 432{
434 struct sk_buff *buf = chain; 433 struct sk_buff *skb;
435 struct sk_buff *frag = buf; 434 struct sk_buff *frag = NULL;
436 struct sk_buff *head = NULL; 435 struct sk_buff *head = NULL;
437 int hdr_sz; 436 int hdr_sz;
438 437
439 /* Copy header if single buffer */ 438 /* Copy header if single buffer */
440 if (!buf->next) { 439 if (skb_queue_len(list) == 1) {
441 hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf)); 440 skb = skb_peek(list);
442 return __pskb_copy(buf, hdr_sz, GFP_ATOMIC); 441 hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
442 return __pskb_copy(skb, hdr_sz, GFP_ATOMIC);
443 } 443 }
444 444
445 /* Clone all fragments and reassemble */ 445 /* Clone all fragments and reassemble */
446 while (buf) { 446 skb_queue_walk(list, skb) {
447 frag = skb_clone(buf, GFP_ATOMIC); 447 frag = skb_clone(skb, GFP_ATOMIC);
448 if (!frag) 448 if (!frag)
449 goto error; 449 goto error;
450 frag->next = NULL; 450 frag->next = NULL;
@@ -452,7 +452,6 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
452 break; 452 break;
453 if (!head) 453 if (!head)
454 goto error; 454 goto error;
455 buf = buf->next;
456 } 455 }
457 return frag; 456 return frag;
458error: 457error:
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 0ea7b695ac4d..d5c83d7ecb47 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -464,11 +464,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
464#define FRAGMENT 1 464#define FRAGMENT 1
465#define LAST_FRAGMENT 2 465#define LAST_FRAGMENT 2
466 466
467/* Bundling protocol message types
468 */
469#define BUNDLE_OPEN 0
470#define BUNDLE_CLOSED 1
471
472/* 467/*
473 * Link management protocol message types 468 * Link management protocol message types
474 */ 469 */
@@ -739,13 +734,14 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
739 734
740int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); 735int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
741 736
742bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu); 737bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu);
743 738
744bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode); 739bool tipc_msg_make_bundle(struct sk_buff_head *list, struct sk_buff *skb,
740 u32 mtu, u32 dnode);
745 741
746int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, 742int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset,
747 int offset, int dsz, int mtu , struct sk_buff **chain); 743 int dsz, int mtu, struct sk_buff_head *list);
748 744
749struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain); 745struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
750 746
751#endif 747#endif
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 376d2bb51d8d..ba6083dca95b 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -38,39 +38,6 @@
38#include "link.h" 38#include "link.h"
39#include "name_distr.h" 39#include "name_distr.h"
40 40
41/**
42 * struct publ_list - list of publications made by this node
43 * @list: circular list of publications
44 * @list_size: number of entries in list
45 */
46struct publ_list {
47 struct list_head list;
48 u32 size;
49};
50
51static struct publ_list publ_zone = {
52 .list = LIST_HEAD_INIT(publ_zone.list),
53 .size = 0,
54};
55
56static struct publ_list publ_cluster = {
57 .list = LIST_HEAD_INIT(publ_cluster.list),
58 .size = 0,
59};
60
61static struct publ_list publ_node = {
62 .list = LIST_HEAD_INIT(publ_node.list),
63 .size = 0,
64};
65
66static struct publ_list *publ_lists[] = {
67 NULL,
68 &publ_zone, /* publ_lists[TIPC_ZONE_SCOPE] */
69 &publ_cluster, /* publ_lists[TIPC_CLUSTER_SCOPE] */
70 &publ_node /* publ_lists[TIPC_NODE_SCOPE] */
71};
72
73
74int sysctl_tipc_named_timeout __read_mostly = 2000; 41int sysctl_tipc_named_timeout __read_mostly = 2000;
75 42
76/** 43/**
@@ -114,9 +81,9 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
114 return buf; 81 return buf;
115} 82}
116 83
117void named_cluster_distribute(struct sk_buff *buf) 84void named_cluster_distribute(struct sk_buff *skb)
118{ 85{
119 struct sk_buff *obuf; 86 struct sk_buff *oskb;
120 struct tipc_node *node; 87 struct tipc_node *node;
121 u32 dnode; 88 u32 dnode;
122 89
@@ -127,15 +94,15 @@ void named_cluster_distribute(struct sk_buff *buf)
127 continue; 94 continue;
128 if (!tipc_node_active_links(node)) 95 if (!tipc_node_active_links(node))
129 continue; 96 continue;
130 obuf = skb_copy(buf, GFP_ATOMIC); 97 oskb = skb_copy(skb, GFP_ATOMIC);
131 if (!obuf) 98 if (!oskb)
132 break; 99 break;
133 msg_set_destnode(buf_msg(obuf), dnode); 100 msg_set_destnode(buf_msg(oskb), dnode);
134 tipc_link_xmit(obuf, dnode, dnode); 101 tipc_link_xmit_skb(oskb, dnode, dnode);
135 } 102 }
136 rcu_read_unlock(); 103 rcu_read_unlock();
137 104
138 kfree_skb(buf); 105 kfree_skb(skb);
139} 106}
140 107
141/** 108/**
@@ -146,8 +113,8 @@ struct sk_buff *tipc_named_publish(struct publication *publ)
146 struct sk_buff *buf; 113 struct sk_buff *buf;
147 struct distr_item *item; 114 struct distr_item *item;
148 115
149 list_add_tail(&publ->local_list, &publ_lists[publ->scope]->list); 116 list_add_tail_rcu(&publ->local_list,
150 publ_lists[publ->scope]->size++; 117 &tipc_nametbl->publ_list[publ->scope]);
151 118
152 if (publ->scope == TIPC_NODE_SCOPE) 119 if (publ->scope == TIPC_NODE_SCOPE)
153 return NULL; 120 return NULL;
@@ -172,7 +139,6 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
172 struct distr_item *item; 139 struct distr_item *item;
173 140
174 list_del(&publ->local_list); 141 list_del(&publ->local_list);
175 publ_lists[publ->scope]->size--;
176 142
177 if (publ->scope == TIPC_NODE_SCOPE) 143 if (publ->scope == TIPC_NODE_SCOPE)
178 return NULL; 144 return NULL;
@@ -190,32 +156,28 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
190 156
191/** 157/**
192 * named_distribute - prepare name info for bulk distribution to another node 158 * named_distribute - prepare name info for bulk distribution to another node
193 * @msg_list: list of messages (buffers) to be returned from this function 159 * @list: list of messages (buffers) to be returned from this function
194 * @dnode: node to be updated 160 * @dnode: node to be updated
195 * @pls: linked list of publication items to be packed into buffer chain 161 * @pls: linked list of publication items to be packed into buffer chain
196 */ 162 */
197static void named_distribute(struct list_head *msg_list, u32 dnode, 163static void named_distribute(struct sk_buff_head *list, u32 dnode,
198 struct publ_list *pls) 164 struct list_head *pls)
199{ 165{
200 struct publication *publ; 166 struct publication *publ;
201 struct sk_buff *buf = NULL; 167 struct sk_buff *skb = NULL;
202 struct distr_item *item = NULL; 168 struct distr_item *item = NULL;
203 uint dsz = pls->size * ITEM_SIZE;
204 uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE; 169 uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
205 uint rem = dsz; 170 uint msg_rem = msg_dsz;
206 uint msg_rem = 0;
207 171
208 list_for_each_entry(publ, &pls->list, local_list) { 172 list_for_each_entry(publ, pls, local_list) {
209 /* Prepare next buffer: */ 173 /* Prepare next buffer: */
210 if (!buf) { 174 if (!skb) {
211 msg_rem = min_t(uint, rem, msg_dsz); 175 skb = named_prepare_buf(PUBLICATION, msg_rem, dnode);
212 rem -= msg_rem; 176 if (!skb) {
213 buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
214 if (!buf) {
215 pr_warn("Bulk publication failure\n"); 177 pr_warn("Bulk publication failure\n");
216 return; 178 return;
217 } 179 }
218 item = (struct distr_item *)msg_data(buf_msg(buf)); 180 item = (struct distr_item *)msg_data(buf_msg(skb));
219 } 181 }
220 182
221 /* Pack publication into message: */ 183 /* Pack publication into message: */
@@ -225,10 +187,16 @@ static void named_distribute(struct list_head *msg_list, u32 dnode,
225 187
226 /* Append full buffer to list: */ 188 /* Append full buffer to list: */
227 if (!msg_rem) { 189 if (!msg_rem) {
228 list_add_tail((struct list_head *)buf, msg_list); 190 __skb_queue_tail(list, skb);
229 buf = NULL; 191 skb = NULL;
192 msg_rem = msg_dsz;
230 } 193 }
231 } 194 }
195 if (skb) {
196 msg_set_size(buf_msg(skb), INT_H_SIZE + (msg_dsz - msg_rem));
197 skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem));
198 __skb_queue_tail(list, skb);
199 }
232} 200}
233 201
234/** 202/**
@@ -236,36 +204,68 @@ static void named_distribute(struct list_head *msg_list, u32 dnode,
236 */ 204 */
237void tipc_named_node_up(u32 dnode) 205void tipc_named_node_up(u32 dnode)
238{ 206{
239 LIST_HEAD(msg_list); 207 struct sk_buff_head head;
240 struct sk_buff *buf_chain; 208
241 209 __skb_queue_head_init(&head);
242 read_lock_bh(&tipc_nametbl_lock); 210
243 named_distribute(&msg_list, dnode, &publ_cluster); 211 rcu_read_lock();
244 named_distribute(&msg_list, dnode, &publ_zone); 212 named_distribute(&head, dnode,
245 read_unlock_bh(&tipc_nametbl_lock); 213 &tipc_nametbl->publ_list[TIPC_CLUSTER_SCOPE]);
246 214 named_distribute(&head, dnode,
247 /* Convert circular list to linear list and send: */ 215 &tipc_nametbl->publ_list[TIPC_ZONE_SCOPE]);
248 buf_chain = (struct sk_buff *)msg_list.next; 216 rcu_read_unlock();
249 ((struct sk_buff *)msg_list.prev)->next = NULL; 217
250 tipc_link_xmit(buf_chain, dnode, dnode); 218 tipc_link_xmit(&head, dnode, dnode);
219}
220
221static void tipc_publ_subscribe(struct publication *publ, u32 addr)
222{
223 struct tipc_node *node;
224
225 if (in_own_node(addr))
226 return;
227
228 node = tipc_node_find(addr);
229 if (!node) {
230 pr_warn("Node subscription rejected, unknown node 0x%x\n",
231 addr);
232 return;
233 }
234
235 tipc_node_lock(node);
236 list_add_tail(&publ->nodesub_list, &node->publ_list);
237 tipc_node_unlock(node);
238}
239
240static void tipc_publ_unsubscribe(struct publication *publ, u32 addr)
241{
242 struct tipc_node *node;
243
244 node = tipc_node_find(addr);
245 if (!node)
246 return;
247
248 tipc_node_lock(node);
249 list_del_init(&publ->nodesub_list);
250 tipc_node_unlock(node);
251} 251}
252 252
253/** 253/**
254 * named_purge_publ - remove publication associated with a failed node 254 * tipc_publ_purge - remove publication associated with a failed node
255 * 255 *
256 * Invoked for each publication issued by a newly failed node. 256 * Invoked for each publication issued by a newly failed node.
257 * Removes publication structure from name table & deletes it. 257 * Removes publication structure from name table & deletes it.
258 */ 258 */
259static void named_purge_publ(struct publication *publ) 259static void tipc_publ_purge(struct publication *publ, u32 addr)
260{ 260{
261 struct publication *p; 261 struct publication *p;
262 262
263 write_lock_bh(&tipc_nametbl_lock); 263 spin_lock_bh(&tipc_nametbl_lock);
264 p = tipc_nametbl_remove_publ(publ->type, publ->lower, 264 p = tipc_nametbl_remove_publ(publ->type, publ->lower,
265 publ->node, publ->ref, publ->key); 265 publ->node, publ->ref, publ->key);
266 if (p) 266 if (p)
267 tipc_nodesub_unsubscribe(&p->subscr); 267 tipc_publ_unsubscribe(p, addr);
268 write_unlock_bh(&tipc_nametbl_lock); 268 spin_unlock_bh(&tipc_nametbl_lock);
269 269
270 if (p != publ) { 270 if (p != publ) {
271 pr_err("Unable to remove publication from failed node\n" 271 pr_err("Unable to remove publication from failed node\n"
@@ -274,7 +274,15 @@ static void named_purge_publ(struct publication *publ)
274 publ->key); 274 publ->key);
275 } 275 }
276 276
277 kfree(p); 277 kfree_rcu(p, rcu);
278}
279
280void tipc_publ_notify(struct list_head *nsub_list, u32 addr)
281{
282 struct publication *publ, *tmp;
283
284 list_for_each_entry_safe(publ, tmp, nsub_list, nodesub_list)
285 tipc_publ_purge(publ, addr);
278} 286}
279 287
280/** 288/**
@@ -294,9 +302,7 @@ static bool tipc_update_nametbl(struct distr_item *i, u32 node, u32 dtype)
294 TIPC_CLUSTER_SCOPE, node, 302 TIPC_CLUSTER_SCOPE, node,
295 ntohl(i->ref), ntohl(i->key)); 303 ntohl(i->ref), ntohl(i->key));
296 if (publ) { 304 if (publ) {
297 tipc_nodesub_subscribe(&publ->subscr, node, publ, 305 tipc_publ_subscribe(publ, node);
298 (net_ev_handler)
299 named_purge_publ);
300 return true; 306 return true;
301 } 307 }
302 } else if (dtype == WITHDRAWAL) { 308 } else if (dtype == WITHDRAWAL) {
@@ -304,8 +310,8 @@ static bool tipc_update_nametbl(struct distr_item *i, u32 node, u32 dtype)
304 node, ntohl(i->ref), 310 node, ntohl(i->ref),
305 ntohl(i->key)); 311 ntohl(i->key));
306 if (publ) { 312 if (publ) {
307 tipc_nodesub_unsubscribe(&publ->subscr); 313 tipc_publ_unsubscribe(publ, node);
308 kfree(publ); 314 kfree_rcu(publ, rcu);
309 return true; 315 return true;
310 } 316 }
311 } else { 317 } else {
@@ -370,14 +376,14 @@ void tipc_named_rcv(struct sk_buff *buf)
370 u32 count = msg_data_sz(msg) / ITEM_SIZE; 376 u32 count = msg_data_sz(msg) / ITEM_SIZE;
371 u32 node = msg_orignode(msg); 377 u32 node = msg_orignode(msg);
372 378
373 write_lock_bh(&tipc_nametbl_lock); 379 spin_lock_bh(&tipc_nametbl_lock);
374 while (count--) { 380 while (count--) {
375 if (!tipc_update_nametbl(item, node, msg_type(msg))) 381 if (!tipc_update_nametbl(item, node, msg_type(msg)))
376 tipc_named_add_backlog(item, msg_type(msg), node); 382 tipc_named_add_backlog(item, msg_type(msg), node);
377 item++; 383 item++;
378 } 384 }
379 tipc_named_process_backlog(); 385 tipc_named_process_backlog();
380 write_unlock_bh(&tipc_nametbl_lock); 386 spin_unlock_bh(&tipc_nametbl_lock);
381 kfree_skb(buf); 387 kfree_skb(buf);
382} 388}
383 389
@@ -393,11 +399,12 @@ void tipc_named_reinit(void)
393 struct publication *publ; 399 struct publication *publ;
394 int scope; 400 int scope;
395 401
396 write_lock_bh(&tipc_nametbl_lock); 402 spin_lock_bh(&tipc_nametbl_lock);
397 403
398 for (scope = TIPC_ZONE_SCOPE; scope <= TIPC_NODE_SCOPE; scope++) 404 for (scope = TIPC_ZONE_SCOPE; scope <= TIPC_NODE_SCOPE; scope++)
399 list_for_each_entry(publ, &publ_lists[scope]->list, local_list) 405 list_for_each_entry_rcu(publ, &tipc_nametbl->publ_list[scope],
406 local_list)
400 publ->node = tipc_own_addr; 407 publ->node = tipc_own_addr;
401 408
402 write_unlock_bh(&tipc_nametbl_lock); 409 spin_unlock_bh(&tipc_nametbl_lock);
403} 410}
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index b9e75feb3434..cef55cedcfb2 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -74,5 +74,6 @@ void tipc_named_node_up(u32 dnode);
74void tipc_named_rcv(struct sk_buff *buf); 74void tipc_named_rcv(struct sk_buff *buf);
75void tipc_named_reinit(void); 75void tipc_named_reinit(void);
76void tipc_named_process_backlog(void); 76void tipc_named_process_backlog(void);
77void tipc_publ_notify(struct list_head *nsub_list, u32 addr);
77 78
78#endif 79#endif
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 3a6a0a7c0759..c8df0223371a 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -1,8 +1,8 @@
1/* 1/*
2 * net/tipc/name_table.c: TIPC name table code 2 * net/tipc/name_table.c: TIPC name table code
3 * 3 *
4 * Copyright (c) 2000-2006, Ericsson AB 4 * Copyright (c) 2000-2006, 2014, Ericsson AB
5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems 5 * Copyright (c) 2004-2008, 2010-2014, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -42,6 +42,12 @@
42 42
43#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ 43#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */
44 44
45static const struct nla_policy
46tipc_nl_name_table_policy[TIPC_NLA_NAME_TABLE_MAX + 1] = {
47 [TIPC_NLA_NAME_TABLE_UNSPEC] = { .type = NLA_UNSPEC },
48 [TIPC_NLA_NAME_TABLE_PUBL] = { .type = NLA_NESTED }
49};
50
45/** 51/**
46 * struct name_info - name sequence publication info 52 * struct name_info - name sequence publication info
47 * @node_list: circular list of publications made by own node 53 * @node_list: circular list of publications made by own node
@@ -86,6 +92,7 @@ struct sub_seq {
86 * @ns_list: links to adjacent name sequences in hash chain 92 * @ns_list: links to adjacent name sequences in hash chain
87 * @subscriptions: list of subscriptions for this 'type' 93 * @subscriptions: list of subscriptions for this 'type'
88 * @lock: spinlock controlling access to publication lists of all sub-sequences 94 * @lock: spinlock controlling access to publication lists of all sub-sequences
95 * @rcu: RCU callback head used for deferred freeing
89 */ 96 */
90struct name_seq { 97struct name_seq {
91 u32 type; 98 u32 type;
@@ -95,21 +102,11 @@ struct name_seq {
95 struct hlist_node ns_list; 102 struct hlist_node ns_list;
96 struct list_head subscriptions; 103 struct list_head subscriptions;
97 spinlock_t lock; 104 spinlock_t lock;
105 struct rcu_head rcu;
98}; 106};
99 107
100/** 108struct name_table *tipc_nametbl;
101 * struct name_table - table containing all existing port name publications 109DEFINE_SPINLOCK(tipc_nametbl_lock);
102 * @types: pointer to fixed-sized array of name sequence lists,
103 * accessed via hashing on 'type'; name sequence lists are *not* sorted
104 * @local_publ_count: number of publications issued by this node
105 */
106struct name_table {
107 struct hlist_head *types;
108 u32 local_publ_count;
109};
110
111static struct name_table table;
112DEFINE_RWLOCK(tipc_nametbl_lock);
113 110
114static int hash(int x) 111static int hash(int x)
115{ 112{
@@ -136,9 +133,7 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper,
136 publ->node = node; 133 publ->node = node;
137 publ->ref = port_ref; 134 publ->ref = port_ref;
138 publ->key = key; 135 publ->key = key;
139 INIT_LIST_HEAD(&publ->local_list);
140 INIT_LIST_HEAD(&publ->pport_list); 136 INIT_LIST_HEAD(&publ->pport_list);
141 INIT_LIST_HEAD(&publ->subscr.nodesub_list);
142 return publ; 137 return publ;
143} 138}
144 139
@@ -173,22 +168,10 @@ static struct name_seq *tipc_nameseq_create(u32 type, struct hlist_head *seq_hea
173 nseq->alloc = 1; 168 nseq->alloc = 1;
174 INIT_HLIST_NODE(&nseq->ns_list); 169 INIT_HLIST_NODE(&nseq->ns_list);
175 INIT_LIST_HEAD(&nseq->subscriptions); 170 INIT_LIST_HEAD(&nseq->subscriptions);
176 hlist_add_head(&nseq->ns_list, seq_head); 171 hlist_add_head_rcu(&nseq->ns_list, seq_head);
177 return nseq; 172 return nseq;
178} 173}
179 174
180/*
181 * nameseq_delete_empty - deletes a name sequence structure if now unused
182 */
183static void nameseq_delete_empty(struct name_seq *seq)
184{
185 if (!seq->first_free && list_empty(&seq->subscriptions)) {
186 hlist_del_init(&seq->ns_list);
187 kfree(seq->sseqs);
188 kfree(seq);
189 }
190}
191
192/** 175/**
193 * nameseq_find_subseq - find sub-sequence (if any) matching a name instance 176 * nameseq_find_subseq - find sub-sequence (if any) matching a name instance
194 * 177 *
@@ -469,8 +452,8 @@ static struct name_seq *nametbl_find_seq(u32 type)
469 struct hlist_head *seq_head; 452 struct hlist_head *seq_head;
470 struct name_seq *ns; 453 struct name_seq *ns;
471 454
472 seq_head = &table.types[hash(type)]; 455 seq_head = &tipc_nametbl->seq_hlist[hash(type)];
473 hlist_for_each_entry(ns, seq_head, ns_list) { 456 hlist_for_each_entry_rcu(ns, seq_head, ns_list) {
474 if (ns->type == type) 457 if (ns->type == type)
475 return ns; 458 return ns;
476 } 459 }
@@ -481,7 +464,9 @@ static struct name_seq *nametbl_find_seq(u32 type)
481struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper, 464struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper,
482 u32 scope, u32 node, u32 port, u32 key) 465 u32 scope, u32 node, u32 port, u32 key)
483{ 466{
467 struct publication *publ;
484 struct name_seq *seq = nametbl_find_seq(type); 468 struct name_seq *seq = nametbl_find_seq(type);
469 int index = hash(type);
485 470
486 if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE) || 471 if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE) ||
487 (lower > upper)) { 472 (lower > upper)) {
@@ -491,12 +476,16 @@ struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper,
491 } 476 }
492 477
493 if (!seq) 478 if (!seq)
494 seq = tipc_nameseq_create(type, &table.types[hash(type)]); 479 seq = tipc_nameseq_create(type,
480 &tipc_nametbl->seq_hlist[index]);
495 if (!seq) 481 if (!seq)
496 return NULL; 482 return NULL;
497 483
498 return tipc_nameseq_insert_publ(seq, type, lower, upper, 484 spin_lock_bh(&seq->lock);
485 publ = tipc_nameseq_insert_publ(seq, type, lower, upper,
499 scope, node, port, key); 486 scope, node, port, key);
487 spin_unlock_bh(&seq->lock);
488 return publ;
500} 489}
501 490
502struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower, 491struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower,
@@ -508,8 +497,16 @@ struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower,
508 if (!seq) 497 if (!seq)
509 return NULL; 498 return NULL;
510 499
500 spin_lock_bh(&seq->lock);
511 publ = tipc_nameseq_remove_publ(seq, lower, node, ref, key); 501 publ = tipc_nameseq_remove_publ(seq, lower, node, ref, key);
512 nameseq_delete_empty(seq); 502 if (!seq->first_free && list_empty(&seq->subscriptions)) {
503 hlist_del_init_rcu(&seq->ns_list);
504 kfree(seq->sseqs);
505 spin_unlock_bh(&seq->lock);
506 kfree_rcu(seq, rcu);
507 return publ;
508 }
509 spin_unlock_bh(&seq->lock);
513 return publ; 510 return publ;
514} 511}
515 512
@@ -538,14 +535,14 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
538 if (!tipc_in_scope(*destnode, tipc_own_addr)) 535 if (!tipc_in_scope(*destnode, tipc_own_addr))
539 return 0; 536 return 0;
540 537
541 read_lock_bh(&tipc_nametbl_lock); 538 rcu_read_lock();
542 seq = nametbl_find_seq(type); 539 seq = nametbl_find_seq(type);
543 if (unlikely(!seq)) 540 if (unlikely(!seq))
544 goto not_found; 541 goto not_found;
542 spin_lock_bh(&seq->lock);
545 sseq = nameseq_find_subseq(seq, instance); 543 sseq = nameseq_find_subseq(seq, instance);
546 if (unlikely(!sseq)) 544 if (unlikely(!sseq))
547 goto not_found; 545 goto no_match;
548 spin_lock_bh(&seq->lock);
549 info = sseq->info; 546 info = sseq->info;
550 547
551 /* Closest-First Algorithm */ 548 /* Closest-First Algorithm */
@@ -595,7 +592,7 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
595no_match: 592no_match:
596 spin_unlock_bh(&seq->lock); 593 spin_unlock_bh(&seq->lock);
597not_found: 594not_found:
598 read_unlock_bh(&tipc_nametbl_lock); 595 rcu_read_unlock();
599 *destnode = node; 596 *destnode = node;
600 return ref; 597 return ref;
601} 598}
@@ -621,13 +618,12 @@ int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
621 struct name_info *info; 618 struct name_info *info;
622 int res = 0; 619 int res = 0;
623 620
624 read_lock_bh(&tipc_nametbl_lock); 621 rcu_read_lock();
625 seq = nametbl_find_seq(type); 622 seq = nametbl_find_seq(type);
626 if (!seq) 623 if (!seq)
627 goto exit; 624 goto exit;
628 625
629 spin_lock_bh(&seq->lock); 626 spin_lock_bh(&seq->lock);
630
631 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); 627 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
632 sseq_stop = seq->sseqs + seq->first_free; 628 sseq_stop = seq->sseqs + seq->first_free;
633 for (; sseq != sseq_stop; sseq++) { 629 for (; sseq != sseq_stop; sseq++) {
@@ -645,10 +641,9 @@ int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
645 if (info->cluster_list_size != info->node_list_size) 641 if (info->cluster_list_size != info->node_list_size)
646 res = 1; 642 res = 1;
647 } 643 }
648
649 spin_unlock_bh(&seq->lock); 644 spin_unlock_bh(&seq->lock);
650exit: 645exit:
651 read_unlock_bh(&tipc_nametbl_lock); 646 rcu_read_unlock();
652 return res; 647 return res;
653} 648}
654 649
@@ -661,22 +656,23 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
661 struct publication *publ; 656 struct publication *publ;
662 struct sk_buff *buf = NULL; 657 struct sk_buff *buf = NULL;
663 658
664 if (table.local_publ_count >= TIPC_MAX_PUBLICATIONS) { 659 spin_lock_bh(&tipc_nametbl_lock);
660 if (tipc_nametbl->local_publ_count >= TIPC_MAX_PUBLICATIONS) {
665 pr_warn("Publication failed, local publication limit reached (%u)\n", 661 pr_warn("Publication failed, local publication limit reached (%u)\n",
666 TIPC_MAX_PUBLICATIONS); 662 TIPC_MAX_PUBLICATIONS);
663 spin_unlock_bh(&tipc_nametbl_lock);
667 return NULL; 664 return NULL;
668 } 665 }
669 666
670 write_lock_bh(&tipc_nametbl_lock);
671 publ = tipc_nametbl_insert_publ(type, lower, upper, scope, 667 publ = tipc_nametbl_insert_publ(type, lower, upper, scope,
672 tipc_own_addr, port_ref, key); 668 tipc_own_addr, port_ref, key);
673 if (likely(publ)) { 669 if (likely(publ)) {
674 table.local_publ_count++; 670 tipc_nametbl->local_publ_count++;
675 buf = tipc_named_publish(publ); 671 buf = tipc_named_publish(publ);
676 /* Any pending external events? */ 672 /* Any pending external events? */
677 tipc_named_process_backlog(); 673 tipc_named_process_backlog();
678 } 674 }
679 write_unlock_bh(&tipc_nametbl_lock); 675 spin_unlock_bh(&tipc_nametbl_lock);
680 676
681 if (buf) 677 if (buf)
682 named_cluster_distribute(buf); 678 named_cluster_distribute(buf);
@@ -689,27 +685,28 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
689int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key) 685int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
690{ 686{
691 struct publication *publ; 687 struct publication *publ;
692 struct sk_buff *buf; 688 struct sk_buff *skb = NULL;
693 689
694 write_lock_bh(&tipc_nametbl_lock); 690 spin_lock_bh(&tipc_nametbl_lock);
695 publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key); 691 publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key);
696 if (likely(publ)) { 692 if (likely(publ)) {
697 table.local_publ_count--; 693 tipc_nametbl->local_publ_count--;
698 buf = tipc_named_withdraw(publ); 694 skb = tipc_named_withdraw(publ);
699 /* Any pending external events? */ 695 /* Any pending external events? */
700 tipc_named_process_backlog(); 696 tipc_named_process_backlog();
701 write_unlock_bh(&tipc_nametbl_lock);
702 list_del_init(&publ->pport_list); 697 list_del_init(&publ->pport_list);
703 kfree(publ); 698 kfree_rcu(publ, rcu);
699 } else {
700 pr_err("Unable to remove local publication\n"
701 "(type=%u, lower=%u, ref=%u, key=%u)\n",
702 type, lower, ref, key);
703 }
704 spin_unlock_bh(&tipc_nametbl_lock);
704 705
705 if (buf) 706 if (skb) {
706 named_cluster_distribute(buf); 707 named_cluster_distribute(skb);
707 return 1; 708 return 1;
708 } 709 }
709 write_unlock_bh(&tipc_nametbl_lock);
710 pr_err("Unable to remove local publication\n"
711 "(type=%u, lower=%u, ref=%u, key=%u)\n",
712 type, lower, ref, key);
713 return 0; 710 return 0;
714} 711}
715 712
@@ -719,12 +716,14 @@ int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
719void tipc_nametbl_subscribe(struct tipc_subscription *s) 716void tipc_nametbl_subscribe(struct tipc_subscription *s)
720{ 717{
721 u32 type = s->seq.type; 718 u32 type = s->seq.type;
719 int index = hash(type);
722 struct name_seq *seq; 720 struct name_seq *seq;
723 721
724 write_lock_bh(&tipc_nametbl_lock); 722 spin_lock_bh(&tipc_nametbl_lock);
725 seq = nametbl_find_seq(type); 723 seq = nametbl_find_seq(type);
726 if (!seq) 724 if (!seq)
727 seq = tipc_nameseq_create(type, &table.types[hash(type)]); 725 seq = tipc_nameseq_create(type,
726 &tipc_nametbl->seq_hlist[index]);
728 if (seq) { 727 if (seq) {
729 spin_lock_bh(&seq->lock); 728 spin_lock_bh(&seq->lock);
730 tipc_nameseq_subscribe(seq, s); 729 tipc_nameseq_subscribe(seq, s);
@@ -733,7 +732,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s)
733 pr_warn("Failed to create subscription for {%u,%u,%u}\n", 732 pr_warn("Failed to create subscription for {%u,%u,%u}\n",
734 s->seq.type, s->seq.lower, s->seq.upper); 733 s->seq.type, s->seq.lower, s->seq.upper);
735 } 734 }
736 write_unlock_bh(&tipc_nametbl_lock); 735 spin_unlock_bh(&tipc_nametbl_lock);
737} 736}
738 737
739/** 738/**
@@ -743,18 +742,23 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s)
743{ 742{
744 struct name_seq *seq; 743 struct name_seq *seq;
745 744
746 write_lock_bh(&tipc_nametbl_lock); 745 spin_lock_bh(&tipc_nametbl_lock);
747 seq = nametbl_find_seq(s->seq.type); 746 seq = nametbl_find_seq(s->seq.type);
748 if (seq != NULL) { 747 if (seq != NULL) {
749 spin_lock_bh(&seq->lock); 748 spin_lock_bh(&seq->lock);
750 list_del_init(&s->nameseq_list); 749 list_del_init(&s->nameseq_list);
751 spin_unlock_bh(&seq->lock); 750 if (!seq->first_free && list_empty(&seq->subscriptions)) {
752 nameseq_delete_empty(seq); 751 hlist_del_init_rcu(&seq->ns_list);
752 kfree(seq->sseqs);
753 spin_unlock_bh(&seq->lock);
754 kfree_rcu(seq, rcu);
755 } else {
756 spin_unlock_bh(&seq->lock);
757 }
753 } 758 }
754 write_unlock_bh(&tipc_nametbl_lock); 759 spin_unlock_bh(&tipc_nametbl_lock);
755} 760}
756 761
757
758/** 762/**
759 * subseq_list - print specified sub-sequence contents into the given buffer 763 * subseq_list - print specified sub-sequence contents into the given buffer
760 */ 764 */
@@ -876,8 +880,8 @@ static int nametbl_list(char *buf, int len, u32 depth_info,
876 lowbound = 0; 880 lowbound = 0;
877 upbound = ~0; 881 upbound = ~0;
878 for (i = 0; i < TIPC_NAMETBL_SIZE; i++) { 882 for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {
879 seq_head = &table.types[i]; 883 seq_head = &tipc_nametbl->seq_hlist[i];
880 hlist_for_each_entry(seq, seq_head, ns_list) { 884 hlist_for_each_entry_rcu(seq, seq_head, ns_list) {
881 ret += nameseq_list(seq, buf + ret, len - ret, 885 ret += nameseq_list(seq, buf + ret, len - ret,
882 depth, seq->type, 886 depth, seq->type,
883 lowbound, upbound, i); 887 lowbound, upbound, i);
@@ -892,8 +896,8 @@ static int nametbl_list(char *buf, int len, u32 depth_info,
892 } 896 }
893 ret += nametbl_header(buf + ret, len - ret, depth); 897 ret += nametbl_header(buf + ret, len - ret, depth);
894 i = hash(type); 898 i = hash(type);
895 seq_head = &table.types[i]; 899 seq_head = &tipc_nametbl->seq_hlist[i];
896 hlist_for_each_entry(seq, seq_head, ns_list) { 900 hlist_for_each_entry_rcu(seq, seq_head, ns_list) {
897 if (seq->type == type) { 901 if (seq->type == type) {
898 ret += nameseq_list(seq, buf + ret, len - ret, 902 ret += nameseq_list(seq, buf + ret, len - ret,
899 depth, type, 903 depth, type,
@@ -925,11 +929,11 @@ struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space)
925 pb = TLV_DATA(rep_tlv); 929 pb = TLV_DATA(rep_tlv);
926 pb_len = ULTRA_STRING_MAX_LEN; 930 pb_len = ULTRA_STRING_MAX_LEN;
927 argv = (struct tipc_name_table_query *)TLV_DATA(req_tlv_area); 931 argv = (struct tipc_name_table_query *)TLV_DATA(req_tlv_area);
928 read_lock_bh(&tipc_nametbl_lock); 932 rcu_read_lock();
929 str_len = nametbl_list(pb, pb_len, ntohl(argv->depth), 933 str_len = nametbl_list(pb, pb_len, ntohl(argv->depth),
930 ntohl(argv->type), 934 ntohl(argv->type),
931 ntohl(argv->lowbound), ntohl(argv->upbound)); 935 ntohl(argv->lowbound), ntohl(argv->upbound));
932 read_unlock_bh(&tipc_nametbl_lock); 936 rcu_read_unlock();
933 str_len += 1; /* for "\0" */ 937 str_len += 1; /* for "\0" */
934 skb_put(buf, TLV_SPACE(str_len)); 938 skb_put(buf, TLV_SPACE(str_len));
935 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 939 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
@@ -939,12 +943,18 @@ struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space)
939 943
940int tipc_nametbl_init(void) 944int tipc_nametbl_init(void)
941{ 945{
942 table.types = kcalloc(TIPC_NAMETBL_SIZE, sizeof(struct hlist_head), 946 int i;
943 GFP_ATOMIC); 947
944 if (!table.types) 948 tipc_nametbl = kzalloc(sizeof(*tipc_nametbl), GFP_ATOMIC);
949 if (!tipc_nametbl)
945 return -ENOMEM; 950 return -ENOMEM;
946 951
947 table.local_publ_count = 0; 952 for (i = 0; i < TIPC_NAMETBL_SIZE; i++)
953 INIT_HLIST_HEAD(&tipc_nametbl->seq_hlist[i]);
954
955 INIT_LIST_HEAD(&tipc_nametbl->publ_list[TIPC_ZONE_SCOPE]);
956 INIT_LIST_HEAD(&tipc_nametbl->publ_list[TIPC_CLUSTER_SCOPE]);
957 INIT_LIST_HEAD(&tipc_nametbl->publ_list[TIPC_NODE_SCOPE]);
948 return 0; 958 return 0;
949} 959}
950 960
@@ -959,17 +969,19 @@ static void tipc_purge_publications(struct name_seq *seq)
959 struct sub_seq *sseq; 969 struct sub_seq *sseq;
960 struct name_info *info; 970 struct name_info *info;
961 971
962 if (!seq->sseqs) { 972 spin_lock_bh(&seq->lock);
963 nameseq_delete_empty(seq);
964 return;
965 }
966 sseq = seq->sseqs; 973 sseq = seq->sseqs;
967 info = sseq->info; 974 info = sseq->info;
968 list_for_each_entry_safe(publ, safe, &info->zone_list, zone_list) { 975 list_for_each_entry_safe(publ, safe, &info->zone_list, zone_list) {
969 tipc_nametbl_remove_publ(publ->type, publ->lower, publ->node, 976 tipc_nametbl_remove_publ(publ->type, publ->lower, publ->node,
970 publ->ref, publ->key); 977 publ->ref, publ->key);
971 kfree(publ); 978 kfree_rcu(publ, rcu);
972 } 979 }
980 hlist_del_init_rcu(&seq->ns_list);
981 kfree(seq->sseqs);
982 spin_unlock_bh(&seq->lock);
983
984 kfree_rcu(seq, rcu);
973} 985}
974 986
975void tipc_nametbl_stop(void) 987void tipc_nametbl_stop(void)
@@ -977,21 +989,202 @@ void tipc_nametbl_stop(void)
977 u32 i; 989 u32 i;
978 struct name_seq *seq; 990 struct name_seq *seq;
979 struct hlist_head *seq_head; 991 struct hlist_head *seq_head;
980 struct hlist_node *safe;
981 992
982 /* Verify name table is empty and purge any lingering 993 /* Verify name table is empty and purge any lingering
983 * publications, then release the name table 994 * publications, then release the name table
984 */ 995 */
985 write_lock_bh(&tipc_nametbl_lock); 996 spin_lock_bh(&tipc_nametbl_lock);
986 for (i = 0; i < TIPC_NAMETBL_SIZE; i++) { 997 for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {
987 if (hlist_empty(&table.types[i])) 998 if (hlist_empty(&tipc_nametbl->seq_hlist[i]))
988 continue; 999 continue;
989 seq_head = &table.types[i]; 1000 seq_head = &tipc_nametbl->seq_hlist[i];
990 hlist_for_each_entry_safe(seq, safe, seq_head, ns_list) { 1001 hlist_for_each_entry_rcu(seq, seq_head, ns_list) {
991 tipc_purge_publications(seq); 1002 tipc_purge_publications(seq);
992 } 1003 }
993 } 1004 }
994 kfree(table.types); 1005 spin_unlock_bh(&tipc_nametbl_lock);
995 table.types = NULL; 1006
996 write_unlock_bh(&tipc_nametbl_lock); 1007 synchronize_net();
1008 kfree(tipc_nametbl);
1009
1010}
1011
1012static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
1013 struct name_seq *seq,
1014 struct sub_seq *sseq, u32 *last_publ)
1015{
1016 void *hdr;
1017 struct nlattr *attrs;
1018 struct nlattr *publ;
1019 struct publication *p;
1020
1021 if (*last_publ) {
1022 list_for_each_entry(p, &sseq->info->zone_list, zone_list)
1023 if (p->key == *last_publ)
1024 break;
1025 if (p->key != *last_publ)
1026 return -EPIPE;
1027 } else {
1028 p = list_first_entry(&sseq->info->zone_list, struct publication,
1029 zone_list);
1030 }
1031
1032 list_for_each_entry_from(p, &sseq->info->zone_list, zone_list) {
1033 *last_publ = p->key;
1034
1035 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq,
1036 &tipc_genl_v2_family, NLM_F_MULTI,
1037 TIPC_NL_NAME_TABLE_GET);
1038 if (!hdr)
1039 return -EMSGSIZE;
1040
1041 attrs = nla_nest_start(msg->skb, TIPC_NLA_NAME_TABLE);
1042 if (!attrs)
1043 goto msg_full;
1044
1045 publ = nla_nest_start(msg->skb, TIPC_NLA_NAME_TABLE_PUBL);
1046 if (!publ)
1047 goto attr_msg_full;
1048
1049 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_TYPE, seq->type))
1050 goto publ_msg_full;
1051 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_LOWER, sseq->lower))
1052 goto publ_msg_full;
1053 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_UPPER, sseq->upper))
1054 goto publ_msg_full;
1055 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_SCOPE, p->scope))
1056 goto publ_msg_full;
1057 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_NODE, p->node))
1058 goto publ_msg_full;
1059 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_REF, p->ref))
1060 goto publ_msg_full;
1061 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_KEY, p->key))
1062 goto publ_msg_full;
1063
1064 nla_nest_end(msg->skb, publ);
1065 nla_nest_end(msg->skb, attrs);
1066 genlmsg_end(msg->skb, hdr);
1067 }
1068 *last_publ = 0;
1069
1070 return 0;
1071
1072publ_msg_full:
1073 nla_nest_cancel(msg->skb, publ);
1074attr_msg_full:
1075 nla_nest_cancel(msg->skb, attrs);
1076msg_full:
1077 genlmsg_cancel(msg->skb, hdr);
1078
1079 return -EMSGSIZE;
1080}
1081
1082static int __tipc_nl_subseq_list(struct tipc_nl_msg *msg, struct name_seq *seq,
1083 u32 *last_lower, u32 *last_publ)
1084{
1085 struct sub_seq *sseq;
1086 struct sub_seq *sseq_start;
1087 int err;
1088
1089 if (*last_lower) {
1090 sseq_start = nameseq_find_subseq(seq, *last_lower);
1091 if (!sseq_start)
1092 return -EPIPE;
1093 } else {
1094 sseq_start = seq->sseqs;
1095 }
1096
1097 for (sseq = sseq_start; sseq != &seq->sseqs[seq->first_free]; sseq++) {
1098 err = __tipc_nl_add_nametable_publ(msg, seq, sseq, last_publ);
1099 if (err) {
1100 *last_lower = sseq->lower;
1101 return err;
1102 }
1103 }
1104 *last_lower = 0;
1105
1106 return 0;
1107}
1108
1109static int __tipc_nl_seq_list(struct tipc_nl_msg *msg, u32 *last_type,
1110 u32 *last_lower, u32 *last_publ)
1111{
1112 struct hlist_head *seq_head;
1113 struct name_seq *seq = NULL;
1114 int err;
1115 int i;
1116
1117 if (*last_type)
1118 i = hash(*last_type);
1119 else
1120 i = 0;
1121
1122 for (; i < TIPC_NAMETBL_SIZE; i++) {
1123 seq_head = &tipc_nametbl->seq_hlist[i];
1124
1125 if (*last_type) {
1126 seq = nametbl_find_seq(*last_type);
1127 if (!seq)
1128 return -EPIPE;
1129 } else {
1130 hlist_for_each_entry_rcu(seq, seq_head, ns_list)
1131 break;
1132 if (!seq)
1133 continue;
1134 }
1135
1136 hlist_for_each_entry_from_rcu(seq, ns_list) {
1137 spin_lock_bh(&seq->lock);
1138 err = __tipc_nl_subseq_list(msg, seq, last_lower,
1139 last_publ);
1140
1141 if (err) {
1142 *last_type = seq->type;
1143 spin_unlock_bh(&seq->lock);
1144 return err;
1145 }
1146 spin_unlock_bh(&seq->lock);
1147 }
1148 *last_type = 0;
1149 }
1150 return 0;
1151}
1152
1153int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
1154{
1155 int err;
1156 int done = cb->args[3];
1157 u32 last_type = cb->args[0];
1158 u32 last_lower = cb->args[1];
1159 u32 last_publ = cb->args[2];
1160 struct tipc_nl_msg msg;
1161
1162 if (done)
1163 return 0;
1164
1165 msg.skb = skb;
1166 msg.portid = NETLINK_CB(cb->skb).portid;
1167 msg.seq = cb->nlh->nlmsg_seq;
1168
1169 rcu_read_lock();
1170 err = __tipc_nl_seq_list(&msg, &last_type, &last_lower, &last_publ);
1171 if (!err) {
1172 done = 1;
1173 } else if (err != -EMSGSIZE) {
1174 /* We never set seq or call nl_dump_check_consistent() this
1175 * means that setting prev_seq here will cause the consistence
1176 * check to fail in the netlink callback handler. Resulting in
1177 * the NLMSG_DONE message having the NLM_F_DUMP_INTR flag set if
1178 * we got an error.
1179 */
1180 cb->prev_seq = 1;
1181 }
1182 rcu_read_unlock();
1183
1184 cb->args[0] = last_type;
1185 cb->args[1] = last_lower;
1186 cb->args[2] = last_publ;
1187 cb->args[3] = done;
1188
1189 return skb->len;
997} 1190}
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index f02f48b9a216..5f0dee92010d 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/name_table.h: Include file for TIPC name table code 2 * net/tipc/name_table.h: Include file for TIPC name table code
3 * 3 *
4 * Copyright (c) 2000-2006, Ericsson AB 4 * Copyright (c) 2000-2006, 2014, Ericsson AB
5 * Copyright (c) 2004-2005, 2010-2011, Wind River Systems 5 * Copyright (c) 2004-2005, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -37,15 +37,15 @@
37#ifndef _TIPC_NAME_TABLE_H 37#ifndef _TIPC_NAME_TABLE_H
38#define _TIPC_NAME_TABLE_H 38#define _TIPC_NAME_TABLE_H
39 39
40#include "node_subscr.h"
41
42struct tipc_subscription; 40struct tipc_subscription;
43struct tipc_port_list; 41struct tipc_port_list;
44 42
45/* 43/*
46 * TIPC name types reserved for internal TIPC use (both current and planned) 44 * TIPC name types reserved for internal TIPC use (both current and planned)
47 */ 45 */
48#define TIPC_ZM_SRV 3 /* zone master service name type */ 46#define TIPC_ZM_SRV 3 /* zone master service name type */
47#define TIPC_PUBL_SCOPE_NUM (TIPC_NODE_SCOPE + 1)
48#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */
49 49
50/** 50/**
51 * struct publication - info about a published (name or) name sequence 51 * struct publication - info about a published (name or) name sequence
@@ -56,12 +56,13 @@ struct tipc_port_list;
56 * @node: network address of publishing port's node 56 * @node: network address of publishing port's node
57 * @ref: publishing port 57 * @ref: publishing port
58 * @key: publication key 58 * @key: publication key
59 * @subscr: subscription to "node down" event (for off-node publications only) 59 * @nodesub_list: subscription to "node down" event (off-node publication only)
60 * @local_list: adjacent entries in list of publications made by this node 60 * @local_list: adjacent entries in list of publications made by this node
61 * @pport_list: adjacent entries in list of publications made by this port 61 * @pport_list: adjacent entries in list of publications made by this port
62 * @node_list: adjacent matching name seq publications with >= node scope 62 * @node_list: adjacent matching name seq publications with >= node scope
63 * @cluster_list: adjacent matching name seq publications with >= cluster scope 63 * @cluster_list: adjacent matching name seq publications with >= cluster scope
64 * @zone_list: adjacent matching name seq publications with >= zone scope 64 * @zone_list: adjacent matching name seq publications with >= zone scope
65 * @rcu: RCU callback head used for deferred freeing
65 * 66 *
66 * Note that the node list, cluster list, and zone list are circular lists. 67 * Note that the node list, cluster list, and zone list are circular lists.
67 */ 68 */
@@ -73,16 +74,31 @@ struct publication {
73 u32 node; 74 u32 node;
74 u32 ref; 75 u32 ref;
75 u32 key; 76 u32 key;
76 struct tipc_node_subscr subscr; 77 struct list_head nodesub_list;
77 struct list_head local_list; 78 struct list_head local_list;
78 struct list_head pport_list; 79 struct list_head pport_list;
79 struct list_head node_list; 80 struct list_head node_list;
80 struct list_head cluster_list; 81 struct list_head cluster_list;
81 struct list_head zone_list; 82 struct list_head zone_list;
83 struct rcu_head rcu;
84};
85
86/**
87 * struct name_table - table containing all existing port name publications
88 * @seq_hlist: name sequence hash lists
89 * @publ_list: pulication lists
90 * @local_publ_count: number of publications issued by this node
91 */
92struct name_table {
93 struct hlist_head seq_hlist[TIPC_NAMETBL_SIZE];
94 struct list_head publ_list[TIPC_PUBL_SCOPE_NUM];
95 u32 local_publ_count;
82}; 96};
83 97
98extern spinlock_t tipc_nametbl_lock;
99extern struct name_table *tipc_nametbl;
84 100
85extern rwlock_t tipc_nametbl_lock; 101int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
86 102
87struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space); 103struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space);
88u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *node); 104u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *node);
diff --git a/net/tipc/net.c b/net/tipc/net.c
index 93b9944a6a8b..cf13df3cde8f 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -42,6 +42,11 @@
42#include "node.h" 42#include "node.h"
43#include "config.h" 43#include "config.h"
44 44
45static const struct nla_policy tipc_nl_net_policy[TIPC_NLA_NET_MAX + 1] = {
46 [TIPC_NLA_NET_UNSPEC] = { .type = NLA_UNSPEC },
47 [TIPC_NLA_NET_ID] = { .type = NLA_U32 }
48};
49
45/* 50/*
46 * The TIPC locking policy is designed to ensure a very fine locking 51 * The TIPC locking policy is designed to ensure a very fine locking
47 * granularity, permitting complete parallel access to individual 52 * granularity, permitting complete parallel access to individual
@@ -138,3 +143,104 @@ void tipc_net_stop(void)
138 143
139 pr_info("Left network mode\n"); 144 pr_info("Left network mode\n");
140} 145}
146
147static int __tipc_nl_add_net(struct tipc_nl_msg *msg)
148{
149 void *hdr;
150 struct nlattr *attrs;
151
152 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
153 NLM_F_MULTI, TIPC_NL_NET_GET);
154 if (!hdr)
155 return -EMSGSIZE;
156
157 attrs = nla_nest_start(msg->skb, TIPC_NLA_NET);
158 if (!attrs)
159 goto msg_full;
160
161 if (nla_put_u32(msg->skb, TIPC_NLA_NET_ID, tipc_net_id))
162 goto attr_msg_full;
163
164 nla_nest_end(msg->skb, attrs);
165 genlmsg_end(msg->skb, hdr);
166
167 return 0;
168
169attr_msg_full:
170 nla_nest_cancel(msg->skb, attrs);
171msg_full:
172 genlmsg_cancel(msg->skb, hdr);
173
174 return -EMSGSIZE;
175}
176
177int tipc_nl_net_dump(struct sk_buff *skb, struct netlink_callback *cb)
178{
179 int err;
180 int done = cb->args[0];
181 struct tipc_nl_msg msg;
182
183 if (done)
184 return 0;
185
186 msg.skb = skb;
187 msg.portid = NETLINK_CB(cb->skb).portid;
188 msg.seq = cb->nlh->nlmsg_seq;
189
190 err = __tipc_nl_add_net(&msg);
191 if (err)
192 goto out;
193
194 done = 1;
195out:
196 cb->args[0] = done;
197
198 return skb->len;
199}
200
201int tipc_nl_net_set(struct sk_buff *skb, struct genl_info *info)
202{
203 int err;
204 struct nlattr *attrs[TIPC_NLA_NET_MAX + 1];
205
206 if (!info->attrs[TIPC_NLA_NET])
207 return -EINVAL;
208
209 err = nla_parse_nested(attrs, TIPC_NLA_NET_MAX,
210 info->attrs[TIPC_NLA_NET],
211 tipc_nl_net_policy);
212 if (err)
213 return err;
214
215 if (attrs[TIPC_NLA_NET_ID]) {
216 u32 val;
217
218 /* Can't change net id once TIPC has joined a network */
219 if (tipc_own_addr)
220 return -EPERM;
221
222 val = nla_get_u32(attrs[TIPC_NLA_NET_ID]);
223 if (val < 1 || val > 9999)
224 return -EINVAL;
225
226 tipc_net_id = val;
227 }
228
229 if (attrs[TIPC_NLA_NET_ADDR]) {
230 u32 addr;
231
232 /* Can't change net addr once TIPC has joined a network */
233 if (tipc_own_addr)
234 return -EPERM;
235
236 addr = nla_get_u32(attrs[TIPC_NLA_NET_ADDR]);
237 if (!tipc_addr_node_valid(addr))
238 return -EINVAL;
239
240 rtnl_lock();
241 tipc_net_start(addr);
242 rtnl_unlock();
243 }
244
245 return 0;
246}
diff --git a/net/tipc/net.h b/net/tipc/net.h
index 59ef3388be2c..a81c1b9eb150 100644
--- a/net/tipc/net.h
+++ b/net/tipc/net.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/net.h: Include file for TIPC network routing code 2 * net/tipc/net.h: Include file for TIPC network routing code
3 * 3 *
4 * Copyright (c) 1995-2006, Ericsson AB 4 * Copyright (c) 1995-2006, 2014, Ericsson AB
5 * Copyright (c) 2005, 2010-2011, Wind River Systems 5 * Copyright (c) 2005, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -37,7 +37,13 @@
37#ifndef _TIPC_NET_H 37#ifndef _TIPC_NET_H
38#define _TIPC_NET_H 38#define _TIPC_NET_H
39 39
40#include <net/genetlink.h>
41
40int tipc_net_start(u32 addr); 42int tipc_net_start(u32 addr);
43
41void tipc_net_stop(void); 44void tipc_net_stop(void);
42 45
46int tipc_nl_net_dump(struct sk_buff *skb, struct netlink_callback *cb);
47int tipc_nl_net_set(struct sk_buff *skb, struct genl_info *info);
48
43#endif 49#endif
diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c
index ad844d365340..b891e3905bc4 100644
--- a/net/tipc/netlink.c
+++ b/net/tipc/netlink.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/netlink.c: TIPC configuration handling 2 * net/tipc/netlink.c: TIPC configuration handling
3 * 3 *
4 * Copyright (c) 2005-2006, Ericsson AB 4 * Copyright (c) 2005-2006, 2014, Ericsson AB
5 * Copyright (c) 2005-2007, Wind River Systems 5 * Copyright (c) 2005-2007, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -36,6 +36,12 @@
36 36
37#include "core.h" 37#include "core.h"
38#include "config.h" 38#include "config.h"
39#include "socket.h"
40#include "name_table.h"
41#include "bearer.h"
42#include "link.h"
43#include "node.h"
44#include "net.h"
39#include <net/genetlink.h> 45#include <net/genetlink.h>
40 46
41static int handle_cmd(struct sk_buff *skb, struct genl_info *info) 47static int handle_cmd(struct sk_buff *skb, struct genl_info *info)
@@ -68,6 +74,19 @@ static int handle_cmd(struct sk_buff *skb, struct genl_info *info)
68 return 0; 74 return 0;
69} 75}
70 76
77static const struct nla_policy tipc_nl_policy[TIPC_NLA_MAX + 1] = {
78 [TIPC_NLA_UNSPEC] = { .type = NLA_UNSPEC, },
79 [TIPC_NLA_BEARER] = { .type = NLA_NESTED, },
80 [TIPC_NLA_SOCK] = { .type = NLA_NESTED, },
81 [TIPC_NLA_PUBL] = { .type = NLA_NESTED, },
82 [TIPC_NLA_LINK] = { .type = NLA_NESTED, },
83 [TIPC_NLA_MEDIA] = { .type = NLA_NESTED, },
84 [TIPC_NLA_NODE] = { .type = NLA_NESTED, },
85 [TIPC_NLA_NET] = { .type = NLA_NESTED, },
86 [TIPC_NLA_NAME_TABLE] = { .type = NLA_NESTED, }
87};
88
89/* Legacy ASCII API */
71static struct genl_family tipc_genl_family = { 90static struct genl_family tipc_genl_family = {
72 .id = GENL_ID_GENERATE, 91 .id = GENL_ID_GENERATE,
73 .name = TIPC_GENL_NAME, 92 .name = TIPC_GENL_NAME,
@@ -76,6 +95,7 @@ static struct genl_family tipc_genl_family = {
76 .maxattr = 0, 95 .maxattr = 0,
77}; 96};
78 97
98/* Legacy ASCII API */
79static struct genl_ops tipc_genl_ops[] = { 99static struct genl_ops tipc_genl_ops[] = {
80 { 100 {
81 .cmd = TIPC_GENL_CMD, 101 .cmd = TIPC_GENL_CMD,
@@ -83,12 +103,122 @@ static struct genl_ops tipc_genl_ops[] = {
83 }, 103 },
84}; 104};
85 105
106/* Users of the legacy API (tipc-config) can't handle that we add operations,
107 * so we have a separate genl handling for the new API.
108 */
109struct genl_family tipc_genl_v2_family = {
110 .id = GENL_ID_GENERATE,
111 .name = TIPC_GENL_V2_NAME,
112 .version = TIPC_GENL_V2_VERSION,
113 .hdrsize = 0,
114 .maxattr = TIPC_NLA_MAX,
115};
116
117static const struct genl_ops tipc_genl_v2_ops[] = {
118 {
119 .cmd = TIPC_NL_BEARER_DISABLE,
120 .doit = tipc_nl_bearer_disable,
121 .policy = tipc_nl_policy,
122 },
123 {
124 .cmd = TIPC_NL_BEARER_ENABLE,
125 .doit = tipc_nl_bearer_enable,
126 .policy = tipc_nl_policy,
127 },
128 {
129 .cmd = TIPC_NL_BEARER_GET,
130 .doit = tipc_nl_bearer_get,
131 .dumpit = tipc_nl_bearer_dump,
132 .policy = tipc_nl_policy,
133 },
134 {
135 .cmd = TIPC_NL_BEARER_SET,
136 .doit = tipc_nl_bearer_set,
137 .policy = tipc_nl_policy,
138 },
139 {
140 .cmd = TIPC_NL_SOCK_GET,
141 .dumpit = tipc_nl_sk_dump,
142 .policy = tipc_nl_policy,
143 },
144 {
145 .cmd = TIPC_NL_PUBL_GET,
146 .dumpit = tipc_nl_publ_dump,
147 .policy = tipc_nl_policy,
148 },
149 {
150 .cmd = TIPC_NL_LINK_GET,
151 .doit = tipc_nl_link_get,
152 .dumpit = tipc_nl_link_dump,
153 .policy = tipc_nl_policy,
154 },
155 {
156 .cmd = TIPC_NL_LINK_SET,
157 .doit = tipc_nl_link_set,
158 .policy = tipc_nl_policy,
159 },
160 {
161 .cmd = TIPC_NL_LINK_RESET_STATS,
162 .doit = tipc_nl_link_reset_stats,
163 .policy = tipc_nl_policy,
164 },
165 {
166 .cmd = TIPC_NL_MEDIA_GET,
167 .doit = tipc_nl_media_get,
168 .dumpit = tipc_nl_media_dump,
169 .policy = tipc_nl_policy,
170 },
171 {
172 .cmd = TIPC_NL_MEDIA_SET,
173 .doit = tipc_nl_media_set,
174 .policy = tipc_nl_policy,
175 },
176 {
177 .cmd = TIPC_NL_NODE_GET,
178 .dumpit = tipc_nl_node_dump,
179 .policy = tipc_nl_policy,
180 },
181 {
182 .cmd = TIPC_NL_NET_GET,
183 .dumpit = tipc_nl_net_dump,
184 .policy = tipc_nl_policy,
185 },
186 {
187 .cmd = TIPC_NL_NET_SET,
188 .doit = tipc_nl_net_set,
189 .policy = tipc_nl_policy,
190 },
191 {
192 .cmd = TIPC_NL_NAME_TABLE_GET,
193 .dumpit = tipc_nl_name_table_dump,
194 .policy = tipc_nl_policy,
195 }
196};
197
198int tipc_nlmsg_parse(const struct nlmsghdr *nlh, struct nlattr ***attr)
199{
200 u32 maxattr = tipc_genl_v2_family.maxattr;
201
202 *attr = tipc_genl_v2_family.attrbuf;
203 if (!*attr)
204 return -EOPNOTSUPP;
205
206 return nlmsg_parse(nlh, GENL_HDRLEN, *attr, maxattr, tipc_nl_policy);
207}
208
86int tipc_netlink_start(void) 209int tipc_netlink_start(void)
87{ 210{
88 int res; 211 int res;
89 212
90 res = genl_register_family_with_ops(&tipc_genl_family, tipc_genl_ops); 213 res = genl_register_family_with_ops(&tipc_genl_family, tipc_genl_ops);
91 if (res) { 214 if (res) {
215 pr_err("Failed to register legacy interface\n");
216 return res;
217 }
218
219 res = genl_register_family_with_ops(&tipc_genl_v2_family,
220 tipc_genl_v2_ops);
221 if (res) {
92 pr_err("Failed to register netlink interface\n"); 222 pr_err("Failed to register netlink interface\n");
93 return res; 223 return res;
94 } 224 }
@@ -98,4 +228,5 @@ int tipc_netlink_start(void)
98void tipc_netlink_stop(void) 228void tipc_netlink_stop(void)
99{ 229{
100 genl_unregister_family(&tipc_genl_family); 230 genl_unregister_family(&tipc_genl_family);
231 genl_unregister_family(&tipc_genl_v2_family);
101} 232}
diff --git a/net/tipc/node_subscr.h b/net/tipc/netlink.h
index d91b8cc81e3d..1425c6869de0 100644
--- a/net/tipc/node_subscr.h
+++ b/net/tipc/netlink.h
@@ -1,8 +1,7 @@
1/* 1/*
2 * net/tipc/node_subscr.h: Include file for TIPC "node down" subscription handling 2 * net/tipc/netlink.h: Include file for TIPC netlink code
3 * 3 *
4 * Copyright (c) 1995-2006, Ericsson AB 4 * Copyright (c) 2014, Ericsson AB
5 * Copyright (c) 2005, 2010-2011, Wind River Systems
6 * All rights reserved. 5 * All rights reserved.
7 * 6 *
8 * Redistribution and use in source and binary forms, with or without 7 * Redistribution and use in source and binary forms, with or without
@@ -34,30 +33,16 @@
34 * POSSIBILITY OF SUCH DAMAGE. 33 * POSSIBILITY OF SUCH DAMAGE.
35 */ 34 */
36 35
37#ifndef _TIPC_NODE_SUBSCR_H 36#ifndef _TIPC_NETLINK_H
38#define _TIPC_NODE_SUBSCR_H 37#define _TIPC_NETLINK_H
39 38
40#include "addr.h" 39extern struct genl_family tipc_genl_v2_family;
40int tipc_nlmsg_parse(const struct nlmsghdr *nlh, struct nlattr ***buf);
41 41
42typedef void (*net_ev_handler) (void *usr_handle); 42struct tipc_nl_msg {
43 43 struct sk_buff *skb;
44/** 44 u32 portid;
45 * struct tipc_node_subscr - "node down" subscription entry 45 u32 seq;
46 * @node: ptr to node structure of interest (or NULL, if none)
47 * @handle_node_down: routine to invoke when node fails
48 * @usr_handle: argument to pass to routine when node fails
49 * @nodesub_list: adjacent entries in list of subscriptions for the node
50 */
51struct tipc_node_subscr {
52 struct tipc_node *node;
53 net_ev_handler handle_node_down;
54 void *usr_handle;
55 struct list_head nodesub_list;
56}; 46};
57 47
58void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr,
59 void *usr_handle, net_ev_handler handle_down);
60void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub);
61void tipc_nodesub_notify(struct list_head *nsub_list);
62
63#endif 48#endif
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 5781634e957d..8d353ec77a66 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -58,6 +58,12 @@ struct tipc_sock_conn {
58 struct list_head list; 58 struct list_head list;
59}; 59};
60 60
61static const struct nla_policy tipc_nl_node_policy[TIPC_NLA_NODE_MAX + 1] = {
62 [TIPC_NLA_NODE_UNSPEC] = { .type = NLA_UNSPEC },
63 [TIPC_NLA_NODE_ADDR] = { .type = NLA_U32 },
64 [TIPC_NLA_NODE_UP] = { .type = NLA_FLAG }
65};
66
61/* 67/*
62 * A trivial power-of-two bitmask technique is used for speed, since this 68 * A trivial power-of-two bitmask technique is used for speed, since this
63 * operation is done for every incoming TIPC packet. The number of hash table 69 * operation is done for every incoming TIPC packet. The number of hash table
@@ -107,9 +113,10 @@ struct tipc_node *tipc_node_create(u32 addr)
107 spin_lock_init(&n_ptr->lock); 113 spin_lock_init(&n_ptr->lock);
108 INIT_HLIST_NODE(&n_ptr->hash); 114 INIT_HLIST_NODE(&n_ptr->hash);
109 INIT_LIST_HEAD(&n_ptr->list); 115 INIT_LIST_HEAD(&n_ptr->list);
110 INIT_LIST_HEAD(&n_ptr->nsub); 116 INIT_LIST_HEAD(&n_ptr->publ_list);
111 INIT_LIST_HEAD(&n_ptr->conn_sks); 117 INIT_LIST_HEAD(&n_ptr->conn_sks);
112 __skb_queue_head_init(&n_ptr->waiting_sks); 118 skb_queue_head_init(&n_ptr->waiting_sks);
119 __skb_queue_head_init(&n_ptr->bclink.deferred_queue);
113 120
114 hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); 121 hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]);
115 122
@@ -375,8 +382,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
375 382
376 /* Flush broadcast link info associated with lost node */ 383 /* Flush broadcast link info associated with lost node */
377 if (n_ptr->bclink.recv_permitted) { 384 if (n_ptr->bclink.recv_permitted) {
378 kfree_skb_list(n_ptr->bclink.deferred_head); 385 __skb_queue_purge(&n_ptr->bclink.deferred_queue);
379 n_ptr->bclink.deferred_size = 0;
380 386
381 if (n_ptr->bclink.reasm_buf) { 387 if (n_ptr->bclink.reasm_buf) {
382 kfree_skb(n_ptr->bclink.reasm_buf); 388 kfree_skb(n_ptr->bclink.reasm_buf);
@@ -568,7 +574,7 @@ void tipc_node_unlock(struct tipc_node *node)
568 skb_queue_splice_init(&node->waiting_sks, &waiting_sks); 574 skb_queue_splice_init(&node->waiting_sks, &waiting_sks);
569 575
570 if (flags & TIPC_NOTIFY_NODE_DOWN) { 576 if (flags & TIPC_NOTIFY_NODE_DOWN) {
571 list_replace_init(&node->nsub, &nsub_list); 577 list_replace_init(&node->publ_list, &nsub_list);
572 list_replace_init(&node->conn_sks, &conn_sks); 578 list_replace_init(&node->conn_sks, &conn_sks);
573 } 579 }
574 node->action_flags &= ~(TIPC_WAKEUP_USERS | TIPC_NOTIFY_NODE_DOWN | 580 node->action_flags &= ~(TIPC_WAKEUP_USERS | TIPC_NOTIFY_NODE_DOWN |
@@ -585,7 +591,7 @@ void tipc_node_unlock(struct tipc_node *node)
585 tipc_node_abort_sock_conns(&conn_sks); 591 tipc_node_abort_sock_conns(&conn_sks);
586 592
587 if (!list_empty(&nsub_list)) 593 if (!list_empty(&nsub_list))
588 tipc_nodesub_notify(&nsub_list); 594 tipc_publ_notify(&nsub_list, addr);
589 595
590 if (flags & TIPC_WAKEUP_BCAST_USERS) 596 if (flags & TIPC_WAKEUP_BCAST_USERS)
591 tipc_bclink_wakeup_users(); 597 tipc_bclink_wakeup_users();
@@ -601,3 +607,93 @@ void tipc_node_unlock(struct tipc_node *node)
601 tipc_nametbl_withdraw(TIPC_LINK_STATE, addr, 607 tipc_nametbl_withdraw(TIPC_LINK_STATE, addr,
602 link_id, addr); 608 link_id, addr);
603} 609}
610
611/* Caller should hold node lock for the passed node */
612static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
613{
614 void *hdr;
615 struct nlattr *attrs;
616
617 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
618 NLM_F_MULTI, TIPC_NL_NODE_GET);
619 if (!hdr)
620 return -EMSGSIZE;
621
622 attrs = nla_nest_start(msg->skb, TIPC_NLA_NODE);
623 if (!attrs)
624 goto msg_full;
625
626 if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr))
627 goto attr_msg_full;
628 if (tipc_node_is_up(node))
629 if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP))
630 goto attr_msg_full;
631
632 nla_nest_end(msg->skb, attrs);
633 genlmsg_end(msg->skb, hdr);
634
635 return 0;
636
637attr_msg_full:
638 nla_nest_cancel(msg->skb, attrs);
639msg_full:
640 genlmsg_cancel(msg->skb, hdr);
641
642 return -EMSGSIZE;
643}
644
645int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
646{
647 int err;
648 int done = cb->args[0];
649 int last_addr = cb->args[1];
650 struct tipc_node *node;
651 struct tipc_nl_msg msg;
652
653 if (done)
654 return 0;
655
656 msg.skb = skb;
657 msg.portid = NETLINK_CB(cb->skb).portid;
658 msg.seq = cb->nlh->nlmsg_seq;
659
660 rcu_read_lock();
661
662 if (last_addr && !tipc_node_find(last_addr)) {
663 rcu_read_unlock();
664 /* We never set seq or call nl_dump_check_consistent() this
665 * means that setting prev_seq here will cause the consistence
666 * check to fail in the netlink callback handler. Resulting in
667 * the NLMSG_DONE message having the NLM_F_DUMP_INTR flag set if
668 * the node state changed while we released the lock.
669 */
670 cb->prev_seq = 1;
671 return -EPIPE;
672 }
673
674 list_for_each_entry_rcu(node, &tipc_node_list, list) {
675 if (last_addr) {
676 if (node->addr == last_addr)
677 last_addr = 0;
678 else
679 continue;
680 }
681
682 tipc_node_lock(node);
683 err = __tipc_nl_add_node(&msg, node);
684 if (err) {
685 last_addr = node->addr;
686 tipc_node_unlock(node);
687 goto out;
688 }
689
690 tipc_node_unlock(node);
691 }
692 done = 1;
693out:
694 cb->args[0] = done;
695 cb->args[1] = last_addr;
696 rcu_read_unlock();
697
698 return skb->len;
699}
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 04e91458bb29..cbe0e950f1cc 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/node.h: Include file for TIPC node management routines 2 * net/tipc/node.h: Include file for TIPC node management routines
3 * 3 *
4 * Copyright (c) 2000-2006, Ericsson AB 4 * Copyright (c) 2000-2006, 2014, Ericsson AB
5 * Copyright (c) 2005, 2010-2014, Wind River Systems 5 * Copyright (c) 2005, 2010-2014, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -37,7 +37,6 @@
37#ifndef _TIPC_NODE_H 37#ifndef _TIPC_NODE_H
38#define _TIPC_NODE_H 38#define _TIPC_NODE_H
39 39
40#include "node_subscr.h"
41#include "addr.h" 40#include "addr.h"
42#include "net.h" 41#include "net.h"
43#include "bearer.h" 42#include "bearer.h"
@@ -72,9 +71,7 @@ enum {
72 * @last_in: sequence # of last in-sequence b'cast message received from node 71 * @last_in: sequence # of last in-sequence b'cast message received from node
73 * @last_sent: sequence # of last b'cast message sent by node 72 * @last_sent: sequence # of last b'cast message sent by node
74 * @oos_state: state tracker for handling OOS b'cast messages 73 * @oos_state: state tracker for handling OOS b'cast messages
75 * @deferred_size: number of OOS b'cast messages in deferred queue 74 * @deferred_queue: deferred queue saved OOS b'cast message received from node
76 * @deferred_head: oldest OOS b'cast message received from node
77 * @deferred_tail: newest OOS b'cast message received from node
78 * @reasm_buf: broadcast reassembly queue head from node 75 * @reasm_buf: broadcast reassembly queue head from node
79 * @recv_permitted: true if node is allowed to receive b'cast messages 76 * @recv_permitted: true if node is allowed to receive b'cast messages
80 */ 77 */
@@ -84,8 +81,7 @@ struct tipc_node_bclink {
84 u32 last_sent; 81 u32 last_sent;
85 u32 oos_state; 82 u32 oos_state;
86 u32 deferred_size; 83 u32 deferred_size;
87 struct sk_buff *deferred_head; 84 struct sk_buff_head deferred_queue;
88 struct sk_buff *deferred_tail;
89 struct sk_buff *reasm_buf; 85 struct sk_buff *reasm_buf;
90 bool recv_permitted; 86 bool recv_permitted;
91}; 87};
@@ -104,7 +100,7 @@ struct tipc_node_bclink {
104 * @link_cnt: number of links to node 100 * @link_cnt: number of links to node
105 * @signature: node instance identifier 101 * @signature: node instance identifier
106 * @link_id: local and remote bearer ids of changing link, if any 102 * @link_id: local and remote bearer ids of changing link, if any
107 * @nsub: list of "node down" subscriptions monitoring node 103 * @publ_list: list of publications
108 * @rcu: rcu struct for tipc_node 104 * @rcu: rcu struct for tipc_node
109 */ 105 */
110struct tipc_node { 106struct tipc_node {
@@ -121,7 +117,7 @@ struct tipc_node {
121 int working_links; 117 int working_links;
122 u32 signature; 118 u32 signature;
123 u32 link_id; 119 u32 link_id;
124 struct list_head nsub; 120 struct list_head publ_list;
125 struct sk_buff_head waiting_sks; 121 struct sk_buff_head waiting_sks;
126 struct list_head conn_sks; 122 struct list_head conn_sks;
127 struct rcu_head rcu; 123 struct rcu_head rcu;
@@ -145,6 +141,8 @@ void tipc_node_unlock(struct tipc_node *node);
145int tipc_node_add_conn(u32 dnode, u32 port, u32 peer_port); 141int tipc_node_add_conn(u32 dnode, u32 port, u32 peer_port);
146void tipc_node_remove_conn(u32 dnode, u32 port); 142void tipc_node_remove_conn(u32 dnode, u32 port);
147 143
144int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
145
148static inline void tipc_node_lock(struct tipc_node *node) 146static inline void tipc_node_lock(struct tipc_node *node)
149{ 147{
150 spin_lock_bh(&node->lock); 148 spin_lock_bh(&node->lock);
diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
deleted file mode 100644
index 2d13eea8574a..000000000000
--- a/net/tipc/node_subscr.c
+++ /dev/null
@@ -1,96 +0,0 @@
1/*
2 * net/tipc/node_subscr.c: TIPC "node down" subscription handling
3 *
4 * Copyright (c) 1995-2006, Ericsson AB
5 * Copyright (c) 2005, 2010-2011, Wind River Systems
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#include "core.h"
38#include "node_subscr.h"
39#include "node.h"
40
41/**
42 * tipc_nodesub_subscribe - create "node down" subscription for specified node
43 */
44void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr,
45 void *usr_handle, net_ev_handler handle_down)
46{
47 if (in_own_node(addr)) {
48 node_sub->node = NULL;
49 return;
50 }
51
52 node_sub->node = tipc_node_find(addr);
53 if (!node_sub->node) {
54 pr_warn("Node subscription rejected, unknown node 0x%x\n",
55 addr);
56 return;
57 }
58 node_sub->handle_node_down = handle_down;
59 node_sub->usr_handle = usr_handle;
60
61 tipc_node_lock(node_sub->node);
62 list_add_tail(&node_sub->nodesub_list, &node_sub->node->nsub);
63 tipc_node_unlock(node_sub->node);
64}
65
66/**
67 * tipc_nodesub_unsubscribe - cancel "node down" subscription (if any)
68 */
69void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
70{
71 if (!node_sub->node)
72 return;
73
74 tipc_node_lock(node_sub->node);
75 list_del_init(&node_sub->nodesub_list);
76 tipc_node_unlock(node_sub->node);
77}
78
79/**
80 * tipc_nodesub_notify - notify subscribers that a node is unreachable
81 *
82 * Note: node is locked by caller
83 */
84void tipc_nodesub_notify(struct list_head *nsub_list)
85{
86 struct tipc_node_subscr *ns, *safe;
87 net_ev_handler handle_node_down;
88
89 list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) {
90 handle_node_down = ns->handle_node_down;
91 if (handle_node_down) {
92 ns->handle_node_down = NULL;
93 handle_node_down(ns->usr_handle);
94 }
95 }
96}
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 51bddc236a15..4731cad99d1c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -121,6 +121,14 @@ static const struct proto_ops msg_ops;
121static struct proto tipc_proto; 121static struct proto tipc_proto;
122static struct proto tipc_proto_kern; 122static struct proto tipc_proto_kern;
123 123
124static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
125 [TIPC_NLA_SOCK_UNSPEC] = { .type = NLA_UNSPEC },
126 [TIPC_NLA_SOCK_ADDR] = { .type = NLA_U32 },
127 [TIPC_NLA_SOCK_REF] = { .type = NLA_U32 },
128 [TIPC_NLA_SOCK_CON] = { .type = NLA_NESTED },
129 [TIPC_NLA_SOCK_HAS_PUBL] = { .type = NLA_FLAG }
130};
131
124/* 132/*
125 * Revised TIPC socket locking policy: 133 * Revised TIPC socket locking policy:
126 * 134 *
@@ -236,12 +244,12 @@ static void tsk_advance_rx_queue(struct sock *sk)
236 */ 244 */
237static void tsk_rej_rx_queue(struct sock *sk) 245static void tsk_rej_rx_queue(struct sock *sk)
238{ 246{
239 struct sk_buff *buf; 247 struct sk_buff *skb;
240 u32 dnode; 248 u32 dnode;
241 249
242 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { 250 while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
243 if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) 251 if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
244 tipc_link_xmit(buf, dnode, 0); 252 tipc_link_xmit_skb(skb, dnode, 0);
245 } 253 }
246} 254}
247 255
@@ -454,7 +462,7 @@ static int tipc_release(struct socket *sock)
454{ 462{
455 struct sock *sk = sock->sk; 463 struct sock *sk = sock->sk;
456 struct tipc_sock *tsk; 464 struct tipc_sock *tsk;
457 struct sk_buff *buf; 465 struct sk_buff *skb;
458 u32 dnode; 466 u32 dnode;
459 467
460 /* 468 /*
@@ -473,11 +481,11 @@ static int tipc_release(struct socket *sock)
473 */ 481 */
474 dnode = tsk_peer_node(tsk); 482 dnode = tsk_peer_node(tsk);
475 while (sock->state != SS_DISCONNECTING) { 483 while (sock->state != SS_DISCONNECTING) {
476 buf = __skb_dequeue(&sk->sk_receive_queue); 484 skb = __skb_dequeue(&sk->sk_receive_queue);
477 if (buf == NULL) 485 if (skb == NULL)
478 break; 486 break;
479 if (TIPC_SKB_CB(buf)->handle != NULL) 487 if (TIPC_SKB_CB(skb)->handle != NULL)
480 kfree_skb(buf); 488 kfree_skb(skb);
481 else { 489 else {
482 if ((sock->state == SS_CONNECTING) || 490 if ((sock->state == SS_CONNECTING) ||
483 (sock->state == SS_CONNECTED)) { 491 (sock->state == SS_CONNECTED)) {
@@ -485,8 +493,8 @@ static int tipc_release(struct socket *sock)
485 tsk->connected = 0; 493 tsk->connected = 0;
486 tipc_node_remove_conn(dnode, tsk->ref); 494 tipc_node_remove_conn(dnode, tsk->ref);
487 } 495 }
488 if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) 496 if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
489 tipc_link_xmit(buf, dnode, 0); 497 tipc_link_xmit_skb(skb, dnode, 0);
490 } 498 }
491 } 499 }
492 500
@@ -494,12 +502,12 @@ static int tipc_release(struct socket *sock)
494 tipc_sk_ref_discard(tsk->ref); 502 tipc_sk_ref_discard(tsk->ref);
495 k_cancel_timer(&tsk->timer); 503 k_cancel_timer(&tsk->timer);
496 if (tsk->connected) { 504 if (tsk->connected) {
497 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 505 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
498 SHORT_H_SIZE, 0, dnode, tipc_own_addr, 506 SHORT_H_SIZE, 0, dnode, tipc_own_addr,
499 tsk_peer_port(tsk), 507 tsk_peer_port(tsk),
500 tsk->ref, TIPC_ERR_NO_PORT); 508 tsk->ref, TIPC_ERR_NO_PORT);
501 if (buf) 509 if (skb)
502 tipc_link_xmit(buf, dnode, tsk->ref); 510 tipc_link_xmit_skb(skb, dnode, tsk->ref);
503 tipc_node_remove_conn(dnode, tsk->ref); 511 tipc_node_remove_conn(dnode, tsk->ref);
504 } 512 }
505 k_term_timer(&tsk->timer); 513 k_term_timer(&tsk->timer);
@@ -692,7 +700,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
692 * tipc_sendmcast - send multicast message 700 * tipc_sendmcast - send multicast message
693 * @sock: socket structure 701 * @sock: socket structure
694 * @seq: destination address 702 * @seq: destination address
695 * @iov: message data to send 703 * @msg: message to send
696 * @dsz: total length of message data 704 * @dsz: total length of message data
697 * @timeo: timeout to wait for wakeup 705 * @timeo: timeout to wait for wakeup
698 * 706 *
@@ -700,11 +708,11 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
700 * Returns the number of bytes sent on success, or errno 708 * Returns the number of bytes sent on success, or errno
701 */ 709 */
702static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, 710static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
703 struct iovec *iov, size_t dsz, long timeo) 711 struct msghdr *msg, size_t dsz, long timeo)
704{ 712{
705 struct sock *sk = sock->sk; 713 struct sock *sk = sock->sk;
706 struct tipc_msg *mhdr = &tipc_sk(sk)->phdr; 714 struct tipc_msg *mhdr = &tipc_sk(sk)->phdr;
707 struct sk_buff *buf; 715 struct sk_buff_head head;
708 uint mtu; 716 uint mtu;
709 int rc; 717 int rc;
710 718
@@ -719,12 +727,13 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
719 727
720new_mtu: 728new_mtu:
721 mtu = tipc_bclink_get_mtu(); 729 mtu = tipc_bclink_get_mtu();
722 rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf); 730 __skb_queue_head_init(&head);
731 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &head);
723 if (unlikely(rc < 0)) 732 if (unlikely(rc < 0))
724 return rc; 733 return rc;
725 734
726 do { 735 do {
727 rc = tipc_bclink_xmit(buf); 736 rc = tipc_bclink_xmit(&head);
728 if (likely(rc >= 0)) { 737 if (likely(rc >= 0)) {
729 rc = dsz; 738 rc = dsz;
730 break; 739 break;
@@ -736,7 +745,7 @@ new_mtu:
736 tipc_sk(sk)->link_cong = 1; 745 tipc_sk(sk)->link_cong = 1;
737 rc = tipc_wait_for_sndmsg(sock, &timeo); 746 rc = tipc_wait_for_sndmsg(sock, &timeo);
738 if (rc) 747 if (rc)
739 kfree_skb_list(buf); 748 __skb_queue_purge(&head);
740 } while (!rc); 749 } while (!rc);
741 return rc; 750 return rc;
742} 751}
@@ -818,39 +827,6 @@ exit:
818 return TIPC_OK; 827 return TIPC_OK;
819} 828}
820 829
821/**
822 * dest_name_check - verify user is permitted to send to specified port name
823 * @dest: destination address
824 * @m: descriptor for message to be sent
825 *
826 * Prevents restricted configuration commands from being issued by
827 * unauthorized users.
828 *
829 * Returns 0 if permission is granted, otherwise errno
830 */
831static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
832{
833 struct tipc_cfg_msg_hdr hdr;
834
835 if (unlikely(dest->addrtype == TIPC_ADDR_ID))
836 return 0;
837 if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
838 return 0;
839 if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
840 return 0;
841 if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
842 return -EACCES;
843
844 if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
845 return -EMSGSIZE;
846 if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
847 return -EFAULT;
848 if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
849 return -EACCES;
850
851 return 0;
852}
853
854static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) 830static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
855{ 831{
856 struct sock *sk = sock->sk; 832 struct sock *sk = sock->sk;
@@ -897,13 +873,13 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
897 struct sock *sk = sock->sk; 873 struct sock *sk = sock->sk;
898 struct tipc_sock *tsk = tipc_sk(sk); 874 struct tipc_sock *tsk = tipc_sk(sk);
899 struct tipc_msg *mhdr = &tsk->phdr; 875 struct tipc_msg *mhdr = &tsk->phdr;
900 struct iovec *iov = m->msg_iov;
901 u32 dnode, dport; 876 u32 dnode, dport;
902 struct sk_buff *buf; 877 struct sk_buff_head head;
878 struct sk_buff *skb;
903 struct tipc_name_seq *seq = &dest->addr.nameseq; 879 struct tipc_name_seq *seq = &dest->addr.nameseq;
904 u32 mtu; 880 u32 mtu;
905 long timeo; 881 long timeo;
906 int rc = -EINVAL; 882 int rc;
907 883
908 if (unlikely(!dest)) 884 if (unlikely(!dest))
909 return -EDESTADDRREQ; 885 return -EDESTADDRREQ;
@@ -936,14 +912,11 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
936 tsk->conn_instance = dest->addr.name.name.instance; 912 tsk->conn_instance = dest->addr.name.name.instance;
937 } 913 }
938 } 914 }
939 rc = dest_name_check(dest, m);
940 if (rc)
941 goto exit;
942 915
943 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 916 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
944 917
945 if (dest->addrtype == TIPC_ADDR_MCAST) { 918 if (dest->addrtype == TIPC_ADDR_MCAST) {
946 rc = tipc_sendmcast(sock, seq, iov, dsz, timeo); 919 rc = tipc_sendmcast(sock, seq, m, dsz, timeo);
947 goto exit; 920 goto exit;
948 } else if (dest->addrtype == TIPC_ADDR_NAME) { 921 } else if (dest->addrtype == TIPC_ADDR_NAME) {
949 u32 type = dest->addr.name.name.type; 922 u32 type = dest->addr.name.name.type;
@@ -974,13 +947,15 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
974 947
975new_mtu: 948new_mtu:
976 mtu = tipc_node_get_mtu(dnode, tsk->ref); 949 mtu = tipc_node_get_mtu(dnode, tsk->ref);
977 rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf); 950 __skb_queue_head_init(&head);
951 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head);
978 if (rc < 0) 952 if (rc < 0)
979 goto exit; 953 goto exit;
980 954
981 do { 955 do {
982 TIPC_SKB_CB(buf)->wakeup_pending = tsk->link_cong; 956 skb = skb_peek(&head);
983 rc = tipc_link_xmit(buf, dnode, tsk->ref); 957 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
958 rc = tipc_link_xmit(&head, dnode, tsk->ref);
984 if (likely(rc >= 0)) { 959 if (likely(rc >= 0)) {
985 if (sock->state != SS_READY) 960 if (sock->state != SS_READY)
986 sock->state = SS_CONNECTING; 961 sock->state = SS_CONNECTING;
@@ -994,7 +969,7 @@ new_mtu:
994 tsk->link_cong = 1; 969 tsk->link_cong = 1;
995 rc = tipc_wait_for_sndmsg(sock, &timeo); 970 rc = tipc_wait_for_sndmsg(sock, &timeo);
996 if (rc) 971 if (rc)
997 kfree_skb_list(buf); 972 __skb_queue_purge(&head);
998 } while (!rc); 973 } while (!rc);
999exit: 974exit:
1000 if (iocb) 975 if (iocb)
@@ -1051,7 +1026,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1051 struct sock *sk = sock->sk; 1026 struct sock *sk = sock->sk;
1052 struct tipc_sock *tsk = tipc_sk(sk); 1027 struct tipc_sock *tsk = tipc_sk(sk);
1053 struct tipc_msg *mhdr = &tsk->phdr; 1028 struct tipc_msg *mhdr = &tsk->phdr;
1054 struct sk_buff *buf; 1029 struct sk_buff_head head;
1055 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 1030 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1056 u32 ref = tsk->ref; 1031 u32 ref = tsk->ref;
1057 int rc = -EINVAL; 1032 int rc = -EINVAL;
@@ -1086,12 +1061,13 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1086next: 1061next:
1087 mtu = tsk->max_pkt; 1062 mtu = tsk->max_pkt;
1088 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); 1063 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1089 rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf); 1064 __skb_queue_head_init(&head);
1065 rc = tipc_msg_build(mhdr, m, sent, send, mtu, &head);
1090 if (unlikely(rc < 0)) 1066 if (unlikely(rc < 0))
1091 goto exit; 1067 goto exit;
1092 do { 1068 do {
1093 if (likely(!tsk_conn_cong(tsk))) { 1069 if (likely(!tsk_conn_cong(tsk))) {
1094 rc = tipc_link_xmit(buf, dnode, ref); 1070 rc = tipc_link_xmit(&head, dnode, ref);
1095 if (likely(!rc)) { 1071 if (likely(!rc)) {
1096 tsk->sent_unacked++; 1072 tsk->sent_unacked++;
1097 sent += send; 1073 sent += send;
@@ -1109,7 +1085,7 @@ next:
1109 } 1085 }
1110 rc = tipc_wait_for_sndpkt(sock, &timeo); 1086 rc = tipc_wait_for_sndpkt(sock, &timeo);
1111 if (rc) 1087 if (rc)
1112 kfree_skb_list(buf); 1088 __skb_queue_purge(&head);
1113 } while (!rc); 1089 } while (!rc);
1114exit: 1090exit:
1115 if (iocb) 1091 if (iocb)
@@ -1254,20 +1230,20 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1254 1230
1255static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) 1231static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1256{ 1232{
1257 struct sk_buff *buf = NULL; 1233 struct sk_buff *skb = NULL;
1258 struct tipc_msg *msg; 1234 struct tipc_msg *msg;
1259 u32 peer_port = tsk_peer_port(tsk); 1235 u32 peer_port = tsk_peer_port(tsk);
1260 u32 dnode = tsk_peer_node(tsk); 1236 u32 dnode = tsk_peer_node(tsk);
1261 1237
1262 if (!tsk->connected) 1238 if (!tsk->connected)
1263 return; 1239 return;
1264 buf = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode, 1240 skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode,
1265 tipc_own_addr, peer_port, tsk->ref, TIPC_OK); 1241 tipc_own_addr, peer_port, tsk->ref, TIPC_OK);
1266 if (!buf) 1242 if (!skb)
1267 return; 1243 return;
1268 msg = buf_msg(buf); 1244 msg = buf_msg(skb);
1269 msg_set_msgcnt(msg, ack); 1245 msg_set_msgcnt(msg, ack);
1270 tipc_link_xmit(buf, dnode, msg_link_selector(msg)); 1246 tipc_link_xmit_skb(skb, dnode, msg_link_selector(msg));
1271} 1247}
1272 1248
1273static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) 1249static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1372,8 +1348,7 @@ restart:
1372 sz = buf_len; 1348 sz = buf_len;
1373 m->msg_flags |= MSG_TRUNC; 1349 m->msg_flags |= MSG_TRUNC;
1374 } 1350 }
1375 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg), 1351 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
1376 m->msg_iov, sz);
1377 if (res) 1352 if (res)
1378 goto exit; 1353 goto exit;
1379 res = sz; 1354 res = sz;
@@ -1473,8 +1448,8 @@ restart:
1473 needed = (buf_len - sz_copied); 1448 needed = (buf_len - sz_copied);
1474 sz_to_copy = (sz <= needed) ? sz : needed; 1449 sz_to_copy = (sz <= needed) ? sz : needed;
1475 1450
1476 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset, 1451 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
1477 m->msg_iov, sz_to_copy); 1452 m, sz_to_copy);
1478 if (res) 1453 if (res)
1479 goto exit; 1454 goto exit;
1480 1455
@@ -1556,7 +1531,7 @@ static void tipc_data_ready(struct sock *sk)
1556 * @tsk: TIPC socket 1531 * @tsk: TIPC socket
1557 * @msg: message 1532 * @msg: message
1558 * 1533 *
1559 * Returns 0 (TIPC_OK) if everyting ok, -TIPC_ERR_NO_PORT otherwise 1534 * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
1560 */ 1535 */
1561static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) 1536static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1562{ 1537{
@@ -1723,20 +1698,20 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
1723/** 1698/**
1724 * tipc_backlog_rcv - handle incoming message from backlog queue 1699 * tipc_backlog_rcv - handle incoming message from backlog queue
1725 * @sk: socket 1700 * @sk: socket
1726 * @buf: message 1701 * @skb: message
1727 * 1702 *
1728 * Caller must hold socket lock, but not port lock. 1703 * Caller must hold socket lock, but not port lock.
1729 * 1704 *
1730 * Returns 0 1705 * Returns 0
1731 */ 1706 */
1732static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) 1707static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1733{ 1708{
1734 int rc; 1709 int rc;
1735 u32 onode; 1710 u32 onode;
1736 struct tipc_sock *tsk = tipc_sk(sk); 1711 struct tipc_sock *tsk = tipc_sk(sk);
1737 uint truesize = buf->truesize; 1712 uint truesize = skb->truesize;
1738 1713
1739 rc = filter_rcv(sk, buf); 1714 rc = filter_rcv(sk, skb);
1740 1715
1741 if (likely(!rc)) { 1716 if (likely(!rc)) {
1742 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) 1717 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
@@ -1744,25 +1719,25 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
1744 return 0; 1719 return 0;
1745 } 1720 }
1746 1721
1747 if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc)) 1722 if ((rc < 0) && !tipc_msg_reverse(skb, &onode, -rc))
1748 return 0; 1723 return 0;
1749 1724
1750 tipc_link_xmit(buf, onode, 0); 1725 tipc_link_xmit_skb(skb, onode, 0);
1751 1726
1752 return 0; 1727 return 0;
1753} 1728}
1754 1729
1755/** 1730/**
1756 * tipc_sk_rcv - handle incoming message 1731 * tipc_sk_rcv - handle incoming message
1757 * @buf: buffer containing arriving message 1732 * @skb: buffer containing arriving message
1758 * Consumes buffer 1733 * Consumes buffer
1759 * Returns 0 if success, or errno: -EHOSTUNREACH 1734 * Returns 0 if success, or errno: -EHOSTUNREACH
1760 */ 1735 */
1761int tipc_sk_rcv(struct sk_buff *buf) 1736int tipc_sk_rcv(struct sk_buff *skb)
1762{ 1737{
1763 struct tipc_sock *tsk; 1738 struct tipc_sock *tsk;
1764 struct sock *sk; 1739 struct sock *sk;
1765 u32 dport = msg_destport(buf_msg(buf)); 1740 u32 dport = msg_destport(buf_msg(skb));
1766 int rc = TIPC_OK; 1741 int rc = TIPC_OK;
1767 uint limit; 1742 uint limit;
1768 u32 dnode; 1743 u32 dnode;
@@ -1770,7 +1745,7 @@ int tipc_sk_rcv(struct sk_buff *buf)
1770 /* Validate destination and message */ 1745 /* Validate destination and message */
1771 tsk = tipc_sk_get(dport); 1746 tsk = tipc_sk_get(dport);
1772 if (unlikely(!tsk)) { 1747 if (unlikely(!tsk)) {
1773 rc = tipc_msg_eval(buf, &dnode); 1748 rc = tipc_msg_eval(skb, &dnode);
1774 goto exit; 1749 goto exit;
1775 } 1750 }
1776 sk = &tsk->sk; 1751 sk = &tsk->sk;
@@ -1779,12 +1754,12 @@ int tipc_sk_rcv(struct sk_buff *buf)
1779 spin_lock_bh(&sk->sk_lock.slock); 1754 spin_lock_bh(&sk->sk_lock.slock);
1780 1755
1781 if (!sock_owned_by_user(sk)) { 1756 if (!sock_owned_by_user(sk)) {
1782 rc = filter_rcv(sk, buf); 1757 rc = filter_rcv(sk, skb);
1783 } else { 1758 } else {
1784 if (sk->sk_backlog.len == 0) 1759 if (sk->sk_backlog.len == 0)
1785 atomic_set(&tsk->dupl_rcvcnt, 0); 1760 atomic_set(&tsk->dupl_rcvcnt, 0);
1786 limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt); 1761 limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
1787 if (sk_add_backlog(sk, buf, limit)) 1762 if (sk_add_backlog(sk, skb, limit))
1788 rc = -TIPC_ERR_OVERLOAD; 1763 rc = -TIPC_ERR_OVERLOAD;
1789 } 1764 }
1790 spin_unlock_bh(&sk->sk_lock.slock); 1765 spin_unlock_bh(&sk->sk_lock.slock);
@@ -1792,10 +1767,10 @@ int tipc_sk_rcv(struct sk_buff *buf)
1792 if (likely(!rc)) 1767 if (likely(!rc))
1793 return 0; 1768 return 0;
1794exit: 1769exit:
1795 if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc)) 1770 if ((rc < 0) && !tipc_msg_reverse(skb, &dnode, -rc))
1796 return -EHOSTUNREACH; 1771 return -EHOSTUNREACH;
1797 1772
1798 tipc_link_xmit(buf, dnode, 0); 1773 tipc_link_xmit_skb(skb, dnode, 0);
1799 return (rc < 0) ? -EHOSTUNREACH : 0; 1774 return (rc < 0) ? -EHOSTUNREACH : 0;
1800} 1775}
1801 1776
@@ -2053,7 +2028,7 @@ static int tipc_shutdown(struct socket *sock, int how)
2053{ 2028{
2054 struct sock *sk = sock->sk; 2029 struct sock *sk = sock->sk;
2055 struct tipc_sock *tsk = tipc_sk(sk); 2030 struct tipc_sock *tsk = tipc_sk(sk);
2056 struct sk_buff *buf; 2031 struct sk_buff *skb;
2057 u32 dnode; 2032 u32 dnode;
2058 int res; 2033 int res;
2059 2034
@@ -2068,23 +2043,23 @@ static int tipc_shutdown(struct socket *sock, int how)
2068 2043
2069restart: 2044restart:
2070 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */ 2045 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2071 buf = __skb_dequeue(&sk->sk_receive_queue); 2046 skb = __skb_dequeue(&sk->sk_receive_queue);
2072 if (buf) { 2047 if (skb) {
2073 if (TIPC_SKB_CB(buf)->handle != NULL) { 2048 if (TIPC_SKB_CB(skb)->handle != NULL) {
2074 kfree_skb(buf); 2049 kfree_skb(skb);
2075 goto restart; 2050 goto restart;
2076 } 2051 }
2077 if (tipc_msg_reverse(buf, &dnode, TIPC_CONN_SHUTDOWN)) 2052 if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN))
2078 tipc_link_xmit(buf, dnode, tsk->ref); 2053 tipc_link_xmit_skb(skb, dnode, tsk->ref);
2079 tipc_node_remove_conn(dnode, tsk->ref); 2054 tipc_node_remove_conn(dnode, tsk->ref);
2080 } else { 2055 } else {
2081 dnode = tsk_peer_node(tsk); 2056 dnode = tsk_peer_node(tsk);
2082 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, 2057 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2083 TIPC_CONN_MSG, SHORT_H_SIZE, 2058 TIPC_CONN_MSG, SHORT_H_SIZE,
2084 0, dnode, tipc_own_addr, 2059 0, dnode, tipc_own_addr,
2085 tsk_peer_port(tsk), 2060 tsk_peer_port(tsk),
2086 tsk->ref, TIPC_CONN_SHUTDOWN); 2061 tsk->ref, TIPC_CONN_SHUTDOWN);
2087 tipc_link_xmit(buf, dnode, tsk->ref); 2062 tipc_link_xmit_skb(skb, dnode, tsk->ref);
2088 } 2063 }
2089 tsk->connected = 0; 2064 tsk->connected = 0;
2090 sock->state = SS_DISCONNECTING; 2065 sock->state = SS_DISCONNECTING;
@@ -2113,7 +2088,7 @@ static void tipc_sk_timeout(unsigned long ref)
2113{ 2088{
2114 struct tipc_sock *tsk; 2089 struct tipc_sock *tsk;
2115 struct sock *sk; 2090 struct sock *sk;
2116 struct sk_buff *buf = NULL; 2091 struct sk_buff *skb = NULL;
2117 u32 peer_port, peer_node; 2092 u32 peer_port, peer_node;
2118 2093
2119 tsk = tipc_sk_get(ref); 2094 tsk = tipc_sk_get(ref);
@@ -2131,20 +2106,20 @@ static void tipc_sk_timeout(unsigned long ref)
2131 2106
2132 if (tsk->probing_state == TIPC_CONN_PROBING) { 2107 if (tsk->probing_state == TIPC_CONN_PROBING) {
2133 /* Previous probe not answered -> self abort */ 2108 /* Previous probe not answered -> self abort */
2134 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 2109 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
2135 SHORT_H_SIZE, 0, tipc_own_addr, 2110 SHORT_H_SIZE, 0, tipc_own_addr,
2136 peer_node, ref, peer_port, 2111 peer_node, ref, peer_port,
2137 TIPC_ERR_NO_PORT); 2112 TIPC_ERR_NO_PORT);
2138 } else { 2113 } else {
2139 buf = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 2114 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
2140 0, peer_node, tipc_own_addr, 2115 0, peer_node, tipc_own_addr,
2141 peer_port, ref, TIPC_OK); 2116 peer_port, ref, TIPC_OK);
2142 tsk->probing_state = TIPC_CONN_PROBING; 2117 tsk->probing_state = TIPC_CONN_PROBING;
2143 k_start_timer(&tsk->timer, tsk->probing_interval); 2118 k_start_timer(&tsk->timer, tsk->probing_interval);
2144 } 2119 }
2145 bh_unlock_sock(sk); 2120 bh_unlock_sock(sk);
2146 if (buf) 2121 if (skb)
2147 tipc_link_xmit(buf, peer_node, ref); 2122 tipc_link_xmit_skb(skb, peer_node, ref);
2148exit: 2123exit:
2149 tipc_sk_put(tsk); 2124 tipc_sk_put(tsk);
2150} 2125}
@@ -2802,3 +2777,233 @@ void tipc_socket_stop(void)
2802 sock_unregister(tipc_family_ops.family); 2777 sock_unregister(tipc_family_ops.family);
2803 proto_unregister(&tipc_proto); 2778 proto_unregister(&tipc_proto);
2804} 2779}
2780
2781/* Caller should hold socket lock for the passed tipc socket. */
2782static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2783{
2784 u32 peer_node;
2785 u32 peer_port;
2786 struct nlattr *nest;
2787
2788 peer_node = tsk_peer_node(tsk);
2789 peer_port = tsk_peer_port(tsk);
2790
2791 nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2792
2793 if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2794 goto msg_full;
2795 if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2796 goto msg_full;
2797
2798 if (tsk->conn_type != 0) {
2799 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2800 goto msg_full;
2801 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2802 goto msg_full;
2803 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2804 goto msg_full;
2805 }
2806 nla_nest_end(skb, nest);
2807
2808 return 0;
2809
2810msg_full:
2811 nla_nest_cancel(skb, nest);
2812
2813 return -EMSGSIZE;
2814}
2815
2816/* Caller should hold socket lock for the passed tipc socket. */
2817static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2818 struct tipc_sock *tsk)
2819{
2820 int err;
2821 void *hdr;
2822 struct nlattr *attrs;
2823
2824 hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2825 &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2826 if (!hdr)
2827 goto msg_cancel;
2828
2829 attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2830 if (!attrs)
2831 goto genlmsg_cancel;
2832 if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->ref))
2833 goto attr_msg_cancel;
2834 if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr))
2835 goto attr_msg_cancel;
2836
2837 if (tsk->connected) {
2838 err = __tipc_nl_add_sk_con(skb, tsk);
2839 if (err)
2840 goto attr_msg_cancel;
2841 } else if (!list_empty(&tsk->publications)) {
2842 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2843 goto attr_msg_cancel;
2844 }
2845 nla_nest_end(skb, attrs);
2846 genlmsg_end(skb, hdr);
2847
2848 return 0;
2849
2850attr_msg_cancel:
2851 nla_nest_cancel(skb, attrs);
2852genlmsg_cancel:
2853 genlmsg_cancel(skb, hdr);
2854msg_cancel:
2855 return -EMSGSIZE;
2856}
2857
2858int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2859{
2860 int err;
2861 struct tipc_sock *tsk;
2862 u32 prev_ref = cb->args[0];
2863 u32 ref = prev_ref;
2864
2865 tsk = tipc_sk_get_next(&ref);
2866 for (; tsk; tsk = tipc_sk_get_next(&ref)) {
2867 lock_sock(&tsk->sk);
2868 err = __tipc_nl_add_sk(skb, cb, tsk);
2869 release_sock(&tsk->sk);
2870 tipc_sk_put(tsk);
2871 if (err)
2872 break;
2873
2874 prev_ref = ref;
2875 }
2876
2877 cb->args[0] = prev_ref;
2878
2879 return skb->len;
2880}
2881
2882/* Caller should hold socket lock for the passed tipc socket. */
2883static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2884 struct netlink_callback *cb,
2885 struct publication *publ)
2886{
2887 void *hdr;
2888 struct nlattr *attrs;
2889
2890 hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2891 &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2892 if (!hdr)
2893 goto msg_cancel;
2894
2895 attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2896 if (!attrs)
2897 goto genlmsg_cancel;
2898
2899 if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2900 goto attr_msg_cancel;
2901 if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2902 goto attr_msg_cancel;
2903 if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2904 goto attr_msg_cancel;
2905 if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2906 goto attr_msg_cancel;
2907
2908 nla_nest_end(skb, attrs);
2909 genlmsg_end(skb, hdr);
2910
2911 return 0;
2912
2913attr_msg_cancel:
2914 nla_nest_cancel(skb, attrs);
2915genlmsg_cancel:
2916 genlmsg_cancel(skb, hdr);
2917msg_cancel:
2918 return -EMSGSIZE;
2919}
2920
2921/* Caller should hold socket lock for the passed tipc socket. */
2922static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2923 struct netlink_callback *cb,
2924 struct tipc_sock *tsk, u32 *last_publ)
2925{
2926 int err;
2927 struct publication *p;
2928
2929 if (*last_publ) {
2930 list_for_each_entry(p, &tsk->publications, pport_list) {
2931 if (p->key == *last_publ)
2932 break;
2933 }
2934 if (p->key != *last_publ) {
2935 /* We never set seq or call nl_dump_check_consistent()
2936 * this means that setting prev_seq here will cause the
2937 * consistence check to fail in the netlink callback
2938 * handler. Resulting in the last NLMSG_DONE message
2939 * having the NLM_F_DUMP_INTR flag set.
2940 */
2941 cb->prev_seq = 1;
2942 *last_publ = 0;
2943 return -EPIPE;
2944 }
2945 } else {
2946 p = list_first_entry(&tsk->publications, struct publication,
2947 pport_list);
2948 }
2949
2950 list_for_each_entry_from(p, &tsk->publications, pport_list) {
2951 err = __tipc_nl_add_sk_publ(skb, cb, p);
2952 if (err) {
2953 *last_publ = p->key;
2954 return err;
2955 }
2956 }
2957 *last_publ = 0;
2958
2959 return 0;
2960}
2961
2962int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2963{
2964 int err;
2965 u32 tsk_ref = cb->args[0];
2966 u32 last_publ = cb->args[1];
2967 u32 done = cb->args[2];
2968 struct tipc_sock *tsk;
2969
2970 if (!tsk_ref) {
2971 struct nlattr **attrs;
2972 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2973
2974 err = tipc_nlmsg_parse(cb->nlh, &attrs);
2975 if (err)
2976 return err;
2977
2978 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2979 attrs[TIPC_NLA_SOCK],
2980 tipc_nl_sock_policy);
2981 if (err)
2982 return err;
2983
2984 if (!sock[TIPC_NLA_SOCK_REF])
2985 return -EINVAL;
2986
2987 tsk_ref = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2988 }
2989
2990 if (done)
2991 return 0;
2992
2993 tsk = tipc_sk_get(tsk_ref);
2994 if (!tsk)
2995 return -EINVAL;
2996
2997 lock_sock(&tsk->sk);
2998 err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2999 if (!err)
3000 done = 1;
3001 release_sock(&tsk->sk);
3002 tipc_sk_put(tsk);
3003
3004 cb->args[0] = tsk_ref;
3005 cb->args[1] = last_publ;
3006 cb->args[2] = done;
3007
3008 return skb->len;
3009}
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index baa43d03901e..d34089387006 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -36,6 +36,7 @@
36#define _TIPC_SOCK_H 36#define _TIPC_SOCK_H
37 37
38#include <net/sock.h> 38#include <net/sock.h>
39#include <net/genetlink.h>
39 40
40#define TIPC_CONNACK_INTV 256 41#define TIPC_CONNACK_INTV 256
41#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) 42#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
@@ -47,5 +48,7 @@ void tipc_sk_mcast_rcv(struct sk_buff *buf);
47void tipc_sk_reinit(void); 48void tipc_sk_reinit(void);
48int tipc_sk_ref_table_init(u32 requested_size, u32 start); 49int tipc_sk_ref_table_init(u32 requested_size, u32 start);
49void tipc_sk_ref_table_stop(void); 50void tipc_sk_ref_table_stop(void);
51int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb);
52int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb);
50 53
51#endif 54#endif
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 31b5cb232a43..0344206b984f 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -305,7 +305,6 @@ static int subscr_subscribe(struct tipc_subscr *s,
305 kfree(sub); 305 kfree(sub);
306 return -EINVAL; 306 return -EINVAL;
307 } 307 }
308 INIT_LIST_HEAD(&sub->nameseq_list);
309 list_add(&sub->subscription_list, &subscriber->subscription_list); 308 list_add(&sub->subscription_list, &subscriber->subscription_list);
310 sub->subscriber = subscriber; 309 sub->subscriber = subscriber;
311 sub->swap = swap; 310 sub->swap = swap;