aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2011-05-19 14:21:05 -0400
committerSage Weil <sage@newdream.net>2011-05-19 14:21:05 -0400
commit0da5d70369e87f80adf794080cfff1ca15a34198 (patch)
treeb9d2fcaa52903e1c9b87ad7edfc24fb294320bce /net/ceph
parent3b663780347ce532b08be1c859b1df14f0eea4c8 (diff)
libceph: handle connection reopen race with callbacks
If a connection is closed and/or reopened (ceph_con_close, ceph_con_open) it can race with a callback. con_work does various state checks for closed or reopened sockets at the beginning, but drops con->mutex before making callbacks. We need to check for state bit changes after retaking the lock to ensure we restart con_work and execute those CLOSED/OPENING tests or else we may end up operating under stale assumptions. In Jim's case, this was causing 'bad tag' errors. There are four cases where we re-take the con->mutex inside con_work: catch them all and return EAGAIN from try_{read,write} so that we can restart con_work. Reported-by: Jim Schutt <jaschut@sandia.gov> Tested-by: Jim Schutt <jaschut@sandia.gov> Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c64
1 files changed, 51 insertions, 13 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index e15a82ccc05f..b140dd3515de 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -598,7 +598,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
598 * Connection negotiation. 598 * Connection negotiation.
599 */ 599 */
600 600
601static void prepare_connect_authorizer(struct ceph_connection *con) 601static int prepare_connect_authorizer(struct ceph_connection *con)
602{ 602{
603 void *auth_buf; 603 void *auth_buf;
604 int auth_len = 0; 604 int auth_len = 0;
@@ -612,6 +612,10 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
612 con->auth_retry); 612 con->auth_retry);
613 mutex_lock(&con->mutex); 613 mutex_lock(&con->mutex);
614 614
615 if (test_bit(CLOSED, &con->state) ||
616 test_bit(OPENING, &con->state))
617 return -EAGAIN;
618
615 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 619 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
616 con->out_connect.authorizer_len = cpu_to_le32(auth_len); 620 con->out_connect.authorizer_len = cpu_to_le32(auth_len);
617 621
@@ -619,6 +623,8 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
619 con->out_kvec[con->out_kvec_left].iov_len = auth_len; 623 con->out_kvec[con->out_kvec_left].iov_len = auth_len;
620 con->out_kvec_left++; 624 con->out_kvec_left++;
621 con->out_kvec_bytes += auth_len; 625 con->out_kvec_bytes += auth_len;
626
627 return 0;
622} 628}
623 629
624/* 630/*
@@ -640,9 +646,9 @@ static void prepare_write_banner(struct ceph_messenger *msgr,
640 set_bit(WRITE_PENDING, &con->state); 646 set_bit(WRITE_PENDING, &con->state);
641} 647}
642 648
643static void prepare_write_connect(struct ceph_messenger *msgr, 649static int prepare_write_connect(struct ceph_messenger *msgr,
644 struct ceph_connection *con, 650 struct ceph_connection *con,
645 int after_banner) 651 int after_banner)
646{ 652{
647 unsigned global_seq = get_global_seq(con->msgr, 0); 653 unsigned global_seq = get_global_seq(con->msgr, 0);
648 int proto; 654 int proto;
@@ -683,7 +689,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
683 con->out_more = 0; 689 con->out_more = 0;
684 set_bit(WRITE_PENDING, &con->state); 690 set_bit(WRITE_PENDING, &con->state);
685 691
686 prepare_connect_authorizer(con); 692 return prepare_connect_authorizer(con);
687} 693}
688 694
689 695
@@ -1216,6 +1222,7 @@ static int process_connect(struct ceph_connection *con)
1216 u64 sup_feat = con->msgr->supported_features; 1222 u64 sup_feat = con->msgr->supported_features;
1217 u64 req_feat = con->msgr->required_features; 1223 u64 req_feat = con->msgr->required_features;
1218 u64 server_feat = le64_to_cpu(con->in_reply.features); 1224 u64 server_feat = le64_to_cpu(con->in_reply.features);
1225 int ret;
1219 1226
1220 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1227 dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
1221 1228
@@ -1250,7 +1257,9 @@ static int process_connect(struct ceph_connection *con)
1250 return -1; 1257 return -1;
1251 } 1258 }
1252 con->auth_retry = 1; 1259 con->auth_retry = 1;
1253 prepare_write_connect(con->msgr, con, 0); 1260 ret = prepare_write_connect(con->msgr, con, 0);
1261 if (ret < 0)
1262 return ret;
1254 prepare_read_connect(con); 1263 prepare_read_connect(con);
1255 break; 1264 break;
1256 1265
@@ -1277,6 +1286,9 @@ static int process_connect(struct ceph_connection *con)
1277 if (con->ops->peer_reset) 1286 if (con->ops->peer_reset)
1278 con->ops->peer_reset(con); 1287 con->ops->peer_reset(con);
1279 mutex_lock(&con->mutex); 1288 mutex_lock(&con->mutex);
1289 if (test_bit(CLOSED, &con->state) ||
1290 test_bit(OPENING, &con->state))
1291 return -EAGAIN;
1280 break; 1292 break;
1281 1293
1282 case CEPH_MSGR_TAG_RETRY_SESSION: 1294 case CEPH_MSGR_TAG_RETRY_SESSION:
@@ -1810,6 +1822,17 @@ static int try_read(struct ceph_connection *con)
1810more: 1822more:
1811 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1823 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
1812 con->in_base_pos); 1824 con->in_base_pos);
1825
1826 /*
1827 * process_connect and process_message drop and re-take
1828 * con->mutex. make sure we handle a racing close or reopen.
1829 */
1830 if (test_bit(CLOSED, &con->state) ||
1831 test_bit(OPENING, &con->state)) {
1832 ret = -EAGAIN;
1833 goto out;
1834 }
1835
1813 if (test_bit(CONNECTING, &con->state)) { 1836 if (test_bit(CONNECTING, &con->state)) {
1814 if (!test_bit(NEGOTIATING, &con->state)) { 1837 if (!test_bit(NEGOTIATING, &con->state)) {
1815 dout("try_read connecting\n"); 1838 dout("try_read connecting\n");
@@ -1938,8 +1961,10 @@ static void con_work(struct work_struct *work)
1938{ 1961{
1939 struct ceph_connection *con = container_of(work, struct ceph_connection, 1962 struct ceph_connection *con = container_of(work, struct ceph_connection,
1940 work.work); 1963 work.work);
1964 int ret;
1941 1965
1942 mutex_lock(&con->mutex); 1966 mutex_lock(&con->mutex);
1967restart:
1943 if (test_and_clear_bit(BACKOFF, &con->state)) { 1968 if (test_and_clear_bit(BACKOFF, &con->state)) {
1944 dout("con_work %p backing off\n", con); 1969 dout("con_work %p backing off\n", con);
1945 if (queue_delayed_work(ceph_msgr_wq, &con->work, 1970 if (queue_delayed_work(ceph_msgr_wq, &con->work,
@@ -1969,18 +1994,31 @@ static void con_work(struct work_struct *work)
1969 con_close_socket(con); 1994 con_close_socket(con);
1970 } 1995 }
1971 1996
1972 if (test_and_clear_bit(SOCK_CLOSED, &con->state) || 1997 if (test_and_clear_bit(SOCK_CLOSED, &con->state))
1973 try_read(con) < 0 || 1998 goto fault;
1974 try_write(con) < 0) { 1999
1975 mutex_unlock(&con->mutex); 2000 ret = try_read(con);
1976 ceph_fault(con); /* error/fault path */ 2001 if (ret == -EAGAIN)
1977 goto done_unlocked; 2002 goto restart;
1978 } 2003 if (ret < 0)
2004 goto fault;
2005
2006 ret = try_write(con);
2007 if (ret == -EAGAIN)
2008 goto restart;
2009 if (ret < 0)
2010 goto fault;
1979 2011
1980done: 2012done:
1981 mutex_unlock(&con->mutex); 2013 mutex_unlock(&con->mutex);
1982done_unlocked: 2014done_unlocked:
1983 con->ops->put(con); 2015 con->ops->put(con);
2016 return;
2017
2018fault:
2019 mutex_unlock(&con->mutex);
2020 ceph_fault(con); /* error/fault path */
2021 goto done_unlocked;
1984} 2022}
1985 2023
1986 2024