aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2011-03-04 15:25:05 -0500
committerSage Weil <sage@newdream.net>2011-03-04 15:25:05 -0500
commite00de341fdb76c955703b4438100f9933c452b7f (patch)
treeab776a5e46f1cb5acaa6b26484763259040f470c
parente76661d0a59e53e5cc4dccbe4b755d1dc8a968ec (diff)
libceph: fix msgr standby handling
The standby logic used to be pretty dependent on the work requeueing behavior that changed when we switched to WQ_NON_REENTRANT. It was also very fragile. Restructure things so that: - We clear WRITE_PENDING when we set STANDBY. This ensures we will requeue work when we wake up later. - con_work backs off if STANDBY is set. There is nothing to do if we are in standby. - clear_standby() helper is called by both con_send() and con_keepalive(), the two actions that can wake us up again. Move the connect_seq++ logic here. Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--net/ceph/messenger.c30
1 files changed, 22 insertions, 8 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3252ea974e8f..05f357828a2f 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1712,14 +1712,6 @@ more:
1712 1712
1713 /* open the socket first? */ 1713 /* open the socket first? */
1714 if (con->sock == NULL) { 1714 if (con->sock == NULL) {
1715 /*
1716 * if we were STANDBY and are reconnecting _this_
1717 * connection, bump connect_seq now. Always bump
1718 * global_seq.
1719 */
1720 if (test_and_clear_bit(STANDBY, &con->state))
1721 con->connect_seq++;
1722
1723 prepare_write_banner(msgr, con); 1715 prepare_write_banner(msgr, con);
1724 prepare_write_connect(msgr, con, 1); 1716 prepare_write_connect(msgr, con, 1);
1725 prepare_read_banner(con); 1717 prepare_read_banner(con);
@@ -1962,6 +1954,10 @@ static void con_work(struct work_struct *work)
1962 } 1954 }
1963 } 1955 }
1964 1956
1957 if (test_bit(STANDBY, &con->state)) {
1958 dout("con_work %p STANDBY\n", con);
1959 goto done;
1960 }
1965 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 1961 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
1966 dout("con_work CLOSED\n"); 1962 dout("con_work CLOSED\n");
1967 con_close_socket(con); 1963 con_close_socket(con);
@@ -2022,6 +2018,8 @@ static void ceph_fault(struct ceph_connection *con)
2022 * the connection in a STANDBY state */ 2018 * the connection in a STANDBY state */
2023 if (list_empty(&con->out_queue) && 2019 if (list_empty(&con->out_queue) &&
2024 !test_bit(KEEPALIVE_PENDING, &con->state)) { 2020 !test_bit(KEEPALIVE_PENDING, &con->state)) {
2021 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
2022 clear_bit(WRITE_PENDING, &con->state);
2025 set_bit(STANDBY, &con->state); 2023 set_bit(STANDBY, &con->state);
2026 } else { 2024 } else {
2027 /* retry after a delay. */ 2025 /* retry after a delay. */
@@ -2117,6 +2115,19 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
2117} 2115}
2118EXPORT_SYMBOL(ceph_messenger_destroy); 2116EXPORT_SYMBOL(ceph_messenger_destroy);
2119 2117
2118static void clear_standby(struct ceph_connection *con)
2119{
2120 /* come back from STANDBY? */
2121 if (test_and_clear_bit(STANDBY, &con->state)) {
2122 mutex_lock(&con->mutex);
2123 dout("clear_standby %p and ++connect_seq\n", con);
2124 con->connect_seq++;
2125 WARN_ON(test_bit(WRITE_PENDING, &con->state));
2126 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state));
2127 mutex_unlock(&con->mutex);
2128 }
2129}
2130
2120/* 2131/*
2121 * Queue up an outgoing message on the given connection. 2132 * Queue up an outgoing message on the given connection.
2122 */ 2133 */
@@ -2149,6 +2160,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2149 2160
2150 /* if there wasn't anything waiting to send before, queue 2161 /* if there wasn't anything waiting to send before, queue
2151 * new work */ 2162 * new work */
2163 clear_standby(con);
2152 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2164 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
2153 queue_con(con); 2165 queue_con(con);
2154} 2166}
@@ -2214,6 +2226,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
2214 */ 2226 */
2215void ceph_con_keepalive(struct ceph_connection *con) 2227void ceph_con_keepalive(struct ceph_connection *con)
2216{ 2228{
2229 dout("con_keepalive %p\n", con);
2230 clear_standby(con);
2217 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && 2231 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
2218 test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2232 test_and_set_bit(WRITE_PENDING, &con->state) == 0)
2219 queue_con(con); 2233 queue_con(con);