diff options
author | Sage Weil <sage@newdream.net> | 2009-12-22 13:43:42 -0500 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2009-12-23 11:17:19 -0500 |
commit | ec302645f4a9bd9ec757c30d185557e1c0972c1a (patch) | |
tree | b03aee886f401affba79f8473b83bd190a69570a /fs | |
parent | 529cfcc46ffa2cbe4d07641c11e65f67fe7b66e4 (diff) |
ceph: use connection mutex to protect read and write stages
Use a single mutex (previously out_mutex) to protect both read and write
activity from concurrent ceph_con_* calls. Drop the mutex when doing
callbacks to avoid nested locking (the callback may need to call something
like ceph_con_close).
Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs')
-rw-r--r-- | fs/ceph/messenger.c | 49 | ||||
-rw-r--r-- | fs/ceph/messenger.h | 3 |
2 files changed, 33 insertions, 19 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index 2e4e9773c46..c03b4185c14 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c | |||
@@ -316,7 +316,6 @@ static void reset_connection(struct ceph_connection *con) | |||
316 | { | 316 | { |
317 | /* reset connection, out_queue, msg_ and connect_seq */ | 317 | /* reset connection, out_queue, msg_ and connect_seq */ |
318 | /* discard existing out_queue and msg_seq */ | 318 | /* discard existing out_queue and msg_seq */ |
319 | mutex_lock(&con->out_mutex); | ||
320 | ceph_msg_remove_list(&con->out_queue); | 319 | ceph_msg_remove_list(&con->out_queue); |
321 | ceph_msg_remove_list(&con->out_sent); | 320 | ceph_msg_remove_list(&con->out_sent); |
322 | 321 | ||
@@ -332,7 +331,6 @@ static void reset_connection(struct ceph_connection *con) | |||
332 | con->out_msg = NULL; | 331 | con->out_msg = NULL; |
333 | } | 332 | } |
334 | con->in_seq = 0; | 333 | con->in_seq = 0; |
335 | mutex_unlock(&con->out_mutex); | ||
336 | } | 334 | } |
337 | 335 | ||
338 | /* | 336 | /* |
@@ -343,7 +341,9 @@ void ceph_con_close(struct ceph_connection *con) | |||
343 | dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr)); | 341 | dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr)); |
344 | set_bit(CLOSED, &con->state); /* in case there's queued work */ | 342 | set_bit(CLOSED, &con->state); /* in case there's queued work */ |
345 | clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ | 343 | clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ |
344 | mutex_lock(&con->mutex); | ||
346 | reset_connection(con); | 345 | reset_connection(con); |
346 | mutex_unlock(&con->mutex); | ||
347 | queue_con(con); | 347 | queue_con(con); |
348 | } | 348 | } |
349 | 349 | ||
@@ -392,7 +392,7 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) | |||
392 | memset(con, 0, sizeof(*con)); | 392 | memset(con, 0, sizeof(*con)); |
393 | atomic_set(&con->nref, 1); | 393 | atomic_set(&con->nref, 1); |
394 | con->msgr = msgr; | 394 | con->msgr = msgr; |
395 | mutex_init(&con->out_mutex); | 395 | mutex_init(&con->mutex); |
396 | INIT_LIST_HEAD(&con->out_queue); | 396 | INIT_LIST_HEAD(&con->out_queue); |
397 | INIT_LIST_HEAD(&con->out_sent); | 397 | INIT_LIST_HEAD(&con->out_sent); |
398 | INIT_DELAYED_WORK(&con->work, con_work); | 398 | INIT_DELAYED_WORK(&con->work, con_work); |
@@ -571,11 +571,13 @@ static void prepare_connect_authorizer(struct ceph_connection *con) | |||
571 | int auth_len = 0; | 571 | int auth_len = 0; |
572 | int auth_protocol = 0; | 572 | int auth_protocol = 0; |
573 | 573 | ||
574 | mutex_unlock(&con->mutex); | ||
574 | if (con->ops->get_authorizer) | 575 | if (con->ops->get_authorizer) |
575 | con->ops->get_authorizer(con, &auth_buf, &auth_len, | 576 | con->ops->get_authorizer(con, &auth_buf, &auth_len, |
576 | &auth_protocol, &con->auth_reply_buf, | 577 | &auth_protocol, &con->auth_reply_buf, |
577 | &con->auth_reply_buf_len, | 578 | &con->auth_reply_buf_len, |
578 | con->auth_retry); | 579 | con->auth_retry); |
580 | mutex_lock(&con->mutex); | ||
579 | 581 | ||
580 | con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); | 582 | con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); |
581 | con->out_connect.authorizer_len = cpu_to_le32(auth_len); | 583 | con->out_connect.authorizer_len = cpu_to_le32(auth_len); |
@@ -1094,10 +1096,13 @@ static int process_connect(struct ceph_connection *con) | |||
1094 | le32_to_cpu(con->out_connect.protocol_version), | 1096 | le32_to_cpu(con->out_connect.protocol_version), |
1095 | le32_to_cpu(con->in_reply.protocol_version)); | 1097 | le32_to_cpu(con->in_reply.protocol_version)); |
1096 | con->error_msg = "protocol version mismatch"; | 1098 | con->error_msg = "protocol version mismatch"; |
1097 | if (con->ops->bad_proto) | ||
1098 | con->ops->bad_proto(con); | ||
1099 | reset_connection(con); | 1099 | reset_connection(con); |
1100 | set_bit(CLOSED, &con->state); /* in case there's queued work */ | 1100 | set_bit(CLOSED, &con->state); /* in case there's queued work */ |
1101 | |||
1102 | mutex_unlock(&con->mutex); | ||
1103 | if (con->ops->bad_proto) | ||
1104 | con->ops->bad_proto(con); | ||
1105 | mutex_lock(&con->mutex); | ||
1101 | return -1; | 1106 | return -1; |
1102 | 1107 | ||
1103 | case CEPH_MSGR_TAG_BADAUTHORIZER: | 1108 | case CEPH_MSGR_TAG_BADAUTHORIZER: |
@@ -1133,9 +1138,11 @@ static int process_connect(struct ceph_connection *con) | |||
1133 | prepare_read_connect(con); | 1138 | prepare_read_connect(con); |
1134 | 1139 | ||
1135 | /* Tell ceph about it. */ | 1140 | /* Tell ceph about it. */ |
1141 | mutex_unlock(&con->mutex); | ||
1136 | pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); | 1142 | pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); |
1137 | if (con->ops->peer_reset) | 1143 | if (con->ops->peer_reset) |
1138 | con->ops->peer_reset(con); | 1144 | con->ops->peer_reset(con); |
1145 | mutex_lock(&con->mutex); | ||
1139 | break; | 1146 | break; |
1140 | 1147 | ||
1141 | case CEPH_MSGR_TAG_RETRY_SESSION: | 1148 | case CEPH_MSGR_TAG_RETRY_SESSION: |
@@ -1221,7 +1228,6 @@ static void process_ack(struct ceph_connection *con) | |||
1221 | u64 ack = le64_to_cpu(con->in_temp_ack); | 1228 | u64 ack = le64_to_cpu(con->in_temp_ack); |
1222 | u64 seq; | 1229 | u64 seq; |
1223 | 1230 | ||
1224 | mutex_lock(&con->out_mutex); | ||
1225 | while (!list_empty(&con->out_sent)) { | 1231 | while (!list_empty(&con->out_sent)) { |
1226 | m = list_first_entry(&con->out_sent, struct ceph_msg, | 1232 | m = list_first_entry(&con->out_sent, struct ceph_msg, |
1227 | list_head); | 1233 | list_head); |
@@ -1232,7 +1238,6 @@ static void process_ack(struct ceph_connection *con) | |||
1232 | le16_to_cpu(m->hdr.type), m); | 1238 | le16_to_cpu(m->hdr.type), m); |
1233 | ceph_msg_remove(m); | 1239 | ceph_msg_remove(m); |
1234 | } | 1240 | } |
1235 | mutex_unlock(&con->out_mutex); | ||
1236 | prepare_read_tag(con); | 1241 | prepare_read_tag(con); |
1237 | } | 1242 | } |
1238 | 1243 | ||
@@ -1366,8 +1371,10 @@ static int read_partial_message(struct ceph_connection *con) | |||
1366 | /* find pages for data payload */ | 1371 | /* find pages for data payload */ |
1367 | want = calc_pages_for(data_off & ~PAGE_MASK, data_len); | 1372 | want = calc_pages_for(data_off & ~PAGE_MASK, data_len); |
1368 | ret = -1; | 1373 | ret = -1; |
1374 | mutex_unlock(&con->mutex); | ||
1369 | if (con->ops->prepare_pages) | 1375 | if (con->ops->prepare_pages) |
1370 | ret = con->ops->prepare_pages(con, m, want); | 1376 | ret = con->ops->prepare_pages(con, m, want); |
1377 | mutex_lock(&con->mutex); | ||
1371 | if (ret < 0) { | 1378 | if (ret < 0) { |
1372 | dout("%p prepare_pages failed, skipping payload\n", m); | 1379 | dout("%p prepare_pages failed, skipping payload\n", m); |
1373 | con->in_base_pos = -data_len - sizeof(m->footer); | 1380 | con->in_base_pos = -data_len - sizeof(m->footer); |
@@ -1454,9 +1461,8 @@ static void process_message(struct ceph_connection *con) | |||
1454 | if (con->peer_name.type == 0) | 1461 | if (con->peer_name.type == 0) |
1455 | con->peer_name = msg->hdr.src.name; | 1462 | con->peer_name = msg->hdr.src.name; |
1456 | 1463 | ||
1457 | mutex_lock(&con->out_mutex); | ||
1458 | con->in_seq++; | 1464 | con->in_seq++; |
1459 | mutex_unlock(&con->out_mutex); | 1465 | mutex_unlock(&con->mutex); |
1460 | 1466 | ||
1461 | dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", | 1467 | dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", |
1462 | msg, le64_to_cpu(msg->hdr.seq), | 1468 | msg, le64_to_cpu(msg->hdr.seq), |
@@ -1467,6 +1473,8 @@ static void process_message(struct ceph_connection *con) | |||
1467 | le32_to_cpu(msg->hdr.data_len), | 1473 | le32_to_cpu(msg->hdr.data_len), |
1468 | con->in_front_crc, con->in_middle_crc, con->in_data_crc); | 1474 | con->in_front_crc, con->in_middle_crc, con->in_data_crc); |
1469 | con->ops->dispatch(con, msg); | 1475 | con->ops->dispatch(con, msg); |
1476 | |||
1477 | mutex_lock(&con->mutex); | ||
1470 | prepare_read_tag(con); | 1478 | prepare_read_tag(con); |
1471 | } | 1479 | } |
1472 | 1480 | ||
@@ -1483,7 +1491,7 @@ static int try_write(struct ceph_connection *con) | |||
1483 | dout("try_write start %p state %lu nref %d\n", con, con->state, | 1491 | dout("try_write start %p state %lu nref %d\n", con, con->state, |
1484 | atomic_read(&con->nref)); | 1492 | atomic_read(&con->nref)); |
1485 | 1493 | ||
1486 | mutex_lock(&con->out_mutex); | 1494 | mutex_lock(&con->mutex); |
1487 | more: | 1495 | more: |
1488 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); | 1496 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); |
1489 | 1497 | ||
@@ -1576,7 +1584,7 @@ do_next: | |||
1576 | done: | 1584 | done: |
1577 | ret = 0; | 1585 | ret = 0; |
1578 | out: | 1586 | out: |
1579 | mutex_unlock(&con->out_mutex); | 1587 | mutex_unlock(&con->mutex); |
1580 | dout("try_write done on %p\n", con); | 1588 | dout("try_write done on %p\n", con); |
1581 | return ret; | 1589 | return ret; |
1582 | } | 1590 | } |
@@ -1600,6 +1608,8 @@ static int try_read(struct ceph_connection *con) | |||
1600 | dout("try_read start on %p\n", con); | 1608 | dout("try_read start on %p\n", con); |
1601 | msgr = con->msgr; | 1609 | msgr = con->msgr; |
1602 | 1610 | ||
1611 | mutex_lock(&con->mutex); | ||
1612 | |||
1603 | more: | 1613 | more: |
1604 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, | 1614 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, |
1605 | con->in_base_pos); | 1615 | con->in_base_pos); |
@@ -1693,6 +1703,7 @@ more: | |||
1693 | done: | 1703 | done: |
1694 | ret = 0; | 1704 | ret = 0; |
1695 | out: | 1705 | out: |
1706 | mutex_unlock(&con->mutex); | ||
1696 | dout("try_read done on %p\n", con); | 1707 | dout("try_read done on %p\n", con); |
1697 | return ret; | 1708 | return ret; |
1698 | 1709 | ||
@@ -1818,6 +1829,8 @@ static void ceph_fault(struct ceph_connection *con) | |||
1818 | 1829 | ||
1819 | clear_bit(BUSY, &con->state); /* to avoid an improbable race */ | 1830 | clear_bit(BUSY, &con->state); /* to avoid an improbable race */ |
1820 | 1831 | ||
1832 | mutex_lock(&con->mutex); | ||
1833 | |||
1821 | con_close_socket(con); | 1834 | con_close_socket(con); |
1822 | 1835 | ||
1823 | if (con->in_msg) { | 1836 | if (con->in_msg) { |
@@ -1827,24 +1840,24 @@ static void ceph_fault(struct ceph_connection *con) | |||
1827 | 1840 | ||
1828 | /* If there are no messages in the queue, place the connection | 1841 | /* If there are no messages in the queue, place the connection |
1829 | * in a STANDBY state (i.e., don't try to reconnect just yet). */ | 1842 | * in a STANDBY state (i.e., don't try to reconnect just yet). */ |
1830 | mutex_lock(&con->out_mutex); | ||
1831 | if (list_empty(&con->out_queue) && !con->out_keepalive_pending) { | 1843 | if (list_empty(&con->out_queue) && !con->out_keepalive_pending) { |
1832 | dout("fault setting STANDBY\n"); | 1844 | dout("fault setting STANDBY\n"); |
1833 | set_bit(STANDBY, &con->state); | 1845 | set_bit(STANDBY, &con->state); |
1834 | mutex_unlock(&con->out_mutex); | 1846 | mutex_unlock(&con->mutex); |
1835 | goto out; | 1847 | goto out; |
1836 | } | 1848 | } |
1837 | 1849 | ||
1838 | /* Requeue anything that hasn't been acked, and retry after a | 1850 | /* Requeue anything that hasn't been acked, and retry after a |
1839 | * delay. */ | 1851 | * delay. */ |
1840 | list_splice_init(&con->out_sent, &con->out_queue); | 1852 | list_splice_init(&con->out_sent, &con->out_queue); |
1841 | mutex_unlock(&con->out_mutex); | ||
1842 | 1853 | ||
1843 | if (con->delay == 0) | 1854 | if (con->delay == 0) |
1844 | con->delay = BASE_DELAY_INTERVAL; | 1855 | con->delay = BASE_DELAY_INTERVAL; |
1845 | else if (con->delay < MAX_DELAY_INTERVAL) | 1856 | else if (con->delay < MAX_DELAY_INTERVAL) |
1846 | con->delay *= 2; | 1857 | con->delay *= 2; |
1847 | 1858 | ||
1859 | mutex_unlock(&con->mutex); | ||
1860 | |||
1848 | /* explicitly schedule work to try to reconnect again later. */ | 1861 | /* explicitly schedule work to try to reconnect again later. */ |
1849 | dout("fault queueing %p delay %lu\n", con, con->delay); | 1862 | dout("fault queueing %p delay %lu\n", con, con->delay); |
1850 | con->ops->get(con); | 1863 | con->ops->get(con); |
@@ -1920,7 +1933,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | |||
1920 | msg->hdr.dst_erank = con->peer_addr.erank; | 1933 | msg->hdr.dst_erank = con->peer_addr.erank; |
1921 | 1934 | ||
1922 | /* queue */ | 1935 | /* queue */ |
1923 | mutex_lock(&con->out_mutex); | 1936 | mutex_lock(&con->mutex); |
1924 | BUG_ON(!list_empty(&msg->list_head)); | 1937 | BUG_ON(!list_empty(&msg->list_head)); |
1925 | list_add_tail(&msg->list_head, &con->out_queue); | 1938 | list_add_tail(&msg->list_head, &con->out_queue); |
1926 | dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, | 1939 | dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, |
@@ -1929,7 +1942,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | |||
1929 | le32_to_cpu(msg->hdr.front_len), | 1942 | le32_to_cpu(msg->hdr.front_len), |
1930 | le32_to_cpu(msg->hdr.middle_len), | 1943 | le32_to_cpu(msg->hdr.middle_len), |
1931 | le32_to_cpu(msg->hdr.data_len)); | 1944 | le32_to_cpu(msg->hdr.data_len)); |
1932 | mutex_unlock(&con->out_mutex); | 1945 | mutex_unlock(&con->mutex); |
1933 | 1946 | ||
1934 | /* if there wasn't anything waiting to send before, queue | 1947 | /* if there wasn't anything waiting to send before, queue |
1935 | * new work */ | 1948 | * new work */ |
@@ -1942,7 +1955,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | |||
1942 | */ | 1955 | */ |
1943 | void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) | 1956 | void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) |
1944 | { | 1957 | { |
1945 | mutex_lock(&con->out_mutex); | 1958 | mutex_lock(&con->mutex); |
1946 | if (!list_empty(&msg->list_head)) { | 1959 | if (!list_empty(&msg->list_head)) { |
1947 | dout("con_revoke %p msg %p\n", con, msg); | 1960 | dout("con_revoke %p msg %p\n", con, msg); |
1948 | list_del_init(&msg->list_head); | 1961 | list_del_init(&msg->list_head); |
@@ -1959,7 +1972,7 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) | |||
1959 | } else { | 1972 | } else { |
1960 | dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg); | 1973 | dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg); |
1961 | } | 1974 | } |
1962 | mutex_unlock(&con->out_mutex); | 1975 | mutex_unlock(&con->mutex); |
1963 | } | 1976 | } |
1964 | 1977 | ||
1965 | /* | 1978 | /* |
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index e04c214b4f6..94b55de9033 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h | |||
@@ -155,8 +155,9 @@ struct ceph_connection { | |||
155 | void *auth_reply_buf; /* where to put the authorizer reply */ | 155 | void *auth_reply_buf; /* where to put the authorizer reply */ |
156 | int auth_reply_buf_len; | 156 | int auth_reply_buf_len; |
157 | 157 | ||
158 | struct mutex mutex; | ||
159 | |||
158 | /* out queue */ | 160 | /* out queue */ |
159 | struct mutex out_mutex; | ||
160 | struct list_head out_queue; | 161 | struct list_head out_queue; |
161 | struct list_head out_sent; /* sending or sent but unacked */ | 162 | struct list_head out_sent; /* sending or sent but unacked */ |
162 | u64 out_seq; /* last message queued for send */ | 163 | u64 out_seq; /* last message queued for send */ |