aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorPablo Neira Ayuso <pablo@netfilter.org>2012-05-08 13:40:30 -0400
committerPablo Neira Ayuso <pablo@netfilter.org>2012-05-08 13:40:33 -0400
commitf73181c8288fc38747ec4f0f3e8a9052ab785cd5 (patch)
tree9523b3ba632e051b3d196f3bc6d8b0258e7c4fcd /net
parent749c42b620a9511782bc38d0a88702a42434529e (diff)
ipvs: add support for sync threads
Allow master and backup servers to use many threads for sync traffic. Add sysctl var "sync_ports" to define the number of threads. Every thread will use single UDP port, thread 0 will use the default port 8848 while last thread will use port 8848+sync_ports-1. The sync traffic for connections is scheduled to many master threads based on the cp address but one connection is always assigned to same thread to avoid reordering of the sync messages. Remove ip_vs_sync_switch_mode because this check for sync mode change is still risky. Instead, check for mode change under sync_buff_lock. Make sure the backup socks do not block on reading. Special thanks to Aleksey Chudov for helping in all tests. Signed-off-by: Julian Anastasov <ja@ssi.bg> Tested-by: Aleksey Chudov <aleksey.chudov@gmail.com> Signed-off-by: Simon Horman <horms@verge.net.au>
Diffstat (limited to 'net')
-rw-r--r--net/netfilter/ipvs/ip_vs_conn.c7
-rw-r--r--net/netfilter/ipvs/ip_vs_ctl.c29
-rw-r--r--net/netfilter/ipvs/ip_vs_sync.c401
3 files changed, 280 insertions, 157 deletions
diff --git a/net/netfilter/ipvs/ip_vs_conn.c b/net/netfilter/ipvs/ip_vs_conn.c
index 4f3205def28f..c7edf2022c3e 100644
--- a/net/netfilter/ipvs/ip_vs_conn.c
+++ b/net/netfilter/ipvs/ip_vs_conn.c
@@ -619,12 +619,19 @@ struct ip_vs_dest *ip_vs_try_bind_dest(struct ip_vs_conn *cp)
619 if (dest) { 619 if (dest) {
620 struct ip_vs_proto_data *pd; 620 struct ip_vs_proto_data *pd;
621 621
622 spin_lock(&cp->lock);
623 if (cp->dest) {
624 spin_unlock(&cp->lock);
625 return dest;
626 }
627
622 /* Applications work depending on the forwarding method 628 /* Applications work depending on the forwarding method
623 * but better to reassign them always when binding dest */ 629 * but better to reassign them always when binding dest */
624 if (cp->app) 630 if (cp->app)
625 ip_vs_unbind_app(cp); 631 ip_vs_unbind_app(cp);
626 632
627 ip_vs_bind_dest(cp, dest); 633 ip_vs_bind_dest(cp, dest);
634 spin_unlock(&cp->lock);
628 635
629 /* Update its packet transmitter */ 636 /* Update its packet transmitter */
630 cp->packet_xmit = NULL; 637 cp->packet_xmit = NULL;
diff --git a/net/netfilter/ipvs/ip_vs_ctl.c b/net/netfilter/ipvs/ip_vs_ctl.c
index a77b9bd433aa..dd811b8dd97c 100644
--- a/net/netfilter/ipvs/ip_vs_ctl.c
+++ b/net/netfilter/ipvs/ip_vs_ctl.c
@@ -1657,9 +1657,24 @@ proc_do_sync_mode(ctl_table *table, int write,
1657 if ((*valp < 0) || (*valp > 1)) { 1657 if ((*valp < 0) || (*valp > 1)) {
1658 /* Restore the correct value */ 1658 /* Restore the correct value */
1659 *valp = val; 1659 *valp = val;
1660 } else { 1660 }
1661 struct net *net = current->nsproxy->net_ns; 1661 }
1662 ip_vs_sync_switch_mode(net, val); 1662 return rc;
1663}
1664
1665static int
1666proc_do_sync_ports(ctl_table *table, int write,
1667 void __user *buffer, size_t *lenp, loff_t *ppos)
1668{
1669 int *valp = table->data;
1670 int val = *valp;
1671 int rc;
1672
1673 rc = proc_dointvec(table, write, buffer, lenp, ppos);
1674 if (write && (*valp != val)) {
1675 if (*valp < 1 || !is_power_of_2(*valp)) {
1676 /* Restore the correct value */
1677 *valp = val;
1663 } 1678 }
1664 } 1679 }
1665 return rc; 1680 return rc;
@@ -1723,6 +1738,12 @@ static struct ctl_table vs_vars[] = {
1723 .proc_handler = &proc_do_sync_mode, 1738 .proc_handler = &proc_do_sync_mode,
1724 }, 1739 },
1725 { 1740 {
1741 .procname = "sync_ports",
1742 .maxlen = sizeof(int),
1743 .mode = 0644,
1744 .proc_handler = &proc_do_sync_ports,
1745 },
1746 {
1726 .procname = "sync_qlen_max", 1747 .procname = "sync_qlen_max",
1727 .maxlen = sizeof(int), 1748 .maxlen = sizeof(int),
1728 .mode = 0644, 1749 .mode = 0644,
@@ -3686,6 +3707,8 @@ int __net_init ip_vs_control_net_init_sysctl(struct net *net)
3686 tbl[idx++].data = &ipvs->sysctl_snat_reroute; 3707 tbl[idx++].data = &ipvs->sysctl_snat_reroute;
3687 ipvs->sysctl_sync_ver = 1; 3708 ipvs->sysctl_sync_ver = 1;
3688 tbl[idx++].data = &ipvs->sysctl_sync_ver; 3709 tbl[idx++].data = &ipvs->sysctl_sync_ver;
3710 ipvs->sysctl_sync_ports = 1;
3711 tbl[idx++].data = &ipvs->sysctl_sync_ports;
3689 ipvs->sysctl_sync_qlen_max = nr_free_buffer_pages() / 32; 3712 ipvs->sysctl_sync_qlen_max = nr_free_buffer_pages() / 32;
3690 tbl[idx++].data = &ipvs->sysctl_sync_qlen_max; 3713 tbl[idx++].data = &ipvs->sysctl_sync_qlen_max;
3691 ipvs->sysctl_sync_sock_size = 0; 3714 ipvs->sysctl_sync_sock_size = 0;
diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
index 8d6a4219e904..effa10c9e4e3 100644
--- a/net/netfilter/ipvs/ip_vs_sync.c
+++ b/net/netfilter/ipvs/ip_vs_sync.c
@@ -196,6 +196,7 @@ struct ip_vs_sync_thread_data {
196 struct net *net; 196 struct net *net;
197 struct socket *sock; 197 struct socket *sock;
198 char *buf; 198 char *buf;
199 int id;
199}; 200};
200 201
201/* Version 0 definition of packet sizes */ 202/* Version 0 definition of packet sizes */
@@ -271,13 +272,6 @@ struct ip_vs_sync_buff {
271 unsigned char *end; 272 unsigned char *end;
272}; 273};
273 274
274/* multicast addr */
275static struct sockaddr_in mcast_addr = {
276 .sin_family = AF_INET,
277 .sin_port = cpu_to_be16(IP_VS_SYNC_PORT),
278 .sin_addr.s_addr = cpu_to_be32(IP_VS_SYNC_GROUP),
279};
280
281/* 275/*
282 * Copy of struct ip_vs_seq 276 * Copy of struct ip_vs_seq
283 * From unaligned network order to aligned host order 277 * From unaligned network order to aligned host order
@@ -300,22 +294,22 @@ static void hton_seq(struct ip_vs_seq *ho, struct ip_vs_seq *no)
300 put_unaligned_be32(ho->previous_delta, &no->previous_delta); 294 put_unaligned_be32(ho->previous_delta, &no->previous_delta);
301} 295}
302 296
303static inline struct ip_vs_sync_buff *sb_dequeue(struct netns_ipvs *ipvs) 297static inline struct ip_vs_sync_buff *
298sb_dequeue(struct netns_ipvs *ipvs, struct ipvs_master_sync_state *ms)
304{ 299{
305 struct ip_vs_sync_buff *sb; 300 struct ip_vs_sync_buff *sb;
306 301
307 spin_lock_bh(&ipvs->sync_lock); 302 spin_lock_bh(&ipvs->sync_lock);
308 if (list_empty(&ipvs->sync_queue)) { 303 if (list_empty(&ms->sync_queue)) {
309 sb = NULL; 304 sb = NULL;
310 __set_current_state(TASK_INTERRUPTIBLE); 305 __set_current_state(TASK_INTERRUPTIBLE);
311 } else { 306 } else {
312 sb = list_entry(ipvs->sync_queue.next, 307 sb = list_entry(ms->sync_queue.next, struct ip_vs_sync_buff,
313 struct ip_vs_sync_buff,
314 list); 308 list);
315 list_del(&sb->list); 309 list_del(&sb->list);
316 ipvs->sync_queue_len--; 310 ms->sync_queue_len--;
317 if (!ipvs->sync_queue_len) 311 if (!ms->sync_queue_len)
318 ipvs->sync_queue_delay = 0; 312 ms->sync_queue_delay = 0;
319 } 313 }
320 spin_unlock_bh(&ipvs->sync_lock); 314 spin_unlock_bh(&ipvs->sync_lock);
321 315
@@ -338,7 +332,7 @@ ip_vs_sync_buff_create(struct netns_ipvs *ipvs)
338 kfree(sb); 332 kfree(sb);
339 return NULL; 333 return NULL;
340 } 334 }
341 sb->mesg->reserved = 0; /* old nr_conns i.e. must be zeo now */ 335 sb->mesg->reserved = 0; /* old nr_conns i.e. must be zero now */
342 sb->mesg->version = SYNC_PROTO_VER; 336 sb->mesg->version = SYNC_PROTO_VER;
343 sb->mesg->syncid = ipvs->master_syncid; 337 sb->mesg->syncid = ipvs->master_syncid;
344 sb->mesg->size = sizeof(struct ip_vs_sync_mesg); 338 sb->mesg->size = sizeof(struct ip_vs_sync_mesg);
@@ -357,20 +351,21 @@ static inline void ip_vs_sync_buff_release(struct ip_vs_sync_buff *sb)
357 kfree(sb); 351 kfree(sb);
358} 352}
359 353
360static inline void sb_queue_tail(struct netns_ipvs *ipvs) 354static inline void sb_queue_tail(struct netns_ipvs *ipvs,
355 struct ipvs_master_sync_state *ms)
361{ 356{
362 struct ip_vs_sync_buff *sb = ipvs->sync_buff; 357 struct ip_vs_sync_buff *sb = ms->sync_buff;
363 358
364 spin_lock(&ipvs->sync_lock); 359 spin_lock(&ipvs->sync_lock);
365 if (ipvs->sync_state & IP_VS_STATE_MASTER && 360 if (ipvs->sync_state & IP_VS_STATE_MASTER &&
366 ipvs->sync_queue_len < sysctl_sync_qlen_max(ipvs)) { 361 ms->sync_queue_len < sysctl_sync_qlen_max(ipvs)) {
367 if (!ipvs->sync_queue_len) 362 if (!ms->sync_queue_len)
368 schedule_delayed_work(&ipvs->master_wakeup_work, 363 schedule_delayed_work(&ms->master_wakeup_work,
369 max(IPVS_SYNC_SEND_DELAY, 1)); 364 max(IPVS_SYNC_SEND_DELAY, 1));
370 ipvs->sync_queue_len++; 365 ms->sync_queue_len++;
371 list_add_tail(&sb->list, &ipvs->sync_queue); 366 list_add_tail(&sb->list, &ms->sync_queue);
372 if ((++ipvs->sync_queue_delay) == IPVS_SYNC_WAKEUP_RATE) 367 if ((++ms->sync_queue_delay) == IPVS_SYNC_WAKEUP_RATE)
373 wake_up_process(ipvs->master_thread); 368 wake_up_process(ms->master_thread);
374 } else 369 } else
375 ip_vs_sync_buff_release(sb); 370 ip_vs_sync_buff_release(sb);
376 spin_unlock(&ipvs->sync_lock); 371 spin_unlock(&ipvs->sync_lock);
@@ -381,15 +376,15 @@ static inline void sb_queue_tail(struct netns_ipvs *ipvs)
381 * than the specified time or the specified time is zero. 376 * than the specified time or the specified time is zero.
382 */ 377 */
383static inline struct ip_vs_sync_buff * 378static inline struct ip_vs_sync_buff *
384get_curr_sync_buff(struct netns_ipvs *ipvs, unsigned long time) 379get_curr_sync_buff(struct netns_ipvs *ipvs, struct ipvs_master_sync_state *ms,
380 unsigned long time)
385{ 381{
386 struct ip_vs_sync_buff *sb; 382 struct ip_vs_sync_buff *sb;
387 383
388 spin_lock_bh(&ipvs->sync_buff_lock); 384 spin_lock_bh(&ipvs->sync_buff_lock);
389 if (ipvs->sync_buff && 385 sb = ms->sync_buff;
390 time_after_eq(jiffies - ipvs->sync_buff->firstuse, time)) { 386 if (sb && time_after_eq(jiffies - sb->firstuse, time)) {
391 sb = ipvs->sync_buff; 387 ms->sync_buff = NULL;
392 ipvs->sync_buff = NULL;
393 __set_current_state(TASK_RUNNING); 388 __set_current_state(TASK_RUNNING);
394 } else 389 } else
395 sb = NULL; 390 sb = NULL;
@@ -397,31 +392,10 @@ get_curr_sync_buff(struct netns_ipvs *ipvs, unsigned long time)
397 return sb; 392 return sb;
398} 393}
399 394
400/* 395static inline int
401 * Switch mode from sending version 0 or 1 396select_master_thread_id(struct netns_ipvs *ipvs, struct ip_vs_conn *cp)
402 * - must handle sync_buf
403 */
404void ip_vs_sync_switch_mode(struct net *net, int mode)
405{ 397{
406 struct netns_ipvs *ipvs = net_ipvs(net); 398 return ((long) cp >> (1 + ilog2(sizeof(*cp)))) & ipvs->threads_mask;
407 struct ip_vs_sync_buff *sb;
408
409 spin_lock_bh(&ipvs->sync_buff_lock);
410 if (!(ipvs->sync_state & IP_VS_STATE_MASTER))
411 goto unlock;
412 sb = ipvs->sync_buff;
413 if (mode == sysctl_sync_ver(ipvs) || !sb)
414 goto unlock;
415
416 /* Buffer empty ? then let buf_create do the job */
417 if (sb->mesg->size <= sizeof(struct ip_vs_sync_mesg)) {
418 ip_vs_sync_buff_release(sb);
419 ipvs->sync_buff = NULL;
420 } else
421 sb_queue_tail(ipvs);
422
423unlock:
424 spin_unlock_bh(&ipvs->sync_buff_lock);
425} 399}
426 400
427/* 401/*
@@ -543,6 +517,9 @@ static void ip_vs_sync_conn_v0(struct net *net, struct ip_vs_conn *cp,
543 struct netns_ipvs *ipvs = net_ipvs(net); 517 struct netns_ipvs *ipvs = net_ipvs(net);
544 struct ip_vs_sync_mesg_v0 *m; 518 struct ip_vs_sync_mesg_v0 *m;
545 struct ip_vs_sync_conn_v0 *s; 519 struct ip_vs_sync_conn_v0 *s;
520 struct ip_vs_sync_buff *buff;
521 struct ipvs_master_sync_state *ms;
522 int id;
546 int len; 523 int len;
547 524
548 if (unlikely(cp->af != AF_INET)) 525 if (unlikely(cp->af != AF_INET))
@@ -555,20 +532,37 @@ static void ip_vs_sync_conn_v0(struct net *net, struct ip_vs_conn *cp,
555 return; 532 return;
556 533
557 spin_lock(&ipvs->sync_buff_lock); 534 spin_lock(&ipvs->sync_buff_lock);
558 if (!ipvs->sync_buff) { 535 if (!(ipvs->sync_state & IP_VS_STATE_MASTER)) {
559 ipvs->sync_buff = 536 spin_unlock(&ipvs->sync_buff_lock);
560 ip_vs_sync_buff_create_v0(ipvs); 537 return;
561 if (!ipvs->sync_buff) { 538 }
539
540 id = select_master_thread_id(ipvs, cp);
541 ms = &ipvs->ms[id];
542 buff = ms->sync_buff;
543 if (buff) {
544 m = (struct ip_vs_sync_mesg_v0 *) buff->mesg;
545 /* Send buffer if it is for v1 */
546 if (!m->nr_conns) {
547 sb_queue_tail(ipvs, ms);
548 ms->sync_buff = NULL;
549 buff = NULL;
550 }
551 }
552 if (!buff) {
553 buff = ip_vs_sync_buff_create_v0(ipvs);
554 if (!buff) {
562 spin_unlock(&ipvs->sync_buff_lock); 555 spin_unlock(&ipvs->sync_buff_lock);
563 pr_err("ip_vs_sync_buff_create failed.\n"); 556 pr_err("ip_vs_sync_buff_create failed.\n");
564 return; 557 return;
565 } 558 }
559 ms->sync_buff = buff;
566 } 560 }
567 561
568 len = (cp->flags & IP_VS_CONN_F_SEQ_MASK) ? FULL_CONN_SIZE : 562 len = (cp->flags & IP_VS_CONN_F_SEQ_MASK) ? FULL_CONN_SIZE :
569 SIMPLE_CONN_SIZE; 563 SIMPLE_CONN_SIZE;
570 m = (struct ip_vs_sync_mesg_v0 *)ipvs->sync_buff->mesg; 564 m = (struct ip_vs_sync_mesg_v0 *) buff->mesg;
571 s = (struct ip_vs_sync_conn_v0 *)ipvs->sync_buff->head; 565 s = (struct ip_vs_sync_conn_v0 *) buff->head;
572 566
573 /* copy members */ 567 /* copy members */
574 s->reserved = 0; 568 s->reserved = 0;
@@ -589,12 +583,12 @@ static void ip_vs_sync_conn_v0(struct net *net, struct ip_vs_conn *cp,
589 583
590 m->nr_conns++; 584 m->nr_conns++;
591 m->size += len; 585 m->size += len;
592 ipvs->sync_buff->head += len; 586 buff->head += len;
593 587
594 /* check if there is a space for next one */ 588 /* check if there is a space for next one */
595 if (ipvs->sync_buff->head + FULL_CONN_SIZE > ipvs->sync_buff->end) { 589 if (buff->head + FULL_CONN_SIZE > buff->end) {
596 sb_queue_tail(ipvs); 590 sb_queue_tail(ipvs, ms);
597 ipvs->sync_buff = NULL; 591 ms->sync_buff = NULL;
598 } 592 }
599 spin_unlock(&ipvs->sync_buff_lock); 593 spin_unlock(&ipvs->sync_buff_lock);
600 594
@@ -619,6 +613,9 @@ void ip_vs_sync_conn(struct net *net, struct ip_vs_conn *cp, int pkts)
619 struct netns_ipvs *ipvs = net_ipvs(net); 613 struct netns_ipvs *ipvs = net_ipvs(net);
620 struct ip_vs_sync_mesg *m; 614 struct ip_vs_sync_mesg *m;
621 union ip_vs_sync_conn *s; 615 union ip_vs_sync_conn *s;
616 struct ip_vs_sync_buff *buff;
617 struct ipvs_master_sync_state *ms;
618 int id;
622 __u8 *p; 619 __u8 *p;
623 unsigned int len, pe_name_len, pad; 620 unsigned int len, pe_name_len, pad;
624 621
@@ -645,6 +642,13 @@ sloop:
645 } 642 }
646 643
647 spin_lock(&ipvs->sync_buff_lock); 644 spin_lock(&ipvs->sync_buff_lock);
645 if (!(ipvs->sync_state & IP_VS_STATE_MASTER)) {
646 spin_unlock(&ipvs->sync_buff_lock);
647 return;
648 }
649
650 id = select_master_thread_id(ipvs, cp);
651 ms = &ipvs->ms[id];
648 652
649#ifdef CONFIG_IP_VS_IPV6 653#ifdef CONFIG_IP_VS_IPV6
650 if (cp->af == AF_INET6) 654 if (cp->af == AF_INET6)
@@ -663,27 +667,32 @@ sloop:
663 667
664 /* check if there is a space for this one */ 668 /* check if there is a space for this one */
665 pad = 0; 669 pad = 0;
666 if (ipvs->sync_buff) { 670 buff = ms->sync_buff;
667 pad = (4 - (size_t)ipvs->sync_buff->head) & 3; 671 if (buff) {
668 if (ipvs->sync_buff->head + len + pad > ipvs->sync_buff->end) { 672 m = buff->mesg;
669 sb_queue_tail(ipvs); 673 pad = (4 - (size_t) buff->head) & 3;
670 ipvs->sync_buff = NULL; 674 /* Send buffer if it is for v0 */
675 if (buff->head + len + pad > buff->end || m->reserved) {
676 sb_queue_tail(ipvs, ms);
677 ms->sync_buff = NULL;
678 buff = NULL;
671 pad = 0; 679 pad = 0;
672 } 680 }
673 } 681 }
674 682
675 if (!ipvs->sync_buff) { 683 if (!buff) {
676 ipvs->sync_buff = ip_vs_sync_buff_create(ipvs); 684 buff = ip_vs_sync_buff_create(ipvs);
677 if (!ipvs->sync_buff) { 685 if (!buff) {
678 spin_unlock(&ipvs->sync_buff_lock); 686 spin_unlock(&ipvs->sync_buff_lock);
679 pr_err("ip_vs_sync_buff_create failed.\n"); 687 pr_err("ip_vs_sync_buff_create failed.\n");
680 return; 688 return;
681 } 689 }
690 ms->sync_buff = buff;
691 m = buff->mesg;
682 } 692 }
683 693
684 m = ipvs->sync_buff->mesg; 694 p = buff->head;
685 p = ipvs->sync_buff->head; 695 buff->head += pad + len;
686 ipvs->sync_buff->head += pad + len;
687 m->size += pad + len; 696 m->size += pad + len;
688 /* Add ev. padding from prev. sync_conn */ 697 /* Add ev. padding from prev. sync_conn */
689 while (pad--) 698 while (pad--)
@@ -834,6 +843,7 @@ static void ip_vs_proc_conn(struct net *net, struct ip_vs_conn_param *param,
834 kfree(param->pe_data); 843 kfree(param->pe_data);
835 844
836 dest = cp->dest; 845 dest = cp->dest;
846 spin_lock(&cp->lock);
837 if ((cp->flags ^ flags) & IP_VS_CONN_F_INACTIVE && 847 if ((cp->flags ^ flags) & IP_VS_CONN_F_INACTIVE &&
838 !(flags & IP_VS_CONN_F_TEMPLATE) && dest) { 848 !(flags & IP_VS_CONN_F_TEMPLATE) && dest) {
839 if (flags & IP_VS_CONN_F_INACTIVE) { 849 if (flags & IP_VS_CONN_F_INACTIVE) {
@@ -847,6 +857,7 @@ static void ip_vs_proc_conn(struct net *net, struct ip_vs_conn_param *param,
847 flags &= IP_VS_CONN_F_BACKUP_UPD_MASK; 857 flags &= IP_VS_CONN_F_BACKUP_UPD_MASK;
848 flags |= cp->flags & ~IP_VS_CONN_F_BACKUP_UPD_MASK; 858 flags |= cp->flags & ~IP_VS_CONN_F_BACKUP_UPD_MASK;
849 cp->flags = flags; 859 cp->flags = flags;
860 spin_unlock(&cp->lock);
850 if (!dest) { 861 if (!dest) {
851 dest = ip_vs_try_bind_dest(cp); 862 dest = ip_vs_try_bind_dest(cp);
852 if (dest) 863 if (dest)
@@ -1399,9 +1410,15 @@ static int bind_mcastif_addr(struct socket *sock, char *ifname)
1399/* 1410/*
1400 * Set up sending multicast socket over UDP 1411 * Set up sending multicast socket over UDP
1401 */ 1412 */
1402static struct socket *make_send_sock(struct net *net) 1413static struct socket *make_send_sock(struct net *net, int id)
1403{ 1414{
1404 struct netns_ipvs *ipvs = net_ipvs(net); 1415 struct netns_ipvs *ipvs = net_ipvs(net);
1416 /* multicast addr */
1417 struct sockaddr_in mcast_addr = {
1418 .sin_family = AF_INET,
1419 .sin_port = cpu_to_be16(IP_VS_SYNC_PORT + id),
1420 .sin_addr.s_addr = cpu_to_be32(IP_VS_SYNC_GROUP),
1421 };
1405 struct socket *sock; 1422 struct socket *sock;
1406 int result; 1423 int result;
1407 1424
@@ -1453,9 +1470,15 @@ error:
1453/* 1470/*
1454 * Set up receiving multicast socket over UDP 1471 * Set up receiving multicast socket over UDP
1455 */ 1472 */
1456static struct socket *make_receive_sock(struct net *net) 1473static struct socket *make_receive_sock(struct net *net, int id)
1457{ 1474{
1458 struct netns_ipvs *ipvs = net_ipvs(net); 1475 struct netns_ipvs *ipvs = net_ipvs(net);
1476 /* multicast addr */
1477 struct sockaddr_in mcast_addr = {
1478 .sin_family = AF_INET,
1479 .sin_port = cpu_to_be16(IP_VS_SYNC_PORT + id),
1480 .sin_addr.s_addr = cpu_to_be32(IP_VS_SYNC_GROUP),
1481 };
1459 struct socket *sock; 1482 struct socket *sock;
1460 int result; 1483 int result;
1461 1484
@@ -1549,10 +1572,10 @@ ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
1549 iov.iov_base = buffer; 1572 iov.iov_base = buffer;
1550 iov.iov_len = (size_t)buflen; 1573 iov.iov_len = (size_t)buflen;
1551 1574
1552 len = kernel_recvmsg(sock, &msg, &iov, 1, buflen, 0); 1575 len = kernel_recvmsg(sock, &msg, &iov, 1, buflen, MSG_DONTWAIT);
1553 1576
1554 if (len < 0) 1577 if (len < 0)
1555 return -1; 1578 return len;
1556 1579
1557 LeaveFunction(7); 1580 LeaveFunction(7);
1558 return len; 1581 return len;
@@ -1561,44 +1584,47 @@ ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
1561/* Wakeup the master thread for sending */ 1584/* Wakeup the master thread for sending */
1562static void master_wakeup_work_handler(struct work_struct *work) 1585static void master_wakeup_work_handler(struct work_struct *work)
1563{ 1586{
1564 struct netns_ipvs *ipvs = container_of(work, struct netns_ipvs, 1587 struct ipvs_master_sync_state *ms =
1565 master_wakeup_work.work); 1588 container_of(work, struct ipvs_master_sync_state,
1589 master_wakeup_work.work);
1590 struct netns_ipvs *ipvs = ms->ipvs;
1566 1591
1567 spin_lock_bh(&ipvs->sync_lock); 1592 spin_lock_bh(&ipvs->sync_lock);
1568 if (ipvs->sync_queue_len && 1593 if (ms->sync_queue_len &&
1569 ipvs->sync_queue_delay < IPVS_SYNC_WAKEUP_RATE) { 1594 ms->sync_queue_delay < IPVS_SYNC_WAKEUP_RATE) {
1570 ipvs->sync_queue_delay = IPVS_SYNC_WAKEUP_RATE; 1595 ms->sync_queue_delay = IPVS_SYNC_WAKEUP_RATE;
1571 wake_up_process(ipvs->master_thread); 1596 wake_up_process(ms->master_thread);
1572 } 1597 }
1573 spin_unlock_bh(&ipvs->sync_lock); 1598 spin_unlock_bh(&ipvs->sync_lock);
1574} 1599}
1575 1600
1576/* Get next buffer to send */ 1601/* Get next buffer to send */
1577static inline struct ip_vs_sync_buff * 1602static inline struct ip_vs_sync_buff *
1578next_sync_buff(struct netns_ipvs *ipvs) 1603next_sync_buff(struct netns_ipvs *ipvs, struct ipvs_master_sync_state *ms)
1579{ 1604{
1580 struct ip_vs_sync_buff *sb; 1605 struct ip_vs_sync_buff *sb;
1581 1606
1582 sb = sb_dequeue(ipvs); 1607 sb = sb_dequeue(ipvs, ms);
1583 if (sb) 1608 if (sb)
1584 return sb; 1609 return sb;
1585 /* Do not delay entries in buffer for more than 2 seconds */ 1610 /* Do not delay entries in buffer for more than 2 seconds */
1586 return get_curr_sync_buff(ipvs, IPVS_SYNC_FLUSH_TIME); 1611 return get_curr_sync_buff(ipvs, ms, IPVS_SYNC_FLUSH_TIME);
1587} 1612}
1588 1613
1589static int sync_thread_master(void *data) 1614static int sync_thread_master(void *data)
1590{ 1615{
1591 struct ip_vs_sync_thread_data *tinfo = data; 1616 struct ip_vs_sync_thread_data *tinfo = data;
1592 struct netns_ipvs *ipvs = net_ipvs(tinfo->net); 1617 struct netns_ipvs *ipvs = net_ipvs(tinfo->net);
1618 struct ipvs_master_sync_state *ms = &ipvs->ms[tinfo->id];
1593 struct sock *sk = tinfo->sock->sk; 1619 struct sock *sk = tinfo->sock->sk;
1594 struct ip_vs_sync_buff *sb; 1620 struct ip_vs_sync_buff *sb;
1595 1621
1596 pr_info("sync thread started: state = MASTER, mcast_ifn = %s, " 1622 pr_info("sync thread started: state = MASTER, mcast_ifn = %s, "
1597 "syncid = %d\n", 1623 "syncid = %d, id = %d\n",
1598 ipvs->master_mcast_ifn, ipvs->master_syncid); 1624 ipvs->master_mcast_ifn, ipvs->master_syncid, tinfo->id);
1599 1625
1600 for (;;) { 1626 for (;;) {
1601 sb = next_sync_buff(ipvs); 1627 sb = next_sync_buff(ipvs, ms);
1602 if (unlikely(kthread_should_stop())) 1628 if (unlikely(kthread_should_stop()))
1603 break; 1629 break;
1604 if (!sb) { 1630 if (!sb) {
@@ -1624,12 +1650,12 @@ done:
1624 ip_vs_sync_buff_release(sb); 1650 ip_vs_sync_buff_release(sb);
1625 1651
1626 /* clean up the sync_buff queue */ 1652 /* clean up the sync_buff queue */
1627 while ((sb = sb_dequeue(ipvs))) 1653 while ((sb = sb_dequeue(ipvs, ms)))
1628 ip_vs_sync_buff_release(sb); 1654 ip_vs_sync_buff_release(sb);
1629 __set_current_state(TASK_RUNNING); 1655 __set_current_state(TASK_RUNNING);
1630 1656
1631 /* clean up the current sync_buff */ 1657 /* clean up the current sync_buff */
1632 sb = get_curr_sync_buff(ipvs, 0); 1658 sb = get_curr_sync_buff(ipvs, ms, 0);
1633 if (sb) 1659 if (sb)
1634 ip_vs_sync_buff_release(sb); 1660 ip_vs_sync_buff_release(sb);
1635 1661
@@ -1648,8 +1674,8 @@ static int sync_thread_backup(void *data)
1648 int len; 1674 int len;
1649 1675
1650 pr_info("sync thread started: state = BACKUP, mcast_ifn = %s, " 1676 pr_info("sync thread started: state = BACKUP, mcast_ifn = %s, "
1651 "syncid = %d\n", 1677 "syncid = %d, id = %d\n",
1652 ipvs->backup_mcast_ifn, ipvs->backup_syncid); 1678 ipvs->backup_mcast_ifn, ipvs->backup_syncid, tinfo->id);
1653 1679
1654 while (!kthread_should_stop()) { 1680 while (!kthread_should_stop()) {
1655 wait_event_interruptible(*sk_sleep(tinfo->sock->sk), 1681 wait_event_interruptible(*sk_sleep(tinfo->sock->sk),
@@ -1661,7 +1687,8 @@ static int sync_thread_backup(void *data)
1661 len = ip_vs_receive(tinfo->sock, tinfo->buf, 1687 len = ip_vs_receive(tinfo->sock, tinfo->buf,
1662 ipvs->recv_mesg_maxlen); 1688 ipvs->recv_mesg_maxlen);
1663 if (len <= 0) { 1689 if (len <= 0) {
1664 pr_err("receiving message error\n"); 1690 if (len != -EAGAIN)
1691 pr_err("receiving message error\n");
1665 break; 1692 break;
1666 } 1693 }
1667 1694
@@ -1685,90 +1712,140 @@ static int sync_thread_backup(void *data)
1685int start_sync_thread(struct net *net, int state, char *mcast_ifn, __u8 syncid) 1712int start_sync_thread(struct net *net, int state, char *mcast_ifn, __u8 syncid)
1686{ 1713{
1687 struct ip_vs_sync_thread_data *tinfo; 1714 struct ip_vs_sync_thread_data *tinfo;
1688 struct task_struct **realtask, *task; 1715 struct task_struct **array = NULL, *task;
1689 struct socket *sock; 1716 struct socket *sock;
1690 struct netns_ipvs *ipvs = net_ipvs(net); 1717 struct netns_ipvs *ipvs = net_ipvs(net);
1691 char *name, *buf = NULL; 1718 char *name;
1692 int (*threadfn)(void *data); 1719 int (*threadfn)(void *data);
1720 int id, count;
1693 int result = -ENOMEM; 1721 int result = -ENOMEM;
1694 1722
1695 IP_VS_DBG(7, "%s(): pid %d\n", __func__, task_pid_nr(current)); 1723 IP_VS_DBG(7, "%s(): pid %d\n", __func__, task_pid_nr(current));
1696 IP_VS_DBG(7, "Each ip_vs_sync_conn entry needs %Zd bytes\n", 1724 IP_VS_DBG(7, "Each ip_vs_sync_conn entry needs %Zd bytes\n",
1697 sizeof(struct ip_vs_sync_conn_v0)); 1725 sizeof(struct ip_vs_sync_conn_v0));
1698 1726
1727 if (!ipvs->sync_state) {
1728 count = clamp(sysctl_sync_ports(ipvs), 1, IPVS_SYNC_PORTS_MAX);
1729 ipvs->threads_mask = count - 1;
1730 } else
1731 count = ipvs->threads_mask + 1;
1699 1732
1700 if (state == IP_VS_STATE_MASTER) { 1733 if (state == IP_VS_STATE_MASTER) {
1701 if (ipvs->master_thread) 1734 if (ipvs->ms)
1702 return -EEXIST; 1735 return -EEXIST;
1703 1736
1704 strlcpy(ipvs->master_mcast_ifn, mcast_ifn, 1737 strlcpy(ipvs->master_mcast_ifn, mcast_ifn,
1705 sizeof(ipvs->master_mcast_ifn)); 1738 sizeof(ipvs->master_mcast_ifn));
1706 ipvs->master_syncid = syncid; 1739 ipvs->master_syncid = syncid;
1707 realtask = &ipvs->master_thread; 1740 name = "ipvs-m:%d:%d";
1708 name = "ipvs_master:%d";
1709 threadfn = sync_thread_master; 1741 threadfn = sync_thread_master;
1710 ipvs->sync_queue_len = 0;
1711 ipvs->sync_queue_delay = 0;
1712 INIT_DELAYED_WORK(&ipvs->master_wakeup_work,
1713 master_wakeup_work_handler);
1714 sock = make_send_sock(net);
1715 } else if (state == IP_VS_STATE_BACKUP) { 1742 } else if (state == IP_VS_STATE_BACKUP) {
1716 if (ipvs->backup_thread) 1743 if (ipvs->backup_threads)
1717 return -EEXIST; 1744 return -EEXIST;
1718 1745
1719 strlcpy(ipvs->backup_mcast_ifn, mcast_ifn, 1746 strlcpy(ipvs->backup_mcast_ifn, mcast_ifn,
1720 sizeof(ipvs->backup_mcast_ifn)); 1747 sizeof(ipvs->backup_mcast_ifn));
1721 ipvs->backup_syncid = syncid; 1748 ipvs->backup_syncid = syncid;
1722 realtask = &ipvs->backup_thread; 1749 name = "ipvs-b:%d:%d";
1723 name = "ipvs_backup:%d";
1724 threadfn = sync_thread_backup; 1750 threadfn = sync_thread_backup;
1725 sock = make_receive_sock(net);
1726 } else { 1751 } else {
1727 return -EINVAL; 1752 return -EINVAL;
1728 } 1753 }
1729 1754
1730 if (IS_ERR(sock)) { 1755 if (state == IP_VS_STATE_MASTER) {
1731 result = PTR_ERR(sock); 1756 struct ipvs_master_sync_state *ms;
1732 goto out;
1733 }
1734 1757
1735 set_sync_mesg_maxlen(net, state); 1758 ipvs->ms = kzalloc(count * sizeof(ipvs->ms[0]), GFP_KERNEL);
1736 if (state == IP_VS_STATE_BACKUP) { 1759 if (!ipvs->ms)
1737 buf = kmalloc(ipvs->recv_mesg_maxlen, GFP_KERNEL); 1760 goto out;
1738 if (!buf) 1761 ms = ipvs->ms;
1739 goto outsocket; 1762 for (id = 0; id < count; id++, ms++) {
1763 INIT_LIST_HEAD(&ms->sync_queue);
1764 ms->sync_queue_len = 0;
1765 ms->sync_queue_delay = 0;
1766 INIT_DELAYED_WORK(&ms->master_wakeup_work,
1767 master_wakeup_work_handler);
1768 ms->ipvs = ipvs;
1769 }
1770 } else {
1771 array = kzalloc(count * sizeof(struct task_struct *),
1772 GFP_KERNEL);
1773 if (!array)
1774 goto out;
1740 } 1775 }
1776 set_sync_mesg_maxlen(net, state);
1741 1777
1742 tinfo = kmalloc(sizeof(*tinfo), GFP_KERNEL); 1778 tinfo = NULL;
1743 if (!tinfo) 1779 for (id = 0; id < count; id++) {
1744 goto outbuf; 1780 if (state == IP_VS_STATE_MASTER)
1745 1781 sock = make_send_sock(net, id);
1746 tinfo->net = net; 1782 else
1747 tinfo->sock = sock; 1783 sock = make_receive_sock(net, id);
1748 tinfo->buf = buf; 1784 if (IS_ERR(sock)) {
1785 result = PTR_ERR(sock);
1786 goto outtinfo;
1787 }
1788 tinfo = kmalloc(sizeof(*tinfo), GFP_KERNEL);
1789 if (!tinfo)
1790 goto outsocket;
1791 tinfo->net = net;
1792 tinfo->sock = sock;
1793 if (state == IP_VS_STATE_BACKUP) {
1794 tinfo->buf = kmalloc(ipvs->recv_mesg_maxlen,
1795 GFP_KERNEL);
1796 if (!tinfo->buf)
1797 goto outtinfo;
1798 }
1799 tinfo->id = id;
1749 1800
1750 task = kthread_run(threadfn, tinfo, name, ipvs->gen); 1801 task = kthread_run(threadfn, tinfo, name, ipvs->gen, id);
1751 if (IS_ERR(task)) { 1802 if (IS_ERR(task)) {
1752 result = PTR_ERR(task); 1803 result = PTR_ERR(task);
1753 goto outtinfo; 1804 goto outtinfo;
1805 }
1806 tinfo = NULL;
1807 if (state == IP_VS_STATE_MASTER)
1808 ipvs->ms[id].master_thread = task;
1809 else
1810 array[id] = task;
1754 } 1811 }
1755 1812
1756 /* mark as active */ 1813 /* mark as active */
1757 *realtask = task; 1814
1815 if (state == IP_VS_STATE_BACKUP)
1816 ipvs->backup_threads = array;
1817 spin_lock_bh(&ipvs->sync_buff_lock);
1758 ipvs->sync_state |= state; 1818 ipvs->sync_state |= state;
1819 spin_unlock_bh(&ipvs->sync_buff_lock);
1759 1820
1760 /* increase the module use count */ 1821 /* increase the module use count */
1761 ip_vs_use_count_inc(); 1822 ip_vs_use_count_inc();
1762 1823
1763 return 0; 1824 return 0;
1764 1825
1765outtinfo:
1766 kfree(tinfo);
1767outbuf:
1768 kfree(buf);
1769outsocket: 1826outsocket:
1770 sk_release_kernel(sock->sk); 1827 sk_release_kernel(sock->sk);
1828
1829outtinfo:
1830 if (tinfo) {
1831 sk_release_kernel(tinfo->sock->sk);
1832 kfree(tinfo->buf);
1833 kfree(tinfo);
1834 }
1835 count = id;
1836 while (count-- > 0) {
1837 if (state == IP_VS_STATE_MASTER)
1838 kthread_stop(ipvs->ms[count].master_thread);
1839 else
1840 kthread_stop(array[count]);
1841 }
1842 kfree(array);
1843
1771out: 1844out:
1845 if (!(ipvs->sync_state & IP_VS_STATE_MASTER)) {
1846 kfree(ipvs->ms);
1847 ipvs->ms = NULL;
1848 }
1772 return result; 1849 return result;
1773} 1850}
1774 1851
@@ -1776,39 +1853,60 @@ out:
1776int stop_sync_thread(struct net *net, int state) 1853int stop_sync_thread(struct net *net, int state)
1777{ 1854{
1778 struct netns_ipvs *ipvs = net_ipvs(net); 1855 struct netns_ipvs *ipvs = net_ipvs(net);
1856 struct task_struct **array;
1857 int id;
1779 int retc = -EINVAL; 1858 int retc = -EINVAL;
1780 1859
1781 IP_VS_DBG(7, "%s(): pid %d\n", __func__, task_pid_nr(current)); 1860 IP_VS_DBG(7, "%s(): pid %d\n", __func__, task_pid_nr(current));
1782 1861
1783 if (state == IP_VS_STATE_MASTER) { 1862 if (state == IP_VS_STATE_MASTER) {
1784 if (!ipvs->master_thread) 1863 if (!ipvs->ms)
1785 return -ESRCH; 1864 return -ESRCH;
1786 1865
1787 pr_info("stopping master sync thread %d ...\n",
1788 task_pid_nr(ipvs->master_thread));
1789
1790 /* 1866 /*
1791 * The lock synchronizes with sb_queue_tail(), so that we don't 1867 * The lock synchronizes with sb_queue_tail(), so that we don't
1792 * add sync buffers to the queue, when we are already in 1868 * add sync buffers to the queue, when we are already in
1793 * progress of stopping the master sync daemon. 1869 * progress of stopping the master sync daemon.
1794 */ 1870 */
1795 1871
1796 spin_lock_bh(&ipvs->sync_lock); 1872 spin_lock_bh(&ipvs->sync_buff_lock);
1873 spin_lock(&ipvs->sync_lock);
1797 ipvs->sync_state &= ~IP_VS_STATE_MASTER; 1874 ipvs->sync_state &= ~IP_VS_STATE_MASTER;
1798 spin_unlock_bh(&ipvs->sync_lock); 1875 spin_unlock(&ipvs->sync_lock);
1799 cancel_delayed_work_sync(&ipvs->master_wakeup_work); 1876 spin_unlock_bh(&ipvs->sync_buff_lock);
1800 retc = kthread_stop(ipvs->master_thread); 1877
1801 ipvs->master_thread = NULL; 1878 retc = 0;
1879 for (id = ipvs->threads_mask; id >= 0; id--) {
1880 struct ipvs_master_sync_state *ms = &ipvs->ms[id];
1881 int ret;
1882
1883 pr_info("stopping master sync thread %d ...\n",
1884 task_pid_nr(ms->master_thread));
1885 cancel_delayed_work_sync(&ms->master_wakeup_work);
1886 ret = kthread_stop(ms->master_thread);
1887 if (retc >= 0)
1888 retc = ret;
1889 }
1890 kfree(ipvs->ms);
1891 ipvs->ms = NULL;
1802 } else if (state == IP_VS_STATE_BACKUP) { 1892 } else if (state == IP_VS_STATE_BACKUP) {
1803 if (!ipvs->backup_thread) 1893 if (!ipvs->backup_threads)
1804 return -ESRCH; 1894 return -ESRCH;
1805 1895
1806 pr_info("stopping backup sync thread %d ...\n",
1807 task_pid_nr(ipvs->backup_thread));
1808
1809 ipvs->sync_state &= ~IP_VS_STATE_BACKUP; 1896 ipvs->sync_state &= ~IP_VS_STATE_BACKUP;
1810 retc = kthread_stop(ipvs->backup_thread); 1897 array = ipvs->backup_threads;
1811 ipvs->backup_thread = NULL; 1898 retc = 0;
1899 for (id = ipvs->threads_mask; id >= 0; id--) {
1900 int ret;
1901
1902 pr_info("stopping backup sync thread %d ...\n",
1903 task_pid_nr(array[id]));
1904 ret = kthread_stop(array[id]);
1905 if (retc >= 0)
1906 retc = ret;
1907 }
1908 kfree(array);
1909 ipvs->backup_threads = NULL;
1812 } 1910 }
1813 1911
1814 /* decrease the module use count */ 1912 /* decrease the module use count */
@@ -1825,13 +1923,8 @@ int __net_init ip_vs_sync_net_init(struct net *net)
1825 struct netns_ipvs *ipvs = net_ipvs(net); 1923 struct netns_ipvs *ipvs = net_ipvs(net);
1826 1924
1827 __mutex_init(&ipvs->sync_mutex, "ipvs->sync_mutex", &__ipvs_sync_key); 1925 __mutex_init(&ipvs->sync_mutex, "ipvs->sync_mutex", &__ipvs_sync_key);
1828 INIT_LIST_HEAD(&ipvs->sync_queue);
1829 spin_lock_init(&ipvs->sync_lock); 1926 spin_lock_init(&ipvs->sync_lock);
1830 spin_lock_init(&ipvs->sync_buff_lock); 1927 spin_lock_init(&ipvs->sync_buff_lock);
1831
1832 ipvs->sync_mcast_addr.sin_family = AF_INET;
1833 ipvs->sync_mcast_addr.sin_port = cpu_to_be16(IP_VS_SYNC_PORT);
1834 ipvs->sync_mcast_addr.sin_addr.s_addr = cpu_to_be32(IP_VS_SYNC_GROUP);
1835 return 0; 1928 return 0;
1836} 1929}
1837 1930