aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/cluster')
-rw-r--r--fs/ocfs2/cluster/nodemanager.c30
-rw-r--r--fs/ocfs2/cluster/tcp.c92
-rw-r--r--fs/ocfs2/cluster/tcp.h1
-rw-r--r--fs/ocfs2/cluster/tcp_internal.h9
4 files changed, 116 insertions, 16 deletions
diff --git a/fs/ocfs2/cluster/nodemanager.c b/fs/ocfs2/cluster/nodemanager.c
index 234f83f2897f..357f1d551771 100644
--- a/fs/ocfs2/cluster/nodemanager.c
+++ b/fs/ocfs2/cluster/nodemanager.c
@@ -573,12 +573,21 @@ static ssize_t o2nm_cluster_attr_idle_timeout_ms_write(
573 ret = o2nm_cluster_attr_write(page, count, &val); 573 ret = o2nm_cluster_attr_write(page, count, &val);
574 574
575 if (ret > 0) { 575 if (ret > 0) {
576 if (val <= cluster->cl_keepalive_delay_ms) { 576 if (cluster->cl_idle_timeout_ms != val
577 && o2net_num_connected_peers()) {
578 mlog(ML_NOTICE,
579 "o2net: cannot change idle timeout after "
580 "the first peer has agreed to it."
581 " %d connected peers\n",
582 o2net_num_connected_peers());
583 ret = -EINVAL;
584 } else if (val <= cluster->cl_keepalive_delay_ms) {
577 mlog(ML_NOTICE, "o2net: idle timeout must be larger " 585 mlog(ML_NOTICE, "o2net: idle timeout must be larger "
578 "than keepalive delay\n"); 586 "than keepalive delay\n");
579 return -EINVAL; 587 ret = -EINVAL;
588 } else {
589 cluster->cl_idle_timeout_ms = val;
580 } 590 }
581 cluster->cl_idle_timeout_ms = val;
582 } 591 }
583 592
584 return ret; 593 return ret;
@@ -599,12 +608,21 @@ static ssize_t o2nm_cluster_attr_keepalive_delay_ms_write(
599 ret = o2nm_cluster_attr_write(page, count, &val); 608 ret = o2nm_cluster_attr_write(page, count, &val);
600 609
601 if (ret > 0) { 610 if (ret > 0) {
602 if (val >= cluster->cl_idle_timeout_ms) { 611 if (cluster->cl_keepalive_delay_ms != val
612 && o2net_num_connected_peers()) {
613 mlog(ML_NOTICE,
614 "o2net: cannot change keepalive delay after"
615 " the first peer has agreed to it."
616 " %d connected peers\n",
617 o2net_num_connected_peers());
618 ret = -EINVAL;
619 } else if (val >= cluster->cl_idle_timeout_ms) {
603 mlog(ML_NOTICE, "o2net: keepalive delay must be " 620 mlog(ML_NOTICE, "o2net: keepalive delay must be "
604 "smaller than idle timeout\n"); 621 "smaller than idle timeout\n");
605 return -EINVAL; 622 ret = -EINVAL;
623 } else {
624 cluster->cl_keepalive_delay_ms = val;
606 } 625 }
607 cluster->cl_keepalive_delay_ms = val;
608 } 626 }
609 627
610 return ret; 628 return ret;
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index ebbaee664c66..457753df1ae7 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -380,6 +380,13 @@ static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc,
380 sc_put(sc); 380 sc_put(sc);
381} 381}
382 382
383static atomic_t o2net_connected_peers = ATOMIC_INIT(0);
384
385int o2net_num_connected_peers(void)
386{
387 return atomic_read(&o2net_connected_peers);
388}
389
383static void o2net_set_nn_state(struct o2net_node *nn, 390static void o2net_set_nn_state(struct o2net_node *nn,
384 struct o2net_sock_container *sc, 391 struct o2net_sock_container *sc,
385 unsigned valid, int err) 392 unsigned valid, int err)
@@ -390,6 +397,11 @@ static void o2net_set_nn_state(struct o2net_node *nn,
390 397
391 assert_spin_locked(&nn->nn_lock); 398 assert_spin_locked(&nn->nn_lock);
392 399
400 if (old_sc && !sc)
401 atomic_dec(&o2net_connected_peers);
402 else if (!old_sc && sc)
403 atomic_inc(&o2net_connected_peers);
404
393 /* the node num comparison and single connect/accept path should stop 405 /* the node num comparison and single connect/accept path should stop
394 * an non-null sc from being overwritten with another */ 406 * an non-null sc from being overwritten with another */
395 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc); 407 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
@@ -1123,6 +1135,44 @@ static int o2net_check_handshake(struct o2net_sock_container *sc)
1123 return -1; 1135 return -1;
1124 } 1136 }
1125 1137
1138 /*
1139 * Ensure timeouts are consistent with other nodes, otherwise
1140 * we can end up with one node thinking that the other must be down,
1141 * but isn't. This can ultimately cause corruption.
1142 */
1143 if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
1144 o2net_idle_timeout(sc->sc_node)) {
1145 mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of "
1146 "%u ms, but we use %u ms locally. disconnecting\n",
1147 SC_NODEF_ARGS(sc),
1148 be32_to_cpu(hand->o2net_idle_timeout_ms),
1149 o2net_idle_timeout(sc->sc_node));
1150 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1151 return -1;
1152 }
1153
1154 if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
1155 o2net_keepalive_delay(sc->sc_node)) {
1156 mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of "
1157 "%u ms, but we use %u ms locally. disconnecting\n",
1158 SC_NODEF_ARGS(sc),
1159 be32_to_cpu(hand->o2net_keepalive_delay_ms),
1160 o2net_keepalive_delay(sc->sc_node));
1161 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1162 return -1;
1163 }
1164
1165 if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
1166 O2HB_MAX_WRITE_TIMEOUT_MS) {
1167 mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of "
1168 "%u ms, but we use %u ms locally. disconnecting\n",
1169 SC_NODEF_ARGS(sc),
1170 be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
1171 O2HB_MAX_WRITE_TIMEOUT_MS);
1172 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1173 return -1;
1174 }
1175
1126 sc->sc_handshake_ok = 1; 1176 sc->sc_handshake_ok = 1;
1127 1177
1128 spin_lock(&nn->nn_lock); 1178 spin_lock(&nn->nn_lock);
@@ -1155,6 +1205,23 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
1155 sclog(sc, "receiving\n"); 1205 sclog(sc, "receiving\n");
1156 do_gettimeofday(&sc->sc_tv_advance_start); 1206 do_gettimeofday(&sc->sc_tv_advance_start);
1157 1207
1208 if (unlikely(sc->sc_handshake_ok == 0)) {
1209 if(sc->sc_page_off < sizeof(struct o2net_handshake)) {
1210 data = page_address(sc->sc_page) + sc->sc_page_off;
1211 datalen = sizeof(struct o2net_handshake) - sc->sc_page_off;
1212 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1213 if (ret > 0)
1214 sc->sc_page_off += ret;
1215 }
1216
1217 if (sc->sc_page_off == sizeof(struct o2net_handshake)) {
1218 o2net_check_handshake(sc);
1219 if (unlikely(sc->sc_handshake_ok == 0))
1220 ret = -EPROTO;
1221 }
1222 goto out;
1223 }
1224
1158 /* do we need more header? */ 1225 /* do we need more header? */
1159 if (sc->sc_page_off < sizeof(struct o2net_msg)) { 1226 if (sc->sc_page_off < sizeof(struct o2net_msg)) {
1160 data = page_address(sc->sc_page) + sc->sc_page_off; 1227 data = page_address(sc->sc_page) + sc->sc_page_off;
@@ -1162,15 +1229,6 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
1162 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); 1229 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1163 if (ret > 0) { 1230 if (ret > 0) {
1164 sc->sc_page_off += ret; 1231 sc->sc_page_off += ret;
1165
1166 /* this working relies on the handshake being
1167 * smaller than the normal message header */
1168 if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
1169 !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
1170 ret = -EPROTO;
1171 goto out;
1172 }
1173
1174 /* only swab incoming here.. we can 1232 /* only swab incoming here.. we can
1175 * only get here once as we cross from 1233 * only get here once as we cross from
1176 * being under to over */ 1234 * being under to over */
@@ -1272,6 +1330,18 @@ static int o2net_set_nodelay(struct socket *sock)
1272 return ret; 1330 return ret;
1273} 1331}
1274 1332
1333static void o2net_initialize_handshake(void)
1334{
1335 o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
1336 O2HB_MAX_WRITE_TIMEOUT_MS);
1337 o2net_hand->o2net_idle_timeout_ms = cpu_to_be32(
1338 o2net_idle_timeout(NULL));
1339 o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32(
1340 o2net_keepalive_delay(NULL));
1341 o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32(
1342 o2net_reconnect_delay(NULL));
1343}
1344
1275/* ------------------------------------------------------------ */ 1345/* ------------------------------------------------------------ */
1276 1346
1277/* called when a connect completes and after a sock is accepted. the 1347/* called when a connect completes and after a sock is accepted. the
@@ -1286,6 +1356,7 @@ static void o2net_sc_connect_completed(struct work_struct *work)
1286 (unsigned long long)O2NET_PROTOCOL_VERSION, 1356 (unsigned long long)O2NET_PROTOCOL_VERSION,
1287 (unsigned long long)be64_to_cpu(o2net_hand->connector_id)); 1357 (unsigned long long)be64_to_cpu(o2net_hand->connector_id));
1288 1358
1359 o2net_initialize_handshake();
1289 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); 1360 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1290 sc_put(sc); 1361 sc_put(sc);
1291} 1362}
@@ -1514,6 +1585,8 @@ static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
1514 1585
1515 if (node_num != o2nm_this_node()) 1586 if (node_num != o2nm_this_node())
1516 o2net_disconnect_node(node); 1587 o2net_disconnect_node(node);
1588
1589 BUG_ON(atomic_read(&o2net_connected_peers) < 0);
1517} 1590}
1518 1591
1519static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num, 1592static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
@@ -1677,6 +1750,7 @@ static int o2net_accept_one(struct socket *sock)
1677 o2net_register_callbacks(sc->sc_sock->sk, sc); 1750 o2net_register_callbacks(sc->sc_sock->sk, sc);
1678 o2net_sc_queue_work(sc, &sc->sc_rx_work); 1751 o2net_sc_queue_work(sc, &sc->sc_rx_work);
1679 1752
1753 o2net_initialize_handshake();
1680 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); 1754 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1681 1755
1682out: 1756out:
diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h
index 2e08976050fb..21a4e43df836 100644
--- a/fs/ocfs2/cluster/tcp.h
+++ b/fs/ocfs2/cluster/tcp.h
@@ -108,6 +108,7 @@ void o2net_unregister_hb_callbacks(void);
108int o2net_start_listening(struct o2nm_node *node); 108int o2net_start_listening(struct o2nm_node *node);
109void o2net_stop_listening(struct o2nm_node *node); 109void o2net_stop_listening(struct o2nm_node *node);
110void o2net_disconnect_node(struct o2nm_node *node); 110void o2net_disconnect_node(struct o2nm_node *node);
111int o2net_num_connected_peers(void);
111 112
112int o2net_init(void); 113int o2net_init(void);
113void o2net_exit(void); 114void o2net_exit(void);
diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h
index 56f7ee1d2547..b700dc9624d1 100644
--- a/fs/ocfs2/cluster/tcp_internal.h
+++ b/fs/ocfs2/cluster/tcp_internal.h
@@ -38,6 +38,9 @@
38 * locking semantics of the file system using the protocol. It should 38 * locking semantics of the file system using the protocol. It should
39 * be somewhere else, I'm sure, but right now it isn't. 39 * be somewhere else, I'm sure, but right now it isn't.
40 * 40 *
41 * New in version 5:
42 * - Network timeout checking protocol
43 *
41 * New in version 4: 44 * New in version 4:
42 * - Remove i_generation from lock names for better stat performance. 45 * - Remove i_generation from lock names for better stat performance.
43 * 46 *
@@ -48,10 +51,14 @@
48 * - full 64 bit i_size in the metadata lock lvbs 51 * - full 64 bit i_size in the metadata lock lvbs
49 * - introduction of "rw" lock and pushing meta/data locking down 52 * - introduction of "rw" lock and pushing meta/data locking down
50 */ 53 */
51#define O2NET_PROTOCOL_VERSION 4ULL 54#define O2NET_PROTOCOL_VERSION 5ULL
52struct o2net_handshake { 55struct o2net_handshake {
53 __be64 protocol_version; 56 __be64 protocol_version;
54 __be64 connector_id; 57 __be64 connector_id;
58 __be32 o2hb_heartbeat_timeout_ms;
59 __be32 o2net_idle_timeout_ms;
60 __be32 o2net_keepalive_delay_ms;
61 __be32 o2net_reconnect_delay_ms;
55}; 62};
56 63
57struct o2net_node { 64struct o2net_node {