aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c21
-rw-r--r--net/ceph/osd_client.c25
-rw-r--r--net/ceph/pagevec.c16
3 files changed, 35 insertions, 27 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 0e8157ee5d43..b6ff4a1519ab 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -97,11 +97,9 @@ struct workqueue_struct *ceph_msgr_wq;
97int ceph_msgr_init(void) 97int ceph_msgr_init(void)
98{ 98{
99 ceph_msgr_wq = create_workqueue("ceph-msgr"); 99 ceph_msgr_wq = create_workqueue("ceph-msgr");
100 if (IS_ERR(ceph_msgr_wq)) { 100 if (!ceph_msgr_wq) {
101 int ret = PTR_ERR(ceph_msgr_wq); 101 pr_err("msgr_init failed to create workqueue\n");
102 pr_err("msgr_init failed to create workqueue: %d\n", ret); 102 return -ENOMEM;
103 ceph_msgr_wq = NULL;
104 return ret;
105 } 103 }
106 return 0; 104 return 0;
107} 105}
@@ -540,8 +538,7 @@ static void prepare_write_message(struct ceph_connection *con)
540 /* initialize page iterator */ 538 /* initialize page iterator */
541 con->out_msg_pos.page = 0; 539 con->out_msg_pos.page = 0;
542 if (m->pages) 540 if (m->pages)
543 con->out_msg_pos.page_pos = 541 con->out_msg_pos.page_pos = m->page_alignment;
544 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
545 else 542 else
546 con->out_msg_pos.page_pos = 0; 543 con->out_msg_pos.page_pos = 0;
547 con->out_msg_pos.data_pos = 0; 544 con->out_msg_pos.data_pos = 0;
@@ -1491,7 +1488,7 @@ static int read_partial_message(struct ceph_connection *con)
1491 struct ceph_msg *m = con->in_msg; 1488 struct ceph_msg *m = con->in_msg;
1492 int ret; 1489 int ret;
1493 int to, left; 1490 int to, left;
1494 unsigned front_len, middle_len, data_len, data_off; 1491 unsigned front_len, middle_len, data_len;
1495 int datacrc = con->msgr->nocrc; 1492 int datacrc = con->msgr->nocrc;
1496 int skip; 1493 int skip;
1497 u64 seq; 1494 u64 seq;
@@ -1527,19 +1524,17 @@ static int read_partial_message(struct ceph_connection *con)
1527 data_len = le32_to_cpu(con->in_hdr.data_len); 1524 data_len = le32_to_cpu(con->in_hdr.data_len);
1528 if (data_len > CEPH_MSG_MAX_DATA_LEN) 1525 if (data_len > CEPH_MSG_MAX_DATA_LEN)
1529 return -EIO; 1526 return -EIO;
1530 data_off = le16_to_cpu(con->in_hdr.data_off);
1531 1527
1532 /* verify seq# */ 1528 /* verify seq# */
1533 seq = le64_to_cpu(con->in_hdr.seq); 1529 seq = le64_to_cpu(con->in_hdr.seq);
1534 if ((s64)seq - (s64)con->in_seq < 1) { 1530 if ((s64)seq - (s64)con->in_seq < 1) {
1535 pr_info("skipping %s%lld %s seq %lld, expected %lld\n", 1531 pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1536 ENTITY_NAME(con->peer_name), 1532 ENTITY_NAME(con->peer_name),
1537 ceph_pr_addr(&con->peer_addr.in_addr), 1533 ceph_pr_addr(&con->peer_addr.in_addr),
1538 seq, con->in_seq + 1); 1534 seq, con->in_seq + 1);
1539 con->in_base_pos = -front_len - middle_len - data_len - 1535 con->in_base_pos = -front_len - middle_len - data_len -
1540 sizeof(m->footer); 1536 sizeof(m->footer);
1541 con->in_tag = CEPH_MSGR_TAG_READY; 1537 con->in_tag = CEPH_MSGR_TAG_READY;
1542 con->in_seq++;
1543 return 0; 1538 return 0;
1544 } else if ((s64)seq - (s64)con->in_seq > 1) { 1539 } else if ((s64)seq - (s64)con->in_seq > 1) {
1545 pr_err("read_partial_message bad seq %lld expected %lld\n", 1540 pr_err("read_partial_message bad seq %lld expected %lld\n",
@@ -1576,7 +1571,7 @@ static int read_partial_message(struct ceph_connection *con)
1576 1571
1577 con->in_msg_pos.page = 0; 1572 con->in_msg_pos.page = 0;
1578 if (m->pages) 1573 if (m->pages)
1579 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; 1574 con->in_msg_pos.page_pos = m->page_alignment;
1580 else 1575 else
1581 con->in_msg_pos.page_pos = 0; 1576 con->in_msg_pos.page_pos = 0;
1582 con->in_msg_pos.data_pos = 0; 1577 con->in_msg_pos.data_pos = 0;
@@ -2301,6 +2296,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
2301 2296
2302 /* data */ 2297 /* data */
2303 m->nr_pages = 0; 2298 m->nr_pages = 0;
2299 m->page_alignment = 0;
2304 m->pages = NULL; 2300 m->pages = NULL;
2305 m->pagelist = NULL; 2301 m->pagelist = NULL;
2306 m->bio = NULL; 2302 m->bio = NULL;
@@ -2370,6 +2366,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2370 type, front_len); 2366 type, front_len);
2371 return NULL; 2367 return NULL;
2372 } 2368 }
2369 msg->page_alignment = le16_to_cpu(hdr->data_off);
2373 } 2370 }
2374 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2371 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2375 2372
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 79391994b3ed..3e20a122ffa2 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -71,6 +71,7 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
71 op->extent.length = objlen; 71 op->extent.length = objlen;
72 } 72 }
73 req->r_num_pages = calc_pages_for(off, *plen); 73 req->r_num_pages = calc_pages_for(off, *plen);
74 req->r_page_alignment = off & ~PAGE_MASK;
74 if (op->op == CEPH_OSD_OP_WRITE) 75 if (op->op == CEPH_OSD_OP_WRITE)
75 op->payload_len = *plen; 76 op->payload_len = *plen;
76 77
@@ -390,6 +391,8 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
390 req->r_request->hdr.data_len = cpu_to_le32(data_len); 391 req->r_request->hdr.data_len = cpu_to_le32(data_len);
391 } 392 }
392 393
394 req->r_request->page_alignment = req->r_page_alignment;
395
393 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 396 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
394 msg_size = p - msg->front.iov_base; 397 msg_size = p - msg->front.iov_base;
395 msg->front.iov_len = msg_size; 398 msg->front.iov_len = msg_size;
@@ -419,7 +422,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
419 u32 truncate_seq, 422 u32 truncate_seq,
420 u64 truncate_size, 423 u64 truncate_size,
421 struct timespec *mtime, 424 struct timespec *mtime,
422 bool use_mempool, int num_reply) 425 bool use_mempool, int num_reply,
426 int page_align)
423{ 427{
424 struct ceph_osd_req_op ops[3]; 428 struct ceph_osd_req_op ops[3];
425 struct ceph_osd_request *req; 429 struct ceph_osd_request *req;
@@ -447,6 +451,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
447 calc_layout(osdc, vino, layout, off, plen, req, ops); 451 calc_layout(osdc, vino, layout, off, plen, req, ops);
448 req->r_file_layout = *layout; /* keep a copy */ 452 req->r_file_layout = *layout; /* keep a copy */
449 453
454 /* in case it differs from natural alignment that calc_layout
455 filled in for us */
456 req->r_page_alignment = page_align;
457
450 ceph_osdc_build_request(req, off, plen, ops, 458 ceph_osdc_build_request(req, off, plen, ops,
451 snapc, 459 snapc,
452 mtime, 460 mtime,
@@ -1489,7 +1497,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1489 struct ceph_vino vino, struct ceph_file_layout *layout, 1497 struct ceph_vino vino, struct ceph_file_layout *layout,
1490 u64 off, u64 *plen, 1498 u64 off, u64 *plen,
1491 u32 truncate_seq, u64 truncate_size, 1499 u32 truncate_seq, u64 truncate_size,
1492 struct page **pages, int num_pages) 1500 struct page **pages, int num_pages, int page_align)
1493{ 1501{
1494 struct ceph_osd_request *req; 1502 struct ceph_osd_request *req;
1495 int rc = 0; 1503 int rc = 0;
@@ -1499,15 +1507,15 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1499 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1507 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1500 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1508 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1501 NULL, 0, truncate_seq, truncate_size, NULL, 1509 NULL, 0, truncate_seq, truncate_size, NULL,
1502 false, 1); 1510 false, 1, page_align);
1503 if (!req) 1511 if (!req)
1504 return -ENOMEM; 1512 return -ENOMEM;
1505 1513
1506 /* it may be a short read due to an object boundary */ 1514 /* it may be a short read due to an object boundary */
1507 req->r_pages = pages; 1515 req->r_pages = pages;
1508 1516
1509 dout("readpages final extent is %llu~%llu (%d pages)\n", 1517 dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
1510 off, *plen, req->r_num_pages); 1518 off, *plen, req->r_num_pages, page_align);
1511 1519
1512 rc = ceph_osdc_start_request(osdc, req, false); 1520 rc = ceph_osdc_start_request(osdc, req, false);
1513 if (!rc) 1521 if (!rc)
@@ -1533,6 +1541,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1533{ 1541{
1534 struct ceph_osd_request *req; 1542 struct ceph_osd_request *req;
1535 int rc = 0; 1543 int rc = 0;
1544 int page_align = off & ~PAGE_MASK;
1536 1545
1537 BUG_ON(vino.snap != CEPH_NOSNAP); 1546 BUG_ON(vino.snap != CEPH_NOSNAP);
1538 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1547 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
@@ -1541,7 +1550,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1541 CEPH_OSD_FLAG_WRITE, 1550 CEPH_OSD_FLAG_WRITE,
1542 snapc, do_sync, 1551 snapc, do_sync,
1543 truncate_seq, truncate_size, mtime, 1552 truncate_seq, truncate_size, mtime,
1544 nofail, 1); 1553 nofail, 1, page_align);
1545 if (!req) 1554 if (!req)
1546 return -ENOMEM; 1555 return -ENOMEM;
1547 1556
@@ -1638,8 +1647,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1638 m = ceph_msg_get(req->r_reply); 1647 m = ceph_msg_get(req->r_reply);
1639 1648
1640 if (data_len > 0) { 1649 if (data_len > 0) {
1641 unsigned data_off = le16_to_cpu(hdr->data_off); 1650 int want = calc_pages_for(req->r_page_alignment, data_len);
1642 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1643 1651
1644 if (unlikely(req->r_num_pages < want)) { 1652 if (unlikely(req->r_num_pages < want)) {
1645 pr_warning("tid %lld reply %d > expected %d pages\n", 1653 pr_warning("tid %lld reply %d > expected %d pages\n",
@@ -1651,6 +1659,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1651 } 1659 }
1652 m->pages = req->r_pages; 1660 m->pages = req->r_pages;
1653 m->nr_pages = req->r_num_pages; 1661 m->nr_pages = req->r_num_pages;
1662 m->page_alignment = req->r_page_alignment;
1654#ifdef CONFIG_BLOCK 1663#ifdef CONFIG_BLOCK
1655 m->bio = req->r_bio; 1664 m->bio = req->r_bio;
1656#endif 1665#endif
diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c
index 54caf0687155..1a040e64c69f 100644
--- a/net/ceph/pagevec.c
+++ b/net/ceph/pagevec.c
@@ -13,8 +13,7 @@
13 * build a vector of user pages 13 * build a vector of user pages
14 */ 14 */
15struct page **ceph_get_direct_page_vector(const char __user *data, 15struct page **ceph_get_direct_page_vector(const char __user *data,
16 int num_pages, 16 int num_pages, bool write_page)
17 loff_t off, size_t len)
18{ 17{
19 struct page **pages; 18 struct page **pages;
20 int rc; 19 int rc;
@@ -25,24 +24,27 @@ struct page **ceph_get_direct_page_vector(const char __user *data,
25 24
26 down_read(&current->mm->mmap_sem); 25 down_read(&current->mm->mmap_sem);
27 rc = get_user_pages(current, current->mm, (unsigned long)data, 26 rc = get_user_pages(current, current->mm, (unsigned long)data,
28 num_pages, 0, 0, pages, NULL); 27 num_pages, write_page, 0, pages, NULL);
29 up_read(&current->mm->mmap_sem); 28 up_read(&current->mm->mmap_sem);
30 if (rc < 0) 29 if (rc < num_pages)
31 goto fail; 30 goto fail;
32 return pages; 31 return pages;
33 32
34fail: 33fail:
35 kfree(pages); 34 ceph_put_page_vector(pages, rc > 0 ? rc : 0, false);
36 return ERR_PTR(rc); 35 return ERR_PTR(rc);
37} 36}
38EXPORT_SYMBOL(ceph_get_direct_page_vector); 37EXPORT_SYMBOL(ceph_get_direct_page_vector);
39 38
40void ceph_put_page_vector(struct page **pages, int num_pages) 39void ceph_put_page_vector(struct page **pages, int num_pages, bool dirty)
41{ 40{
42 int i; 41 int i;
43 42
44 for (i = 0; i < num_pages; i++) 43 for (i = 0; i < num_pages; i++) {
44 if (dirty)
45 set_page_dirty_lock(pages[i]);
45 put_page(pages[i]); 46 put_page(pages[i]);
47 }
46 kfree(pages); 48 kfree(pages);
47} 49}
48EXPORT_SYMBOL(ceph_put_page_vector); 50EXPORT_SYMBOL(ceph_put_page_vector);