aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/cluster')
-rw-r--r--fs/ocfs2/cluster/heartbeat.c31
-rw-r--r--fs/ocfs2/cluster/nodemanager.c198
-rw-r--r--fs/ocfs2/cluster/nodemanager.h17
-rw-r--r--fs/ocfs2/cluster/quorum.c4
-rw-r--r--fs/ocfs2/cluster/tcp.c240
-rw-r--r--fs/ocfs2/cluster/tcp.h8
-rw-r--r--fs/ocfs2/cluster/tcp_internal.h23
7 files changed, 427 insertions, 94 deletions
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c
index 305cba3681fe..277ca67a2ad6 100644
--- a/fs/ocfs2/cluster/heartbeat.c
+++ b/fs/ocfs2/cluster/heartbeat.c
@@ -141,7 +141,7 @@ struct o2hb_region {
141 * recognizes a node going up and down in one iteration */ 141 * recognizes a node going up and down in one iteration */
142 u64 hr_generation; 142 u64 hr_generation;
143 143
144 struct work_struct hr_write_timeout_work; 144 struct delayed_work hr_write_timeout_work;
145 unsigned long hr_last_timeout_start; 145 unsigned long hr_last_timeout_start;
146 146
147 /* Used during o2hb_check_slot to hold a copy of the block 147 /* Used during o2hb_check_slot to hold a copy of the block
@@ -156,9 +156,11 @@ struct o2hb_bio_wait_ctxt {
156 int wc_error; 156 int wc_error;
157}; 157};
158 158
159static void o2hb_write_timeout(void *arg) 159static void o2hb_write_timeout(struct work_struct *work)
160{ 160{
161 struct o2hb_region *reg = arg; 161 struct o2hb_region *reg =
162 container_of(work, struct o2hb_region,
163 hr_write_timeout_work.work);
162 164
163 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " 165 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
164 "milliseconds\n", reg->hr_dev_name, 166 "milliseconds\n", reg->hr_dev_name,
@@ -1404,7 +1406,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
1404 goto out; 1406 goto out;
1405 } 1407 }
1406 1408
1407 INIT_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout, reg); 1409 INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1408 1410
1409 /* 1411 /*
1410 * A node is considered live after it has beat LIVE_THRESHOLD 1412 * A node is considered live after it has beat LIVE_THRESHOLD
@@ -1445,6 +1447,15 @@ out:
1445 return ret; 1447 return ret;
1446} 1448}
1447 1449
1450static ssize_t o2hb_region_pid_read(struct o2hb_region *reg,
1451 char *page)
1452{
1453 if (!reg->hr_task)
1454 return 0;
1455
1456 return sprintf(page, "%u\n", reg->hr_task->pid);
1457}
1458
1448struct o2hb_region_attribute { 1459struct o2hb_region_attribute {
1449 struct configfs_attribute attr; 1460 struct configfs_attribute attr;
1450 ssize_t (*show)(struct o2hb_region *, char *); 1461 ssize_t (*show)(struct o2hb_region *, char *);
@@ -1483,11 +1494,19 @@ static struct o2hb_region_attribute o2hb_region_attr_dev = {
1483 .store = o2hb_region_dev_write, 1494 .store = o2hb_region_dev_write,
1484}; 1495};
1485 1496
1497static struct o2hb_region_attribute o2hb_region_attr_pid = {
1498 .attr = { .ca_owner = THIS_MODULE,
1499 .ca_name = "pid",
1500 .ca_mode = S_IRUGO | S_IRUSR },
1501 .show = o2hb_region_pid_read,
1502};
1503
1486static struct configfs_attribute *o2hb_region_attrs[] = { 1504static struct configfs_attribute *o2hb_region_attrs[] = {
1487 &o2hb_region_attr_block_bytes.attr, 1505 &o2hb_region_attr_block_bytes.attr,
1488 &o2hb_region_attr_start_block.attr, 1506 &o2hb_region_attr_start_block.attr,
1489 &o2hb_region_attr_blocks.attr, 1507 &o2hb_region_attr_blocks.attr,
1490 &o2hb_region_attr_dev.attr, 1508 &o2hb_region_attr_dev.attr,
1509 &o2hb_region_attr_pid.attr,
1491 NULL, 1510 NULL,
1492}; 1511};
1493 1512
@@ -1551,7 +1570,7 @@ static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *g
1551 struct o2hb_region *reg = NULL; 1570 struct o2hb_region *reg = NULL;
1552 struct config_item *ret = NULL; 1571 struct config_item *ret = NULL;
1553 1572
1554 reg = kcalloc(1, sizeof(struct o2hb_region), GFP_KERNEL); 1573 reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
1555 if (reg == NULL) 1574 if (reg == NULL)
1556 goto out; /* ENOMEM */ 1575 goto out; /* ENOMEM */
1557 1576
@@ -1677,7 +1696,7 @@ struct config_group *o2hb_alloc_hb_set(void)
1677 struct o2hb_heartbeat_group *hs = NULL; 1696 struct o2hb_heartbeat_group *hs = NULL;
1678 struct config_group *ret = NULL; 1697 struct config_group *ret = NULL;
1679 1698
1680 hs = kcalloc(1, sizeof(struct o2hb_heartbeat_group), GFP_KERNEL); 1699 hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
1681 if (hs == NULL) 1700 if (hs == NULL)
1682 goto out; 1701 goto out;
1683 1702
diff --git a/fs/ocfs2/cluster/nodemanager.c b/fs/ocfs2/cluster/nodemanager.c
index d11753c50bc1..b17333a0606b 100644
--- a/fs/ocfs2/cluster/nodemanager.c
+++ b/fs/ocfs2/cluster/nodemanager.c
@@ -35,7 +35,7 @@
35/* for now we operate under the assertion that there can be only one 35/* for now we operate under the assertion that there can be only one
36 * cluster active at a time. Changing this will require trickling 36 * cluster active at a time. Changing this will require trickling
37 * cluster references throughout where nodes are looked up */ 37 * cluster references throughout where nodes are looked up */
38static struct o2nm_cluster *o2nm_single_cluster = NULL; 38struct o2nm_cluster *o2nm_single_cluster = NULL;
39 39
40#define OCFS2_MAX_HB_CTL_PATH 256 40#define OCFS2_MAX_HB_CTL_PATH 256
41static char ocfs2_hb_ctl_path[OCFS2_MAX_HB_CTL_PATH] = "/sbin/ocfs2_hb_ctl"; 41static char ocfs2_hb_ctl_path[OCFS2_MAX_HB_CTL_PATH] = "/sbin/ocfs2_hb_ctl";
@@ -97,17 +97,6 @@ const char *o2nm_get_hb_ctl_path(void)
97} 97}
98EXPORT_SYMBOL_GPL(o2nm_get_hb_ctl_path); 98EXPORT_SYMBOL_GPL(o2nm_get_hb_ctl_path);
99 99
100struct o2nm_cluster {
101 struct config_group cl_group;
102 unsigned cl_has_local:1;
103 u8 cl_local_node;
104 rwlock_t cl_nodes_lock;
105 struct o2nm_node *cl_nodes[O2NM_MAX_NODES];
106 struct rb_root cl_node_ip_tree;
107 /* this bitmap is part of a hack for disk bitmap.. will go eventually. - zab */
108 unsigned long cl_nodes_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
109};
110
111struct o2nm_node *o2nm_get_node_by_num(u8 node_num) 100struct o2nm_node *o2nm_get_node_by_num(u8 node_num)
112{ 101{
113 struct o2nm_node *node = NULL; 102 struct o2nm_node *node = NULL;
@@ -543,6 +532,179 @@ static struct o2nm_node_group *to_o2nm_node_group(struct config_group *group)
543} 532}
544#endif 533#endif
545 534
535struct o2nm_cluster_attribute {
536 struct configfs_attribute attr;
537 ssize_t (*show)(struct o2nm_cluster *, char *);
538 ssize_t (*store)(struct o2nm_cluster *, const char *, size_t);
539};
540
541static ssize_t o2nm_cluster_attr_write(const char *page, ssize_t count,
542 unsigned int *val)
543{
544 unsigned long tmp;
545 char *p = (char *)page;
546
547 tmp = simple_strtoul(p, &p, 0);
548 if (!p || (*p && (*p != '\n')))
549 return -EINVAL;
550
551 if (tmp == 0)
552 return -EINVAL;
553 if (tmp >= (u32)-1)
554 return -ERANGE;
555
556 *val = tmp;
557
558 return count;
559}
560
561static ssize_t o2nm_cluster_attr_idle_timeout_ms_read(
562 struct o2nm_cluster *cluster, char *page)
563{
564 return sprintf(page, "%u\n", cluster->cl_idle_timeout_ms);
565}
566
567static ssize_t o2nm_cluster_attr_idle_timeout_ms_write(
568 struct o2nm_cluster *cluster, const char *page, size_t count)
569{
570 ssize_t ret;
571 unsigned int val;
572
573 ret = o2nm_cluster_attr_write(page, count, &val);
574
575 if (ret > 0) {
576 if (cluster->cl_idle_timeout_ms != val
577 && o2net_num_connected_peers()) {
578 mlog(ML_NOTICE,
579 "o2net: cannot change idle timeout after "
580 "the first peer has agreed to it."
581 " %d connected peers\n",
582 o2net_num_connected_peers());
583 ret = -EINVAL;
584 } else if (val <= cluster->cl_keepalive_delay_ms) {
585 mlog(ML_NOTICE, "o2net: idle timeout must be larger "
586 "than keepalive delay\n");
587 ret = -EINVAL;
588 } else {
589 cluster->cl_idle_timeout_ms = val;
590 }
591 }
592
593 return ret;
594}
595
596static ssize_t o2nm_cluster_attr_keepalive_delay_ms_read(
597 struct o2nm_cluster *cluster, char *page)
598{
599 return sprintf(page, "%u\n", cluster->cl_keepalive_delay_ms);
600}
601
602static ssize_t o2nm_cluster_attr_keepalive_delay_ms_write(
603 struct o2nm_cluster *cluster, const char *page, size_t count)
604{
605 ssize_t ret;
606 unsigned int val;
607
608 ret = o2nm_cluster_attr_write(page, count, &val);
609
610 if (ret > 0) {
611 if (cluster->cl_keepalive_delay_ms != val
612 && o2net_num_connected_peers()) {
613 mlog(ML_NOTICE,
614 "o2net: cannot change keepalive delay after"
615 " the first peer has agreed to it."
616 " %d connected peers\n",
617 o2net_num_connected_peers());
618 ret = -EINVAL;
619 } else if (val >= cluster->cl_idle_timeout_ms) {
620 mlog(ML_NOTICE, "o2net: keepalive delay must be "
621 "smaller than idle timeout\n");
622 ret = -EINVAL;
623 } else {
624 cluster->cl_keepalive_delay_ms = val;
625 }
626 }
627
628 return ret;
629}
630
631static ssize_t o2nm_cluster_attr_reconnect_delay_ms_read(
632 struct o2nm_cluster *cluster, char *page)
633{
634 return sprintf(page, "%u\n", cluster->cl_reconnect_delay_ms);
635}
636
637static ssize_t o2nm_cluster_attr_reconnect_delay_ms_write(
638 struct o2nm_cluster *cluster, const char *page, size_t count)
639{
640 return o2nm_cluster_attr_write(page, count,
641 &cluster->cl_reconnect_delay_ms);
642}
643static struct o2nm_cluster_attribute o2nm_cluster_attr_idle_timeout_ms = {
644 .attr = { .ca_owner = THIS_MODULE,
645 .ca_name = "idle_timeout_ms",
646 .ca_mode = S_IRUGO | S_IWUSR },
647 .show = o2nm_cluster_attr_idle_timeout_ms_read,
648 .store = o2nm_cluster_attr_idle_timeout_ms_write,
649};
650
651static struct o2nm_cluster_attribute o2nm_cluster_attr_keepalive_delay_ms = {
652 .attr = { .ca_owner = THIS_MODULE,
653 .ca_name = "keepalive_delay_ms",
654 .ca_mode = S_IRUGO | S_IWUSR },
655 .show = o2nm_cluster_attr_keepalive_delay_ms_read,
656 .store = o2nm_cluster_attr_keepalive_delay_ms_write,
657};
658
659static struct o2nm_cluster_attribute o2nm_cluster_attr_reconnect_delay_ms = {
660 .attr = { .ca_owner = THIS_MODULE,
661 .ca_name = "reconnect_delay_ms",
662 .ca_mode = S_IRUGO | S_IWUSR },
663 .show = o2nm_cluster_attr_reconnect_delay_ms_read,
664 .store = o2nm_cluster_attr_reconnect_delay_ms_write,
665};
666
667static struct configfs_attribute *o2nm_cluster_attrs[] = {
668 &o2nm_cluster_attr_idle_timeout_ms.attr,
669 &o2nm_cluster_attr_keepalive_delay_ms.attr,
670 &o2nm_cluster_attr_reconnect_delay_ms.attr,
671 NULL,
672};
673static ssize_t o2nm_cluster_show(struct config_item *item,
674 struct configfs_attribute *attr,
675 char *page)
676{
677 struct o2nm_cluster *cluster = to_o2nm_cluster(item);
678 struct o2nm_cluster_attribute *o2nm_cluster_attr =
679 container_of(attr, struct o2nm_cluster_attribute, attr);
680 ssize_t ret = 0;
681
682 if (o2nm_cluster_attr->show)
683 ret = o2nm_cluster_attr->show(cluster, page);
684 return ret;
685}
686
687static ssize_t o2nm_cluster_store(struct config_item *item,
688 struct configfs_attribute *attr,
689 const char *page, size_t count)
690{
691 struct o2nm_cluster *cluster = to_o2nm_cluster(item);
692 struct o2nm_cluster_attribute *o2nm_cluster_attr =
693 container_of(attr, struct o2nm_cluster_attribute, attr);
694 ssize_t ret;
695
696 if (o2nm_cluster_attr->store == NULL) {
697 ret = -EINVAL;
698 goto out;
699 }
700
701 ret = o2nm_cluster_attr->store(cluster, page, count);
702 if (ret < count)
703 goto out;
704out:
705 return ret;
706}
707
546static struct config_item *o2nm_node_group_make_item(struct config_group *group, 708static struct config_item *o2nm_node_group_make_item(struct config_group *group,
547 const char *name) 709 const char *name)
548{ 710{
@@ -552,7 +714,7 @@ static struct config_item *o2nm_node_group_make_item(struct config_group *group,
552 if (strlen(name) > O2NM_MAX_NAME_LEN) 714 if (strlen(name) > O2NM_MAX_NAME_LEN)
553 goto out; /* ENAMETOOLONG */ 715 goto out; /* ENAMETOOLONG */
554 716
555 node = kcalloc(1, sizeof(struct o2nm_node), GFP_KERNEL); 717 node = kzalloc(sizeof(struct o2nm_node), GFP_KERNEL);
556 if (node == NULL) 718 if (node == NULL)
557 goto out; /* ENOMEM */ 719 goto out; /* ENOMEM */
558 720
@@ -624,10 +786,13 @@ static void o2nm_cluster_release(struct config_item *item)
624 786
625static struct configfs_item_operations o2nm_cluster_item_ops = { 787static struct configfs_item_operations o2nm_cluster_item_ops = {
626 .release = o2nm_cluster_release, 788 .release = o2nm_cluster_release,
789 .show_attribute = o2nm_cluster_show,
790 .store_attribute = o2nm_cluster_store,
627}; 791};
628 792
629static struct config_item_type o2nm_cluster_type = { 793static struct config_item_type o2nm_cluster_type = {
630 .ct_item_ops = &o2nm_cluster_item_ops, 794 .ct_item_ops = &o2nm_cluster_item_ops,
795 .ct_attrs = o2nm_cluster_attrs,
631 .ct_owner = THIS_MODULE, 796 .ct_owner = THIS_MODULE,
632}; 797};
633 798
@@ -660,8 +825,8 @@ static struct config_group *o2nm_cluster_group_make_group(struct config_group *g
660 if (o2nm_single_cluster) 825 if (o2nm_single_cluster)
661 goto out; /* ENOSPC */ 826 goto out; /* ENOSPC */
662 827
663 cluster = kcalloc(1, sizeof(struct o2nm_cluster), GFP_KERNEL); 828 cluster = kzalloc(sizeof(struct o2nm_cluster), GFP_KERNEL);
664 ns = kcalloc(1, sizeof(struct o2nm_node_group), GFP_KERNEL); 829 ns = kzalloc(sizeof(struct o2nm_node_group), GFP_KERNEL);
665 defs = kcalloc(3, sizeof(struct config_group *), GFP_KERNEL); 830 defs = kcalloc(3, sizeof(struct config_group *), GFP_KERNEL);
666 o2hb_group = o2hb_alloc_hb_set(); 831 o2hb_group = o2hb_alloc_hb_set();
667 if (cluster == NULL || ns == NULL || o2hb_group == NULL || defs == NULL) 832 if (cluster == NULL || ns == NULL || o2hb_group == NULL || defs == NULL)
@@ -678,6 +843,9 @@ static struct config_group *o2nm_cluster_group_make_group(struct config_group *g
678 cluster->cl_group.default_groups[2] = NULL; 843 cluster->cl_group.default_groups[2] = NULL;
679 rwlock_init(&cluster->cl_nodes_lock); 844 rwlock_init(&cluster->cl_nodes_lock);
680 cluster->cl_node_ip_tree = RB_ROOT; 845 cluster->cl_node_ip_tree = RB_ROOT;
846 cluster->cl_reconnect_delay_ms = O2NET_RECONNECT_DELAY_MS_DEFAULT;
847 cluster->cl_idle_timeout_ms = O2NET_IDLE_TIMEOUT_MS_DEFAULT;
848 cluster->cl_keepalive_delay_ms = O2NET_KEEPALIVE_DELAY_MS_DEFAULT;
681 849
682 ret = &cluster->cl_group; 850 ret = &cluster->cl_group;
683 o2nm_single_cluster = cluster; 851 o2nm_single_cluster = cluster;
diff --git a/fs/ocfs2/cluster/nodemanager.h b/fs/ocfs2/cluster/nodemanager.h
index fce8033c310f..8fb23cacc2f5 100644
--- a/fs/ocfs2/cluster/nodemanager.h
+++ b/fs/ocfs2/cluster/nodemanager.h
@@ -53,6 +53,23 @@ struct o2nm_node {
53 unsigned long nd_set_attributes; 53 unsigned long nd_set_attributes;
54}; 54};
55 55
56struct o2nm_cluster {
57 struct config_group cl_group;
58 unsigned cl_has_local:1;
59 u8 cl_local_node;
60 rwlock_t cl_nodes_lock;
61 struct o2nm_node *cl_nodes[O2NM_MAX_NODES];
62 struct rb_root cl_node_ip_tree;
63 unsigned int cl_idle_timeout_ms;
64 unsigned int cl_keepalive_delay_ms;
65 unsigned int cl_reconnect_delay_ms;
66
67 /* this bitmap is part of a hack for disk bitmap.. will go eventually. - zab */
68 unsigned long cl_nodes_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
69};
70
71extern struct o2nm_cluster *o2nm_single_cluster;
72
56u8 o2nm_this_node(void); 73u8 o2nm_this_node(void);
57 74
58int o2nm_configured_node_map(unsigned long *map, unsigned bytes); 75int o2nm_configured_node_map(unsigned long *map, unsigned bytes);
diff --git a/fs/ocfs2/cluster/quorum.c b/fs/ocfs2/cluster/quorum.c
index 7bba98fbfc15..4705d659fe57 100644
--- a/fs/ocfs2/cluster/quorum.c
+++ b/fs/ocfs2/cluster/quorum.c
@@ -88,7 +88,7 @@ void o2quo_disk_timeout(void)
88 o2quo_fence_self(); 88 o2quo_fence_self();
89} 89}
90 90
91static void o2quo_make_decision(void *arg) 91static void o2quo_make_decision(struct work_struct *work)
92{ 92{
93 int quorum; 93 int quorum;
94 int lowest_hb, lowest_reachable = 0, fence = 0; 94 int lowest_hb, lowest_reachable = 0, fence = 0;
@@ -306,7 +306,7 @@ void o2quo_init(void)
306 struct o2quo_state *qs = &o2quo_state; 306 struct o2quo_state *qs = &o2quo_state;
307 307
308 spin_lock_init(&qs->qs_lock); 308 spin_lock_init(&qs->qs_lock);
309 INIT_WORK(&qs->qs_work, o2quo_make_decision, NULL); 309 INIT_WORK(&qs->qs_work, o2quo_make_decision);
310} 310}
311 311
312void o2quo_exit(void) 312void o2quo_exit(void)
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index b650efa8c8be..ae4ff4a6636b 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -140,13 +140,35 @@ static int o2net_sys_err_translations[O2NET_ERR_MAX] =
140 [O2NET_ERR_DIED] = -EHOSTDOWN,}; 140 [O2NET_ERR_DIED] = -EHOSTDOWN,};
141 141
142/* can't quite avoid *all* internal declarations :/ */ 142/* can't quite avoid *all* internal declarations :/ */
143static void o2net_sc_connect_completed(void *arg); 143static void o2net_sc_connect_completed(struct work_struct *work);
144static void o2net_rx_until_empty(void *arg); 144static void o2net_rx_until_empty(struct work_struct *work);
145static void o2net_shutdown_sc(void *arg); 145static void o2net_shutdown_sc(struct work_struct *work);
146static void o2net_listen_data_ready(struct sock *sk, int bytes); 146static void o2net_listen_data_ready(struct sock *sk, int bytes);
147static void o2net_sc_send_keep_req(void *arg); 147static void o2net_sc_send_keep_req(struct work_struct *work);
148static void o2net_idle_timer(unsigned long data); 148static void o2net_idle_timer(unsigned long data);
149static void o2net_sc_postpone_idle(struct o2net_sock_container *sc); 149static void o2net_sc_postpone_idle(struct o2net_sock_container *sc);
150static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc);
151
152/*
153 * FIXME: These should use to_o2nm_cluster_from_node(), but we end up
154 * losing our parent link to the cluster during shutdown. This can be
155 * solved by adding a pre-removal callback to configfs, or passing
156 * around the cluster with the node. -jeffm
157 */
158static inline int o2net_reconnect_delay(struct o2nm_node *node)
159{
160 return o2nm_single_cluster->cl_reconnect_delay_ms;
161}
162
163static inline int o2net_keepalive_delay(struct o2nm_node *node)
164{
165 return o2nm_single_cluster->cl_keepalive_delay_ms;
166}
167
168static inline int o2net_idle_timeout(struct o2nm_node *node)
169{
170 return o2nm_single_cluster->cl_idle_timeout_ms;
171}
150 172
151static inline int o2net_sys_err_to_errno(enum o2net_system_error err) 173static inline int o2net_sys_err_to_errno(enum o2net_system_error err)
152{ 174{
@@ -271,6 +293,8 @@ static void sc_kref_release(struct kref *kref)
271{ 293{
272 struct o2net_sock_container *sc = container_of(kref, 294 struct o2net_sock_container *sc = container_of(kref,
273 struct o2net_sock_container, sc_kref); 295 struct o2net_sock_container, sc_kref);
296 BUG_ON(timer_pending(&sc->sc_idle_timeout));
297
274 sclog(sc, "releasing\n"); 298 sclog(sc, "releasing\n");
275 299
276 if (sc->sc_sock) { 300 if (sc->sc_sock) {
@@ -300,7 +324,7 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
300 struct page *page = NULL; 324 struct page *page = NULL;
301 325
302 page = alloc_page(GFP_NOFS); 326 page = alloc_page(GFP_NOFS);
303 sc = kcalloc(1, sizeof(*sc), GFP_NOFS); 327 sc = kzalloc(sizeof(*sc), GFP_NOFS);
304 if (sc == NULL || page == NULL) 328 if (sc == NULL || page == NULL)
305 goto out; 329 goto out;
306 330
@@ -308,10 +332,10 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
308 o2nm_node_get(node); 332 o2nm_node_get(node);
309 sc->sc_node = node; 333 sc->sc_node = node;
310 334
311 INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed, sc); 335 INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed);
312 INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty, sc); 336 INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty);
313 INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc, sc); 337 INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc);
314 INIT_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req, sc); 338 INIT_DELAYED_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req);
315 339
316 init_timer(&sc->sc_idle_timeout); 340 init_timer(&sc->sc_idle_timeout);
317 sc->sc_idle_timeout.function = o2net_idle_timer; 341 sc->sc_idle_timeout.function = o2net_idle_timer;
@@ -342,7 +366,7 @@ static void o2net_sc_queue_work(struct o2net_sock_container *sc,
342 sc_put(sc); 366 sc_put(sc);
343} 367}
344static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc, 368static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc,
345 struct work_struct *work, 369 struct delayed_work *work,
346 int delay) 370 int delay)
347{ 371{
348 sc_get(sc); 372 sc_get(sc);
@@ -350,12 +374,19 @@ static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc,
350 sc_put(sc); 374 sc_put(sc);
351} 375}
352static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc, 376static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc,
353 struct work_struct *work) 377 struct delayed_work *work)
354{ 378{
355 if (cancel_delayed_work(work)) 379 if (cancel_delayed_work(work))
356 sc_put(sc); 380 sc_put(sc);
357} 381}
358 382
383static atomic_t o2net_connected_peers = ATOMIC_INIT(0);
384
385int o2net_num_connected_peers(void)
386{
387 return atomic_read(&o2net_connected_peers);
388}
389
359static void o2net_set_nn_state(struct o2net_node *nn, 390static void o2net_set_nn_state(struct o2net_node *nn,
360 struct o2net_sock_container *sc, 391 struct o2net_sock_container *sc,
361 unsigned valid, int err) 392 unsigned valid, int err)
@@ -366,6 +397,11 @@ static void o2net_set_nn_state(struct o2net_node *nn,
366 397
367 assert_spin_locked(&nn->nn_lock); 398 assert_spin_locked(&nn->nn_lock);
368 399
400 if (old_sc && !sc)
401 atomic_dec(&o2net_connected_peers);
402 else if (!old_sc && sc)
403 atomic_inc(&o2net_connected_peers);
404
369 /* the node num comparison and single connect/accept path should stop 405 /* the node num comparison and single connect/accept path should stop
370 * an non-null sc from being overwritten with another */ 406 * an non-null sc from being overwritten with another */
371 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc); 407 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
@@ -424,9 +460,9 @@ static void o2net_set_nn_state(struct o2net_node *nn,
424 /* delay if we're withing a RECONNECT_DELAY of the 460 /* delay if we're withing a RECONNECT_DELAY of the
425 * last attempt */ 461 * last attempt */
426 delay = (nn->nn_last_connect_attempt + 462 delay = (nn->nn_last_connect_attempt +
427 msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS)) 463 msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node)))
428 - jiffies; 464 - jiffies;
429 if (delay > msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS)) 465 if (delay > msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node)))
430 delay = 0; 466 delay = 0;
431 mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay); 467 mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay);
432 queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay); 468 queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay);
@@ -564,9 +600,11 @@ static void o2net_ensure_shutdown(struct o2net_node *nn,
564 * ourselves as state_change couldn't get the nn_lock and call set_nn_state 600 * ourselves as state_change couldn't get the nn_lock and call set_nn_state
565 * itself. 601 * itself.
566 */ 602 */
567static void o2net_shutdown_sc(void *arg) 603static void o2net_shutdown_sc(struct work_struct *work)
568{ 604{
569 struct o2net_sock_container *sc = arg; 605 struct o2net_sock_container *sc =
606 container_of(work, struct o2net_sock_container,
607 sc_shutdown_work);
570 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); 608 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
571 609
572 sclog(sc, "shutting down\n"); 610 sclog(sc, "shutting down\n");
@@ -676,7 +714,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
676 goto out; 714 goto out;
677 } 715 }
678 716
679 nmh = kcalloc(1, sizeof(struct o2net_msg_handler), GFP_NOFS); 717 nmh = kzalloc(sizeof(struct o2net_msg_handler), GFP_NOFS);
680 if (nmh == NULL) { 718 if (nmh == NULL) {
681 ret = -ENOMEM; 719 ret = -ENOMEM;
682 goto out; 720 goto out;
@@ -1097,13 +1135,51 @@ static int o2net_check_handshake(struct o2net_sock_container *sc)
1097 return -1; 1135 return -1;
1098 } 1136 }
1099 1137
1138 /*
1139 * Ensure timeouts are consistent with other nodes, otherwise
1140 * we can end up with one node thinking that the other must be down,
1141 * but isn't. This can ultimately cause corruption.
1142 */
1143 if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
1144 o2net_idle_timeout(sc->sc_node)) {
1145 mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of "
1146 "%u ms, but we use %u ms locally. disconnecting\n",
1147 SC_NODEF_ARGS(sc),
1148 be32_to_cpu(hand->o2net_idle_timeout_ms),
1149 o2net_idle_timeout(sc->sc_node));
1150 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1151 return -1;
1152 }
1153
1154 if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
1155 o2net_keepalive_delay(sc->sc_node)) {
1156 mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of "
1157 "%u ms, but we use %u ms locally. disconnecting\n",
1158 SC_NODEF_ARGS(sc),
1159 be32_to_cpu(hand->o2net_keepalive_delay_ms),
1160 o2net_keepalive_delay(sc->sc_node));
1161 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1162 return -1;
1163 }
1164
1165 if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
1166 O2HB_MAX_WRITE_TIMEOUT_MS) {
1167 mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of "
1168 "%u ms, but we use %u ms locally. disconnecting\n",
1169 SC_NODEF_ARGS(sc),
1170 be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
1171 O2HB_MAX_WRITE_TIMEOUT_MS);
1172 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1173 return -1;
1174 }
1175
1100 sc->sc_handshake_ok = 1; 1176 sc->sc_handshake_ok = 1;
1101 1177
1102 spin_lock(&nn->nn_lock); 1178 spin_lock(&nn->nn_lock);
1103 /* set valid and queue the idle timers only if it hasn't been 1179 /* set valid and queue the idle timers only if it hasn't been
1104 * shut down already */ 1180 * shut down already */
1105 if (nn->nn_sc == sc) { 1181 if (nn->nn_sc == sc) {
1106 o2net_sc_postpone_idle(sc); 1182 o2net_sc_reset_idle_timer(sc);
1107 o2net_set_nn_state(nn, sc, 1, 0); 1183 o2net_set_nn_state(nn, sc, 1, 0);
1108 } 1184 }
1109 spin_unlock(&nn->nn_lock); 1185 spin_unlock(&nn->nn_lock);
@@ -1129,6 +1205,23 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
1129 sclog(sc, "receiving\n"); 1205 sclog(sc, "receiving\n");
1130 do_gettimeofday(&sc->sc_tv_advance_start); 1206 do_gettimeofday(&sc->sc_tv_advance_start);
1131 1207
1208 if (unlikely(sc->sc_handshake_ok == 0)) {
1209 if(sc->sc_page_off < sizeof(struct o2net_handshake)) {
1210 data = page_address(sc->sc_page) + sc->sc_page_off;
1211 datalen = sizeof(struct o2net_handshake) - sc->sc_page_off;
1212 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1213 if (ret > 0)
1214 sc->sc_page_off += ret;
1215 }
1216
1217 if (sc->sc_page_off == sizeof(struct o2net_handshake)) {
1218 o2net_check_handshake(sc);
1219 if (unlikely(sc->sc_handshake_ok == 0))
1220 ret = -EPROTO;
1221 }
1222 goto out;
1223 }
1224
1132 /* do we need more header? */ 1225 /* do we need more header? */
1133 if (sc->sc_page_off < sizeof(struct o2net_msg)) { 1226 if (sc->sc_page_off < sizeof(struct o2net_msg)) {
1134 data = page_address(sc->sc_page) + sc->sc_page_off; 1227 data = page_address(sc->sc_page) + sc->sc_page_off;
@@ -1136,15 +1229,6 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
1136 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); 1229 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1137 if (ret > 0) { 1230 if (ret > 0) {
1138 sc->sc_page_off += ret; 1231 sc->sc_page_off += ret;
1139
1140 /* this working relies on the handshake being
1141 * smaller than the normal message header */
1142 if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
1143 !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
1144 ret = -EPROTO;
1145 goto out;
1146 }
1147
1148 /* only swab incoming here.. we can 1232 /* only swab incoming here.. we can
1149 * only get here once as we cross from 1233 * only get here once as we cross from
1150 * being under to over */ 1234 * being under to over */
@@ -1201,9 +1285,10 @@ out:
1201/* this work func is triggerd by data ready. it reads until it can read no 1285/* this work func is triggerd by data ready. it reads until it can read no
1202 * more. it interprets 0, eof, as fatal. if data_ready hits while we're doing 1286 * more. it interprets 0, eof, as fatal. if data_ready hits while we're doing
1203 * our work the work struct will be marked and we'll be called again. */ 1287 * our work the work struct will be marked and we'll be called again. */
1204static void o2net_rx_until_empty(void *arg) 1288static void o2net_rx_until_empty(struct work_struct *work)
1205{ 1289{
1206 struct o2net_sock_container *sc = arg; 1290 struct o2net_sock_container *sc =
1291 container_of(work, struct o2net_sock_container, sc_rx_work);
1207 int ret; 1292 int ret;
1208 1293
1209 do { 1294 do {
@@ -1245,26 +1330,43 @@ static int o2net_set_nodelay(struct socket *sock)
1245 return ret; 1330 return ret;
1246} 1331}
1247 1332
1333static void o2net_initialize_handshake(void)
1334{
1335 o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
1336 O2HB_MAX_WRITE_TIMEOUT_MS);
1337 o2net_hand->o2net_idle_timeout_ms = cpu_to_be32(
1338 o2net_idle_timeout(NULL));
1339 o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32(
1340 o2net_keepalive_delay(NULL));
1341 o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32(
1342 o2net_reconnect_delay(NULL));
1343}
1344
1248/* ------------------------------------------------------------ */ 1345/* ------------------------------------------------------------ */
1249 1346
1250/* called when a connect completes and after a sock is accepted. the 1347/* called when a connect completes and after a sock is accepted. the
1251 * rx path will see the response and mark the sc valid */ 1348 * rx path will see the response and mark the sc valid */
1252static void o2net_sc_connect_completed(void *arg) 1349static void o2net_sc_connect_completed(struct work_struct *work)
1253{ 1350{
1254 struct o2net_sock_container *sc = arg; 1351 struct o2net_sock_container *sc =
1352 container_of(work, struct o2net_sock_container,
1353 sc_connect_work);
1255 1354
1256 mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n", 1355 mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n",
1257 (unsigned long long)O2NET_PROTOCOL_VERSION, 1356 (unsigned long long)O2NET_PROTOCOL_VERSION,
1258 (unsigned long long)be64_to_cpu(o2net_hand->connector_id)); 1357 (unsigned long long)be64_to_cpu(o2net_hand->connector_id));
1259 1358
1359 o2net_initialize_handshake();
1260 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); 1360 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1261 sc_put(sc); 1361 sc_put(sc);
1262} 1362}
1263 1363
1264/* this is called as a work_struct func. */ 1364/* this is called as a work_struct func. */
1265static void o2net_sc_send_keep_req(void *arg) 1365static void o2net_sc_send_keep_req(struct work_struct *work)
1266{ 1366{
1267 struct o2net_sock_container *sc = arg; 1367 struct o2net_sock_container *sc =
1368 container_of(work, struct o2net_sock_container,
1369 sc_keepalive_work.work);
1268 1370
1269 o2net_sendpage(sc, o2net_keep_req, sizeof(*o2net_keep_req)); 1371 o2net_sendpage(sc, o2net_keep_req, sizeof(*o2net_keep_req));
1270 sc_put(sc); 1372 sc_put(sc);
@@ -1280,8 +1382,10 @@ static void o2net_idle_timer(unsigned long data)
1280 1382
1281 do_gettimeofday(&now); 1383 do_gettimeofday(&now);
1282 1384
1283 printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for 10 " 1385 printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for %u.%u "
1284 "seconds, shutting it down.\n", SC_NODEF_ARGS(sc)); 1386 "seconds, shutting it down.\n", SC_NODEF_ARGS(sc),
1387 o2net_idle_timeout(sc->sc_node) / 1000,
1388 o2net_idle_timeout(sc->sc_node) % 1000);
1285 mlog(ML_NOTICE, "here are some times that might help debug the " 1389 mlog(ML_NOTICE, "here are some times that might help debug the "
1286 "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv " 1390 "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv "
1287 "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n", 1391 "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n",
@@ -1299,14 +1403,21 @@ static void o2net_idle_timer(unsigned long data)
1299 o2net_sc_queue_work(sc, &sc->sc_shutdown_work); 1403 o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
1300} 1404}
1301 1405
1302static void o2net_sc_postpone_idle(struct o2net_sock_container *sc) 1406static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc)
1303{ 1407{
1304 o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work); 1408 o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
1305 o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work, 1409 o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work,
1306 O2NET_KEEPALIVE_DELAY_SECS * HZ); 1410 msecs_to_jiffies(o2net_keepalive_delay(sc->sc_node)));
1307 do_gettimeofday(&sc->sc_tv_timer); 1411 do_gettimeofday(&sc->sc_tv_timer);
1308 mod_timer(&sc->sc_idle_timeout, 1412 mod_timer(&sc->sc_idle_timeout,
1309 jiffies + (O2NET_IDLE_TIMEOUT_SECS * HZ)); 1413 jiffies + msecs_to_jiffies(o2net_idle_timeout(sc->sc_node)));
1414}
1415
1416static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
1417{
1418 /* Only push out an existing timer */
1419 if (timer_pending(&sc->sc_idle_timeout))
1420 o2net_sc_reset_idle_timer(sc);
1310} 1421}
1311 1422
1312/* this work func is kicked whenever a path sets the nn state which doesn't 1423/* this work func is kicked whenever a path sets the nn state which doesn't
@@ -1314,14 +1425,15 @@ static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
1314 * having a connect attempt fail, etc. This centralizes the logic which decides 1425 * having a connect attempt fail, etc. This centralizes the logic which decides
1315 * if a connect attempt should be made or if we should give up and all future 1426 * if a connect attempt should be made or if we should give up and all future
1316 * transmit attempts should fail */ 1427 * transmit attempts should fail */
1317static void o2net_start_connect(void *arg) 1428static void o2net_start_connect(struct work_struct *work)
1318{ 1429{
1319 struct o2net_node *nn = arg; 1430 struct o2net_node *nn =
1431 container_of(work, struct o2net_node, nn_connect_work.work);
1320 struct o2net_sock_container *sc = NULL; 1432 struct o2net_sock_container *sc = NULL;
1321 struct o2nm_node *node = NULL, *mynode = NULL; 1433 struct o2nm_node *node = NULL, *mynode = NULL;
1322 struct socket *sock = NULL; 1434 struct socket *sock = NULL;
1323 struct sockaddr_in myaddr = {0, }, remoteaddr = {0, }; 1435 struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
1324 int ret = 0; 1436 int ret = 0, stop;
1325 1437
1326 /* if we're greater we initiate tx, otherwise we accept */ 1438 /* if we're greater we initiate tx, otherwise we accept */
1327 if (o2nm_this_node() <= o2net_num_from_nn(nn)) 1439 if (o2nm_this_node() <= o2net_num_from_nn(nn))
@@ -1342,10 +1454,9 @@ static void o2net_start_connect(void *arg)
1342 1454
1343 spin_lock(&nn->nn_lock); 1455 spin_lock(&nn->nn_lock);
1344 /* see if we already have one pending or have given up */ 1456 /* see if we already have one pending or have given up */
1345 if (nn->nn_sc || nn->nn_persistent_error) 1457 stop = (nn->nn_sc || nn->nn_persistent_error);
1346 arg = NULL;
1347 spin_unlock(&nn->nn_lock); 1458 spin_unlock(&nn->nn_lock);
1348 if (arg == NULL) /* *shrug*, needed some indicator */ 1459 if (stop)
1349 goto out; 1460 goto out;
1350 1461
1351 nn->nn_last_connect_attempt = jiffies; 1462 nn->nn_last_connect_attempt = jiffies;
@@ -1421,24 +1532,29 @@ out:
1421 return; 1532 return;
1422} 1533}
1423 1534
1424static void o2net_connect_expired(void *arg) 1535static void o2net_connect_expired(struct work_struct *work)
1425{ 1536{
1426 struct o2net_node *nn = arg; 1537 struct o2net_node *nn =
1538 container_of(work, struct o2net_node, nn_connect_expired.work);
1427 1539
1428 spin_lock(&nn->nn_lock); 1540 spin_lock(&nn->nn_lock);
1429 if (!nn->nn_sc_valid) { 1541 if (!nn->nn_sc_valid) {
1542 struct o2nm_node *node = nn->nn_sc->sc_node;
1430 mlog(ML_ERROR, "no connection established with node %u after " 1543 mlog(ML_ERROR, "no connection established with node %u after "
1431 "%u seconds, giving up and returning errors.\n", 1544 "%u.%u seconds, giving up and returning errors.\n",
1432 o2net_num_from_nn(nn), O2NET_IDLE_TIMEOUT_SECS); 1545 o2net_num_from_nn(nn),
1546 o2net_idle_timeout(node) / 1000,
1547 o2net_idle_timeout(node) % 1000);
1433 1548
1434 o2net_set_nn_state(nn, NULL, 0, -ENOTCONN); 1549 o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
1435 } 1550 }
1436 spin_unlock(&nn->nn_lock); 1551 spin_unlock(&nn->nn_lock);
1437} 1552}
1438 1553
1439static void o2net_still_up(void *arg) 1554static void o2net_still_up(struct work_struct *work)
1440{ 1555{
1441 struct o2net_node *nn = arg; 1556 struct o2net_node *nn =
1557 container_of(work, struct o2net_node, nn_still_up.work);
1442 1558
1443 o2quo_hb_still_up(o2net_num_from_nn(nn)); 1559 o2quo_hb_still_up(o2net_num_from_nn(nn));
1444} 1560}
@@ -1469,6 +1585,8 @@ static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
1469 1585
1470 if (node_num != o2nm_this_node()) 1586 if (node_num != o2nm_this_node())
1471 o2net_disconnect_node(node); 1587 o2net_disconnect_node(node);
1588
1589 BUG_ON(atomic_read(&o2net_connected_peers) < 0);
1472} 1590}
1473 1591
1474static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num, 1592static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
@@ -1480,14 +1598,14 @@ static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
1480 1598
1481 /* ensure an immediate connect attempt */ 1599 /* ensure an immediate connect attempt */
1482 nn->nn_last_connect_attempt = jiffies - 1600 nn->nn_last_connect_attempt = jiffies -
1483 (msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1); 1601 (msecs_to_jiffies(o2net_reconnect_delay(node)) + 1);
1484 1602
1485 if (node_num != o2nm_this_node()) { 1603 if (node_num != o2nm_this_node()) {
1486 /* heartbeat doesn't work unless a local node number is 1604 /* heartbeat doesn't work unless a local node number is
1487 * configured and doing so brings up the o2net_wq, so we can 1605 * configured and doing so brings up the o2net_wq, so we can
1488 * use it.. */ 1606 * use it.. */
1489 queue_delayed_work(o2net_wq, &nn->nn_connect_expired, 1607 queue_delayed_work(o2net_wq, &nn->nn_connect_expired,
1490 O2NET_IDLE_TIMEOUT_SECS * HZ); 1608 msecs_to_jiffies(o2net_idle_timeout(node)));
1491 1609
1492 /* believe it or not, accept and node hearbeating testing 1610 /* believe it or not, accept and node hearbeating testing
1493 * can succeed for this node before we got here.. so 1611 * can succeed for this node before we got here.. so
@@ -1632,6 +1750,7 @@ static int o2net_accept_one(struct socket *sock)
1632 o2net_register_callbacks(sc->sc_sock->sk, sc); 1750 o2net_register_callbacks(sc->sc_sock->sk, sc);
1633 o2net_sc_queue_work(sc, &sc->sc_rx_work); 1751 o2net_sc_queue_work(sc, &sc->sc_rx_work);
1634 1752
1753 o2net_initialize_handshake();
1635 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); 1754 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1636 1755
1637out: 1756out:
@@ -1644,9 +1763,9 @@ out:
1644 return ret; 1763 return ret;
1645} 1764}
1646 1765
1647static void o2net_accept_many(void *arg) 1766static void o2net_accept_many(struct work_struct *work)
1648{ 1767{
1649 struct socket *sock = arg; 1768 struct socket *sock = o2net_listen_sock;
1650 while (o2net_accept_one(sock) == 0) 1769 while (o2net_accept_one(sock) == 0)
1651 cond_resched(); 1770 cond_resched();
1652} 1771}
@@ -1700,7 +1819,7 @@ static int o2net_open_listening_sock(__be16 port)
1700 write_unlock_bh(&sock->sk->sk_callback_lock); 1819 write_unlock_bh(&sock->sk->sk_callback_lock);
1701 1820
1702 o2net_listen_sock = sock; 1821 o2net_listen_sock = sock;
1703 INIT_WORK(&o2net_listen_work, o2net_accept_many, sock); 1822 INIT_WORK(&o2net_listen_work, o2net_accept_many);
1704 1823
1705 sock->sk->sk_reuse = 1; 1824 sock->sk->sk_reuse = 1;
1706 ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); 1825 ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
@@ -1799,9 +1918,9 @@ int o2net_init(void)
1799 1918
1800 o2quo_init(); 1919 o2quo_init();
1801 1920
1802 o2net_hand = kcalloc(1, sizeof(struct o2net_handshake), GFP_KERNEL); 1921 o2net_hand = kzalloc(sizeof(struct o2net_handshake), GFP_KERNEL);
1803 o2net_keep_req = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL); 1922 o2net_keep_req = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
1804 o2net_keep_resp = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL); 1923 o2net_keep_resp = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
1805 if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) { 1924 if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) {
1806 kfree(o2net_hand); 1925 kfree(o2net_hand);
1807 kfree(o2net_keep_req); 1926 kfree(o2net_keep_req);
@@ -1819,9 +1938,10 @@ int o2net_init(void)
1819 struct o2net_node *nn = o2net_nn_from_num(i); 1938 struct o2net_node *nn = o2net_nn_from_num(i);
1820 1939
1821 spin_lock_init(&nn->nn_lock); 1940 spin_lock_init(&nn->nn_lock);
1822 INIT_WORK(&nn->nn_connect_work, o2net_start_connect, nn); 1941 INIT_DELAYED_WORK(&nn->nn_connect_work, o2net_start_connect);
1823 INIT_WORK(&nn->nn_connect_expired, o2net_connect_expired, nn); 1942 INIT_DELAYED_WORK(&nn->nn_connect_expired,
1824 INIT_WORK(&nn->nn_still_up, o2net_still_up, nn); 1943 o2net_connect_expired);
1944 INIT_DELAYED_WORK(&nn->nn_still_up, o2net_still_up);
1825 /* until we see hb from a node we'll return einval */ 1945 /* until we see hb from a node we'll return einval */
1826 nn->nn_persistent_error = -ENOTCONN; 1946 nn->nn_persistent_error = -ENOTCONN;
1827 init_waitqueue_head(&nn->nn_sc_wq); 1947 init_waitqueue_head(&nn->nn_sc_wq);
diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h
index 616ff2b8434a..21a4e43df836 100644
--- a/fs/ocfs2/cluster/tcp.h
+++ b/fs/ocfs2/cluster/tcp.h
@@ -54,6 +54,13 @@ typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data)
54 54
55#define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg)) 55#define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg))
56 56
57/* same as hb delay, we're waiting for another node to recognize our hb */
58#define O2NET_RECONNECT_DELAY_MS_DEFAULT 2000
59
60#define O2NET_KEEPALIVE_DELAY_MS_DEFAULT 5000
61#define O2NET_IDLE_TIMEOUT_MS_DEFAULT 10000
62
63
57/* TODO: figure this out.... */ 64/* TODO: figure this out.... */
58static inline int o2net_link_down(int err, struct socket *sock) 65static inline int o2net_link_down(int err, struct socket *sock)
59{ 66{
@@ -101,6 +108,7 @@ void o2net_unregister_hb_callbacks(void);
101int o2net_start_listening(struct o2nm_node *node); 108int o2net_start_listening(struct o2nm_node *node);
102void o2net_stop_listening(struct o2nm_node *node); 109void o2net_stop_listening(struct o2nm_node *node);
103void o2net_disconnect_node(struct o2nm_node *node); 110void o2net_disconnect_node(struct o2nm_node *node);
111int o2net_num_connected_peers(void);
104 112
105int o2net_init(void); 113int o2net_init(void);
106void o2net_exit(void); 114void o2net_exit(void);
diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h
index 4b46aac7d243..b700dc9624d1 100644
--- a/fs/ocfs2/cluster/tcp_internal.h
+++ b/fs/ocfs2/cluster/tcp_internal.h
@@ -27,23 +27,20 @@
27#define O2NET_MSG_KEEP_REQ_MAGIC ((u16)0xfa57) 27#define O2NET_MSG_KEEP_REQ_MAGIC ((u16)0xfa57)
28#define O2NET_MSG_KEEP_RESP_MAGIC ((u16)0xfa58) 28#define O2NET_MSG_KEEP_RESP_MAGIC ((u16)0xfa58)
29 29
30/* same as hb delay, we're waiting for another node to recognize our hb */
31#define O2NET_RECONNECT_DELAY_MS O2HB_REGION_TIMEOUT_MS
32
33/* we're delaying our quorum decision so that heartbeat will have timed 30/* we're delaying our quorum decision so that heartbeat will have timed
34 * out truly dead nodes by the time we come around to making decisions 31 * out truly dead nodes by the time we come around to making decisions
35 * on their number */ 32 * on their number */
36#define O2NET_QUORUM_DELAY_MS ((o2hb_dead_threshold + 2) * O2HB_REGION_TIMEOUT_MS) 33#define O2NET_QUORUM_DELAY_MS ((o2hb_dead_threshold + 2) * O2HB_REGION_TIMEOUT_MS)
37 34
38#define O2NET_KEEPALIVE_DELAY_SECS 5
39#define O2NET_IDLE_TIMEOUT_SECS 10
40
41/* 35/*
42 * This version number represents quite a lot, unfortunately. It not 36 * This version number represents quite a lot, unfortunately. It not
43 * only represents the raw network message protocol on the wire but also 37 * only represents the raw network message protocol on the wire but also
44 * locking semantics of the file system using the protocol. It should 38 * locking semantics of the file system using the protocol. It should
45 * be somewhere else, I'm sure, but right now it isn't. 39 * be somewhere else, I'm sure, but right now it isn't.
46 * 40 *
41 * New in version 5:
42 * - Network timeout checking protocol
43 *
47 * New in version 4: 44 * New in version 4:
48 * - Remove i_generation from lock names for better stat performance. 45 * - Remove i_generation from lock names for better stat performance.
49 * 46 *
@@ -54,10 +51,14 @@
54 * - full 64 bit i_size in the metadata lock lvbs 51 * - full 64 bit i_size in the metadata lock lvbs
55 * - introduction of "rw" lock and pushing meta/data locking down 52 * - introduction of "rw" lock and pushing meta/data locking down
56 */ 53 */
57#define O2NET_PROTOCOL_VERSION 4ULL 54#define O2NET_PROTOCOL_VERSION 5ULL
58struct o2net_handshake { 55struct o2net_handshake {
59 __be64 protocol_version; 56 __be64 protocol_version;
60 __be64 connector_id; 57 __be64 connector_id;
58 __be32 o2hb_heartbeat_timeout_ms;
59 __be32 o2net_idle_timeout_ms;
60 __be32 o2net_keepalive_delay_ms;
61 __be32 o2net_reconnect_delay_ms;
61}; 62};
62 63
63struct o2net_node { 64struct o2net_node {
@@ -86,18 +87,18 @@ struct o2net_node {
86 * connect attempt fails and so can be self-arming. shutdown is 87 * connect attempt fails and so can be self-arming. shutdown is
87 * careful to first mark the nn such that no connects will be attempted 88 * careful to first mark the nn such that no connects will be attempted
88 * before canceling delayed connect work and flushing the queue. */ 89 * before canceling delayed connect work and flushing the queue. */
89 struct work_struct nn_connect_work; 90 struct delayed_work nn_connect_work;
90 unsigned long nn_last_connect_attempt; 91 unsigned long nn_last_connect_attempt;
91 92
92 /* this is queued as nodes come up and is canceled when a connection is 93 /* this is queued as nodes come up and is canceled when a connection is
93 * established. this expiring gives up on the node and errors out 94 * established. this expiring gives up on the node and errors out
94 * transmits */ 95 * transmits */
95 struct work_struct nn_connect_expired; 96 struct delayed_work nn_connect_expired;
96 97
97 /* after we give up on a socket we wait a while before deciding 98 /* after we give up on a socket we wait a while before deciding
98 * that it is still heartbeating and that we should do some 99 * that it is still heartbeating and that we should do some
99 * quorum work */ 100 * quorum work */
100 struct work_struct nn_still_up; 101 struct delayed_work nn_still_up;
101}; 102};
102 103
103struct o2net_sock_container { 104struct o2net_sock_container {
@@ -129,7 +130,7 @@ struct o2net_sock_container {
129 struct work_struct sc_shutdown_work; 130 struct work_struct sc_shutdown_work;
130 131
131 struct timer_list sc_idle_timeout; 132 struct timer_list sc_idle_timeout;
132 struct work_struct sc_keepalive_work; 133 struct delayed_work sc_keepalive_work;
133 134
134 unsigned sc_handshake_ok:1; 135 unsigned sc_handshake_ok:1;
135 136