diff options
Diffstat (limited to 'fs/ceph/messenger.c')
-rw-r--r-- | fs/ceph/messenger.c | 108 |
1 files changed, 51 insertions, 57 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index 509f57d9ccb3..60b74839ebec 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c | |||
@@ -39,18 +39,6 @@ static void queue_con(struct ceph_connection *con); | |||
39 | static void con_work(struct work_struct *); | 39 | static void con_work(struct work_struct *); |
40 | static void ceph_fault(struct ceph_connection *con); | 40 | static void ceph_fault(struct ceph_connection *con); |
41 | 41 | ||
42 | const char *ceph_name_type_str(int t) | ||
43 | { | ||
44 | switch (t) { | ||
45 | case CEPH_ENTITY_TYPE_MON: return "mon"; | ||
46 | case CEPH_ENTITY_TYPE_MDS: return "mds"; | ||
47 | case CEPH_ENTITY_TYPE_OSD: return "osd"; | ||
48 | case CEPH_ENTITY_TYPE_CLIENT: return "client"; | ||
49 | case CEPH_ENTITY_TYPE_ADMIN: return "admin"; | ||
50 | default: return "???"; | ||
51 | } | ||
52 | } | ||
53 | |||
54 | /* | 42 | /* |
55 | * nicely render a sockaddr as a string. | 43 | * nicely render a sockaddr as a string. |
56 | */ | 44 | */ |
@@ -340,6 +328,7 @@ static void reset_connection(struct ceph_connection *con) | |||
340 | ceph_msg_put(con->out_msg); | 328 | ceph_msg_put(con->out_msg); |
341 | con->out_msg = NULL; | 329 | con->out_msg = NULL; |
342 | } | 330 | } |
331 | con->out_keepalive_pending = false; | ||
343 | con->in_seq = 0; | 332 | con->in_seq = 0; |
344 | con->in_seq_acked = 0; | 333 | con->in_seq_acked = 0; |
345 | } | 334 | } |
@@ -357,6 +346,7 @@ void ceph_con_close(struct ceph_connection *con) | |||
357 | clear_bit(WRITE_PENDING, &con->state); | 346 | clear_bit(WRITE_PENDING, &con->state); |
358 | mutex_lock(&con->mutex); | 347 | mutex_lock(&con->mutex); |
359 | reset_connection(con); | 348 | reset_connection(con); |
349 | con->peer_global_seq = 0; | ||
360 | cancel_delayed_work(&con->work); | 350 | cancel_delayed_work(&con->work); |
361 | mutex_unlock(&con->mutex); | 351 | mutex_unlock(&con->mutex); |
362 | queue_con(con); | 352 | queue_con(con); |
@@ -492,7 +482,14 @@ static void prepare_write_message(struct ceph_connection *con) | |||
492 | list_move_tail(&m->list_head, &con->out_sent); | 482 | list_move_tail(&m->list_head, &con->out_sent); |
493 | } | 483 | } |
494 | 484 | ||
495 | m->hdr.seq = cpu_to_le64(++con->out_seq); | 485 | /* |
486 | * only assign outgoing seq # if we haven't sent this message | ||
487 | * yet. if it is requeued, resend with it's original seq. | ||
488 | */ | ||
489 | if (m->needs_out_seq) { | ||
490 | m->hdr.seq = cpu_to_le64(++con->out_seq); | ||
491 | m->needs_out_seq = false; | ||
492 | } | ||
496 | 493 | ||
497 | dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", | 494 | dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", |
498 | m, con->out_seq, le16_to_cpu(m->hdr.type), | 495 | m, con->out_seq, le16_to_cpu(m->hdr.type), |
@@ -654,7 +651,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr, | |||
654 | dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, | 651 | dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, |
655 | con->connect_seq, global_seq, proto); | 652 | con->connect_seq, global_seq, proto); |
656 | 653 | ||
657 | con->out_connect.features = CEPH_FEATURE_SUPPORTED; | 654 | con->out_connect.features = CEPH_FEATURE_SUPPORTED_CLIENT; |
658 | con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); | 655 | con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); |
659 | con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); | 656 | con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); |
660 | con->out_connect.global_seq = cpu_to_le32(global_seq); | 657 | con->out_connect.global_seq = cpu_to_le32(global_seq); |
@@ -1117,8 +1114,8 @@ static void fail_protocol(struct ceph_connection *con) | |||
1117 | 1114 | ||
1118 | static int process_connect(struct ceph_connection *con) | 1115 | static int process_connect(struct ceph_connection *con) |
1119 | { | 1116 | { |
1120 | u64 sup_feat = CEPH_FEATURE_SUPPORTED; | 1117 | u64 sup_feat = CEPH_FEATURE_SUPPORTED_CLIENT; |
1121 | u64 req_feat = CEPH_FEATURE_REQUIRED; | 1118 | u64 req_feat = CEPH_FEATURE_REQUIRED_CLIENT; |
1122 | u64 server_feat = le64_to_cpu(con->in_reply.features); | 1119 | u64 server_feat = le64_to_cpu(con->in_reply.features); |
1123 | 1120 | ||
1124 | dout("process_connect on %p tag %d\n", con, (int)con->in_tag); | 1121 | dout("process_connect on %p tag %d\n", con, (int)con->in_tag); |
@@ -1226,6 +1223,7 @@ static int process_connect(struct ceph_connection *con) | |||
1226 | clear_bit(CONNECTING, &con->state); | 1223 | clear_bit(CONNECTING, &con->state); |
1227 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); | 1224 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); |
1228 | con->connect_seq++; | 1225 | con->connect_seq++; |
1226 | con->peer_features = server_feat; | ||
1229 | dout("process_connect got READY gseq %d cseq %d (%d)\n", | 1227 | dout("process_connect got READY gseq %d cseq %d (%d)\n", |
1230 | con->peer_global_seq, | 1228 | con->peer_global_seq, |
1231 | le32_to_cpu(con->in_reply.connect_seq), | 1229 | le32_to_cpu(con->in_reply.connect_seq), |
@@ -1395,19 +1393,17 @@ static int read_partial_message(struct ceph_connection *con) | |||
1395 | con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); | 1393 | con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); |
1396 | if (skip) { | 1394 | if (skip) { |
1397 | /* skip this message */ | 1395 | /* skip this message */ |
1398 | dout("alloc_msg returned NULL, skipping message\n"); | 1396 | dout("alloc_msg said skip message\n"); |
1399 | con->in_base_pos = -front_len - middle_len - data_len - | 1397 | con->in_base_pos = -front_len - middle_len - data_len - |
1400 | sizeof(m->footer); | 1398 | sizeof(m->footer); |
1401 | con->in_tag = CEPH_MSGR_TAG_READY; | 1399 | con->in_tag = CEPH_MSGR_TAG_READY; |
1402 | con->in_seq++; | 1400 | con->in_seq++; |
1403 | return 0; | 1401 | return 0; |
1404 | } | 1402 | } |
1405 | if (IS_ERR(con->in_msg)) { | 1403 | if (!con->in_msg) { |
1406 | ret = PTR_ERR(con->in_msg); | ||
1407 | con->in_msg = NULL; | ||
1408 | con->error_msg = | 1404 | con->error_msg = |
1409 | "error allocating memory for incoming message"; | 1405 | "error allocating memory for incoming message"; |
1410 | return ret; | 1406 | return -ENOMEM; |
1411 | } | 1407 | } |
1412 | m = con->in_msg; | 1408 | m = con->in_msg; |
1413 | m->front.iov_len = 0; /* haven't read it yet */ | 1409 | m->front.iov_len = 0; /* haven't read it yet */ |
@@ -1507,14 +1503,14 @@ static void process_message(struct ceph_connection *con) | |||
1507 | 1503 | ||
1508 | /* if first message, set peer_name */ | 1504 | /* if first message, set peer_name */ |
1509 | if (con->peer_name.type == 0) | 1505 | if (con->peer_name.type == 0) |
1510 | con->peer_name = msg->hdr.src.name; | 1506 | con->peer_name = msg->hdr.src; |
1511 | 1507 | ||
1512 | con->in_seq++; | 1508 | con->in_seq++; |
1513 | mutex_unlock(&con->mutex); | 1509 | mutex_unlock(&con->mutex); |
1514 | 1510 | ||
1515 | dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", | 1511 | dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", |
1516 | msg, le64_to_cpu(msg->hdr.seq), | 1512 | msg, le64_to_cpu(msg->hdr.seq), |
1517 | ENTITY_NAME(msg->hdr.src.name), | 1513 | ENTITY_NAME(msg->hdr.src), |
1518 | le16_to_cpu(msg->hdr.type), | 1514 | le16_to_cpu(msg->hdr.type), |
1519 | ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), | 1515 | ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), |
1520 | le32_to_cpu(msg->hdr.front_len), | 1516 | le32_to_cpu(msg->hdr.front_len), |
@@ -1539,7 +1535,6 @@ static int try_write(struct ceph_connection *con) | |||
1539 | dout("try_write start %p state %lu nref %d\n", con, con->state, | 1535 | dout("try_write start %p state %lu nref %d\n", con, con->state, |
1540 | atomic_read(&con->nref)); | 1536 | atomic_read(&con->nref)); |
1541 | 1537 | ||
1542 | mutex_lock(&con->mutex); | ||
1543 | more: | 1538 | more: |
1544 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); | 1539 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); |
1545 | 1540 | ||
@@ -1632,7 +1627,6 @@ do_next: | |||
1632 | done: | 1627 | done: |
1633 | ret = 0; | 1628 | ret = 0; |
1634 | out: | 1629 | out: |
1635 | mutex_unlock(&con->mutex); | ||
1636 | dout("try_write done on %p\n", con); | 1630 | dout("try_write done on %p\n", con); |
1637 | return ret; | 1631 | return ret; |
1638 | } | 1632 | } |
@@ -1644,7 +1638,6 @@ out: | |||
1644 | */ | 1638 | */ |
1645 | static int try_read(struct ceph_connection *con) | 1639 | static int try_read(struct ceph_connection *con) |
1646 | { | 1640 | { |
1647 | struct ceph_messenger *msgr; | ||
1648 | int ret = -1; | 1641 | int ret = -1; |
1649 | 1642 | ||
1650 | if (!con->sock) | 1643 | if (!con->sock) |
@@ -1654,9 +1647,6 @@ static int try_read(struct ceph_connection *con) | |||
1654 | return 0; | 1647 | return 0; |
1655 | 1648 | ||
1656 | dout("try_read start on %p\n", con); | 1649 | dout("try_read start on %p\n", con); |
1657 | msgr = con->msgr; | ||
1658 | |||
1659 | mutex_lock(&con->mutex); | ||
1660 | 1650 | ||
1661 | more: | 1651 | more: |
1662 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, | 1652 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, |
@@ -1751,7 +1741,6 @@ more: | |||
1751 | done: | 1741 | done: |
1752 | ret = 0; | 1742 | ret = 0; |
1753 | out: | 1743 | out: |
1754 | mutex_unlock(&con->mutex); | ||
1755 | dout("try_read done on %p\n", con); | 1744 | dout("try_read done on %p\n", con); |
1756 | return ret; | 1745 | return ret; |
1757 | 1746 | ||
@@ -1823,6 +1812,8 @@ more: | |||
1823 | dout("con_work %p start, clearing QUEUED\n", con); | 1812 | dout("con_work %p start, clearing QUEUED\n", con); |
1824 | clear_bit(QUEUED, &con->state); | 1813 | clear_bit(QUEUED, &con->state); |
1825 | 1814 | ||
1815 | mutex_lock(&con->mutex); | ||
1816 | |||
1826 | if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ | 1817 | if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ |
1827 | dout("con_work CLOSED\n"); | 1818 | dout("con_work CLOSED\n"); |
1828 | con_close_socket(con); | 1819 | con_close_socket(con); |
@@ -1837,11 +1828,16 @@ more: | |||
1837 | if (test_and_clear_bit(SOCK_CLOSED, &con->state) || | 1828 | if (test_and_clear_bit(SOCK_CLOSED, &con->state) || |
1838 | try_read(con) < 0 || | 1829 | try_read(con) < 0 || |
1839 | try_write(con) < 0) { | 1830 | try_write(con) < 0) { |
1831 | mutex_unlock(&con->mutex); | ||
1840 | backoff = 1; | 1832 | backoff = 1; |
1841 | ceph_fault(con); /* error/fault path */ | 1833 | ceph_fault(con); /* error/fault path */ |
1834 | goto done_unlocked; | ||
1842 | } | 1835 | } |
1843 | 1836 | ||
1844 | done: | 1837 | done: |
1838 | mutex_unlock(&con->mutex); | ||
1839 | |||
1840 | done_unlocked: | ||
1845 | clear_bit(BUSY, &con->state); | 1841 | clear_bit(BUSY, &con->state); |
1846 | dout("con->state=%lu\n", con->state); | 1842 | dout("con->state=%lu\n", con->state); |
1847 | if (test_bit(QUEUED, &con->state)) { | 1843 | if (test_bit(QUEUED, &con->state)) { |
@@ -1940,7 +1936,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) | |||
1940 | 1936 | ||
1941 | /* the zero page is needed if a request is "canceled" while the message | 1937 | /* the zero page is needed if a request is "canceled" while the message |
1942 | * is being written over the socket */ | 1938 | * is being written over the socket */ |
1943 | msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO); | 1939 | msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO); |
1944 | if (!msgr->zero_page) { | 1940 | if (!msgr->zero_page) { |
1945 | kfree(msgr); | 1941 | kfree(msgr); |
1946 | return ERR_PTR(-ENOMEM); | 1942 | return ERR_PTR(-ENOMEM); |
@@ -1980,12 +1976,12 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | |||
1980 | } | 1976 | } |
1981 | 1977 | ||
1982 | /* set src+dst */ | 1978 | /* set src+dst */ |
1983 | msg->hdr.src.name = con->msgr->inst.name; | 1979 | msg->hdr.src = con->msgr->inst.name; |
1984 | msg->hdr.src.addr = con->msgr->my_enc_addr; | ||
1985 | msg->hdr.orig_src = msg->hdr.src; | ||
1986 | 1980 | ||
1987 | BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); | 1981 | BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); |
1988 | 1982 | ||
1983 | msg->needs_out_seq = true; | ||
1984 | |||
1989 | /* queue */ | 1985 | /* queue */ |
1990 | mutex_lock(&con->mutex); | 1986 | mutex_lock(&con->mutex); |
1991 | BUG_ON(!list_empty(&msg->list_head)); | 1987 | BUG_ON(!list_empty(&msg->list_head)); |
@@ -2074,26 +2070,29 @@ void ceph_con_keepalive(struct ceph_connection *con) | |||
2074 | * construct a new message with given type, size | 2070 | * construct a new message with given type, size |
2075 | * the new msg has a ref count of 1. | 2071 | * the new msg has a ref count of 1. |
2076 | */ | 2072 | */ |
2077 | struct ceph_msg *ceph_msg_new(int type, int front_len, | 2073 | struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) |
2078 | int page_len, int page_off, struct page **pages) | ||
2079 | { | 2074 | { |
2080 | struct ceph_msg *m; | 2075 | struct ceph_msg *m; |
2081 | 2076 | ||
2082 | m = kmalloc(sizeof(*m), GFP_NOFS); | 2077 | m = kmalloc(sizeof(*m), flags); |
2083 | if (m == NULL) | 2078 | if (m == NULL) |
2084 | goto out; | 2079 | goto out; |
2085 | kref_init(&m->kref); | 2080 | kref_init(&m->kref); |
2086 | INIT_LIST_HEAD(&m->list_head); | 2081 | INIT_LIST_HEAD(&m->list_head); |
2087 | 2082 | ||
2083 | m->hdr.tid = 0; | ||
2088 | m->hdr.type = cpu_to_le16(type); | 2084 | m->hdr.type = cpu_to_le16(type); |
2085 | m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); | ||
2086 | m->hdr.version = 0; | ||
2089 | m->hdr.front_len = cpu_to_le32(front_len); | 2087 | m->hdr.front_len = cpu_to_le32(front_len); |
2090 | m->hdr.middle_len = 0; | 2088 | m->hdr.middle_len = 0; |
2091 | m->hdr.data_len = cpu_to_le32(page_len); | 2089 | m->hdr.data_len = 0; |
2092 | m->hdr.data_off = cpu_to_le16(page_off); | 2090 | m->hdr.data_off = 0; |
2093 | m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); | 2091 | m->hdr.reserved = 0; |
2094 | m->footer.front_crc = 0; | 2092 | m->footer.front_crc = 0; |
2095 | m->footer.middle_crc = 0; | 2093 | m->footer.middle_crc = 0; |
2096 | m->footer.data_crc = 0; | 2094 | m->footer.data_crc = 0; |
2095 | m->footer.flags = 0; | ||
2097 | m->front_max = front_len; | 2096 | m->front_max = front_len; |
2098 | m->front_is_vmalloc = false; | 2097 | m->front_is_vmalloc = false; |
2099 | m->more_to_follow = false; | 2098 | m->more_to_follow = false; |
@@ -2102,11 +2101,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, | |||
2102 | /* front */ | 2101 | /* front */ |
2103 | if (front_len) { | 2102 | if (front_len) { |
2104 | if (front_len > PAGE_CACHE_SIZE) { | 2103 | if (front_len > PAGE_CACHE_SIZE) { |
2105 | m->front.iov_base = __vmalloc(front_len, GFP_NOFS, | 2104 | m->front.iov_base = __vmalloc(front_len, flags, |
2106 | PAGE_KERNEL); | 2105 | PAGE_KERNEL); |
2107 | m->front_is_vmalloc = true; | 2106 | m->front_is_vmalloc = true; |
2108 | } else { | 2107 | } else { |
2109 | m->front.iov_base = kmalloc(front_len, GFP_NOFS); | 2108 | m->front.iov_base = kmalloc(front_len, flags); |
2110 | } | 2109 | } |
2111 | if (m->front.iov_base == NULL) { | 2110 | if (m->front.iov_base == NULL) { |
2112 | pr_err("msg_new can't allocate %d bytes\n", | 2111 | pr_err("msg_new can't allocate %d bytes\n", |
@@ -2122,19 +2121,18 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, | |||
2122 | m->middle = NULL; | 2121 | m->middle = NULL; |
2123 | 2122 | ||
2124 | /* data */ | 2123 | /* data */ |
2125 | m->nr_pages = calc_pages_for(page_off, page_len); | 2124 | m->nr_pages = 0; |
2126 | m->pages = pages; | 2125 | m->pages = NULL; |
2127 | m->pagelist = NULL; | 2126 | m->pagelist = NULL; |
2128 | 2127 | ||
2129 | dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, | 2128 | dout("ceph_msg_new %p front %d\n", m, front_len); |
2130 | m->nr_pages); | ||
2131 | return m; | 2129 | return m; |
2132 | 2130 | ||
2133 | out2: | 2131 | out2: |
2134 | ceph_msg_put(m); | 2132 | ceph_msg_put(m); |
2135 | out: | 2133 | out: |
2136 | pr_err("msg_new can't create type %d len %d\n", type, front_len); | 2134 | pr_err("msg_new can't create type %d front %d\n", type, front_len); |
2137 | return ERR_PTR(-ENOMEM); | 2135 | return NULL; |
2138 | } | 2136 | } |
2139 | 2137 | ||
2140 | /* | 2138 | /* |
@@ -2177,29 +2175,25 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, | |||
2177 | mutex_unlock(&con->mutex); | 2175 | mutex_unlock(&con->mutex); |
2178 | msg = con->ops->alloc_msg(con, hdr, skip); | 2176 | msg = con->ops->alloc_msg(con, hdr, skip); |
2179 | mutex_lock(&con->mutex); | 2177 | mutex_lock(&con->mutex); |
2180 | if (IS_ERR(msg)) | 2178 | if (!msg || *skip) |
2181 | return msg; | ||
2182 | |||
2183 | if (*skip) | ||
2184 | return NULL; | 2179 | return NULL; |
2185 | } | 2180 | } |
2186 | if (!msg) { | 2181 | if (!msg) { |
2187 | *skip = 0; | 2182 | *skip = 0; |
2188 | msg = ceph_msg_new(type, front_len, 0, 0, NULL); | 2183 | msg = ceph_msg_new(type, front_len, GFP_NOFS); |
2189 | if (!msg) { | 2184 | if (!msg) { |
2190 | pr_err("unable to allocate msg type %d len %d\n", | 2185 | pr_err("unable to allocate msg type %d len %d\n", |
2191 | type, front_len); | 2186 | type, front_len); |
2192 | return ERR_PTR(-ENOMEM); | 2187 | return NULL; |
2193 | } | 2188 | } |
2194 | } | 2189 | } |
2195 | memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); | 2190 | memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); |
2196 | 2191 | ||
2197 | if (middle_len) { | 2192 | if (middle_len && !msg->middle) { |
2198 | ret = ceph_alloc_middle(con, msg); | 2193 | ret = ceph_alloc_middle(con, msg); |
2199 | |||
2200 | if (ret < 0) { | 2194 | if (ret < 0) { |
2201 | ceph_msg_put(msg); | 2195 | ceph_msg_put(msg); |
2202 | return msg; | 2196 | return NULL; |
2203 | } | 2197 | } |
2204 | } | 2198 | } |
2205 | 2199 | ||