aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/messenger.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/messenger.c')
-rw-r--r--fs/ceph/messenger.c108
1 files changed, 51 insertions, 57 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 509f57d9ccb3..60b74839ebec 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -39,18 +39,6 @@ static void queue_con(struct ceph_connection *con);
39static void con_work(struct work_struct *); 39static void con_work(struct work_struct *);
40static void ceph_fault(struct ceph_connection *con); 40static void ceph_fault(struct ceph_connection *con);
41 41
42const char *ceph_name_type_str(int t)
43{
44 switch (t) {
45 case CEPH_ENTITY_TYPE_MON: return "mon";
46 case CEPH_ENTITY_TYPE_MDS: return "mds";
47 case CEPH_ENTITY_TYPE_OSD: return "osd";
48 case CEPH_ENTITY_TYPE_CLIENT: return "client";
49 case CEPH_ENTITY_TYPE_ADMIN: return "admin";
50 default: return "???";
51 }
52}
53
54/* 42/*
55 * nicely render a sockaddr as a string. 43 * nicely render a sockaddr as a string.
56 */ 44 */
@@ -340,6 +328,7 @@ static void reset_connection(struct ceph_connection *con)
340 ceph_msg_put(con->out_msg); 328 ceph_msg_put(con->out_msg);
341 con->out_msg = NULL; 329 con->out_msg = NULL;
342 } 330 }
331 con->out_keepalive_pending = false;
343 con->in_seq = 0; 332 con->in_seq = 0;
344 con->in_seq_acked = 0; 333 con->in_seq_acked = 0;
345} 334}
@@ -357,6 +346,7 @@ void ceph_con_close(struct ceph_connection *con)
357 clear_bit(WRITE_PENDING, &con->state); 346 clear_bit(WRITE_PENDING, &con->state);
358 mutex_lock(&con->mutex); 347 mutex_lock(&con->mutex);
359 reset_connection(con); 348 reset_connection(con);
349 con->peer_global_seq = 0;
360 cancel_delayed_work(&con->work); 350 cancel_delayed_work(&con->work);
361 mutex_unlock(&con->mutex); 351 mutex_unlock(&con->mutex);
362 queue_con(con); 352 queue_con(con);
@@ -492,7 +482,14 @@ static void prepare_write_message(struct ceph_connection *con)
492 list_move_tail(&m->list_head, &con->out_sent); 482 list_move_tail(&m->list_head, &con->out_sent);
493 } 483 }
494 484
495 m->hdr.seq = cpu_to_le64(++con->out_seq); 485 /*
486 * only assign outgoing seq # if we haven't sent this message
487 * yet. if it is requeued, resend with it's original seq.
488 */
489 if (m->needs_out_seq) {
490 m->hdr.seq = cpu_to_le64(++con->out_seq);
491 m->needs_out_seq = false;
492 }
496 493
497 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", 494 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
498 m, con->out_seq, le16_to_cpu(m->hdr.type), 495 m, con->out_seq, le16_to_cpu(m->hdr.type),
@@ -654,7 +651,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
654 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 651 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
655 con->connect_seq, global_seq, proto); 652 con->connect_seq, global_seq, proto);
656 653
657 con->out_connect.features = CEPH_FEATURE_SUPPORTED; 654 con->out_connect.features = CEPH_FEATURE_SUPPORTED_CLIENT;
658 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 655 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
659 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 656 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
660 con->out_connect.global_seq = cpu_to_le32(global_seq); 657 con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -1117,8 +1114,8 @@ static void fail_protocol(struct ceph_connection *con)
1117 1114
1118static int process_connect(struct ceph_connection *con) 1115static int process_connect(struct ceph_connection *con)
1119{ 1116{
1120 u64 sup_feat = CEPH_FEATURE_SUPPORTED; 1117 u64 sup_feat = CEPH_FEATURE_SUPPORTED_CLIENT;
1121 u64 req_feat = CEPH_FEATURE_REQUIRED; 1118 u64 req_feat = CEPH_FEATURE_REQUIRED_CLIENT;
1122 u64 server_feat = le64_to_cpu(con->in_reply.features); 1119 u64 server_feat = le64_to_cpu(con->in_reply.features);
1123 1120
1124 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1121 dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -1226,6 +1223,7 @@ static int process_connect(struct ceph_connection *con)
1226 clear_bit(CONNECTING, &con->state); 1223 clear_bit(CONNECTING, &con->state);
1227 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1224 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1228 con->connect_seq++; 1225 con->connect_seq++;
1226 con->peer_features = server_feat;
1229 dout("process_connect got READY gseq %d cseq %d (%d)\n", 1227 dout("process_connect got READY gseq %d cseq %d (%d)\n",
1230 con->peer_global_seq, 1228 con->peer_global_seq,
1231 le32_to_cpu(con->in_reply.connect_seq), 1229 le32_to_cpu(con->in_reply.connect_seq),
@@ -1395,19 +1393,17 @@ static int read_partial_message(struct ceph_connection *con)
1395 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); 1393 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
1396 if (skip) { 1394 if (skip) {
1397 /* skip this message */ 1395 /* skip this message */
1398 dout("alloc_msg returned NULL, skipping message\n"); 1396 dout("alloc_msg said skip message\n");
1399 con->in_base_pos = -front_len - middle_len - data_len - 1397 con->in_base_pos = -front_len - middle_len - data_len -
1400 sizeof(m->footer); 1398 sizeof(m->footer);
1401 con->in_tag = CEPH_MSGR_TAG_READY; 1399 con->in_tag = CEPH_MSGR_TAG_READY;
1402 con->in_seq++; 1400 con->in_seq++;
1403 return 0; 1401 return 0;
1404 } 1402 }
1405 if (IS_ERR(con->in_msg)) { 1403 if (!con->in_msg) {
1406 ret = PTR_ERR(con->in_msg);
1407 con->in_msg = NULL;
1408 con->error_msg = 1404 con->error_msg =
1409 "error allocating memory for incoming message"; 1405 "error allocating memory for incoming message";
1410 return ret; 1406 return -ENOMEM;
1411 } 1407 }
1412 m = con->in_msg; 1408 m = con->in_msg;
1413 m->front.iov_len = 0; /* haven't read it yet */ 1409 m->front.iov_len = 0; /* haven't read it yet */
@@ -1507,14 +1503,14 @@ static void process_message(struct ceph_connection *con)
1507 1503
1508 /* if first message, set peer_name */ 1504 /* if first message, set peer_name */
1509 if (con->peer_name.type == 0) 1505 if (con->peer_name.type == 0)
1510 con->peer_name = msg->hdr.src.name; 1506 con->peer_name = msg->hdr.src;
1511 1507
1512 con->in_seq++; 1508 con->in_seq++;
1513 mutex_unlock(&con->mutex); 1509 mutex_unlock(&con->mutex);
1514 1510
1515 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", 1511 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
1516 msg, le64_to_cpu(msg->hdr.seq), 1512 msg, le64_to_cpu(msg->hdr.seq),
1517 ENTITY_NAME(msg->hdr.src.name), 1513 ENTITY_NAME(msg->hdr.src),
1518 le16_to_cpu(msg->hdr.type), 1514 le16_to_cpu(msg->hdr.type),
1519 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1515 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1520 le32_to_cpu(msg->hdr.front_len), 1516 le32_to_cpu(msg->hdr.front_len),
@@ -1539,7 +1535,6 @@ static int try_write(struct ceph_connection *con)
1539 dout("try_write start %p state %lu nref %d\n", con, con->state, 1535 dout("try_write start %p state %lu nref %d\n", con, con->state,
1540 atomic_read(&con->nref)); 1536 atomic_read(&con->nref));
1541 1537
1542 mutex_lock(&con->mutex);
1543more: 1538more:
1544 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1539 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1545 1540
@@ -1632,7 +1627,6 @@ do_next:
1632done: 1627done:
1633 ret = 0; 1628 ret = 0;
1634out: 1629out:
1635 mutex_unlock(&con->mutex);
1636 dout("try_write done on %p\n", con); 1630 dout("try_write done on %p\n", con);
1637 return ret; 1631 return ret;
1638} 1632}
@@ -1644,7 +1638,6 @@ out:
1644 */ 1638 */
1645static int try_read(struct ceph_connection *con) 1639static int try_read(struct ceph_connection *con)
1646{ 1640{
1647 struct ceph_messenger *msgr;
1648 int ret = -1; 1641 int ret = -1;
1649 1642
1650 if (!con->sock) 1643 if (!con->sock)
@@ -1654,9 +1647,6 @@ static int try_read(struct ceph_connection *con)
1654 return 0; 1647 return 0;
1655 1648
1656 dout("try_read start on %p\n", con); 1649 dout("try_read start on %p\n", con);
1657 msgr = con->msgr;
1658
1659 mutex_lock(&con->mutex);
1660 1650
1661more: 1651more:
1662 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1652 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
@@ -1751,7 +1741,6 @@ more:
1751done: 1741done:
1752 ret = 0; 1742 ret = 0;
1753out: 1743out:
1754 mutex_unlock(&con->mutex);
1755 dout("try_read done on %p\n", con); 1744 dout("try_read done on %p\n", con);
1756 return ret; 1745 return ret;
1757 1746
@@ -1823,6 +1812,8 @@ more:
1823 dout("con_work %p start, clearing QUEUED\n", con); 1812 dout("con_work %p start, clearing QUEUED\n", con);
1824 clear_bit(QUEUED, &con->state); 1813 clear_bit(QUEUED, &con->state);
1825 1814
1815 mutex_lock(&con->mutex);
1816
1826 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 1817 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
1827 dout("con_work CLOSED\n"); 1818 dout("con_work CLOSED\n");
1828 con_close_socket(con); 1819 con_close_socket(con);
@@ -1837,11 +1828,16 @@ more:
1837 if (test_and_clear_bit(SOCK_CLOSED, &con->state) || 1828 if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
1838 try_read(con) < 0 || 1829 try_read(con) < 0 ||
1839 try_write(con) < 0) { 1830 try_write(con) < 0) {
1831 mutex_unlock(&con->mutex);
1840 backoff = 1; 1832 backoff = 1;
1841 ceph_fault(con); /* error/fault path */ 1833 ceph_fault(con); /* error/fault path */
1834 goto done_unlocked;
1842 } 1835 }
1843 1836
1844done: 1837done:
1838 mutex_unlock(&con->mutex);
1839
1840done_unlocked:
1845 clear_bit(BUSY, &con->state); 1841 clear_bit(BUSY, &con->state);
1846 dout("con->state=%lu\n", con->state); 1842 dout("con->state=%lu\n", con->state);
1847 if (test_bit(QUEUED, &con->state)) { 1843 if (test_bit(QUEUED, &con->state)) {
@@ -1940,7 +1936,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
1940 1936
1941 /* the zero page is needed if a request is "canceled" while the message 1937 /* the zero page is needed if a request is "canceled" while the message
1942 * is being written over the socket */ 1938 * is being written over the socket */
1943 msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO); 1939 msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO);
1944 if (!msgr->zero_page) { 1940 if (!msgr->zero_page) {
1945 kfree(msgr); 1941 kfree(msgr);
1946 return ERR_PTR(-ENOMEM); 1942 return ERR_PTR(-ENOMEM);
@@ -1980,12 +1976,12 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
1980 } 1976 }
1981 1977
1982 /* set src+dst */ 1978 /* set src+dst */
1983 msg->hdr.src.name = con->msgr->inst.name; 1979 msg->hdr.src = con->msgr->inst.name;
1984 msg->hdr.src.addr = con->msgr->my_enc_addr;
1985 msg->hdr.orig_src = msg->hdr.src;
1986 1980
1987 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 1981 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
1988 1982
1983 msg->needs_out_seq = true;
1984
1989 /* queue */ 1985 /* queue */
1990 mutex_lock(&con->mutex); 1986 mutex_lock(&con->mutex);
1991 BUG_ON(!list_empty(&msg->list_head)); 1987 BUG_ON(!list_empty(&msg->list_head));
@@ -2074,26 +2070,29 @@ void ceph_con_keepalive(struct ceph_connection *con)
2074 * construct a new message with given type, size 2070 * construct a new message with given type, size
2075 * the new msg has a ref count of 1. 2071 * the new msg has a ref count of 1.
2076 */ 2072 */
2077struct ceph_msg *ceph_msg_new(int type, int front_len, 2073struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
2078 int page_len, int page_off, struct page **pages)
2079{ 2074{
2080 struct ceph_msg *m; 2075 struct ceph_msg *m;
2081 2076
2082 m = kmalloc(sizeof(*m), GFP_NOFS); 2077 m = kmalloc(sizeof(*m), flags);
2083 if (m == NULL) 2078 if (m == NULL)
2084 goto out; 2079 goto out;
2085 kref_init(&m->kref); 2080 kref_init(&m->kref);
2086 INIT_LIST_HEAD(&m->list_head); 2081 INIT_LIST_HEAD(&m->list_head);
2087 2082
2083 m->hdr.tid = 0;
2088 m->hdr.type = cpu_to_le16(type); 2084 m->hdr.type = cpu_to_le16(type);
2085 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
2086 m->hdr.version = 0;
2089 m->hdr.front_len = cpu_to_le32(front_len); 2087 m->hdr.front_len = cpu_to_le32(front_len);
2090 m->hdr.middle_len = 0; 2088 m->hdr.middle_len = 0;
2091 m->hdr.data_len = cpu_to_le32(page_len); 2089 m->hdr.data_len = 0;
2092 m->hdr.data_off = cpu_to_le16(page_off); 2090 m->hdr.data_off = 0;
2093 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 2091 m->hdr.reserved = 0;
2094 m->footer.front_crc = 0; 2092 m->footer.front_crc = 0;
2095 m->footer.middle_crc = 0; 2093 m->footer.middle_crc = 0;
2096 m->footer.data_crc = 0; 2094 m->footer.data_crc = 0;
2095 m->footer.flags = 0;
2097 m->front_max = front_len; 2096 m->front_max = front_len;
2098 m->front_is_vmalloc = false; 2097 m->front_is_vmalloc = false;
2099 m->more_to_follow = false; 2098 m->more_to_follow = false;
@@ -2102,11 +2101,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
2102 /* front */ 2101 /* front */
2103 if (front_len) { 2102 if (front_len) {
2104 if (front_len > PAGE_CACHE_SIZE) { 2103 if (front_len > PAGE_CACHE_SIZE) {
2105 m->front.iov_base = __vmalloc(front_len, GFP_NOFS, 2104 m->front.iov_base = __vmalloc(front_len, flags,
2106 PAGE_KERNEL); 2105 PAGE_KERNEL);
2107 m->front_is_vmalloc = true; 2106 m->front_is_vmalloc = true;
2108 } else { 2107 } else {
2109 m->front.iov_base = kmalloc(front_len, GFP_NOFS); 2108 m->front.iov_base = kmalloc(front_len, flags);
2110 } 2109 }
2111 if (m->front.iov_base == NULL) { 2110 if (m->front.iov_base == NULL) {
2112 pr_err("msg_new can't allocate %d bytes\n", 2111 pr_err("msg_new can't allocate %d bytes\n",
@@ -2122,19 +2121,18 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
2122 m->middle = NULL; 2121 m->middle = NULL;
2123 2122
2124 /* data */ 2123 /* data */
2125 m->nr_pages = calc_pages_for(page_off, page_len); 2124 m->nr_pages = 0;
2126 m->pages = pages; 2125 m->pages = NULL;
2127 m->pagelist = NULL; 2126 m->pagelist = NULL;
2128 2127
2129 dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, 2128 dout("ceph_msg_new %p front %d\n", m, front_len);
2130 m->nr_pages);
2131 return m; 2129 return m;
2132 2130
2133out2: 2131out2:
2134 ceph_msg_put(m); 2132 ceph_msg_put(m);
2135out: 2133out:
2136 pr_err("msg_new can't create type %d len %d\n", type, front_len); 2134 pr_err("msg_new can't create type %d front %d\n", type, front_len);
2137 return ERR_PTR(-ENOMEM); 2135 return NULL;
2138} 2136}
2139 2137
2140/* 2138/*
@@ -2177,29 +2175,25 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2177 mutex_unlock(&con->mutex); 2175 mutex_unlock(&con->mutex);
2178 msg = con->ops->alloc_msg(con, hdr, skip); 2176 msg = con->ops->alloc_msg(con, hdr, skip);
2179 mutex_lock(&con->mutex); 2177 mutex_lock(&con->mutex);
2180 if (IS_ERR(msg)) 2178 if (!msg || *skip)
2181 return msg;
2182
2183 if (*skip)
2184 return NULL; 2179 return NULL;
2185 } 2180 }
2186 if (!msg) { 2181 if (!msg) {
2187 *skip = 0; 2182 *skip = 0;
2188 msg = ceph_msg_new(type, front_len, 0, 0, NULL); 2183 msg = ceph_msg_new(type, front_len, GFP_NOFS);
2189 if (!msg) { 2184 if (!msg) {
2190 pr_err("unable to allocate msg type %d len %d\n", 2185 pr_err("unable to allocate msg type %d len %d\n",
2191 type, front_len); 2186 type, front_len);
2192 return ERR_PTR(-ENOMEM); 2187 return NULL;
2193 } 2188 }
2194 } 2189 }
2195 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2190 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2196 2191
2197 if (middle_len) { 2192 if (middle_len && !msg->middle) {
2198 ret = ceph_alloc_middle(con, msg); 2193 ret = ceph_alloc_middle(con, msg);
2199
2200 if (ret < 0) { 2194 if (ret < 0) {
2201 ceph_msg_put(msg); 2195 ceph_msg_put(msg);
2202 return msg; 2196 return NULL;
2203 } 2197 }
2204 } 2198 }
2205 2199