diff options
author | Jens Axboe <jaxboe@fusionio.com> | 2010-06-01 06:42:12 -0400 |
---|---|---|
committer | Jens Axboe <jaxboe@fusionio.com> | 2010-06-01 06:42:12 -0400 |
commit | b4ca761577535b2b4d153689ee97342797dfff05 (patch) | |
tree | 29054d55508f1faa22ec32acf7c245751af03348 /fs/ceph/messenger.c | |
parent | 28f4197e5d4707311febeec8a0eb97cb5fd93c97 (diff) | |
parent | 67a3e12b05e055c0415c556a315a3d3eb637e29e (diff) |
Merge branch 'master' into for-linus
Conflicts:
fs/pipe.c
Signed-off-by: Jens Axboe <jaxboe@fusionio.com>
Diffstat (limited to 'fs/ceph/messenger.c')
-rw-r--r-- | fs/ceph/messenger.c | 97 |
1 files changed, 42 insertions, 55 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index cd4fadb6491a..64b8b1f7863d 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c | |||
@@ -39,18 +39,6 @@ static void queue_con(struct ceph_connection *con); | |||
39 | static void con_work(struct work_struct *); | 39 | static void con_work(struct work_struct *); |
40 | static void ceph_fault(struct ceph_connection *con); | 40 | static void ceph_fault(struct ceph_connection *con); |
41 | 41 | ||
42 | const char *ceph_name_type_str(int t) | ||
43 | { | ||
44 | switch (t) { | ||
45 | case CEPH_ENTITY_TYPE_MON: return "mon"; | ||
46 | case CEPH_ENTITY_TYPE_MDS: return "mds"; | ||
47 | case CEPH_ENTITY_TYPE_OSD: return "osd"; | ||
48 | case CEPH_ENTITY_TYPE_CLIENT: return "client"; | ||
49 | case CEPH_ENTITY_TYPE_ADMIN: return "admin"; | ||
50 | default: return "???"; | ||
51 | } | ||
52 | } | ||
53 | |||
54 | /* | 42 | /* |
55 | * nicely render a sockaddr as a string. | 43 | * nicely render a sockaddr as a string. |
56 | */ | 44 | */ |
@@ -132,6 +120,12 @@ void ceph_msgr_exit(void) | |||
132 | destroy_workqueue(ceph_msgr_wq); | 120 | destroy_workqueue(ceph_msgr_wq); |
133 | } | 121 | } |
134 | 122 | ||
123 | void ceph_msgr_flush() | ||
124 | { | ||
125 | flush_workqueue(ceph_msgr_wq); | ||
126 | } | ||
127 | |||
128 | |||
135 | /* | 129 | /* |
136 | * socket callback functions | 130 | * socket callback functions |
137 | */ | 131 | */ |
@@ -340,6 +334,7 @@ static void reset_connection(struct ceph_connection *con) | |||
340 | ceph_msg_put(con->out_msg); | 334 | ceph_msg_put(con->out_msg); |
341 | con->out_msg = NULL; | 335 | con->out_msg = NULL; |
342 | } | 336 | } |
337 | con->out_keepalive_pending = false; | ||
343 | con->in_seq = 0; | 338 | con->in_seq = 0; |
344 | con->in_seq_acked = 0; | 339 | con->in_seq_acked = 0; |
345 | } | 340 | } |
@@ -357,6 +352,7 @@ void ceph_con_close(struct ceph_connection *con) | |||
357 | clear_bit(WRITE_PENDING, &con->state); | 352 | clear_bit(WRITE_PENDING, &con->state); |
358 | mutex_lock(&con->mutex); | 353 | mutex_lock(&con->mutex); |
359 | reset_connection(con); | 354 | reset_connection(con); |
355 | con->peer_global_seq = 0; | ||
360 | cancel_delayed_work(&con->work); | 356 | cancel_delayed_work(&con->work); |
361 | mutex_unlock(&con->mutex); | 357 | mutex_unlock(&con->mutex); |
362 | queue_con(con); | 358 | queue_con(con); |
@@ -661,7 +657,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr, | |||
661 | dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, | 657 | dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, |
662 | con->connect_seq, global_seq, proto); | 658 | con->connect_seq, global_seq, proto); |
663 | 659 | ||
664 | con->out_connect.features = CEPH_FEATURE_SUPPORTED; | 660 | con->out_connect.features = CEPH_FEATURE_SUPPORTED_CLIENT; |
665 | con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); | 661 | con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); |
666 | con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); | 662 | con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); |
667 | con->out_connect.global_seq = cpu_to_le32(global_seq); | 663 | con->out_connect.global_seq = cpu_to_le32(global_seq); |
@@ -1124,8 +1120,8 @@ static void fail_protocol(struct ceph_connection *con) | |||
1124 | 1120 | ||
1125 | static int process_connect(struct ceph_connection *con) | 1121 | static int process_connect(struct ceph_connection *con) |
1126 | { | 1122 | { |
1127 | u64 sup_feat = CEPH_FEATURE_SUPPORTED; | 1123 | u64 sup_feat = CEPH_FEATURE_SUPPORTED_CLIENT; |
1128 | u64 req_feat = CEPH_FEATURE_REQUIRED; | 1124 | u64 req_feat = CEPH_FEATURE_REQUIRED_CLIENT; |
1129 | u64 server_feat = le64_to_cpu(con->in_reply.features); | 1125 | u64 server_feat = le64_to_cpu(con->in_reply.features); |
1130 | 1126 | ||
1131 | dout("process_connect on %p tag %d\n", con, (int)con->in_tag); | 1127 | dout("process_connect on %p tag %d\n", con, (int)con->in_tag); |
@@ -1233,6 +1229,7 @@ static int process_connect(struct ceph_connection *con) | |||
1233 | clear_bit(CONNECTING, &con->state); | 1229 | clear_bit(CONNECTING, &con->state); |
1234 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); | 1230 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); |
1235 | con->connect_seq++; | 1231 | con->connect_seq++; |
1232 | con->peer_features = server_feat; | ||
1236 | dout("process_connect got READY gseq %d cseq %d (%d)\n", | 1233 | dout("process_connect got READY gseq %d cseq %d (%d)\n", |
1237 | con->peer_global_seq, | 1234 | con->peer_global_seq, |
1238 | le32_to_cpu(con->in_reply.connect_seq), | 1235 | le32_to_cpu(con->in_reply.connect_seq), |
@@ -1402,19 +1399,17 @@ static int read_partial_message(struct ceph_connection *con) | |||
1402 | con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); | 1399 | con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); |
1403 | if (skip) { | 1400 | if (skip) { |
1404 | /* skip this message */ | 1401 | /* skip this message */ |
1405 | dout("alloc_msg returned NULL, skipping message\n"); | 1402 | dout("alloc_msg said skip message\n"); |
1406 | con->in_base_pos = -front_len - middle_len - data_len - | 1403 | con->in_base_pos = -front_len - middle_len - data_len - |
1407 | sizeof(m->footer); | 1404 | sizeof(m->footer); |
1408 | con->in_tag = CEPH_MSGR_TAG_READY; | 1405 | con->in_tag = CEPH_MSGR_TAG_READY; |
1409 | con->in_seq++; | 1406 | con->in_seq++; |
1410 | return 0; | 1407 | return 0; |
1411 | } | 1408 | } |
1412 | if (IS_ERR(con->in_msg)) { | 1409 | if (!con->in_msg) { |
1413 | ret = PTR_ERR(con->in_msg); | ||
1414 | con->in_msg = NULL; | ||
1415 | con->error_msg = | 1410 | con->error_msg = |
1416 | "error allocating memory for incoming message"; | 1411 | "error allocating memory for incoming message"; |
1417 | return ret; | 1412 | return -ENOMEM; |
1418 | } | 1413 | } |
1419 | m = con->in_msg; | 1414 | m = con->in_msg; |
1420 | m->front.iov_len = 0; /* haven't read it yet */ | 1415 | m->front.iov_len = 0; /* haven't read it yet */ |
@@ -1514,14 +1509,14 @@ static void process_message(struct ceph_connection *con) | |||
1514 | 1509 | ||
1515 | /* if first message, set peer_name */ | 1510 | /* if first message, set peer_name */ |
1516 | if (con->peer_name.type == 0) | 1511 | if (con->peer_name.type == 0) |
1517 | con->peer_name = msg->hdr.src.name; | 1512 | con->peer_name = msg->hdr.src; |
1518 | 1513 | ||
1519 | con->in_seq++; | 1514 | con->in_seq++; |
1520 | mutex_unlock(&con->mutex); | 1515 | mutex_unlock(&con->mutex); |
1521 | 1516 | ||
1522 | dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", | 1517 | dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", |
1523 | msg, le64_to_cpu(msg->hdr.seq), | 1518 | msg, le64_to_cpu(msg->hdr.seq), |
1524 | ENTITY_NAME(msg->hdr.src.name), | 1519 | ENTITY_NAME(msg->hdr.src), |
1525 | le16_to_cpu(msg->hdr.type), | 1520 | le16_to_cpu(msg->hdr.type), |
1526 | ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), | 1521 | ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), |
1527 | le32_to_cpu(msg->hdr.front_len), | 1522 | le32_to_cpu(msg->hdr.front_len), |
@@ -1546,7 +1541,6 @@ static int try_write(struct ceph_connection *con) | |||
1546 | dout("try_write start %p state %lu nref %d\n", con, con->state, | 1541 | dout("try_write start %p state %lu nref %d\n", con, con->state, |
1547 | atomic_read(&con->nref)); | 1542 | atomic_read(&con->nref)); |
1548 | 1543 | ||
1549 | mutex_lock(&con->mutex); | ||
1550 | more: | 1544 | more: |
1551 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); | 1545 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); |
1552 | 1546 | ||
@@ -1639,7 +1633,6 @@ do_next: | |||
1639 | done: | 1633 | done: |
1640 | ret = 0; | 1634 | ret = 0; |
1641 | out: | 1635 | out: |
1642 | mutex_unlock(&con->mutex); | ||
1643 | dout("try_write done on %p\n", con); | 1636 | dout("try_write done on %p\n", con); |
1644 | return ret; | 1637 | return ret; |
1645 | } | 1638 | } |
@@ -1651,7 +1644,6 @@ out: | |||
1651 | */ | 1644 | */ |
1652 | static int try_read(struct ceph_connection *con) | 1645 | static int try_read(struct ceph_connection *con) |
1653 | { | 1646 | { |
1654 | struct ceph_messenger *msgr; | ||
1655 | int ret = -1; | 1647 | int ret = -1; |
1656 | 1648 | ||
1657 | if (!con->sock) | 1649 | if (!con->sock) |
@@ -1661,9 +1653,6 @@ static int try_read(struct ceph_connection *con) | |||
1661 | return 0; | 1653 | return 0; |
1662 | 1654 | ||
1663 | dout("try_read start on %p\n", con); | 1655 | dout("try_read start on %p\n", con); |
1664 | msgr = con->msgr; | ||
1665 | |||
1666 | mutex_lock(&con->mutex); | ||
1667 | 1656 | ||
1668 | more: | 1657 | more: |
1669 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, | 1658 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, |
@@ -1758,7 +1747,6 @@ more: | |||
1758 | done: | 1747 | done: |
1759 | ret = 0; | 1748 | ret = 0; |
1760 | out: | 1749 | out: |
1761 | mutex_unlock(&con->mutex); | ||
1762 | dout("try_read done on %p\n", con); | 1750 | dout("try_read done on %p\n", con); |
1763 | return ret; | 1751 | return ret; |
1764 | 1752 | ||
@@ -1830,6 +1818,8 @@ more: | |||
1830 | dout("con_work %p start, clearing QUEUED\n", con); | 1818 | dout("con_work %p start, clearing QUEUED\n", con); |
1831 | clear_bit(QUEUED, &con->state); | 1819 | clear_bit(QUEUED, &con->state); |
1832 | 1820 | ||
1821 | mutex_lock(&con->mutex); | ||
1822 | |||
1833 | if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ | 1823 | if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ |
1834 | dout("con_work CLOSED\n"); | 1824 | dout("con_work CLOSED\n"); |
1835 | con_close_socket(con); | 1825 | con_close_socket(con); |
@@ -1844,11 +1834,16 @@ more: | |||
1844 | if (test_and_clear_bit(SOCK_CLOSED, &con->state) || | 1834 | if (test_and_clear_bit(SOCK_CLOSED, &con->state) || |
1845 | try_read(con) < 0 || | 1835 | try_read(con) < 0 || |
1846 | try_write(con) < 0) { | 1836 | try_write(con) < 0) { |
1837 | mutex_unlock(&con->mutex); | ||
1847 | backoff = 1; | 1838 | backoff = 1; |
1848 | ceph_fault(con); /* error/fault path */ | 1839 | ceph_fault(con); /* error/fault path */ |
1840 | goto done_unlocked; | ||
1849 | } | 1841 | } |
1850 | 1842 | ||
1851 | done: | 1843 | done: |
1844 | mutex_unlock(&con->mutex); | ||
1845 | |||
1846 | done_unlocked: | ||
1852 | clear_bit(BUSY, &con->state); | 1847 | clear_bit(BUSY, &con->state); |
1853 | dout("con->state=%lu\n", con->state); | 1848 | dout("con->state=%lu\n", con->state); |
1854 | if (test_bit(QUEUED, &con->state)) { | 1849 | if (test_bit(QUEUED, &con->state)) { |
@@ -1947,7 +1942,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) | |||
1947 | 1942 | ||
1948 | /* the zero page is needed if a request is "canceled" while the message | 1943 | /* the zero page is needed if a request is "canceled" while the message |
1949 | * is being written over the socket */ | 1944 | * is being written over the socket */ |
1950 | msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO); | 1945 | msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO); |
1951 | if (!msgr->zero_page) { | 1946 | if (!msgr->zero_page) { |
1952 | kfree(msgr); | 1947 | kfree(msgr); |
1953 | return ERR_PTR(-ENOMEM); | 1948 | return ERR_PTR(-ENOMEM); |
@@ -1987,9 +1982,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | |||
1987 | } | 1982 | } |
1988 | 1983 | ||
1989 | /* set src+dst */ | 1984 | /* set src+dst */ |
1990 | msg->hdr.src.name = con->msgr->inst.name; | 1985 | msg->hdr.src = con->msgr->inst.name; |
1991 | msg->hdr.src.addr = con->msgr->my_enc_addr; | ||
1992 | msg->hdr.orig_src = msg->hdr.src; | ||
1993 | 1986 | ||
1994 | BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); | 1987 | BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); |
1995 | 1988 | ||
@@ -2083,12 +2076,11 @@ void ceph_con_keepalive(struct ceph_connection *con) | |||
2083 | * construct a new message with given type, size | 2076 | * construct a new message with given type, size |
2084 | * the new msg has a ref count of 1. | 2077 | * the new msg has a ref count of 1. |
2085 | */ | 2078 | */ |
2086 | struct ceph_msg *ceph_msg_new(int type, int front_len, | 2079 | struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) |
2087 | int page_len, int page_off, struct page **pages) | ||
2088 | { | 2080 | { |
2089 | struct ceph_msg *m; | 2081 | struct ceph_msg *m; |
2090 | 2082 | ||
2091 | m = kmalloc(sizeof(*m), GFP_NOFS); | 2083 | m = kmalloc(sizeof(*m), flags); |
2092 | if (m == NULL) | 2084 | if (m == NULL) |
2093 | goto out; | 2085 | goto out; |
2094 | kref_init(&m->kref); | 2086 | kref_init(&m->kref); |
@@ -2100,8 +2092,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, | |||
2100 | m->hdr.version = 0; | 2092 | m->hdr.version = 0; |
2101 | m->hdr.front_len = cpu_to_le32(front_len); | 2093 | m->hdr.front_len = cpu_to_le32(front_len); |
2102 | m->hdr.middle_len = 0; | 2094 | m->hdr.middle_len = 0; |
2103 | m->hdr.data_len = cpu_to_le32(page_len); | 2095 | m->hdr.data_len = 0; |
2104 | m->hdr.data_off = cpu_to_le16(page_off); | 2096 | m->hdr.data_off = 0; |
2105 | m->hdr.reserved = 0; | 2097 | m->hdr.reserved = 0; |
2106 | m->footer.front_crc = 0; | 2098 | m->footer.front_crc = 0; |
2107 | m->footer.middle_crc = 0; | 2099 | m->footer.middle_crc = 0; |
@@ -2115,11 +2107,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, | |||
2115 | /* front */ | 2107 | /* front */ |
2116 | if (front_len) { | 2108 | if (front_len) { |
2117 | if (front_len > PAGE_CACHE_SIZE) { | 2109 | if (front_len > PAGE_CACHE_SIZE) { |
2118 | m->front.iov_base = __vmalloc(front_len, GFP_NOFS, | 2110 | m->front.iov_base = __vmalloc(front_len, flags, |
2119 | PAGE_KERNEL); | 2111 | PAGE_KERNEL); |
2120 | m->front_is_vmalloc = true; | 2112 | m->front_is_vmalloc = true; |
2121 | } else { | 2113 | } else { |
2122 | m->front.iov_base = kmalloc(front_len, GFP_NOFS); | 2114 | m->front.iov_base = kmalloc(front_len, flags); |
2123 | } | 2115 | } |
2124 | if (m->front.iov_base == NULL) { | 2116 | if (m->front.iov_base == NULL) { |
2125 | pr_err("msg_new can't allocate %d bytes\n", | 2117 | pr_err("msg_new can't allocate %d bytes\n", |
@@ -2135,19 +2127,18 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, | |||
2135 | m->middle = NULL; | 2127 | m->middle = NULL; |
2136 | 2128 | ||
2137 | /* data */ | 2129 | /* data */ |
2138 | m->nr_pages = calc_pages_for(page_off, page_len); | 2130 | m->nr_pages = 0; |
2139 | m->pages = pages; | 2131 | m->pages = NULL; |
2140 | m->pagelist = NULL; | 2132 | m->pagelist = NULL; |
2141 | 2133 | ||
2142 | dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, | 2134 | dout("ceph_msg_new %p front %d\n", m, front_len); |
2143 | m->nr_pages); | ||
2144 | return m; | 2135 | return m; |
2145 | 2136 | ||
2146 | out2: | 2137 | out2: |
2147 | ceph_msg_put(m); | 2138 | ceph_msg_put(m); |
2148 | out: | 2139 | out: |
2149 | pr_err("msg_new can't create type %d len %d\n", type, front_len); | 2140 | pr_err("msg_new can't create type %d front %d\n", type, front_len); |
2150 | return ERR_PTR(-ENOMEM); | 2141 | return NULL; |
2151 | } | 2142 | } |
2152 | 2143 | ||
2153 | /* | 2144 | /* |
@@ -2190,29 +2181,25 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, | |||
2190 | mutex_unlock(&con->mutex); | 2181 | mutex_unlock(&con->mutex); |
2191 | msg = con->ops->alloc_msg(con, hdr, skip); | 2182 | msg = con->ops->alloc_msg(con, hdr, skip); |
2192 | mutex_lock(&con->mutex); | 2183 | mutex_lock(&con->mutex); |
2193 | if (IS_ERR(msg)) | 2184 | if (!msg || *skip) |
2194 | return msg; | ||
2195 | |||
2196 | if (*skip) | ||
2197 | return NULL; | 2185 | return NULL; |
2198 | } | 2186 | } |
2199 | if (!msg) { | 2187 | if (!msg) { |
2200 | *skip = 0; | 2188 | *skip = 0; |
2201 | msg = ceph_msg_new(type, front_len, 0, 0, NULL); | 2189 | msg = ceph_msg_new(type, front_len, GFP_NOFS); |
2202 | if (!msg) { | 2190 | if (!msg) { |
2203 | pr_err("unable to allocate msg type %d len %d\n", | 2191 | pr_err("unable to allocate msg type %d len %d\n", |
2204 | type, front_len); | 2192 | type, front_len); |
2205 | return ERR_PTR(-ENOMEM); | 2193 | return NULL; |
2206 | } | 2194 | } |
2207 | } | 2195 | } |
2208 | memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); | 2196 | memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); |
2209 | 2197 | ||
2210 | if (middle_len) { | 2198 | if (middle_len && !msg->middle) { |
2211 | ret = ceph_alloc_middle(con, msg); | 2199 | ret = ceph_alloc_middle(con, msg); |
2212 | |||
2213 | if (ret < 0) { | 2200 | if (ret < 0) { |
2214 | ceph_msg_put(msg); | 2201 | ceph_msg_put(msg); |
2215 | return msg; | 2202 | return NULL; |
2216 | } | 2203 | } |
2217 | } | 2204 | } |
2218 | 2205 | ||