diff options
author | Sage Weil <sage@newdream.net> | 2011-05-19 14:21:05 -0400 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2011-05-19 14:21:05 -0400 |
commit | 0da5d70369e87f80adf794080cfff1ca15a34198 (patch) | |
tree | b9d2fcaa52903e1c9b87ad7edfc24fb294320bce | |
parent | 3b663780347ce532b08be1c859b1df14f0eea4c8 (diff) |
libceph: handle connection reopen race with callbacks
If a connection is closed and/or reopened (ceph_con_close, ceph_con_open)
it can race with a callback. con_work does various state checks for
closed or reopened sockets at the beginning, but drops con->mutex before
making callbacks. We need to check for state bit changes after retaking
the lock to ensure we restart con_work and execute those CLOSED/OPENING
tests or else we may end up operating under stale assumptions.
In Jim's case, this was causing 'bad tag' errors.
There are four cases where we re-take the con->mutex inside con_work: catch
them all and return EAGAIN from try_{read,write} so that we can restart
con_work.
Reported-by: Jim Schutt <jaschut@sandia.gov>
Tested-by: Jim Schutt <jaschut@sandia.gov>
Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r-- | net/ceph/messenger.c | 64 |
1 files changed, 51 insertions, 13 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e15a82ccc05f..b140dd3515de 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c | |||
@@ -598,7 +598,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) | |||
598 | * Connection negotiation. | 598 | * Connection negotiation. |
599 | */ | 599 | */ |
600 | 600 | ||
601 | static void prepare_connect_authorizer(struct ceph_connection *con) | 601 | static int prepare_connect_authorizer(struct ceph_connection *con) |
602 | { | 602 | { |
603 | void *auth_buf; | 603 | void *auth_buf; |
604 | int auth_len = 0; | 604 | int auth_len = 0; |
@@ -612,6 +612,10 @@ static void prepare_connect_authorizer(struct ceph_connection *con) | |||
612 | con->auth_retry); | 612 | con->auth_retry); |
613 | mutex_lock(&con->mutex); | 613 | mutex_lock(&con->mutex); |
614 | 614 | ||
615 | if (test_bit(CLOSED, &con->state) || | ||
616 | test_bit(OPENING, &con->state)) | ||
617 | return -EAGAIN; | ||
618 | |||
615 | con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); | 619 | con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); |
616 | con->out_connect.authorizer_len = cpu_to_le32(auth_len); | 620 | con->out_connect.authorizer_len = cpu_to_le32(auth_len); |
617 | 621 | ||
@@ -619,6 +623,8 @@ static void prepare_connect_authorizer(struct ceph_connection *con) | |||
619 | con->out_kvec[con->out_kvec_left].iov_len = auth_len; | 623 | con->out_kvec[con->out_kvec_left].iov_len = auth_len; |
620 | con->out_kvec_left++; | 624 | con->out_kvec_left++; |
621 | con->out_kvec_bytes += auth_len; | 625 | con->out_kvec_bytes += auth_len; |
626 | |||
627 | return 0; | ||
622 | } | 628 | } |
623 | 629 | ||
624 | /* | 630 | /* |
@@ -640,9 +646,9 @@ static void prepare_write_banner(struct ceph_messenger *msgr, | |||
640 | set_bit(WRITE_PENDING, &con->state); | 646 | set_bit(WRITE_PENDING, &con->state); |
641 | } | 647 | } |
642 | 648 | ||
643 | static void prepare_write_connect(struct ceph_messenger *msgr, | 649 | static int prepare_write_connect(struct ceph_messenger *msgr, |
644 | struct ceph_connection *con, | 650 | struct ceph_connection *con, |
645 | int after_banner) | 651 | int after_banner) |
646 | { | 652 | { |
647 | unsigned global_seq = get_global_seq(con->msgr, 0); | 653 | unsigned global_seq = get_global_seq(con->msgr, 0); |
648 | int proto; | 654 | int proto; |
@@ -683,7 +689,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr, | |||
683 | con->out_more = 0; | 689 | con->out_more = 0; |
684 | set_bit(WRITE_PENDING, &con->state); | 690 | set_bit(WRITE_PENDING, &con->state); |
685 | 691 | ||
686 | prepare_connect_authorizer(con); | 692 | return prepare_connect_authorizer(con); |
687 | } | 693 | } |
688 | 694 | ||
689 | 695 | ||
@@ -1216,6 +1222,7 @@ static int process_connect(struct ceph_connection *con) | |||
1216 | u64 sup_feat = con->msgr->supported_features; | 1222 | u64 sup_feat = con->msgr->supported_features; |
1217 | u64 req_feat = con->msgr->required_features; | 1223 | u64 req_feat = con->msgr->required_features; |
1218 | u64 server_feat = le64_to_cpu(con->in_reply.features); | 1224 | u64 server_feat = le64_to_cpu(con->in_reply.features); |
1225 | int ret; | ||
1219 | 1226 | ||
1220 | dout("process_connect on %p tag %d\n", con, (int)con->in_tag); | 1227 | dout("process_connect on %p tag %d\n", con, (int)con->in_tag); |
1221 | 1228 | ||
@@ -1250,7 +1257,9 @@ static int process_connect(struct ceph_connection *con) | |||
1250 | return -1; | 1257 | return -1; |
1251 | } | 1258 | } |
1252 | con->auth_retry = 1; | 1259 | con->auth_retry = 1; |
1253 | prepare_write_connect(con->msgr, con, 0); | 1260 | ret = prepare_write_connect(con->msgr, con, 0); |
1261 | if (ret < 0) | ||
1262 | return ret; | ||
1254 | prepare_read_connect(con); | 1263 | prepare_read_connect(con); |
1255 | break; | 1264 | break; |
1256 | 1265 | ||
@@ -1277,6 +1286,9 @@ static int process_connect(struct ceph_connection *con) | |||
1277 | if (con->ops->peer_reset) | 1286 | if (con->ops->peer_reset) |
1278 | con->ops->peer_reset(con); | 1287 | con->ops->peer_reset(con); |
1279 | mutex_lock(&con->mutex); | 1288 | mutex_lock(&con->mutex); |
1289 | if (test_bit(CLOSED, &con->state) || | ||
1290 | test_bit(OPENING, &con->state)) | ||
1291 | return -EAGAIN; | ||
1280 | break; | 1292 | break; |
1281 | 1293 | ||
1282 | case CEPH_MSGR_TAG_RETRY_SESSION: | 1294 | case CEPH_MSGR_TAG_RETRY_SESSION: |
@@ -1810,6 +1822,17 @@ static int try_read(struct ceph_connection *con) | |||
1810 | more: | 1822 | more: |
1811 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, | 1823 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, |
1812 | con->in_base_pos); | 1824 | con->in_base_pos); |
1825 | |||
1826 | /* | ||
1827 | * process_connect and process_message drop and re-take | ||
1828 | * con->mutex. make sure we handle a racing close or reopen. | ||
1829 | */ | ||
1830 | if (test_bit(CLOSED, &con->state) || | ||
1831 | test_bit(OPENING, &con->state)) { | ||
1832 | ret = -EAGAIN; | ||
1833 | goto out; | ||
1834 | } | ||
1835 | |||
1813 | if (test_bit(CONNECTING, &con->state)) { | 1836 | if (test_bit(CONNECTING, &con->state)) { |
1814 | if (!test_bit(NEGOTIATING, &con->state)) { | 1837 | if (!test_bit(NEGOTIATING, &con->state)) { |
1815 | dout("try_read connecting\n"); | 1838 | dout("try_read connecting\n"); |
@@ -1938,8 +1961,10 @@ static void con_work(struct work_struct *work) | |||
1938 | { | 1961 | { |
1939 | struct ceph_connection *con = container_of(work, struct ceph_connection, | 1962 | struct ceph_connection *con = container_of(work, struct ceph_connection, |
1940 | work.work); | 1963 | work.work); |
1964 | int ret; | ||
1941 | 1965 | ||
1942 | mutex_lock(&con->mutex); | 1966 | mutex_lock(&con->mutex); |
1967 | restart: | ||
1943 | if (test_and_clear_bit(BACKOFF, &con->state)) { | 1968 | if (test_and_clear_bit(BACKOFF, &con->state)) { |
1944 | dout("con_work %p backing off\n", con); | 1969 | dout("con_work %p backing off\n", con); |
1945 | if (queue_delayed_work(ceph_msgr_wq, &con->work, | 1970 | if (queue_delayed_work(ceph_msgr_wq, &con->work, |
@@ -1969,18 +1994,31 @@ static void con_work(struct work_struct *work) | |||
1969 | con_close_socket(con); | 1994 | con_close_socket(con); |
1970 | } | 1995 | } |
1971 | 1996 | ||
1972 | if (test_and_clear_bit(SOCK_CLOSED, &con->state) || | 1997 | if (test_and_clear_bit(SOCK_CLOSED, &con->state)) |
1973 | try_read(con) < 0 || | 1998 | goto fault; |
1974 | try_write(con) < 0) { | 1999 | |
1975 | mutex_unlock(&con->mutex); | 2000 | ret = try_read(con); |
1976 | ceph_fault(con); /* error/fault path */ | 2001 | if (ret == -EAGAIN) |
1977 | goto done_unlocked; | 2002 | goto restart; |
1978 | } | 2003 | if (ret < 0) |
2004 | goto fault; | ||
2005 | |||
2006 | ret = try_write(con); | ||
2007 | if (ret == -EAGAIN) | ||
2008 | goto restart; | ||
2009 | if (ret < 0) | ||
2010 | goto fault; | ||
1979 | 2011 | ||
1980 | done: | 2012 | done: |
1981 | mutex_unlock(&con->mutex); | 2013 | mutex_unlock(&con->mutex); |
1982 | done_unlocked: | 2014 | done_unlocked: |
1983 | con->ops->put(con); | 2015 | con->ops->put(con); |
2016 | return; | ||
2017 | |||
2018 | fault: | ||
2019 | mutex_unlock(&con->mutex); | ||
2020 | ceph_fault(con); /* error/fault path */ | ||
2021 | goto done_unlocked; | ||
1984 | } | 2022 | } |
1985 | 2023 | ||
1986 | 2024 | ||