aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/messenger.c
diff options
context:
space:
mode:
authorJens Axboe <jaxboe@fusionio.com>2010-06-01 06:42:12 -0400
committerJens Axboe <jaxboe@fusionio.com>2010-06-01 06:42:12 -0400
commitb4ca761577535b2b4d153689ee97342797dfff05 (patch)
tree29054d55508f1faa22ec32acf7c245751af03348 /fs/ceph/messenger.c
parent28f4197e5d4707311febeec8a0eb97cb5fd93c97 (diff)
parent67a3e12b05e055c0415c556a315a3d3eb637e29e (diff)
Merge branch 'master' into for-linus
Conflicts: fs/pipe.c Signed-off-by: Jens Axboe <jaxboe@fusionio.com>
Diffstat (limited to 'fs/ceph/messenger.c')
-rw-r--r--fs/ceph/messenger.c97
1 files changed, 42 insertions, 55 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index cd4fadb6491a..64b8b1f7863d 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -39,18 +39,6 @@ static void queue_con(struct ceph_connection *con);
39static void con_work(struct work_struct *); 39static void con_work(struct work_struct *);
40static void ceph_fault(struct ceph_connection *con); 40static void ceph_fault(struct ceph_connection *con);
41 41
42const char *ceph_name_type_str(int t)
43{
44 switch (t) {
45 case CEPH_ENTITY_TYPE_MON: return "mon";
46 case CEPH_ENTITY_TYPE_MDS: return "mds";
47 case CEPH_ENTITY_TYPE_OSD: return "osd";
48 case CEPH_ENTITY_TYPE_CLIENT: return "client";
49 case CEPH_ENTITY_TYPE_ADMIN: return "admin";
50 default: return "???";
51 }
52}
53
54/* 42/*
55 * nicely render a sockaddr as a string. 43 * nicely render a sockaddr as a string.
56 */ 44 */
@@ -132,6 +120,12 @@ void ceph_msgr_exit(void)
132 destroy_workqueue(ceph_msgr_wq); 120 destroy_workqueue(ceph_msgr_wq);
133} 121}
134 122
123void ceph_msgr_flush()
124{
125 flush_workqueue(ceph_msgr_wq);
126}
127
128
135/* 129/*
136 * socket callback functions 130 * socket callback functions
137 */ 131 */
@@ -340,6 +334,7 @@ static void reset_connection(struct ceph_connection *con)
340 ceph_msg_put(con->out_msg); 334 ceph_msg_put(con->out_msg);
341 con->out_msg = NULL; 335 con->out_msg = NULL;
342 } 336 }
337 con->out_keepalive_pending = false;
343 con->in_seq = 0; 338 con->in_seq = 0;
344 con->in_seq_acked = 0; 339 con->in_seq_acked = 0;
345} 340}
@@ -357,6 +352,7 @@ void ceph_con_close(struct ceph_connection *con)
357 clear_bit(WRITE_PENDING, &con->state); 352 clear_bit(WRITE_PENDING, &con->state);
358 mutex_lock(&con->mutex); 353 mutex_lock(&con->mutex);
359 reset_connection(con); 354 reset_connection(con);
355 con->peer_global_seq = 0;
360 cancel_delayed_work(&con->work); 356 cancel_delayed_work(&con->work);
361 mutex_unlock(&con->mutex); 357 mutex_unlock(&con->mutex);
362 queue_con(con); 358 queue_con(con);
@@ -661,7 +657,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
661 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 657 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
662 con->connect_seq, global_seq, proto); 658 con->connect_seq, global_seq, proto);
663 659
664 con->out_connect.features = CEPH_FEATURE_SUPPORTED; 660 con->out_connect.features = CEPH_FEATURE_SUPPORTED_CLIENT;
665 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 661 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
666 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 662 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
667 con->out_connect.global_seq = cpu_to_le32(global_seq); 663 con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -1124,8 +1120,8 @@ static void fail_protocol(struct ceph_connection *con)
1124 1120
1125static int process_connect(struct ceph_connection *con) 1121static int process_connect(struct ceph_connection *con)
1126{ 1122{
1127 u64 sup_feat = CEPH_FEATURE_SUPPORTED; 1123 u64 sup_feat = CEPH_FEATURE_SUPPORTED_CLIENT;
1128 u64 req_feat = CEPH_FEATURE_REQUIRED; 1124 u64 req_feat = CEPH_FEATURE_REQUIRED_CLIENT;
1129 u64 server_feat = le64_to_cpu(con->in_reply.features); 1125 u64 server_feat = le64_to_cpu(con->in_reply.features);
1130 1126
1131 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1127 dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -1233,6 +1229,7 @@ static int process_connect(struct ceph_connection *con)
1233 clear_bit(CONNECTING, &con->state); 1229 clear_bit(CONNECTING, &con->state);
1234 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1230 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1235 con->connect_seq++; 1231 con->connect_seq++;
1232 con->peer_features = server_feat;
1236 dout("process_connect got READY gseq %d cseq %d (%d)\n", 1233 dout("process_connect got READY gseq %d cseq %d (%d)\n",
1237 con->peer_global_seq, 1234 con->peer_global_seq,
1238 le32_to_cpu(con->in_reply.connect_seq), 1235 le32_to_cpu(con->in_reply.connect_seq),
@@ -1402,19 +1399,17 @@ static int read_partial_message(struct ceph_connection *con)
1402 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); 1399 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
1403 if (skip) { 1400 if (skip) {
1404 /* skip this message */ 1401 /* skip this message */
1405 dout("alloc_msg returned NULL, skipping message\n"); 1402 dout("alloc_msg said skip message\n");
1406 con->in_base_pos = -front_len - middle_len - data_len - 1403 con->in_base_pos = -front_len - middle_len - data_len -
1407 sizeof(m->footer); 1404 sizeof(m->footer);
1408 con->in_tag = CEPH_MSGR_TAG_READY; 1405 con->in_tag = CEPH_MSGR_TAG_READY;
1409 con->in_seq++; 1406 con->in_seq++;
1410 return 0; 1407 return 0;
1411 } 1408 }
1412 if (IS_ERR(con->in_msg)) { 1409 if (!con->in_msg) {
1413 ret = PTR_ERR(con->in_msg);
1414 con->in_msg = NULL;
1415 con->error_msg = 1410 con->error_msg =
1416 "error allocating memory for incoming message"; 1411 "error allocating memory for incoming message";
1417 return ret; 1412 return -ENOMEM;
1418 } 1413 }
1419 m = con->in_msg; 1414 m = con->in_msg;
1420 m->front.iov_len = 0; /* haven't read it yet */ 1415 m->front.iov_len = 0; /* haven't read it yet */
@@ -1514,14 +1509,14 @@ static void process_message(struct ceph_connection *con)
1514 1509
1515 /* if first message, set peer_name */ 1510 /* if first message, set peer_name */
1516 if (con->peer_name.type == 0) 1511 if (con->peer_name.type == 0)
1517 con->peer_name = msg->hdr.src.name; 1512 con->peer_name = msg->hdr.src;
1518 1513
1519 con->in_seq++; 1514 con->in_seq++;
1520 mutex_unlock(&con->mutex); 1515 mutex_unlock(&con->mutex);
1521 1516
1522 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", 1517 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
1523 msg, le64_to_cpu(msg->hdr.seq), 1518 msg, le64_to_cpu(msg->hdr.seq),
1524 ENTITY_NAME(msg->hdr.src.name), 1519 ENTITY_NAME(msg->hdr.src),
1525 le16_to_cpu(msg->hdr.type), 1520 le16_to_cpu(msg->hdr.type),
1526 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1521 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1527 le32_to_cpu(msg->hdr.front_len), 1522 le32_to_cpu(msg->hdr.front_len),
@@ -1546,7 +1541,6 @@ static int try_write(struct ceph_connection *con)
1546 dout("try_write start %p state %lu nref %d\n", con, con->state, 1541 dout("try_write start %p state %lu nref %d\n", con, con->state,
1547 atomic_read(&con->nref)); 1542 atomic_read(&con->nref));
1548 1543
1549 mutex_lock(&con->mutex);
1550more: 1544more:
1551 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1545 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1552 1546
@@ -1639,7 +1633,6 @@ do_next:
1639done: 1633done:
1640 ret = 0; 1634 ret = 0;
1641out: 1635out:
1642 mutex_unlock(&con->mutex);
1643 dout("try_write done on %p\n", con); 1636 dout("try_write done on %p\n", con);
1644 return ret; 1637 return ret;
1645} 1638}
@@ -1651,7 +1644,6 @@ out:
1651 */ 1644 */
1652static int try_read(struct ceph_connection *con) 1645static int try_read(struct ceph_connection *con)
1653{ 1646{
1654 struct ceph_messenger *msgr;
1655 int ret = -1; 1647 int ret = -1;
1656 1648
1657 if (!con->sock) 1649 if (!con->sock)
@@ -1661,9 +1653,6 @@ static int try_read(struct ceph_connection *con)
1661 return 0; 1653 return 0;
1662 1654
1663 dout("try_read start on %p\n", con); 1655 dout("try_read start on %p\n", con);
1664 msgr = con->msgr;
1665
1666 mutex_lock(&con->mutex);
1667 1656
1668more: 1657more:
1669 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1658 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
@@ -1758,7 +1747,6 @@ more:
1758done: 1747done:
1759 ret = 0; 1748 ret = 0;
1760out: 1749out:
1761 mutex_unlock(&con->mutex);
1762 dout("try_read done on %p\n", con); 1750 dout("try_read done on %p\n", con);
1763 return ret; 1751 return ret;
1764 1752
@@ -1830,6 +1818,8 @@ more:
1830 dout("con_work %p start, clearing QUEUED\n", con); 1818 dout("con_work %p start, clearing QUEUED\n", con);
1831 clear_bit(QUEUED, &con->state); 1819 clear_bit(QUEUED, &con->state);
1832 1820
1821 mutex_lock(&con->mutex);
1822
1833 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 1823 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
1834 dout("con_work CLOSED\n"); 1824 dout("con_work CLOSED\n");
1835 con_close_socket(con); 1825 con_close_socket(con);
@@ -1844,11 +1834,16 @@ more:
1844 if (test_and_clear_bit(SOCK_CLOSED, &con->state) || 1834 if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
1845 try_read(con) < 0 || 1835 try_read(con) < 0 ||
1846 try_write(con) < 0) { 1836 try_write(con) < 0) {
1837 mutex_unlock(&con->mutex);
1847 backoff = 1; 1838 backoff = 1;
1848 ceph_fault(con); /* error/fault path */ 1839 ceph_fault(con); /* error/fault path */
1840 goto done_unlocked;
1849 } 1841 }
1850 1842
1851done: 1843done:
1844 mutex_unlock(&con->mutex);
1845
1846done_unlocked:
1852 clear_bit(BUSY, &con->state); 1847 clear_bit(BUSY, &con->state);
1853 dout("con->state=%lu\n", con->state); 1848 dout("con->state=%lu\n", con->state);
1854 if (test_bit(QUEUED, &con->state)) { 1849 if (test_bit(QUEUED, &con->state)) {
@@ -1947,7 +1942,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
1947 1942
1948 /* the zero page is needed if a request is "canceled" while the message 1943 /* the zero page is needed if a request is "canceled" while the message
1949 * is being written over the socket */ 1944 * is being written over the socket */
1950 msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO); 1945 msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO);
1951 if (!msgr->zero_page) { 1946 if (!msgr->zero_page) {
1952 kfree(msgr); 1947 kfree(msgr);
1953 return ERR_PTR(-ENOMEM); 1948 return ERR_PTR(-ENOMEM);
@@ -1987,9 +1982,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
1987 } 1982 }
1988 1983
1989 /* set src+dst */ 1984 /* set src+dst */
1990 msg->hdr.src.name = con->msgr->inst.name; 1985 msg->hdr.src = con->msgr->inst.name;
1991 msg->hdr.src.addr = con->msgr->my_enc_addr;
1992 msg->hdr.orig_src = msg->hdr.src;
1993 1986
1994 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 1987 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
1995 1988
@@ -2083,12 +2076,11 @@ void ceph_con_keepalive(struct ceph_connection *con)
2083 * construct a new message with given type, size 2076 * construct a new message with given type, size
2084 * the new msg has a ref count of 1. 2077 * the new msg has a ref count of 1.
2085 */ 2078 */
2086struct ceph_msg *ceph_msg_new(int type, int front_len, 2079struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
2087 int page_len, int page_off, struct page **pages)
2088{ 2080{
2089 struct ceph_msg *m; 2081 struct ceph_msg *m;
2090 2082
2091 m = kmalloc(sizeof(*m), GFP_NOFS); 2083 m = kmalloc(sizeof(*m), flags);
2092 if (m == NULL) 2084 if (m == NULL)
2093 goto out; 2085 goto out;
2094 kref_init(&m->kref); 2086 kref_init(&m->kref);
@@ -2100,8 +2092,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
2100 m->hdr.version = 0; 2092 m->hdr.version = 0;
2101 m->hdr.front_len = cpu_to_le32(front_len); 2093 m->hdr.front_len = cpu_to_le32(front_len);
2102 m->hdr.middle_len = 0; 2094 m->hdr.middle_len = 0;
2103 m->hdr.data_len = cpu_to_le32(page_len); 2095 m->hdr.data_len = 0;
2104 m->hdr.data_off = cpu_to_le16(page_off); 2096 m->hdr.data_off = 0;
2105 m->hdr.reserved = 0; 2097 m->hdr.reserved = 0;
2106 m->footer.front_crc = 0; 2098 m->footer.front_crc = 0;
2107 m->footer.middle_crc = 0; 2099 m->footer.middle_crc = 0;
@@ -2115,11 +2107,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
2115 /* front */ 2107 /* front */
2116 if (front_len) { 2108 if (front_len) {
2117 if (front_len > PAGE_CACHE_SIZE) { 2109 if (front_len > PAGE_CACHE_SIZE) {
2118 m->front.iov_base = __vmalloc(front_len, GFP_NOFS, 2110 m->front.iov_base = __vmalloc(front_len, flags,
2119 PAGE_KERNEL); 2111 PAGE_KERNEL);
2120 m->front_is_vmalloc = true; 2112 m->front_is_vmalloc = true;
2121 } else { 2113 } else {
2122 m->front.iov_base = kmalloc(front_len, GFP_NOFS); 2114 m->front.iov_base = kmalloc(front_len, flags);
2123 } 2115 }
2124 if (m->front.iov_base == NULL) { 2116 if (m->front.iov_base == NULL) {
2125 pr_err("msg_new can't allocate %d bytes\n", 2117 pr_err("msg_new can't allocate %d bytes\n",
@@ -2135,19 +2127,18 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
2135 m->middle = NULL; 2127 m->middle = NULL;
2136 2128
2137 /* data */ 2129 /* data */
2138 m->nr_pages = calc_pages_for(page_off, page_len); 2130 m->nr_pages = 0;
2139 m->pages = pages; 2131 m->pages = NULL;
2140 m->pagelist = NULL; 2132 m->pagelist = NULL;
2141 2133
2142 dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, 2134 dout("ceph_msg_new %p front %d\n", m, front_len);
2143 m->nr_pages);
2144 return m; 2135 return m;
2145 2136
2146out2: 2137out2:
2147 ceph_msg_put(m); 2138 ceph_msg_put(m);
2148out: 2139out:
2149 pr_err("msg_new can't create type %d len %d\n", type, front_len); 2140 pr_err("msg_new can't create type %d front %d\n", type, front_len);
2150 return ERR_PTR(-ENOMEM); 2141 return NULL;
2151} 2142}
2152 2143
2153/* 2144/*
@@ -2190,29 +2181,25 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2190 mutex_unlock(&con->mutex); 2181 mutex_unlock(&con->mutex);
2191 msg = con->ops->alloc_msg(con, hdr, skip); 2182 msg = con->ops->alloc_msg(con, hdr, skip);
2192 mutex_lock(&con->mutex); 2183 mutex_lock(&con->mutex);
2193 if (IS_ERR(msg)) 2184 if (!msg || *skip)
2194 return msg;
2195
2196 if (*skip)
2197 return NULL; 2185 return NULL;
2198 } 2186 }
2199 if (!msg) { 2187 if (!msg) {
2200 *skip = 0; 2188 *skip = 0;
2201 msg = ceph_msg_new(type, front_len, 0, 0, NULL); 2189 msg = ceph_msg_new(type, front_len, GFP_NOFS);
2202 if (!msg) { 2190 if (!msg) {
2203 pr_err("unable to allocate msg type %d len %d\n", 2191 pr_err("unable to allocate msg type %d len %d\n",
2204 type, front_len); 2192 type, front_len);
2205 return ERR_PTR(-ENOMEM); 2193 return NULL;
2206 } 2194 }
2207 } 2195 }
2208 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2196 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2209 2197
2210 if (middle_len) { 2198 if (middle_len && !msg->middle) {
2211 ret = ceph_alloc_middle(con, msg); 2199 ret = ceph_alloc_middle(con, msg);
2212
2213 if (ret < 0) { 2200 if (ret < 0) {
2214 ceph_msg_put(msg); 2201 ceph_msg_put(msg);
2215 return msg; 2202 return NULL;
2216 } 2203 }
2217 } 2204 }
2218 2205