diff options
Diffstat (limited to 'fs/ceph/messenger.c')
-rw-r--r-- | fs/ceph/messenger.c | 91 |
1 files changed, 36 insertions, 55 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index cd4fadb6491a..60b74839ebec 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c | |||
@@ -39,18 +39,6 @@ static void queue_con(struct ceph_connection *con); | |||
39 | static void con_work(struct work_struct *); | 39 | static void con_work(struct work_struct *); |
40 | static void ceph_fault(struct ceph_connection *con); | 40 | static void ceph_fault(struct ceph_connection *con); |
41 | 41 | ||
42 | const char *ceph_name_type_str(int t) | ||
43 | { | ||
44 | switch (t) { | ||
45 | case CEPH_ENTITY_TYPE_MON: return "mon"; | ||
46 | case CEPH_ENTITY_TYPE_MDS: return "mds"; | ||
47 | case CEPH_ENTITY_TYPE_OSD: return "osd"; | ||
48 | case CEPH_ENTITY_TYPE_CLIENT: return "client"; | ||
49 | case CEPH_ENTITY_TYPE_ADMIN: return "admin"; | ||
50 | default: return "???"; | ||
51 | } | ||
52 | } | ||
53 | |||
54 | /* | 42 | /* |
55 | * nicely render a sockaddr as a string. | 43 | * nicely render a sockaddr as a string. |
56 | */ | 44 | */ |
@@ -340,6 +328,7 @@ static void reset_connection(struct ceph_connection *con) | |||
340 | ceph_msg_put(con->out_msg); | 328 | ceph_msg_put(con->out_msg); |
341 | con->out_msg = NULL; | 329 | con->out_msg = NULL; |
342 | } | 330 | } |
331 | con->out_keepalive_pending = false; | ||
343 | con->in_seq = 0; | 332 | con->in_seq = 0; |
344 | con->in_seq_acked = 0; | 333 | con->in_seq_acked = 0; |
345 | } | 334 | } |
@@ -357,6 +346,7 @@ void ceph_con_close(struct ceph_connection *con) | |||
357 | clear_bit(WRITE_PENDING, &con->state); | 346 | clear_bit(WRITE_PENDING, &con->state); |
358 | mutex_lock(&con->mutex); | 347 | mutex_lock(&con->mutex); |
359 | reset_connection(con); | 348 | reset_connection(con); |
349 | con->peer_global_seq = 0; | ||
360 | cancel_delayed_work(&con->work); | 350 | cancel_delayed_work(&con->work); |
361 | mutex_unlock(&con->mutex); | 351 | mutex_unlock(&con->mutex); |
362 | queue_con(con); | 352 | queue_con(con); |
@@ -661,7 +651,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr, | |||
661 | dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, | 651 | dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, |
662 | con->connect_seq, global_seq, proto); | 652 | con->connect_seq, global_seq, proto); |
663 | 653 | ||
664 | con->out_connect.features = CEPH_FEATURE_SUPPORTED; | 654 | con->out_connect.features = CEPH_FEATURE_SUPPORTED_CLIENT; |
665 | con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); | 655 | con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); |
666 | con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); | 656 | con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); |
667 | con->out_connect.global_seq = cpu_to_le32(global_seq); | 657 | con->out_connect.global_seq = cpu_to_le32(global_seq); |
@@ -1124,8 +1114,8 @@ static void fail_protocol(struct ceph_connection *con) | |||
1124 | 1114 | ||
1125 | static int process_connect(struct ceph_connection *con) | 1115 | static int process_connect(struct ceph_connection *con) |
1126 | { | 1116 | { |
1127 | u64 sup_feat = CEPH_FEATURE_SUPPORTED; | 1117 | u64 sup_feat = CEPH_FEATURE_SUPPORTED_CLIENT; |
1128 | u64 req_feat = CEPH_FEATURE_REQUIRED; | 1118 | u64 req_feat = CEPH_FEATURE_REQUIRED_CLIENT; |
1129 | u64 server_feat = le64_to_cpu(con->in_reply.features); | 1119 | u64 server_feat = le64_to_cpu(con->in_reply.features); |
1130 | 1120 | ||
1131 | dout("process_connect on %p tag %d\n", con, (int)con->in_tag); | 1121 | dout("process_connect on %p tag %d\n", con, (int)con->in_tag); |
@@ -1233,6 +1223,7 @@ static int process_connect(struct ceph_connection *con) | |||
1233 | clear_bit(CONNECTING, &con->state); | 1223 | clear_bit(CONNECTING, &con->state); |
1234 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); | 1224 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); |
1235 | con->connect_seq++; | 1225 | con->connect_seq++; |
1226 | con->peer_features = server_feat; | ||
1236 | dout("process_connect got READY gseq %d cseq %d (%d)\n", | 1227 | dout("process_connect got READY gseq %d cseq %d (%d)\n", |
1237 | con->peer_global_seq, | 1228 | con->peer_global_seq, |
1238 | le32_to_cpu(con->in_reply.connect_seq), | 1229 | le32_to_cpu(con->in_reply.connect_seq), |
@@ -1402,19 +1393,17 @@ static int read_partial_message(struct ceph_connection *con) | |||
1402 | con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); | 1393 | con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); |
1403 | if (skip) { | 1394 | if (skip) { |
1404 | /* skip this message */ | 1395 | /* skip this message */ |
1405 | dout("alloc_msg returned NULL, skipping message\n"); | 1396 | dout("alloc_msg said skip message\n"); |
1406 | con->in_base_pos = -front_len - middle_len - data_len - | 1397 | con->in_base_pos = -front_len - middle_len - data_len - |
1407 | sizeof(m->footer); | 1398 | sizeof(m->footer); |
1408 | con->in_tag = CEPH_MSGR_TAG_READY; | 1399 | con->in_tag = CEPH_MSGR_TAG_READY; |
1409 | con->in_seq++; | 1400 | con->in_seq++; |
1410 | return 0; | 1401 | return 0; |
1411 | } | 1402 | } |
1412 | if (IS_ERR(con->in_msg)) { | 1403 | if (!con->in_msg) { |
1413 | ret = PTR_ERR(con->in_msg); | ||
1414 | con->in_msg = NULL; | ||
1415 | con->error_msg = | 1404 | con->error_msg = |
1416 | "error allocating memory for incoming message"; | 1405 | "error allocating memory for incoming message"; |
1417 | return ret; | 1406 | return -ENOMEM; |
1418 | } | 1407 | } |
1419 | m = con->in_msg; | 1408 | m = con->in_msg; |
1420 | m->front.iov_len = 0; /* haven't read it yet */ | 1409 | m->front.iov_len = 0; /* haven't read it yet */ |
@@ -1514,14 +1503,14 @@ static void process_message(struct ceph_connection *con) | |||
1514 | 1503 | ||
1515 | /* if first message, set peer_name */ | 1504 | /* if first message, set peer_name */ |
1516 | if (con->peer_name.type == 0) | 1505 | if (con->peer_name.type == 0) |
1517 | con->peer_name = msg->hdr.src.name; | 1506 | con->peer_name = msg->hdr.src; |
1518 | 1507 | ||
1519 | con->in_seq++; | 1508 | con->in_seq++; |
1520 | mutex_unlock(&con->mutex); | 1509 | mutex_unlock(&con->mutex); |
1521 | 1510 | ||
1522 | dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", | 1511 | dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", |
1523 | msg, le64_to_cpu(msg->hdr.seq), | 1512 | msg, le64_to_cpu(msg->hdr.seq), |
1524 | ENTITY_NAME(msg->hdr.src.name), | 1513 | ENTITY_NAME(msg->hdr.src), |
1525 | le16_to_cpu(msg->hdr.type), | 1514 | le16_to_cpu(msg->hdr.type), |
1526 | ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), | 1515 | ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), |
1527 | le32_to_cpu(msg->hdr.front_len), | 1516 | le32_to_cpu(msg->hdr.front_len), |
@@ -1546,7 +1535,6 @@ static int try_write(struct ceph_connection *con) | |||
1546 | dout("try_write start %p state %lu nref %d\n", con, con->state, | 1535 | dout("try_write start %p state %lu nref %d\n", con, con->state, |
1547 | atomic_read(&con->nref)); | 1536 | atomic_read(&con->nref)); |
1548 | 1537 | ||
1549 | mutex_lock(&con->mutex); | ||
1550 | more: | 1538 | more: |
1551 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); | 1539 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); |
1552 | 1540 | ||
@@ -1639,7 +1627,6 @@ do_next: | |||
1639 | done: | 1627 | done: |
1640 | ret = 0; | 1628 | ret = 0; |
1641 | out: | 1629 | out: |
1642 | mutex_unlock(&con->mutex); | ||
1643 | dout("try_write done on %p\n", con); | 1630 | dout("try_write done on %p\n", con); |
1644 | return ret; | 1631 | return ret; |
1645 | } | 1632 | } |
@@ -1651,7 +1638,6 @@ out: | |||
1651 | */ | 1638 | */ |
1652 | static int try_read(struct ceph_connection *con) | 1639 | static int try_read(struct ceph_connection *con) |
1653 | { | 1640 | { |
1654 | struct ceph_messenger *msgr; | ||
1655 | int ret = -1; | 1641 | int ret = -1; |
1656 | 1642 | ||
1657 | if (!con->sock) | 1643 | if (!con->sock) |
@@ -1661,9 +1647,6 @@ static int try_read(struct ceph_connection *con) | |||
1661 | return 0; | 1647 | return 0; |
1662 | 1648 | ||
1663 | dout("try_read start on %p\n", con); | 1649 | dout("try_read start on %p\n", con); |
1664 | msgr = con->msgr; | ||
1665 | |||
1666 | mutex_lock(&con->mutex); | ||
1667 | 1650 | ||
1668 | more: | 1651 | more: |
1669 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, | 1652 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, |
@@ -1758,7 +1741,6 @@ more: | |||
1758 | done: | 1741 | done: |
1759 | ret = 0; | 1742 | ret = 0; |
1760 | out: | 1743 | out: |
1761 | mutex_unlock(&con->mutex); | ||
1762 | dout("try_read done on %p\n", con); | 1744 | dout("try_read done on %p\n", con); |
1763 | return ret; | 1745 | return ret; |
1764 | 1746 | ||
@@ -1830,6 +1812,8 @@ more: | |||
1830 | dout("con_work %p start, clearing QUEUED\n", con); | 1812 | dout("con_work %p start, clearing QUEUED\n", con); |
1831 | clear_bit(QUEUED, &con->state); | 1813 | clear_bit(QUEUED, &con->state); |
1832 | 1814 | ||
1815 | mutex_lock(&con->mutex); | ||
1816 | |||
1833 | if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ | 1817 | if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ |
1834 | dout("con_work CLOSED\n"); | 1818 | dout("con_work CLOSED\n"); |
1835 | con_close_socket(con); | 1819 | con_close_socket(con); |
@@ -1844,11 +1828,16 @@ more: | |||
1844 | if (test_and_clear_bit(SOCK_CLOSED, &con->state) || | 1828 | if (test_and_clear_bit(SOCK_CLOSED, &con->state) || |
1845 | try_read(con) < 0 || | 1829 | try_read(con) < 0 || |
1846 | try_write(con) < 0) { | 1830 | try_write(con) < 0) { |
1831 | mutex_unlock(&con->mutex); | ||
1847 | backoff = 1; | 1832 | backoff = 1; |
1848 | ceph_fault(con); /* error/fault path */ | 1833 | ceph_fault(con); /* error/fault path */ |
1834 | goto done_unlocked; | ||
1849 | } | 1835 | } |
1850 | 1836 | ||
1851 | done: | 1837 | done: |
1838 | mutex_unlock(&con->mutex); | ||
1839 | |||
1840 | done_unlocked: | ||
1852 | clear_bit(BUSY, &con->state); | 1841 | clear_bit(BUSY, &con->state); |
1853 | dout("con->state=%lu\n", con->state); | 1842 | dout("con->state=%lu\n", con->state); |
1854 | if (test_bit(QUEUED, &con->state)) { | 1843 | if (test_bit(QUEUED, &con->state)) { |
@@ -1947,7 +1936,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) | |||
1947 | 1936 | ||
1948 | /* the zero page is needed if a request is "canceled" while the message | 1937 | /* the zero page is needed if a request is "canceled" while the message |
1949 | * is being written over the socket */ | 1938 | * is being written over the socket */ |
1950 | msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO); | 1939 | msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO); |
1951 | if (!msgr->zero_page) { | 1940 | if (!msgr->zero_page) { |
1952 | kfree(msgr); | 1941 | kfree(msgr); |
1953 | return ERR_PTR(-ENOMEM); | 1942 | return ERR_PTR(-ENOMEM); |
@@ -1987,9 +1976,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | |||
1987 | } | 1976 | } |
1988 | 1977 | ||
1989 | /* set src+dst */ | 1978 | /* set src+dst */ |
1990 | msg->hdr.src.name = con->msgr->inst.name; | 1979 | msg->hdr.src = con->msgr->inst.name; |
1991 | msg->hdr.src.addr = con->msgr->my_enc_addr; | ||
1992 | msg->hdr.orig_src = msg->hdr.src; | ||
1993 | 1980 | ||
1994 | BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); | 1981 | BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); |
1995 | 1982 | ||
@@ -2083,12 +2070,11 @@ void ceph_con_keepalive(struct ceph_connection *con) | |||
2083 | * construct a new message with given type, size | 2070 | * construct a new message with given type, size |
2084 | * the new msg has a ref count of 1. | 2071 | * the new msg has a ref count of 1. |
2085 | */ | 2072 | */ |
2086 | struct ceph_msg *ceph_msg_new(int type, int front_len, | 2073 | struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) |
2087 | int page_len, int page_off, struct page **pages) | ||
2088 | { | 2074 | { |
2089 | struct ceph_msg *m; | 2075 | struct ceph_msg *m; |
2090 | 2076 | ||
2091 | m = kmalloc(sizeof(*m), GFP_NOFS); | 2077 | m = kmalloc(sizeof(*m), flags); |
2092 | if (m == NULL) | 2078 | if (m == NULL) |
2093 | goto out; | 2079 | goto out; |
2094 | kref_init(&m->kref); | 2080 | kref_init(&m->kref); |
@@ -2100,8 +2086,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, | |||
2100 | m->hdr.version = 0; | 2086 | m->hdr.version = 0; |
2101 | m->hdr.front_len = cpu_to_le32(front_len); | 2087 | m->hdr.front_len = cpu_to_le32(front_len); |
2102 | m->hdr.middle_len = 0; | 2088 | m->hdr.middle_len = 0; |
2103 | m->hdr.data_len = cpu_to_le32(page_len); | 2089 | m->hdr.data_len = 0; |
2104 | m->hdr.data_off = cpu_to_le16(page_off); | 2090 | m->hdr.data_off = 0; |
2105 | m->hdr.reserved = 0; | 2091 | m->hdr.reserved = 0; |
2106 | m->footer.front_crc = 0; | 2092 | m->footer.front_crc = 0; |
2107 | m->footer.middle_crc = 0; | 2093 | m->footer.middle_crc = 0; |
@@ -2115,11 +2101,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, | |||
2115 | /* front */ | 2101 | /* front */ |
2116 | if (front_len) { | 2102 | if (front_len) { |
2117 | if (front_len > PAGE_CACHE_SIZE) { | 2103 | if (front_len > PAGE_CACHE_SIZE) { |
2118 | m->front.iov_base = __vmalloc(front_len, GFP_NOFS, | 2104 | m->front.iov_base = __vmalloc(front_len, flags, |
2119 | PAGE_KERNEL); | 2105 | PAGE_KERNEL); |
2120 | m->front_is_vmalloc = true; | 2106 | m->front_is_vmalloc = true; |
2121 | } else { | 2107 | } else { |
2122 | m->front.iov_base = kmalloc(front_len, GFP_NOFS); | 2108 | m->front.iov_base = kmalloc(front_len, flags); |
2123 | } | 2109 | } |
2124 | if (m->front.iov_base == NULL) { | 2110 | if (m->front.iov_base == NULL) { |
2125 | pr_err("msg_new can't allocate %d bytes\n", | 2111 | pr_err("msg_new can't allocate %d bytes\n", |
@@ -2135,19 +2121,18 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, | |||
2135 | m->middle = NULL; | 2121 | m->middle = NULL; |
2136 | 2122 | ||
2137 | /* data */ | 2123 | /* data */ |
2138 | m->nr_pages = calc_pages_for(page_off, page_len); | 2124 | m->nr_pages = 0; |
2139 | m->pages = pages; | 2125 | m->pages = NULL; |
2140 | m->pagelist = NULL; | 2126 | m->pagelist = NULL; |
2141 | 2127 | ||
2142 | dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, | 2128 | dout("ceph_msg_new %p front %d\n", m, front_len); |
2143 | m->nr_pages); | ||
2144 | return m; | 2129 | return m; |
2145 | 2130 | ||
2146 | out2: | 2131 | out2: |
2147 | ceph_msg_put(m); | 2132 | ceph_msg_put(m); |
2148 | out: | 2133 | out: |
2149 | pr_err("msg_new can't create type %d len %d\n", type, front_len); | 2134 | pr_err("msg_new can't create type %d front %d\n", type, front_len); |
2150 | return ERR_PTR(-ENOMEM); | 2135 | return NULL; |
2151 | } | 2136 | } |
2152 | 2137 | ||
2153 | /* | 2138 | /* |
@@ -2190,29 +2175,25 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, | |||
2190 | mutex_unlock(&con->mutex); | 2175 | mutex_unlock(&con->mutex); |
2191 | msg = con->ops->alloc_msg(con, hdr, skip); | 2176 | msg = con->ops->alloc_msg(con, hdr, skip); |
2192 | mutex_lock(&con->mutex); | 2177 | mutex_lock(&con->mutex); |
2193 | if (IS_ERR(msg)) | 2178 | if (!msg || *skip) |
2194 | return msg; | ||
2195 | |||
2196 | if (*skip) | ||
2197 | return NULL; | 2179 | return NULL; |
2198 | } | 2180 | } |
2199 | if (!msg) { | 2181 | if (!msg) { |
2200 | *skip = 0; | 2182 | *skip = 0; |
2201 | msg = ceph_msg_new(type, front_len, 0, 0, NULL); | 2183 | msg = ceph_msg_new(type, front_len, GFP_NOFS); |
2202 | if (!msg) { | 2184 | if (!msg) { |
2203 | pr_err("unable to allocate msg type %d len %d\n", | 2185 | pr_err("unable to allocate msg type %d len %d\n", |
2204 | type, front_len); | 2186 | type, front_len); |
2205 | return ERR_PTR(-ENOMEM); | 2187 | return NULL; |
2206 | } | 2188 | } |
2207 | } | 2189 | } |
2208 | memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); | 2190 | memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); |
2209 | 2191 | ||
2210 | if (middle_len) { | 2192 | if (middle_len && !msg->middle) { |
2211 | ret = ceph_alloc_middle(con, msg); | 2193 | ret = ceph_alloc_middle(con, msg); |
2212 | |||
2213 | if (ret < 0) { | 2194 | if (ret < 0) { |
2214 | ceph_msg_put(msg); | 2195 | ceph_msg_put(msg); |
2215 | return msg; | 2196 | return NULL; |
2216 | } | 2197 | } |
2217 | } | 2198 | } |
2218 | 2199 | ||