aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/cluster
diff options
context:
space:
mode:
authorAndrew Beekhof <abeekhof@suse.de>2006-12-04 08:04:55 -0500
committerMark Fasheh <mark.fasheh@oracle.com>2006-12-11 17:26:44 -0500
commit828ae6afbef03bfe107a4a8cc38798419d6a2765 (patch)
treeaa9cd680db9af2070f124cfd66aad88da279a5b9 /fs/ocfs2/cluster
parentb5dd80304da482d77b2320e1a01a189e656b9770 (diff)
[patch 3/3] OCFS2 Configurable timeouts - Protocol changes
Modify the OCFS2 handshake to ensure essential timeouts are configured identically on all nodes. Only allow changes when there are no connected peers Improves the logic in o2net_advance_rx() which broke now that sizeof(struct o2net_handshake) is greater than sizeof(struct o2net_msg) Included is the field for userspace-heartbeat timeout to avoid the need for further protocol changes. Uses a global spinlock to ensure the decisions to update configfs entries are made on the correct value. The region covered by the spinlock when incrementing the counter is much larger as this is the more critical case. Small cleanup contributed by Adrian Bunk <bunk@stusta.de> Signed-off-by: Andrew Beekhof <abeekhof@suse.de> Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
Diffstat (limited to 'fs/ocfs2/cluster')
-rw-r--r--fs/ocfs2/cluster/nodemanager.c30
-rw-r--r--fs/ocfs2/cluster/tcp.c92
-rw-r--r--fs/ocfs2/cluster/tcp.h1
-rw-r--r--fs/ocfs2/cluster/tcp_internal.h9
4 files changed, 116 insertions, 16 deletions
diff --git a/fs/ocfs2/cluster/nodemanager.c b/fs/ocfs2/cluster/nodemanager.c
index 234f83f2897f..357f1d551771 100644
--- a/fs/ocfs2/cluster/nodemanager.c
+++ b/fs/ocfs2/cluster/nodemanager.c
@@ -573,12 +573,21 @@ static ssize_t o2nm_cluster_attr_idle_timeout_ms_write(
573 ret = o2nm_cluster_attr_write(page, count, &val); 573 ret = o2nm_cluster_attr_write(page, count, &val);
574 574
575 if (ret > 0) { 575 if (ret > 0) {
576 if (val <= cluster->cl_keepalive_delay_ms) { 576 if (cluster->cl_idle_timeout_ms != val
577 && o2net_num_connected_peers()) {
578 mlog(ML_NOTICE,
579 "o2net: cannot change idle timeout after "
580 "the first peer has agreed to it."
581 " %d connected peers\n",
582 o2net_num_connected_peers());
583 ret = -EINVAL;
584 } else if (val <= cluster->cl_keepalive_delay_ms) {
577 mlog(ML_NOTICE, "o2net: idle timeout must be larger " 585 mlog(ML_NOTICE, "o2net: idle timeout must be larger "
578 "than keepalive delay\n"); 586 "than keepalive delay\n");
579 return -EINVAL; 587 ret = -EINVAL;
588 } else {
589 cluster->cl_idle_timeout_ms = val;
580 } 590 }
581 cluster->cl_idle_timeout_ms = val;
582 } 591 }
583 592
584 return ret; 593 return ret;
@@ -599,12 +608,21 @@ static ssize_t o2nm_cluster_attr_keepalive_delay_ms_write(
599 ret = o2nm_cluster_attr_write(page, count, &val); 608 ret = o2nm_cluster_attr_write(page, count, &val);
600 609
601 if (ret > 0) { 610 if (ret > 0) {
602 if (val >= cluster->cl_idle_timeout_ms) { 611 if (cluster->cl_keepalive_delay_ms != val
612 && o2net_num_connected_peers()) {
613 mlog(ML_NOTICE,
614 "o2net: cannot change keepalive delay after"
615 " the first peer has agreed to it."
616 " %d connected peers\n",
617 o2net_num_connected_peers());
618 ret = -EINVAL;
619 } else if (val >= cluster->cl_idle_timeout_ms) {
603 mlog(ML_NOTICE, "o2net: keepalive delay must be " 620 mlog(ML_NOTICE, "o2net: keepalive delay must be "
604 "smaller than idle timeout\n"); 621 "smaller than idle timeout\n");
605 return -EINVAL; 622 ret = -EINVAL;
623 } else {
624 cluster->cl_keepalive_delay_ms = val;
606 } 625 }
607 cluster->cl_keepalive_delay_ms = val;
608 } 626 }
609 627
610 return ret; 628 return ret;
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index ebbaee664c66..457753df1ae7 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -380,6 +380,13 @@ static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc,
380 sc_put(sc); 380 sc_put(sc);
381} 381}
382 382
383static atomic_t o2net_connected_peers = ATOMIC_INIT(0);
384
385int o2net_num_connected_peers(void)
386{
387 return atomic_read(&o2net_connected_peers);
388}
389
383static void o2net_set_nn_state(struct o2net_node *nn, 390static void o2net_set_nn_state(struct o2net_node *nn,
384 struct o2net_sock_container *sc, 391 struct o2net_sock_container *sc,
385 unsigned valid, int err) 392 unsigned valid, int err)
@@ -390,6 +397,11 @@ static void o2net_set_nn_state(struct o2net_node *nn,
390 397
391 assert_spin_locked(&nn->nn_lock); 398 assert_spin_locked(&nn->nn_lock);
392 399
400 if (old_sc && !sc)
401 atomic_dec(&o2net_connected_peers);
402 else if (!old_sc && sc)
403 atomic_inc(&o2net_connected_peers);
404
393 /* the node num comparison and single connect/accept path should stop 405 /* the node num comparison and single connect/accept path should stop
394 * an non-null sc from being overwritten with another */ 406 * an non-null sc from being overwritten with another */
395 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc); 407 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
@@ -1123,6 +1135,44 @@ static int o2net_check_handshake(struct o2net_sock_container *sc)
1123 return -1; 1135 return -1;
1124 } 1136 }
1125 1137
1138 /*
1139 * Ensure timeouts are consistent with other nodes, otherwise
1140 * we can end up with one node thinking that the other must be down,
1141 * but isn't. This can ultimately cause corruption.
1142 */
1143 if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
1144 o2net_idle_timeout(sc->sc_node)) {
1145 mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of "
1146 "%u ms, but we use %u ms locally. disconnecting\n",
1147 SC_NODEF_ARGS(sc),
1148 be32_to_cpu(hand->o2net_idle_timeout_ms),
1149 o2net_idle_timeout(sc->sc_node));
1150 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1151 return -1;
1152 }
1153
1154 if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
1155 o2net_keepalive_delay(sc->sc_node)) {
1156 mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of "
1157 "%u ms, but we use %u ms locally. disconnecting\n",
1158 SC_NODEF_ARGS(sc),
1159 be32_to_cpu(hand->o2net_keepalive_delay_ms),
1160 o2net_keepalive_delay(sc->sc_node));
1161 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1162 return -1;
1163 }
1164
1165 if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
1166 O2HB_MAX_WRITE_TIMEOUT_MS) {
1167 mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of "
1168 "%u ms, but we use %u ms locally. disconnecting\n",
1169 SC_NODEF_ARGS(sc),
1170 be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
1171 O2HB_MAX_WRITE_TIMEOUT_MS);
1172 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1173 return -1;
1174 }
1175
1126 sc->sc_handshake_ok = 1; 1176 sc->sc_handshake_ok = 1;
1127 1177
1128 spin_lock(&nn->nn_lock); 1178 spin_lock(&nn->nn_lock);
@@ -1155,6 +1205,23 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
1155 sclog(sc, "receiving\n"); 1205 sclog(sc, "receiving\n");
1156 do_gettimeofday(&sc->sc_tv_advance_start); 1206 do_gettimeofday(&sc->sc_tv_advance_start);
1157 1207
1208 if (unlikely(sc->sc_handshake_ok == 0)) {
1209 if(sc->sc_page_off < sizeof(struct o2net_handshake)) {
1210 data = page_address(sc->sc_page) + sc->sc_page_off;
1211 datalen = sizeof(struct o2net_handshake) - sc->sc_page_off;
1212 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1213 if (ret > 0)
1214 sc->sc_page_off += ret;
1215 }
1216
1217 if (sc->sc_page_off == sizeof(struct o2net_handshake)) {
1218 o2net_check_handshake(sc);
1219 if (unlikely(sc->sc_handshake_ok == 0))
1220 ret = -EPROTO;
1221 }
1222 goto out;
1223 }
1224
1158 /* do we need more header? */ 1225 /* do we need more header? */
1159 if (sc->sc_page_off < sizeof(struct o2net_msg)) { 1226 if (sc->sc_page_off < sizeof(struct o2net_msg)) {
1160 data = page_address(sc->sc_page) + sc->sc_page_off; 1227 data = page_address(sc->sc_page) + sc->sc_page_off;
@@ -1162,15 +1229,6 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
1162 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); 1229 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1163 if (ret > 0) { 1230 if (ret > 0) {
1164 sc->sc_page_off += ret; 1231 sc->sc_page_off += ret;
1165
1166 /* this working relies on the handshake being
1167 * smaller than the normal message header */
1168 if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
1169 !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
1170 ret = -EPROTO;
1171 goto out;
1172 }
1173
1174 /* only swab incoming here.. we can 1232 /* only swab incoming here.. we can
1175 * only get here once as we cross from 1233 * only get here once as we cross from
1176 * being under to over */ 1234 * being under to over */
@@ -1272,6 +1330,18 @@ static int o2net_set_nodelay(struct socket *sock)
1272 return ret; 1330 return ret;
1273} 1331}
1274 1332
1333static void o2net_initialize_handshake(void)
1334{
1335 o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
1336 O2HB_MAX_WRITE_TIMEOUT_MS);
1337 o2net_hand->o2net_idle_timeout_ms = cpu_to_be32(
1338 o2net_idle_timeout(NULL));
1339 o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32(
1340 o2net_keepalive_delay(NULL));
1341 o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32(
1342 o2net_reconnect_delay(NULL));
1343}
1344
1275/* ------------------------------------------------------------ */ 1345/* ------------------------------------------------------------ */
1276 1346
1277/* called when a connect completes and after a sock is accepted. the 1347/* called when a connect completes and after a sock is accepted. the
@@ -1286,6 +1356,7 @@ static void o2net_sc_connect_completed(struct work_struct *work)
1286 (unsigned long long)O2NET_PROTOCOL_VERSION, 1356 (unsigned long long)O2NET_PROTOCOL_VERSION,
1287 (unsigned long long)be64_to_cpu(o2net_hand->connector_id)); 1357 (unsigned long long)be64_to_cpu(o2net_hand->connector_id));
1288 1358
1359 o2net_initialize_handshake();
1289 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); 1360 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1290 sc_put(sc); 1361 sc_put(sc);
1291} 1362}
@@ -1514,6 +1585,8 @@ static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
1514 1585
1515 if (node_num != o2nm_this_node()) 1586 if (node_num != o2nm_this_node())
1516 o2net_disconnect_node(node); 1587 o2net_disconnect_node(node);
1588
1589 BUG_ON(atomic_read(&o2net_connected_peers) < 0);
1517} 1590}
1518 1591
1519static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num, 1592static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
@@ -1677,6 +1750,7 @@ static int o2net_accept_one(struct socket *sock)
1677 o2net_register_callbacks(sc->sc_sock->sk, sc); 1750 o2net_register_callbacks(sc->sc_sock->sk, sc);
1678 o2net_sc_queue_work(sc, &sc->sc_rx_work); 1751 o2net_sc_queue_work(sc, &sc->sc_rx_work);
1679 1752
1753 o2net_initialize_handshake();
1680 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); 1754 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1681 1755
1682out: 1756out:
diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h
index 2e08976050fb..21a4e43df836 100644
--- a/fs/ocfs2/cluster/tcp.h
+++ b/fs/ocfs2/cluster/tcp.h
@@ -108,6 +108,7 @@ void o2net_unregister_hb_callbacks(void);
108int o2net_start_listening(struct o2nm_node *node); 108int o2net_start_listening(struct o2nm_node *node);
109void o2net_stop_listening(struct o2nm_node *node); 109void o2net_stop_listening(struct o2nm_node *node);
110void o2net_disconnect_node(struct o2nm_node *node); 110void o2net_disconnect_node(struct o2nm_node *node);
111int o2net_num_connected_peers(void);
111 112
112int o2net_init(void); 113int o2net_init(void);
113void o2net_exit(void); 114void o2net_exit(void);
diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h
index 56f7ee1d2547..b700dc9624d1 100644
--- a/fs/ocfs2/cluster/tcp_internal.h
+++ b/fs/ocfs2/cluster/tcp_internal.h
@@ -38,6 +38,9 @@
38 * locking semantics of the file system using the protocol. It should 38 * locking semantics of the file system using the protocol. It should
39 * be somewhere else, I'm sure, but right now it isn't. 39 * be somewhere else, I'm sure, but right now it isn't.
40 * 40 *
41 * New in version 5:
42 * - Network timeout checking protocol
43 *
41 * New in version 4: 44 * New in version 4:
42 * - Remove i_generation from lock names for better stat performance. 45 * - Remove i_generation from lock names for better stat performance.
43 * 46 *
@@ -48,10 +51,14 @@
48 * - full 64 bit i_size in the metadata lock lvbs 51 * - full 64 bit i_size in the metadata lock lvbs
49 * - introduction of "rw" lock and pushing meta/data locking down 52 * - introduction of "rw" lock and pushing meta/data locking down
50 */ 53 */
51#define O2NET_PROTOCOL_VERSION 4ULL 54#define O2NET_PROTOCOL_VERSION 5ULL
52struct o2net_handshake { 55struct o2net_handshake {
53 __be64 protocol_version; 56 __be64 protocol_version;
54 __be64 connector_id; 57 __be64 connector_id;
58 __be32 o2hb_heartbeat_timeout_ms;
59 __be32 o2net_idle_timeout_ms;
60 __be32 o2net_keepalive_delay_ms;
61 __be32 o2net_reconnect_delay_ms;
55}; 62};
56 63
57struct o2net_node { 64struct o2net_node {