aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/messenger.c
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-12-22 13:43:42 -0500
committerSage Weil <sage@newdream.net>2009-12-23 11:17:19 -0500
commitec302645f4a9bd9ec757c30d185557e1c0972c1a (patch)
treeb03aee886f401affba79f8473b83bd190a69570a /fs/ceph/messenger.c
parent529cfcc46ffa2cbe4d07641c11e65f67fe7b66e4 (diff)
ceph: use connection mutex to protect read and write stages
Use a single mutex (previously out_mutex) to protect both read and write activity from concurrent ceph_con_* calls. Drop the mutex when doing callbacks to avoid nested locking (the callback may need to call something like ceph_con_close). Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/messenger.c')
-rw-r--r--fs/ceph/messenger.c49
1 files changed, 31 insertions, 18 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 2e4e9773c46..c03b4185c14 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -316,7 +316,6 @@ static void reset_connection(struct ceph_connection *con)
316{ 316{
317 /* reset connection, out_queue, msg_ and connect_seq */ 317 /* reset connection, out_queue, msg_ and connect_seq */
318 /* discard existing out_queue and msg_seq */ 318 /* discard existing out_queue and msg_seq */
319 mutex_lock(&con->out_mutex);
320 ceph_msg_remove_list(&con->out_queue); 319 ceph_msg_remove_list(&con->out_queue);
321 ceph_msg_remove_list(&con->out_sent); 320 ceph_msg_remove_list(&con->out_sent);
322 321
@@ -332,7 +331,6 @@ static void reset_connection(struct ceph_connection *con)
332 con->out_msg = NULL; 331 con->out_msg = NULL;
333 } 332 }
334 con->in_seq = 0; 333 con->in_seq = 0;
335 mutex_unlock(&con->out_mutex);
336} 334}
337 335
338/* 336/*
@@ -343,7 +341,9 @@ void ceph_con_close(struct ceph_connection *con)
343 dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr)); 341 dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr));
344 set_bit(CLOSED, &con->state); /* in case there's queued work */ 342 set_bit(CLOSED, &con->state); /* in case there's queued work */
345 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ 343 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
344 mutex_lock(&con->mutex);
346 reset_connection(con); 345 reset_connection(con);
346 mutex_unlock(&con->mutex);
347 queue_con(con); 347 queue_con(con);
348} 348}
349 349
@@ -392,7 +392,7 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
392 memset(con, 0, sizeof(*con)); 392 memset(con, 0, sizeof(*con));
393 atomic_set(&con->nref, 1); 393 atomic_set(&con->nref, 1);
394 con->msgr = msgr; 394 con->msgr = msgr;
395 mutex_init(&con->out_mutex); 395 mutex_init(&con->mutex);
396 INIT_LIST_HEAD(&con->out_queue); 396 INIT_LIST_HEAD(&con->out_queue);
397 INIT_LIST_HEAD(&con->out_sent); 397 INIT_LIST_HEAD(&con->out_sent);
398 INIT_DELAYED_WORK(&con->work, con_work); 398 INIT_DELAYED_WORK(&con->work, con_work);
@@ -571,11 +571,13 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
571 int auth_len = 0; 571 int auth_len = 0;
572 int auth_protocol = 0; 572 int auth_protocol = 0;
573 573
574 mutex_unlock(&con->mutex);
574 if (con->ops->get_authorizer) 575 if (con->ops->get_authorizer)
575 con->ops->get_authorizer(con, &auth_buf, &auth_len, 576 con->ops->get_authorizer(con, &auth_buf, &auth_len,
576 &auth_protocol, &con->auth_reply_buf, 577 &auth_protocol, &con->auth_reply_buf,
577 &con->auth_reply_buf_len, 578 &con->auth_reply_buf_len,
578 con->auth_retry); 579 con->auth_retry);
580 mutex_lock(&con->mutex);
579 581
580 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 582 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
581 con->out_connect.authorizer_len = cpu_to_le32(auth_len); 583 con->out_connect.authorizer_len = cpu_to_le32(auth_len);
@@ -1094,10 +1096,13 @@ static int process_connect(struct ceph_connection *con)
1094 le32_to_cpu(con->out_connect.protocol_version), 1096 le32_to_cpu(con->out_connect.protocol_version),
1095 le32_to_cpu(con->in_reply.protocol_version)); 1097 le32_to_cpu(con->in_reply.protocol_version));
1096 con->error_msg = "protocol version mismatch"; 1098 con->error_msg = "protocol version mismatch";
1097 if (con->ops->bad_proto)
1098 con->ops->bad_proto(con);
1099 reset_connection(con); 1099 reset_connection(con);
1100 set_bit(CLOSED, &con->state); /* in case there's queued work */ 1100 set_bit(CLOSED, &con->state); /* in case there's queued work */
1101
1102 mutex_unlock(&con->mutex);
1103 if (con->ops->bad_proto)
1104 con->ops->bad_proto(con);
1105 mutex_lock(&con->mutex);
1101 return -1; 1106 return -1;
1102 1107
1103 case CEPH_MSGR_TAG_BADAUTHORIZER: 1108 case CEPH_MSGR_TAG_BADAUTHORIZER:
@@ -1133,9 +1138,11 @@ static int process_connect(struct ceph_connection *con)
1133 prepare_read_connect(con); 1138 prepare_read_connect(con);
1134 1139
1135 /* Tell ceph about it. */ 1140 /* Tell ceph about it. */
1141 mutex_unlock(&con->mutex);
1136 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); 1142 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
1137 if (con->ops->peer_reset) 1143 if (con->ops->peer_reset)
1138 con->ops->peer_reset(con); 1144 con->ops->peer_reset(con);
1145 mutex_lock(&con->mutex);
1139 break; 1146 break;
1140 1147
1141 case CEPH_MSGR_TAG_RETRY_SESSION: 1148 case CEPH_MSGR_TAG_RETRY_SESSION:
@@ -1221,7 +1228,6 @@ static void process_ack(struct ceph_connection *con)
1221 u64 ack = le64_to_cpu(con->in_temp_ack); 1228 u64 ack = le64_to_cpu(con->in_temp_ack);
1222 u64 seq; 1229 u64 seq;
1223 1230
1224 mutex_lock(&con->out_mutex);
1225 while (!list_empty(&con->out_sent)) { 1231 while (!list_empty(&con->out_sent)) {
1226 m = list_first_entry(&con->out_sent, struct ceph_msg, 1232 m = list_first_entry(&con->out_sent, struct ceph_msg,
1227 list_head); 1233 list_head);
@@ -1232,7 +1238,6 @@ static void process_ack(struct ceph_connection *con)
1232 le16_to_cpu(m->hdr.type), m); 1238 le16_to_cpu(m->hdr.type), m);
1233 ceph_msg_remove(m); 1239 ceph_msg_remove(m);
1234 } 1240 }
1235 mutex_unlock(&con->out_mutex);
1236 prepare_read_tag(con); 1241 prepare_read_tag(con);
1237} 1242}
1238 1243
@@ -1366,8 +1371,10 @@ static int read_partial_message(struct ceph_connection *con)
1366 /* find pages for data payload */ 1371 /* find pages for data payload */
1367 want = calc_pages_for(data_off & ~PAGE_MASK, data_len); 1372 want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1368 ret = -1; 1373 ret = -1;
1374 mutex_unlock(&con->mutex);
1369 if (con->ops->prepare_pages) 1375 if (con->ops->prepare_pages)
1370 ret = con->ops->prepare_pages(con, m, want); 1376 ret = con->ops->prepare_pages(con, m, want);
1377 mutex_lock(&con->mutex);
1371 if (ret < 0) { 1378 if (ret < 0) {
1372 dout("%p prepare_pages failed, skipping payload\n", m); 1379 dout("%p prepare_pages failed, skipping payload\n", m);
1373 con->in_base_pos = -data_len - sizeof(m->footer); 1380 con->in_base_pos = -data_len - sizeof(m->footer);
@@ -1454,9 +1461,8 @@ static void process_message(struct ceph_connection *con)
1454 if (con->peer_name.type == 0) 1461 if (con->peer_name.type == 0)
1455 con->peer_name = msg->hdr.src.name; 1462 con->peer_name = msg->hdr.src.name;
1456 1463
1457 mutex_lock(&con->out_mutex);
1458 con->in_seq++; 1464 con->in_seq++;
1459 mutex_unlock(&con->out_mutex); 1465 mutex_unlock(&con->mutex);
1460 1466
1461 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", 1467 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
1462 msg, le64_to_cpu(msg->hdr.seq), 1468 msg, le64_to_cpu(msg->hdr.seq),
@@ -1467,6 +1473,8 @@ static void process_message(struct ceph_connection *con)
1467 le32_to_cpu(msg->hdr.data_len), 1473 le32_to_cpu(msg->hdr.data_len),
1468 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 1474 con->in_front_crc, con->in_middle_crc, con->in_data_crc);
1469 con->ops->dispatch(con, msg); 1475 con->ops->dispatch(con, msg);
1476
1477 mutex_lock(&con->mutex);
1470 prepare_read_tag(con); 1478 prepare_read_tag(con);
1471} 1479}
1472 1480
@@ -1483,7 +1491,7 @@ static int try_write(struct ceph_connection *con)
1483 dout("try_write start %p state %lu nref %d\n", con, con->state, 1491 dout("try_write start %p state %lu nref %d\n", con, con->state,
1484 atomic_read(&con->nref)); 1492 atomic_read(&con->nref));
1485 1493
1486 mutex_lock(&con->out_mutex); 1494 mutex_lock(&con->mutex);
1487more: 1495more:
1488 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1496 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1489 1497
@@ -1576,7 +1584,7 @@ do_next:
1576done: 1584done:
1577 ret = 0; 1585 ret = 0;
1578out: 1586out:
1579 mutex_unlock(&con->out_mutex); 1587 mutex_unlock(&con->mutex);
1580 dout("try_write done on %p\n", con); 1588 dout("try_write done on %p\n", con);
1581 return ret; 1589 return ret;
1582} 1590}
@@ -1600,6 +1608,8 @@ static int try_read(struct ceph_connection *con)
1600 dout("try_read start on %p\n", con); 1608 dout("try_read start on %p\n", con);
1601 msgr = con->msgr; 1609 msgr = con->msgr;
1602 1610
1611 mutex_lock(&con->mutex);
1612
1603more: 1613more:
1604 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1614 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
1605 con->in_base_pos); 1615 con->in_base_pos);
@@ -1693,6 +1703,7 @@ more:
1693done: 1703done:
1694 ret = 0; 1704 ret = 0;
1695out: 1705out:
1706 mutex_unlock(&con->mutex);
1696 dout("try_read done on %p\n", con); 1707 dout("try_read done on %p\n", con);
1697 return ret; 1708 return ret;
1698 1709
@@ -1818,6 +1829,8 @@ static void ceph_fault(struct ceph_connection *con)
1818 1829
1819 clear_bit(BUSY, &con->state); /* to avoid an improbable race */ 1830 clear_bit(BUSY, &con->state); /* to avoid an improbable race */
1820 1831
1832 mutex_lock(&con->mutex);
1833
1821 con_close_socket(con); 1834 con_close_socket(con);
1822 1835
1823 if (con->in_msg) { 1836 if (con->in_msg) {
@@ -1827,24 +1840,24 @@ static void ceph_fault(struct ceph_connection *con)
1827 1840
1828 /* If there are no messages in the queue, place the connection 1841 /* If there are no messages in the queue, place the connection
1829 * in a STANDBY state (i.e., don't try to reconnect just yet). */ 1842 * in a STANDBY state (i.e., don't try to reconnect just yet). */
1830 mutex_lock(&con->out_mutex);
1831 if (list_empty(&con->out_queue) && !con->out_keepalive_pending) { 1843 if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
1832 dout("fault setting STANDBY\n"); 1844 dout("fault setting STANDBY\n");
1833 set_bit(STANDBY, &con->state); 1845 set_bit(STANDBY, &con->state);
1834 mutex_unlock(&con->out_mutex); 1846 mutex_unlock(&con->mutex);
1835 goto out; 1847 goto out;
1836 } 1848 }
1837 1849
1838 /* Requeue anything that hasn't been acked, and retry after a 1850 /* Requeue anything that hasn't been acked, and retry after a
1839 * delay. */ 1851 * delay. */
1840 list_splice_init(&con->out_sent, &con->out_queue); 1852 list_splice_init(&con->out_sent, &con->out_queue);
1841 mutex_unlock(&con->out_mutex);
1842 1853
1843 if (con->delay == 0) 1854 if (con->delay == 0)
1844 con->delay = BASE_DELAY_INTERVAL; 1855 con->delay = BASE_DELAY_INTERVAL;
1845 else if (con->delay < MAX_DELAY_INTERVAL) 1856 else if (con->delay < MAX_DELAY_INTERVAL)
1846 con->delay *= 2; 1857 con->delay *= 2;
1847 1858
1859 mutex_unlock(&con->mutex);
1860
1848 /* explicitly schedule work to try to reconnect again later. */ 1861 /* explicitly schedule work to try to reconnect again later. */
1849 dout("fault queueing %p delay %lu\n", con, con->delay); 1862 dout("fault queueing %p delay %lu\n", con, con->delay);
1850 con->ops->get(con); 1863 con->ops->get(con);
@@ -1920,7 +1933,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
1920 msg->hdr.dst_erank = con->peer_addr.erank; 1933 msg->hdr.dst_erank = con->peer_addr.erank;
1921 1934
1922 /* queue */ 1935 /* queue */
1923 mutex_lock(&con->out_mutex); 1936 mutex_lock(&con->mutex);
1924 BUG_ON(!list_empty(&msg->list_head)); 1937 BUG_ON(!list_empty(&msg->list_head));
1925 list_add_tail(&msg->list_head, &con->out_queue); 1938 list_add_tail(&msg->list_head, &con->out_queue);
1926 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 1939 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
@@ -1929,7 +1942,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
1929 le32_to_cpu(msg->hdr.front_len), 1942 le32_to_cpu(msg->hdr.front_len),
1930 le32_to_cpu(msg->hdr.middle_len), 1943 le32_to_cpu(msg->hdr.middle_len),
1931 le32_to_cpu(msg->hdr.data_len)); 1944 le32_to_cpu(msg->hdr.data_len));
1932 mutex_unlock(&con->out_mutex); 1945 mutex_unlock(&con->mutex);
1933 1946
1934 /* if there wasn't anything waiting to send before, queue 1947 /* if there wasn't anything waiting to send before, queue
1935 * new work */ 1948 * new work */
@@ -1942,7 +1955,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
1942 */ 1955 */
1943void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) 1956void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
1944{ 1957{
1945 mutex_lock(&con->out_mutex); 1958 mutex_lock(&con->mutex);
1946 if (!list_empty(&msg->list_head)) { 1959 if (!list_empty(&msg->list_head)) {
1947 dout("con_revoke %p msg %p\n", con, msg); 1960 dout("con_revoke %p msg %p\n", con, msg);
1948 list_del_init(&msg->list_head); 1961 list_del_init(&msg->list_head);
@@ -1959,7 +1972,7 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
1959 } else { 1972 } else {
1960 dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg); 1973 dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
1961 } 1974 }
1962 mutex_unlock(&con->out_mutex); 1975 mutex_unlock(&con->mutex);
1963} 1976}
1964 1977
1965/* 1978/*