aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPablo Neira Ayuso <pablo@netfilter.org>2012-05-08 13:39:49 -0400
committerPablo Neira Ayuso <pablo@netfilter.org>2012-05-08 13:39:53 -0400
commit1c003b1580e20ff9f500846677303a695b1837cc (patch)
treed00c951b6e4c88edd403b4d8ead96a57b9bfc808
parentcdcc5e905d59026fbf2e7f74f9cc834203b6207b (diff)
ipvs: wakeup master thread
High rate of sync messages in master can lead to overflowing the socket buffer and dropping the messages. Fixed sleep of 1 second without wakeup events is not suitable for loaded masters, Use delayed_work to schedule sending for queued messages and limit the delay to IPVS_SYNC_SEND_DELAY (20ms). This will reduce the rate of wakeups but to avoid sending long bursts we wakeup the master thread after IPVS_SYNC_WAKEUP_RATE (8) messages. Add hard limit for the queued messages before sending by using "sync_qlen_max" sysctl var. It defaults to 1/32 of the memory pages but actually represents number of messages. It will protect us from allocating large parts of memory when the sending rate is lower than the queuing rate. As suggested by Pablo, add new sysctl var "sync_sock_size" to configure the SNDBUF (master) or RCVBUF (slave) socket limit. Default value is 0 (preserve system defaults). Change the master thread to detect and block on SNDBUF overflow, so that we do not drop messages when the socket limit is low but the sync_qlen_max limit is not reached. On ENOBUFS or other errors just drop the messages. Change master thread to enter TASK_INTERRUPTIBLE state early, so that we do not miss wakeups due to messages or kthread_should_stop event. Thanks to Pablo Neira Ayuso for his valuable feedback! Signed-off-by: Julian Anastasov <ja@ssi.bg> Signed-off-by: Simon Horman <horms@verge.net.au>
-rw-r--r--include/net/ip_vs.h29
-rw-r--r--net/netfilter/ipvs/ip_vs_ctl.c16
-rw-r--r--net/netfilter/ipvs/ip_vs_sync.c149
3 files changed, 162 insertions, 32 deletions
diff --git a/include/net/ip_vs.h b/include/net/ip_vs.h
index 93b81aa73429..30e43c8c0283 100644
--- a/include/net/ip_vs.h
+++ b/include/net/ip_vs.h
@@ -869,6 +869,8 @@ struct netns_ipvs {
869#endif 869#endif
870 int sysctl_snat_reroute; 870 int sysctl_snat_reroute;
871 int sysctl_sync_ver; 871 int sysctl_sync_ver;
872 int sysctl_sync_qlen_max;
873 int sysctl_sync_sock_size;
872 int sysctl_cache_bypass; 874 int sysctl_cache_bypass;
873 int sysctl_expire_nodest_conn; 875 int sysctl_expire_nodest_conn;
874 int sysctl_expire_quiescent_template; 876 int sysctl_expire_quiescent_template;
@@ -889,6 +891,9 @@ struct netns_ipvs {
889 struct timer_list est_timer; /* Estimation timer */ 891 struct timer_list est_timer; /* Estimation timer */
890 /* ip_vs_sync */ 892 /* ip_vs_sync */
891 struct list_head sync_queue; 893 struct list_head sync_queue;
894 int sync_queue_len;
895 unsigned int sync_queue_delay;
896 struct delayed_work master_wakeup_work;
892 spinlock_t sync_lock; 897 spinlock_t sync_lock;
893 struct ip_vs_sync_buff *sync_buff; 898 struct ip_vs_sync_buff *sync_buff;
894 spinlock_t sync_buff_lock; 899 spinlock_t sync_buff_lock;
@@ -911,6 +916,10 @@ struct netns_ipvs {
911#define DEFAULT_SYNC_THRESHOLD 3 916#define DEFAULT_SYNC_THRESHOLD 3
912#define DEFAULT_SYNC_PERIOD 50 917#define DEFAULT_SYNC_PERIOD 50
913#define DEFAULT_SYNC_VER 1 918#define DEFAULT_SYNC_VER 1
919#define IPVS_SYNC_WAKEUP_RATE 8
920#define IPVS_SYNC_QLEN_MAX (IPVS_SYNC_WAKEUP_RATE * 4)
921#define IPVS_SYNC_SEND_DELAY (HZ / 50)
922#define IPVS_SYNC_CHECK_PERIOD HZ
914 923
915#ifdef CONFIG_SYSCTL 924#ifdef CONFIG_SYSCTL
916 925
@@ -929,6 +938,16 @@ static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
929 return ipvs->sysctl_sync_ver; 938 return ipvs->sysctl_sync_ver;
930} 939}
931 940
941static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs)
942{
943 return ipvs->sysctl_sync_qlen_max;
944}
945
946static inline int sysctl_sync_sock_size(struct netns_ipvs *ipvs)
947{
948 return ipvs->sysctl_sync_sock_size;
949}
950
932#else 951#else
933 952
934static inline int sysctl_sync_threshold(struct netns_ipvs *ipvs) 953static inline int sysctl_sync_threshold(struct netns_ipvs *ipvs)
@@ -946,6 +965,16 @@ static inline int sysctl_sync_ver(struct netns_ipvs *ipvs)
946 return DEFAULT_SYNC_VER; 965 return DEFAULT_SYNC_VER;
947} 966}
948 967
968static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs)
969{
970 return IPVS_SYNC_QLEN_MAX;
971}
972
973static inline int sysctl_sync_sock_size(struct netns_ipvs *ipvs)
974{
975 return 0;
976}
977
949#endif 978#endif
950 979
951/* 980/*
diff --git a/net/netfilter/ipvs/ip_vs_ctl.c b/net/netfilter/ipvs/ip_vs_ctl.c
index 37b91996bfba..bd3827ec25c9 100644
--- a/net/netfilter/ipvs/ip_vs_ctl.c
+++ b/net/netfilter/ipvs/ip_vs_ctl.c
@@ -1718,6 +1718,18 @@ static struct ctl_table vs_vars[] = {
1718 .proc_handler = &proc_do_sync_mode, 1718 .proc_handler = &proc_do_sync_mode,
1719 }, 1719 },
1720 { 1720 {
1721 .procname = "sync_qlen_max",
1722 .maxlen = sizeof(int),
1723 .mode = 0644,
1724 .proc_handler = proc_dointvec,
1725 },
1726 {
1727 .procname = "sync_sock_size",
1728 .maxlen = sizeof(int),
1729 .mode = 0644,
1730 .proc_handler = proc_dointvec,
1731 },
1732 {
1721 .procname = "cache_bypass", 1733 .procname = "cache_bypass",
1722 .maxlen = sizeof(int), 1734 .maxlen = sizeof(int),
1723 .mode = 0644, 1735 .mode = 0644,
@@ -3655,6 +3667,10 @@ int __net_init ip_vs_control_net_init_sysctl(struct net *net)
3655 tbl[idx++].data = &ipvs->sysctl_snat_reroute; 3667 tbl[idx++].data = &ipvs->sysctl_snat_reroute;
3656 ipvs->sysctl_sync_ver = 1; 3668 ipvs->sysctl_sync_ver = 1;
3657 tbl[idx++].data = &ipvs->sysctl_sync_ver; 3669 tbl[idx++].data = &ipvs->sysctl_sync_ver;
3670 ipvs->sysctl_sync_qlen_max = nr_free_buffer_pages() / 32;
3671 tbl[idx++].data = &ipvs->sysctl_sync_qlen_max;
3672 ipvs->sysctl_sync_sock_size = 0;
3673 tbl[idx++].data = &ipvs->sysctl_sync_sock_size;
3658 tbl[idx++].data = &ipvs->sysctl_cache_bypass; 3674 tbl[idx++].data = &ipvs->sysctl_cache_bypass;
3659 tbl[idx++].data = &ipvs->sysctl_expire_nodest_conn; 3675 tbl[idx++].data = &ipvs->sysctl_expire_nodest_conn;
3660 tbl[idx++].data = &ipvs->sysctl_expire_quiescent_template; 3676 tbl[idx++].data = &ipvs->sysctl_expire_quiescent_template;
diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
index d2df694405f1..b3235b230139 100644
--- a/net/netfilter/ipvs/ip_vs_sync.c
+++ b/net/netfilter/ipvs/ip_vs_sync.c
@@ -307,11 +307,15 @@ static inline struct ip_vs_sync_buff *sb_dequeue(struct netns_ipvs *ipvs)
307 spin_lock_bh(&ipvs->sync_lock); 307 spin_lock_bh(&ipvs->sync_lock);
308 if (list_empty(&ipvs->sync_queue)) { 308 if (list_empty(&ipvs->sync_queue)) {
309 sb = NULL; 309 sb = NULL;
310 __set_current_state(TASK_INTERRUPTIBLE);
310 } else { 311 } else {
311 sb = list_entry(ipvs->sync_queue.next, 312 sb = list_entry(ipvs->sync_queue.next,
312 struct ip_vs_sync_buff, 313 struct ip_vs_sync_buff,
313 list); 314 list);
314 list_del(&sb->list); 315 list_del(&sb->list);
316 ipvs->sync_queue_len--;
317 if (!ipvs->sync_queue_len)
318 ipvs->sync_queue_delay = 0;
315 } 319 }
316 spin_unlock_bh(&ipvs->sync_lock); 320 spin_unlock_bh(&ipvs->sync_lock);
317 321
@@ -358,9 +362,16 @@ static inline void sb_queue_tail(struct netns_ipvs *ipvs)
358 struct ip_vs_sync_buff *sb = ipvs->sync_buff; 362 struct ip_vs_sync_buff *sb = ipvs->sync_buff;
359 363
360 spin_lock(&ipvs->sync_lock); 364 spin_lock(&ipvs->sync_lock);
361 if (ipvs->sync_state & IP_VS_STATE_MASTER) 365 if (ipvs->sync_state & IP_VS_STATE_MASTER &&
366 ipvs->sync_queue_len < sysctl_sync_qlen_max(ipvs)) {
367 if (!ipvs->sync_queue_len)
368 schedule_delayed_work(&ipvs->master_wakeup_work,
369 max(IPVS_SYNC_SEND_DELAY, 1));
370 ipvs->sync_queue_len++;
362 list_add_tail(&sb->list, &ipvs->sync_queue); 371 list_add_tail(&sb->list, &ipvs->sync_queue);
363 else 372 if ((++ipvs->sync_queue_delay) == IPVS_SYNC_WAKEUP_RATE)
373 wake_up_process(ipvs->master_thread);
374 } else
364 ip_vs_sync_buff_release(sb); 375 ip_vs_sync_buff_release(sb);
365 spin_unlock(&ipvs->sync_lock); 376 spin_unlock(&ipvs->sync_lock);
366} 377}
@@ -379,6 +390,7 @@ get_curr_sync_buff(struct netns_ipvs *ipvs, unsigned long time)
379 time_after_eq(jiffies - ipvs->sync_buff->firstuse, time)) { 390 time_after_eq(jiffies - ipvs->sync_buff->firstuse, time)) {
380 sb = ipvs->sync_buff; 391 sb = ipvs->sync_buff;
381 ipvs->sync_buff = NULL; 392 ipvs->sync_buff = NULL;
393 __set_current_state(TASK_RUNNING);
382 } else 394 } else
383 sb = NULL; 395 sb = NULL;
384 spin_unlock_bh(&ipvs->sync_buff_lock); 396 spin_unlock_bh(&ipvs->sync_buff_lock);
@@ -392,26 +404,23 @@ get_curr_sync_buff(struct netns_ipvs *ipvs, unsigned long time)
392void ip_vs_sync_switch_mode(struct net *net, int mode) 404void ip_vs_sync_switch_mode(struct net *net, int mode)
393{ 405{
394 struct netns_ipvs *ipvs = net_ipvs(net); 406 struct netns_ipvs *ipvs = net_ipvs(net);
407 struct ip_vs_sync_buff *sb;
395 408
409 spin_lock_bh(&ipvs->sync_buff_lock);
396 if (!(ipvs->sync_state & IP_VS_STATE_MASTER)) 410 if (!(ipvs->sync_state & IP_VS_STATE_MASTER))
397 return; 411 goto unlock;
398 if (mode == sysctl_sync_ver(ipvs) || !ipvs->sync_buff) 412 sb = ipvs->sync_buff;
399 return; 413 if (mode == sysctl_sync_ver(ipvs) || !sb)
414 goto unlock;
400 415
401 spin_lock_bh(&ipvs->sync_buff_lock);
402 /* Buffer empty ? then let buf_create do the job */ 416 /* Buffer empty ? then let buf_create do the job */
403 if (ipvs->sync_buff->mesg->size <= sizeof(struct ip_vs_sync_mesg)) { 417 if (sb->mesg->size <= sizeof(struct ip_vs_sync_mesg)) {
404 kfree(ipvs->sync_buff); 418 ip_vs_sync_buff_release(sb);
405 ipvs->sync_buff = NULL; 419 ipvs->sync_buff = NULL;
406 } else { 420 } else
407 spin_lock_bh(&ipvs->sync_lock); 421 sb_queue_tail(ipvs);
408 if (ipvs->sync_state & IP_VS_STATE_MASTER) 422
409 list_add_tail(&ipvs->sync_buff->list, 423unlock:
410 &ipvs->sync_queue);
411 else
412 ip_vs_sync_buff_release(ipvs->sync_buff);
413 spin_unlock_bh(&ipvs->sync_lock);
414 }
415 spin_unlock_bh(&ipvs->sync_buff_lock); 424 spin_unlock_bh(&ipvs->sync_buff_lock);
416} 425}
417 426
@@ -1130,6 +1139,28 @@ static void ip_vs_process_message(struct net *net, __u8 *buffer,
1130 1139
1131 1140
1132/* 1141/*
1142 * Setup sndbuf (mode=1) or rcvbuf (mode=0)
1143 */
1144static void set_sock_size(struct sock *sk, int mode, int val)
1145{
1146 /* setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val)); */
1147 /* setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)); */
1148 lock_sock(sk);
1149 if (mode) {
1150 val = clamp_t(int, val, (SOCK_MIN_SNDBUF + 1) / 2,
1151 sysctl_wmem_max);
1152 sk->sk_sndbuf = val * 2;
1153 sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1154 } else {
1155 val = clamp_t(int, val, (SOCK_MIN_RCVBUF + 1) / 2,
1156 sysctl_rmem_max);
1157 sk->sk_rcvbuf = val * 2;
1158 sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1159 }
1160 release_sock(sk);
1161}
1162
1163/*
1133 * Setup loopback of outgoing multicasts on a sending socket 1164 * Setup loopback of outgoing multicasts on a sending socket
1134 */ 1165 */
1135static void set_mcast_loop(struct sock *sk, u_char loop) 1166static void set_mcast_loop(struct sock *sk, u_char loop)
@@ -1305,6 +1336,9 @@ static struct socket *make_send_sock(struct net *net)
1305 1336
1306 set_mcast_loop(sock->sk, 0); 1337 set_mcast_loop(sock->sk, 0);
1307 set_mcast_ttl(sock->sk, 1); 1338 set_mcast_ttl(sock->sk, 1);
1339 result = sysctl_sync_sock_size(ipvs);
1340 if (result > 0)
1341 set_sock_size(sock->sk, 1, result);
1308 1342
1309 result = bind_mcastif_addr(sock, ipvs->master_mcast_ifn); 1343 result = bind_mcastif_addr(sock, ipvs->master_mcast_ifn);
1310 if (result < 0) { 1344 if (result < 0) {
@@ -1350,6 +1384,9 @@ static struct socket *make_receive_sock(struct net *net)
1350 sk_change_net(sock->sk, net); 1384 sk_change_net(sock->sk, net);
1351 /* it is equivalent to the REUSEADDR option in user-space */ 1385 /* it is equivalent to the REUSEADDR option in user-space */
1352 sock->sk->sk_reuse = SK_CAN_REUSE; 1386 sock->sk->sk_reuse = SK_CAN_REUSE;
1387 result = sysctl_sync_sock_size(ipvs);
1388 if (result > 0)
1389 set_sock_size(sock->sk, 0, result);
1353 1390
1354 result = sock->ops->bind(sock, (struct sockaddr *) &mcast_addr, 1391 result = sock->ops->bind(sock, (struct sockaddr *) &mcast_addr,
1355 sizeof(struct sockaddr)); 1392 sizeof(struct sockaddr));
@@ -1392,18 +1429,22 @@ ip_vs_send_async(struct socket *sock, const char *buffer, const size_t length)
1392 return len; 1429 return len;
1393} 1430}
1394 1431
1395static void 1432static int
1396ip_vs_send_sync_msg(struct socket *sock, struct ip_vs_sync_mesg *msg) 1433ip_vs_send_sync_msg(struct socket *sock, struct ip_vs_sync_mesg *msg)
1397{ 1434{
1398 int msize; 1435 int msize;
1436 int ret;
1399 1437
1400 msize = msg->size; 1438 msize = msg->size;
1401 1439
1402 /* Put size in network byte order */ 1440 /* Put size in network byte order */
1403 msg->size = htons(msg->size); 1441 msg->size = htons(msg->size);
1404 1442
1405 if (ip_vs_send_async(sock, (char *)msg, msize) != msize) 1443 ret = ip_vs_send_async(sock, (char *)msg, msize);
1406 pr_err("ip_vs_send_async error\n"); 1444 if (ret >= 0 || ret == -EAGAIN)
1445 return ret;
1446 pr_err("ip_vs_send_async error %d\n", ret);
1447 return 0;
1407} 1448}
1408 1449
1409static int 1450static int
@@ -1428,36 +1469,75 @@ ip_vs_receive(struct socket *sock, char *buffer, const size_t buflen)
1428 return len; 1469 return len;
1429} 1470}
1430 1471
1472/* Wakeup the master thread for sending */
1473static void master_wakeup_work_handler(struct work_struct *work)
1474{
1475 struct netns_ipvs *ipvs = container_of(work, struct netns_ipvs,
1476 master_wakeup_work.work);
1477
1478 spin_lock_bh(&ipvs->sync_lock);
1479 if (ipvs->sync_queue_len &&
1480 ipvs->sync_queue_delay < IPVS_SYNC_WAKEUP_RATE) {
1481 ipvs->sync_queue_delay = IPVS_SYNC_WAKEUP_RATE;
1482 wake_up_process(ipvs->master_thread);
1483 }
1484 spin_unlock_bh(&ipvs->sync_lock);
1485}
1486
1487/* Get next buffer to send */
1488static inline struct ip_vs_sync_buff *
1489next_sync_buff(struct netns_ipvs *ipvs)
1490{
1491 struct ip_vs_sync_buff *sb;
1492
1493 sb = sb_dequeue(ipvs);
1494 if (sb)
1495 return sb;
1496 /* Do not delay entries in buffer for more than 2 seconds */
1497 return get_curr_sync_buff(ipvs, 2 * HZ);
1498}
1431 1499
1432static int sync_thread_master(void *data) 1500static int sync_thread_master(void *data)
1433{ 1501{
1434 struct ip_vs_sync_thread_data *tinfo = data; 1502 struct ip_vs_sync_thread_data *tinfo = data;
1435 struct netns_ipvs *ipvs = net_ipvs(tinfo->net); 1503 struct netns_ipvs *ipvs = net_ipvs(tinfo->net);
1504 struct sock *sk = tinfo->sock->sk;
1436 struct ip_vs_sync_buff *sb; 1505 struct ip_vs_sync_buff *sb;
1437 1506
1438 pr_info("sync thread started: state = MASTER, mcast_ifn = %s, " 1507 pr_info("sync thread started: state = MASTER, mcast_ifn = %s, "
1439 "syncid = %d\n", 1508 "syncid = %d\n",
1440 ipvs->master_mcast_ifn, ipvs->master_syncid); 1509 ipvs->master_mcast_ifn, ipvs->master_syncid);
1441 1510
1442 while (!kthread_should_stop()) { 1511 for (;;) {
1443 while ((sb = sb_dequeue(ipvs))) { 1512 sb = next_sync_buff(ipvs);
1444 ip_vs_send_sync_msg(tinfo->sock, sb->mesg); 1513 if (unlikely(kthread_should_stop()))
1445 ip_vs_sync_buff_release(sb); 1514 break;
1515 if (!sb) {
1516 schedule_timeout(IPVS_SYNC_CHECK_PERIOD);
1517 continue;
1446 } 1518 }
1447 1519 while (ip_vs_send_sync_msg(tinfo->sock, sb->mesg) < 0) {
1448 /* check if entries stay in ipvs->sync_buff for 2 seconds */ 1520 int ret = 0;
1449 sb = get_curr_sync_buff(ipvs, 2 * HZ); 1521
1450 if (sb) { 1522 __wait_event_interruptible(*sk_sleep(sk),
1451 ip_vs_send_sync_msg(tinfo->sock, sb->mesg); 1523 sock_writeable(sk) ||
1452 ip_vs_sync_buff_release(sb); 1524 kthread_should_stop(),
1525 ret);
1526 if (unlikely(kthread_should_stop()))
1527 goto done;
1453 } 1528 }
1454 1529 ip_vs_sync_buff_release(sb);
1455 schedule_timeout_interruptible(HZ);
1456 } 1530 }
1457 1531
1532done:
1533 __set_current_state(TASK_RUNNING);
1534 if (sb)
1535 ip_vs_sync_buff_release(sb);
1536
1458 /* clean up the sync_buff queue */ 1537 /* clean up the sync_buff queue */
1459 while ((sb = sb_dequeue(ipvs))) 1538 while ((sb = sb_dequeue(ipvs)))
1460 ip_vs_sync_buff_release(sb); 1539 ip_vs_sync_buff_release(sb);
1540 __set_current_state(TASK_RUNNING);
1461 1541
1462 /* clean up the current sync_buff */ 1542 /* clean up the current sync_buff */
1463 sb = get_curr_sync_buff(ipvs, 0); 1543 sb = get_curr_sync_buff(ipvs, 0);
@@ -1538,6 +1618,10 @@ int start_sync_thread(struct net *net, int state, char *mcast_ifn, __u8 syncid)
1538 realtask = &ipvs->master_thread; 1618 realtask = &ipvs->master_thread;
1539 name = "ipvs_master:%d"; 1619 name = "ipvs_master:%d";
1540 threadfn = sync_thread_master; 1620 threadfn = sync_thread_master;
1621 ipvs->sync_queue_len = 0;
1622 ipvs->sync_queue_delay = 0;
1623 INIT_DELAYED_WORK(&ipvs->master_wakeup_work,
1624 master_wakeup_work_handler);
1541 sock = make_send_sock(net); 1625 sock = make_send_sock(net);
1542 } else if (state == IP_VS_STATE_BACKUP) { 1626 } else if (state == IP_VS_STATE_BACKUP) {
1543 if (ipvs->backup_thread) 1627 if (ipvs->backup_thread)
@@ -1623,6 +1707,7 @@ int stop_sync_thread(struct net *net, int state)
1623 spin_lock_bh(&ipvs->sync_lock); 1707 spin_lock_bh(&ipvs->sync_lock);
1624 ipvs->sync_state &= ~IP_VS_STATE_MASTER; 1708 ipvs->sync_state &= ~IP_VS_STATE_MASTER;
1625 spin_unlock_bh(&ipvs->sync_lock); 1709 spin_unlock_bh(&ipvs->sync_lock);
1710 cancel_delayed_work_sync(&ipvs->master_wakeup_work);
1626 retc = kthread_stop(ipvs->master_thread); 1711 retc = kthread_stop(ipvs->master_thread);
1627 ipvs->master_thread = NULL; 1712 ipvs->master_thread = NULL;
1628 } else if (state == IP_VS_STATE_BACKUP) { 1713 } else if (state == IP_VS_STATE_BACKUP) {