aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/cluster/tcp.c
diff options
context:
space:
mode:
authorAndrew Beekhof <abeekhof@suse.de>2006-12-04 08:04:55 -0500
committerMark Fasheh <mark.fasheh@oracle.com>2006-12-11 17:26:44 -0500
commit828ae6afbef03bfe107a4a8cc38798419d6a2765 (patch)
treeaa9cd680db9af2070f124cfd66aad88da279a5b9 /fs/ocfs2/cluster/tcp.c
parentb5dd80304da482d77b2320e1a01a189e656b9770 (diff)
[patch 3/3] OCFS2 Configurable timeouts - Protocol changes
Modify the OCFS2 handshake to ensure essential timeouts are configured identically on all nodes. Only allow changes when there are no connected peers Improves the logic in o2net_advance_rx() which broke now that sizeof(struct o2net_handshake) is greater than sizeof(struct o2net_msg) Included is the field for userspace-heartbeat timeout to avoid the need for further protocol changes. Uses a global spinlock to ensure the decisions to update configfs entries are made on the correct value. The region covered by the spinlock when incrementing the counter is much larger as this is the more critical case. Small cleanup contributed by Adrian Bunk <bunk@stusta.de> Signed-off-by: Andrew Beekhof <abeekhof@suse.de> Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
Diffstat (limited to 'fs/ocfs2/cluster/tcp.c')
-rw-r--r--fs/ocfs2/cluster/tcp.c92
1 files changed, 83 insertions, 9 deletions
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index ebbaee664c66..457753df1ae7 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -380,6 +380,13 @@ static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc,
380 sc_put(sc); 380 sc_put(sc);
381} 381}
382 382
383static atomic_t o2net_connected_peers = ATOMIC_INIT(0);
384
385int o2net_num_connected_peers(void)
386{
387 return atomic_read(&o2net_connected_peers);
388}
389
383static void o2net_set_nn_state(struct o2net_node *nn, 390static void o2net_set_nn_state(struct o2net_node *nn,
384 struct o2net_sock_container *sc, 391 struct o2net_sock_container *sc,
385 unsigned valid, int err) 392 unsigned valid, int err)
@@ -390,6 +397,11 @@ static void o2net_set_nn_state(struct o2net_node *nn,
390 397
391 assert_spin_locked(&nn->nn_lock); 398 assert_spin_locked(&nn->nn_lock);
392 399
400 if (old_sc && !sc)
401 atomic_dec(&o2net_connected_peers);
402 else if (!old_sc && sc)
403 atomic_inc(&o2net_connected_peers);
404
393 /* the node num comparison and single connect/accept path should stop 405 /* the node num comparison and single connect/accept path should stop
394 * an non-null sc from being overwritten with another */ 406 * an non-null sc from being overwritten with another */
395 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc); 407 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
@@ -1123,6 +1135,44 @@ static int o2net_check_handshake(struct o2net_sock_container *sc)
1123 return -1; 1135 return -1;
1124 } 1136 }
1125 1137
1138 /*
1139 * Ensure timeouts are consistent with other nodes, otherwise
1140 * we can end up with one node thinking that the other must be down,
1141 * but isn't. This can ultimately cause corruption.
1142 */
1143 if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
1144 o2net_idle_timeout(sc->sc_node)) {
1145 mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of "
1146 "%u ms, but we use %u ms locally. disconnecting\n",
1147 SC_NODEF_ARGS(sc),
1148 be32_to_cpu(hand->o2net_idle_timeout_ms),
1149 o2net_idle_timeout(sc->sc_node));
1150 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1151 return -1;
1152 }
1153
1154 if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
1155 o2net_keepalive_delay(sc->sc_node)) {
1156 mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of "
1157 "%u ms, but we use %u ms locally. disconnecting\n",
1158 SC_NODEF_ARGS(sc),
1159 be32_to_cpu(hand->o2net_keepalive_delay_ms),
1160 o2net_keepalive_delay(sc->sc_node));
1161 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1162 return -1;
1163 }
1164
1165 if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
1166 O2HB_MAX_WRITE_TIMEOUT_MS) {
1167 mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of "
1168 "%u ms, but we use %u ms locally. disconnecting\n",
1169 SC_NODEF_ARGS(sc),
1170 be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
1171 O2HB_MAX_WRITE_TIMEOUT_MS);
1172 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1173 return -1;
1174 }
1175
1126 sc->sc_handshake_ok = 1; 1176 sc->sc_handshake_ok = 1;
1127 1177
1128 spin_lock(&nn->nn_lock); 1178 spin_lock(&nn->nn_lock);
@@ -1155,6 +1205,23 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
1155 sclog(sc, "receiving\n"); 1205 sclog(sc, "receiving\n");
1156 do_gettimeofday(&sc->sc_tv_advance_start); 1206 do_gettimeofday(&sc->sc_tv_advance_start);
1157 1207
1208 if (unlikely(sc->sc_handshake_ok == 0)) {
1209 if(sc->sc_page_off < sizeof(struct o2net_handshake)) {
1210 data = page_address(sc->sc_page) + sc->sc_page_off;
1211 datalen = sizeof(struct o2net_handshake) - sc->sc_page_off;
1212 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1213 if (ret > 0)
1214 sc->sc_page_off += ret;
1215 }
1216
1217 if (sc->sc_page_off == sizeof(struct o2net_handshake)) {
1218 o2net_check_handshake(sc);
1219 if (unlikely(sc->sc_handshake_ok == 0))
1220 ret = -EPROTO;
1221 }
1222 goto out;
1223 }
1224
1158 /* do we need more header? */ 1225 /* do we need more header? */
1159 if (sc->sc_page_off < sizeof(struct o2net_msg)) { 1226 if (sc->sc_page_off < sizeof(struct o2net_msg)) {
1160 data = page_address(sc->sc_page) + sc->sc_page_off; 1227 data = page_address(sc->sc_page) + sc->sc_page_off;
@@ -1162,15 +1229,6 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
1162 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); 1229 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1163 if (ret > 0) { 1230 if (ret > 0) {
1164 sc->sc_page_off += ret; 1231 sc->sc_page_off += ret;
1165
1166 /* this working relies on the handshake being
1167 * smaller than the normal message header */
1168 if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
1169 !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
1170 ret = -EPROTO;
1171 goto out;
1172 }
1173
1174 /* only swab incoming here.. we can 1232 /* only swab incoming here.. we can
1175 * only get here once as we cross from 1233 * only get here once as we cross from
1176 * being under to over */ 1234 * being under to over */
@@ -1272,6 +1330,18 @@ static int o2net_set_nodelay(struct socket *sock)
1272 return ret; 1330 return ret;
1273} 1331}
1274 1332
1333static void o2net_initialize_handshake(void)
1334{
1335 o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
1336 O2HB_MAX_WRITE_TIMEOUT_MS);
1337 o2net_hand->o2net_idle_timeout_ms = cpu_to_be32(
1338 o2net_idle_timeout(NULL));
1339 o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32(
1340 o2net_keepalive_delay(NULL));
1341 o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32(
1342 o2net_reconnect_delay(NULL));
1343}
1344
1275/* ------------------------------------------------------------ */ 1345/* ------------------------------------------------------------ */
1276 1346
1277/* called when a connect completes and after a sock is accepted. the 1347/* called when a connect completes and after a sock is accepted. the
@@ -1286,6 +1356,7 @@ static void o2net_sc_connect_completed(struct work_struct *work)
1286 (unsigned long long)O2NET_PROTOCOL_VERSION, 1356 (unsigned long long)O2NET_PROTOCOL_VERSION,
1287 (unsigned long long)be64_to_cpu(o2net_hand->connector_id)); 1357 (unsigned long long)be64_to_cpu(o2net_hand->connector_id));
1288 1358
1359 o2net_initialize_handshake();
1289 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); 1360 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1290 sc_put(sc); 1361 sc_put(sc);
1291} 1362}
@@ -1514,6 +1585,8 @@ static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
1514 1585
1515 if (node_num != o2nm_this_node()) 1586 if (node_num != o2nm_this_node())
1516 o2net_disconnect_node(node); 1587 o2net_disconnect_node(node);
1588
1589 BUG_ON(atomic_read(&o2net_connected_peers) < 0);
1517} 1590}
1518 1591
1519static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num, 1592static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
@@ -1677,6 +1750,7 @@ static int o2net_accept_one(struct socket *sock)
1677 o2net_register_callbacks(sc->sc_sock->sk, sc); 1750 o2net_register_callbacks(sc->sc_sock->sk, sc);
1678 o2net_sc_queue_work(sc, &sc->sc_rx_work); 1751 o2net_sc_queue_work(sc, &sc->sc_rx_work);
1679 1752
1753 o2net_initialize_handshake();
1680 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); 1754 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1681 1755
1682out: 1756out: