aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/messenger.c219
-rw-r--r--fs/ceph/messenger.h4
-rw-r--r--fs/ceph/osd_client.c249
-rw-r--r--fs/ceph/osd_client.h61
-rw-r--r--fs/ceph/pagelist.c2
-rw-r--r--fs/ceph/pagelist.h2
6 files changed, 436 insertions, 101 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 2502d76fcec1..17a09b32a591 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -9,6 +9,8 @@
9#include <linux/slab.h> 9#include <linux/slab.h>
10#include <linux/socket.h> 10#include <linux/socket.h>
11#include <linux/string.h> 11#include <linux/string.h>
12#include <linux/bio.h>
13#include <linux/blkdev.h>
12#include <net/tcp.h> 14#include <net/tcp.h>
13 15
14#include "super.h" 16#include "super.h"
@@ -529,8 +531,11 @@ static void prepare_write_message(struct ceph_connection *con)
529 if (le32_to_cpu(m->hdr.data_len) > 0) { 531 if (le32_to_cpu(m->hdr.data_len) > 0) {
530 /* initialize page iterator */ 532 /* initialize page iterator */
531 con->out_msg_pos.page = 0; 533 con->out_msg_pos.page = 0;
532 con->out_msg_pos.page_pos = 534 if (m->pages)
533 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK; 535 con->out_msg_pos.page_pos =
536 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
537 else
538 con->out_msg_pos.page_pos = 0;
534 con->out_msg_pos.data_pos = 0; 539 con->out_msg_pos.data_pos = 0;
535 con->out_msg_pos.did_page_crc = 0; 540 con->out_msg_pos.did_page_crc = 0;
536 con->out_more = 1; /* data + footer will follow */ 541 con->out_more = 1; /* data + footer will follow */
@@ -712,6 +717,31 @@ out:
712 return ret; /* done! */ 717 return ret; /* done! */
713} 718}
714 719
720#ifdef CONFIG_BLOCK
721static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
722{
723 if (!bio) {
724 *iter = NULL;
725 *seg = 0;
726 return;
727 }
728 *iter = bio;
729 *seg = bio->bi_idx;
730}
731
732static void iter_bio_next(struct bio **bio_iter, int *seg)
733{
734 if (*bio_iter == NULL)
735 return;
736
737 BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
738
739 (*seg)++;
740 if (*seg == (*bio_iter)->bi_vcnt)
741 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
742}
743#endif
744
715/* 745/*
716 * Write as much message data payload as we can. If we finish, queue 746 * Write as much message data payload as we can. If we finish, queue
717 * up the footer. 747 * up the footer.
@@ -726,21 +756,46 @@ static int write_partial_msg_pages(struct ceph_connection *con)
726 size_t len; 756 size_t len;
727 int crc = con->msgr->nocrc; 757 int crc = con->msgr->nocrc;
728 int ret; 758 int ret;
759 int total_max_write;
760 int in_trail = 0;
761 size_t trail_len = (msg->trail ? msg->trail->length : 0);
729 762
730 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 763 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
731 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 764 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
732 con->out_msg_pos.page_pos); 765 con->out_msg_pos.page_pos);
733 766
734 while (con->out_msg_pos.page < con->out_msg->nr_pages) { 767#ifdef CONFIG_BLOCK
768 if (msg->bio && !msg->bio_iter)
769 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
770#endif
771
772 while (data_len > con->out_msg_pos.data_pos) {
735 struct page *page = NULL; 773 struct page *page = NULL;
736 void *kaddr = NULL; 774 void *kaddr = NULL;
775 int max_write = PAGE_SIZE;
776 int page_shift = 0;
777
778 total_max_write = data_len - trail_len -
779 con->out_msg_pos.data_pos;
737 780
738 /* 781 /*
739 * if we are calculating the data crc (the default), we need 782 * if we are calculating the data crc (the default), we need
740 * to map the page. if our pages[] has been revoked, use the 783 * to map the page. if our pages[] has been revoked, use the
741 * zero page. 784 * zero page.
742 */ 785 */
743 if (msg->pages) { 786
787 /* have we reached the trail part of the data? */
788 if (con->out_msg_pos.data_pos >= data_len - trail_len) {
789 in_trail = 1;
790
791 total_max_write = data_len - con->out_msg_pos.data_pos;
792
793 page = list_first_entry(&msg->trail->head,
794 struct page, lru);
795 if (crc)
796 kaddr = kmap(page);
797 max_write = PAGE_SIZE;
798 } else if (msg->pages) {
744 page = msg->pages[con->out_msg_pos.page]; 799 page = msg->pages[con->out_msg_pos.page];
745 if (crc) 800 if (crc)
746 kaddr = kmap(page); 801 kaddr = kmap(page);
@@ -749,13 +804,25 @@ static int write_partial_msg_pages(struct ceph_connection *con)
749 struct page, lru); 804 struct page, lru);
750 if (crc) 805 if (crc)
751 kaddr = kmap(page); 806 kaddr = kmap(page);
807#ifdef CONFIG_BLOCK
808 } else if (msg->bio) {
809 struct bio_vec *bv;
810
811 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
812 page = bv->bv_page;
813 page_shift = bv->bv_offset;
814 if (crc)
815 kaddr = kmap(page) + page_shift;
816 max_write = bv->bv_len;
817#endif
752 } else { 818 } else {
753 page = con->msgr->zero_page; 819 page = con->msgr->zero_page;
754 if (crc) 820 if (crc)
755 kaddr = page_address(con->msgr->zero_page); 821 kaddr = page_address(con->msgr->zero_page);
756 } 822 }
757 len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), 823 len = min_t(int, max_write - con->out_msg_pos.page_pos,
758 (int)(data_len - con->out_msg_pos.data_pos)); 824 total_max_write);
825
759 if (crc && !con->out_msg_pos.did_page_crc) { 826 if (crc && !con->out_msg_pos.did_page_crc) {
760 void *base = kaddr + con->out_msg_pos.page_pos; 827 void *base = kaddr + con->out_msg_pos.page_pos;
761 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); 828 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
@@ -765,13 +832,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
765 cpu_to_le32(crc32c(tmpcrc, base, len)); 832 cpu_to_le32(crc32c(tmpcrc, base, len));
766 con->out_msg_pos.did_page_crc = 1; 833 con->out_msg_pos.did_page_crc = 1;
767 } 834 }
768
769 ret = kernel_sendpage(con->sock, page, 835 ret = kernel_sendpage(con->sock, page,
770 con->out_msg_pos.page_pos, len, 836 con->out_msg_pos.page_pos + page_shift,
837 len,
771 MSG_DONTWAIT | MSG_NOSIGNAL | 838 MSG_DONTWAIT | MSG_NOSIGNAL |
772 MSG_MORE); 839 MSG_MORE);
773 840
774 if (crc && (msg->pages || msg->pagelist)) 841 if (crc &&
842 (msg->pages || msg->pagelist || msg->bio || in_trail))
775 kunmap(page); 843 kunmap(page);
776 844
777 if (ret <= 0) 845 if (ret <= 0)
@@ -783,9 +851,16 @@ static int write_partial_msg_pages(struct ceph_connection *con)
783 con->out_msg_pos.page_pos = 0; 851 con->out_msg_pos.page_pos = 0;
784 con->out_msg_pos.page++; 852 con->out_msg_pos.page++;
785 con->out_msg_pos.did_page_crc = 0; 853 con->out_msg_pos.did_page_crc = 0;
786 if (msg->pagelist) 854 if (in_trail)
855 list_move_tail(&page->lru,
856 &msg->trail->head);
857 else if (msg->pagelist)
787 list_move_tail(&page->lru, 858 list_move_tail(&page->lru,
788 &msg->pagelist->head); 859 &msg->pagelist->head);
860#ifdef CONFIG_BLOCK
861 else if (msg->bio)
862 iter_bio_next(&msg->bio_iter, &msg->bio_seg);
863#endif
789 } 864 }
790 } 865 }
791 866
@@ -1305,8 +1380,7 @@ static int read_partial_message_section(struct ceph_connection *con,
1305 struct kvec *section, 1380 struct kvec *section,
1306 unsigned int sec_len, u32 *crc) 1381 unsigned int sec_len, u32 *crc)
1307{ 1382{
1308 int left; 1383 int ret, left;
1309 int ret;
1310 1384
1311 BUG_ON(!section); 1385 BUG_ON(!section);
1312 1386
@@ -1329,13 +1403,83 @@ static int read_partial_message_section(struct ceph_connection *con,
1329static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1403static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
1330 struct ceph_msg_header *hdr, 1404 struct ceph_msg_header *hdr,
1331 int *skip); 1405 int *skip);
1406
1407
1408static int read_partial_message_pages(struct ceph_connection *con,
1409 struct page **pages,
1410 unsigned data_len, int datacrc)
1411{
1412 void *p;
1413 int ret;
1414 int left;
1415
1416 left = min((int)(data_len - con->in_msg_pos.data_pos),
1417 (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
1418 /* (page) data */
1419 BUG_ON(pages == NULL);
1420 p = kmap(pages[con->in_msg_pos.page]);
1421 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1422 left);
1423 if (ret > 0 && datacrc)
1424 con->in_data_crc =
1425 crc32c(con->in_data_crc,
1426 p + con->in_msg_pos.page_pos, ret);
1427 kunmap(pages[con->in_msg_pos.page]);
1428 if (ret <= 0)
1429 return ret;
1430 con->in_msg_pos.data_pos += ret;
1431 con->in_msg_pos.page_pos += ret;
1432 if (con->in_msg_pos.page_pos == PAGE_SIZE) {
1433 con->in_msg_pos.page_pos = 0;
1434 con->in_msg_pos.page++;
1435 }
1436
1437 return ret;
1438}
1439
1440#ifdef CONFIG_BLOCK
1441static int read_partial_message_bio(struct ceph_connection *con,
1442 struct bio **bio_iter, int *bio_seg,
1443 unsigned data_len, int datacrc)
1444{
1445 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
1446 void *p;
1447 int ret, left;
1448
1449 if (IS_ERR(bv))
1450 return PTR_ERR(bv);
1451
1452 left = min((int)(data_len - con->in_msg_pos.data_pos),
1453 (int)(bv->bv_len - con->in_msg_pos.page_pos));
1454
1455 p = kmap(bv->bv_page) + bv->bv_offset;
1456
1457 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1458 left);
1459 if (ret > 0 && datacrc)
1460 con->in_data_crc =
1461 crc32c(con->in_data_crc,
1462 p + con->in_msg_pos.page_pos, ret);
1463 kunmap(bv->bv_page);
1464 if (ret <= 0)
1465 return ret;
1466 con->in_msg_pos.data_pos += ret;
1467 con->in_msg_pos.page_pos += ret;
1468 if (con->in_msg_pos.page_pos == bv->bv_len) {
1469 con->in_msg_pos.page_pos = 0;
1470 iter_bio_next(bio_iter, bio_seg);
1471 }
1472
1473 return ret;
1474}
1475#endif
1476
1332/* 1477/*
1333 * read (part of) a message. 1478 * read (part of) a message.
1334 */ 1479 */
1335static int read_partial_message(struct ceph_connection *con) 1480static int read_partial_message(struct ceph_connection *con)
1336{ 1481{
1337 struct ceph_msg *m = con->in_msg; 1482 struct ceph_msg *m = con->in_msg;
1338 void *p;
1339 int ret; 1483 int ret;
1340 int to, left; 1484 int to, left;
1341 unsigned front_len, middle_len, data_len, data_off; 1485 unsigned front_len, middle_len, data_len, data_off;
@@ -1422,7 +1566,10 @@ static int read_partial_message(struct ceph_connection *con)
1422 m->middle->vec.iov_len = 0; 1566 m->middle->vec.iov_len = 0;
1423 1567
1424 con->in_msg_pos.page = 0; 1568 con->in_msg_pos.page = 0;
1425 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; 1569 if (m->pages)
1570 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
1571 else
1572 con->in_msg_pos.page_pos = 0;
1426 con->in_msg_pos.data_pos = 0; 1573 con->in_msg_pos.data_pos = 0;
1427 } 1574 }
1428 1575
@@ -1440,27 +1587,29 @@ static int read_partial_message(struct ceph_connection *con)
1440 if (ret <= 0) 1587 if (ret <= 0)
1441 return ret; 1588 return ret;
1442 } 1589 }
1590#ifdef CONFIG_BLOCK
1591 if (m->bio && !m->bio_iter)
1592 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1593#endif
1443 1594
1444 /* (page) data */ 1595 /* (page) data */
1445 while (con->in_msg_pos.data_pos < data_len) { 1596 while (con->in_msg_pos.data_pos < data_len) {
1446 left = min((int)(data_len - con->in_msg_pos.data_pos), 1597 if (m->pages) {
1447 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 1598 ret = read_partial_message_pages(con, m->pages,
1448 BUG_ON(m->pages == NULL); 1599 data_len, datacrc);
1449 p = kmap(m->pages[con->in_msg_pos.page]); 1600 if (ret <= 0)
1450 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1601 return ret;
1451 left); 1602#ifdef CONFIG_BLOCK
1452 if (ret > 0 && datacrc) 1603 } else if (m->bio) {
1453 con->in_data_crc = 1604
1454 crc32c(con->in_data_crc, 1605 ret = read_partial_message_bio(con,
1455 p + con->in_msg_pos.page_pos, ret); 1606 &m->bio_iter, &m->bio_seg,
1456 kunmap(m->pages[con->in_msg_pos.page]); 1607 data_len, datacrc);
1457 if (ret <= 0) 1608 if (ret <= 0)
1458 return ret; 1609 return ret;
1459 con->in_msg_pos.data_pos += ret; 1610#endif
1460 con->in_msg_pos.page_pos += ret; 1611 } else {
1461 if (con->in_msg_pos.page_pos == PAGE_SIZE) { 1612 BUG_ON(1);
1462 con->in_msg_pos.page_pos = 0;
1463 con->in_msg_pos.page++;
1464 } 1613 }
1465 } 1614 }
1466 1615
@@ -2136,6 +2285,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
2136 m->nr_pages = 0; 2285 m->nr_pages = 0;
2137 m->pages = NULL; 2286 m->pages = NULL;
2138 m->pagelist = NULL; 2287 m->pagelist = NULL;
2288 m->bio = NULL;
2289 m->bio_iter = NULL;
2290 m->bio_seg = 0;
2291 m->trail = NULL;
2139 2292
2140 dout("ceph_msg_new %p front %d\n", m, front_len); 2293 dout("ceph_msg_new %p front %d\n", m, front_len);
2141 return m; 2294 return m;
@@ -2250,6 +2403,8 @@ void ceph_msg_last_put(struct kref *kref)
2250 m->pagelist = NULL; 2403 m->pagelist = NULL;
2251 } 2404 }
2252 2405
2406 m->trail = NULL;
2407
2253 if (m->pool) 2408 if (m->pool)
2254 ceph_msgpool_put(m->pool, m); 2409 ceph_msgpool_put(m->pool, m);
2255 else 2410 else
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 76fbc957bc13..5a79450604ef 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -82,6 +82,10 @@ struct ceph_msg {
82 struct ceph_pagelist *pagelist; /* instead of pages */ 82 struct ceph_pagelist *pagelist; /* instead of pages */
83 struct list_head list_head; 83 struct list_head list_head;
84 struct kref kref; 84 struct kref kref;
85 struct bio *bio; /* instead of pages/pagelist */
86 struct bio *bio_iter; /* bio iterator */
87 int bio_seg; /* current bio segment */
88 struct ceph_pagelist *trail; /* the trailing part of the data */
85 bool front_is_vmalloc; 89 bool front_is_vmalloc;
86 bool more_to_follow; 90 bool more_to_follow;
87 bool needs_out_seq; 91 bool needs_out_seq;
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 2647dafd96f5..c5d818e73add 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -6,12 +6,16 @@
6#include <linux/pagemap.h> 6#include <linux/pagemap.h>
7#include <linux/slab.h> 7#include <linux/slab.h>
8#include <linux/uaccess.h> 8#include <linux/uaccess.h>
9#ifdef CONFIG_BLOCK
10#include <linux/bio.h>
11#endif
9 12
10#include "super.h" 13#include "super.h"
11#include "osd_client.h" 14#include "osd_client.h"
12#include "messenger.h" 15#include "messenger.h"
13#include "decode.h" 16#include "decode.h"
14#include "auth.h" 17#include "auth.h"
18#include "pagelist.h"
15 19
16#define OSD_OP_FRONT_LEN 4096 20#define OSD_OP_FRONT_LEN 4096
17#define OSD_OPREPLY_FRONT_LEN 512 21#define OSD_OPREPLY_FRONT_LEN 512
@@ -22,29 +26,50 @@ static int __kick_requests(struct ceph_osd_client *osdc,
22 26
23static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); 27static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
24 28
29static int op_needs_trail(int op)
30{
31 switch (op) {
32 case CEPH_OSD_OP_GETXATTR:
33 case CEPH_OSD_OP_SETXATTR:
34 case CEPH_OSD_OP_CMPXATTR:
35 case CEPH_OSD_OP_CALL:
36 return 1;
37 default:
38 return 0;
39 }
40}
41
42static int op_has_extent(int op)
43{
44 return (op == CEPH_OSD_OP_READ ||
45 op == CEPH_OSD_OP_WRITE);
46}
47
25void ceph_calc_raw_layout(struct ceph_osd_client *osdc, 48void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
26 struct ceph_file_layout *layout, 49 struct ceph_file_layout *layout,
27 u64 snapid, 50 u64 snapid,
28 u64 off, u64 len, u64 *bno, 51 u64 off, u64 *plen, u64 *bno,
29 struct ceph_osd_request *req) 52 struct ceph_osd_request *req,
53 struct ceph_osd_req_op *op)
30{ 54{
31 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 55 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
32 struct ceph_osd_op *op = (void *)(reqhead + 1); 56 u64 orig_len = *plen;
33 u64 orig_len = len;
34 u64 objoff, objlen; /* extent in object */ 57 u64 objoff, objlen; /* extent in object */
35 58
36 reqhead->snapid = cpu_to_le64(snapid); 59 reqhead->snapid = cpu_to_le64(snapid);
37 60
38 /* object extent? */ 61 /* object extent? */
39 ceph_calc_file_object_mapping(layout, off, &len, bno, 62 ceph_calc_file_object_mapping(layout, off, plen, bno,
40 &objoff, &objlen); 63 &objoff, &objlen);
41 if (len < orig_len) 64 if (*plen < orig_len)
42 dout(" skipping last %llu, final file extent %llu~%llu\n", 65 dout(" skipping last %llu, final file extent %llu~%llu\n",
43 orig_len - len, off, len); 66 orig_len - *plen, off, *plen);
44 67
45 op->extent.offset = cpu_to_le64(objoff); 68 if (op_has_extent(op->op)) {
46 op->extent.length = cpu_to_le64(objlen); 69 op->extent.offset = objoff;
47 req->r_num_pages = calc_pages_for(off, len); 70 op->extent.length = objlen;
71 }
72 req->r_num_pages = calc_pages_for(off, *plen);
48 73
49 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", 74 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
50 *bno, objoff, objlen, req->r_num_pages); 75 *bno, objoff, objlen, req->r_num_pages);
@@ -80,11 +105,13 @@ static void calc_layout(struct ceph_osd_client *osdc,
80 struct ceph_vino vino, 105 struct ceph_vino vino,
81 struct ceph_file_layout *layout, 106 struct ceph_file_layout *layout,
82 u64 off, u64 *plen, 107 u64 off, u64 *plen,
83 struct ceph_osd_request *req) 108 struct ceph_osd_request *req,
109 struct ceph_osd_req_op *op)
84{ 110{
85 u64 bno; 111 u64 bno;
86 112
87 ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req); 113 ceph_calc_raw_layout(osdc, layout, vino.snap, off,
114 plen, &bno, req, op);
88 115
89 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno); 116 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
90 req->r_oid_len = strlen(req->r_oid); 117 req->r_oid_len = strlen(req->r_oid);
@@ -113,35 +140,64 @@ void ceph_osdc_release_request(struct kref *kref)
113 if (req->r_own_pages) 140 if (req->r_own_pages)
114 ceph_release_page_vector(req->r_pages, 141 ceph_release_page_vector(req->r_pages,
115 req->r_num_pages); 142 req->r_num_pages);
143#ifdef CONFIG_BLOCK
144 if (req->r_bio)
145 bio_put(req->r_bio);
146#endif
116 ceph_put_snap_context(req->r_snapc); 147 ceph_put_snap_context(req->r_snapc);
148 if (req->r_trail) {
149 ceph_pagelist_release(req->r_trail);
150 kfree(req->r_trail);
151 }
117 if (req->r_mempool) 152 if (req->r_mempool)
118 mempool_free(req, req->r_osdc->req_mempool); 153 mempool_free(req, req->r_osdc->req_mempool);
119 else 154 else
120 kfree(req); 155 kfree(req);
121} 156}
122 157
158static int op_needs_trail(int op)
159{
160 switch (op) {
161 case CEPH_OSD_OP_GETXATTR:
162 case CEPH_OSD_OP_SETXATTR:
163 case CEPH_OSD_OP_CMPXATTR:
164 return 1;
165 default:
166 return 0;
167 }
168}
169
170static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
171{
172 int i = 0;
173
174 if (needs_trail)
175 *needs_trail = 0;
176 while (ops[i].op) {
177 if (needs_trail && op_needs_trail(ops[i].op))
178 *needs_trail = 1;
179 i++;
180 }
181
182 return i;
183}
184
123struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 185struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
124 int flags, 186 int flags,
125 struct ceph_snap_context *snapc, 187 struct ceph_snap_context *snapc,
126 int do_sync, 188 struct ceph_osd_req_op *ops,
127 bool use_mempool, 189 bool use_mempool,
128 gfp_t gfp_flags, 190 gfp_t gfp_flags,
129 struct page **pages) 191 struct page **pages,
192 struct bio *bio)
130{ 193{
131 struct ceph_osd_request *req; 194 struct ceph_osd_request *req;
132 struct ceph_msg *msg; 195 struct ceph_msg *msg;
133 int num_op = 1 + do_sync; 196 int needs_trail;
134 size_t msg_size = sizeof(struct ceph_osd_request_head) + 197 int num_op = get_num_ops(ops, &needs_trail);
135 num_op*sizeof(struct ceph_osd_op); 198 size_t msg_size = sizeof(struct ceph_osd_request_head);
136 199
137 if (use_mempool) { 200 msg_size += num_op*sizeof(struct ceph_osd_op);
138 req = mempool_alloc(osdc->req_mempool, gfp_flags);
139 memset(req, 0, sizeof(*req));
140 } else {
141 req = kzalloc(sizeof(*req), gfp_flags);
142 }
143 if (!req)
144 return NULL;
145 201
146 if (use_mempool) { 202 if (use_mempool) {
147 req = mempool_alloc(osdc->req_mempool, gfp_flags); 203 req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -154,6 +210,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
154 210
155 req->r_osdc = osdc; 211 req->r_osdc = osdc;
156 req->r_mempool = use_mempool; 212 req->r_mempool = use_mempool;
213
157 kref_init(&req->r_kref); 214 kref_init(&req->r_kref);
158 init_completion(&req->r_completion); 215 init_completion(&req->r_completion);
159 init_completion(&req->r_safe_completion); 216 init_completion(&req->r_safe_completion);
@@ -174,6 +231,15 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
174 } 231 }
175 req->r_reply = msg; 232 req->r_reply = msg;
176 233
234 /* allocate space for the trailing data */
235 if (needs_trail) {
236 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
237 if (!req->r_trail) {
238 ceph_osdc_put_request(req);
239 return NULL;
240 }
241 ceph_pagelist_init(req->r_trail);
242 }
177 /* create request message; allow space for oid */ 243 /* create request message; allow space for oid */
178 msg_size += 40; 244 msg_size += 40;
179 if (snapc) 245 if (snapc)
@@ -186,38 +252,87 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
186 ceph_osdc_put_request(req); 252 ceph_osdc_put_request(req);
187 return NULL; 253 return NULL;
188 } 254 }
255
189 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); 256 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
190 memset(msg->front.iov_base, 0, msg->front.iov_len); 257 memset(msg->front.iov_base, 0, msg->front.iov_len);
191 258
192 req->r_request = msg; 259 req->r_request = msg;
193 req->r_pages = pages; 260 req->r_pages = pages;
261#ifdef CONFIG_BLOCK
262 if (bio) {
263 req->r_bio = bio;
264 bio_get(req->r_bio);
265 }
266#endif
194 267
195 return req; 268 return req;
196} 269}
197 270
271static void osd_req_encode_op(struct ceph_osd_request *req,
272 struct ceph_osd_op *dst,
273 struct ceph_osd_req_op *src)
274{
275 dst->op = cpu_to_le16(src->op);
276
277 switch (dst->op) {
278 case CEPH_OSD_OP_READ:
279 case CEPH_OSD_OP_WRITE:
280 dst->extent.offset =
281 cpu_to_le64(src->extent.offset);
282 dst->extent.length =
283 cpu_to_le64(src->extent.length);
284 dst->extent.truncate_size =
285 cpu_to_le64(src->extent.truncate_size);
286 dst->extent.truncate_seq =
287 cpu_to_le32(src->extent.truncate_seq);
288 break;
289
290 case CEPH_OSD_OP_GETXATTR:
291 case CEPH_OSD_OP_SETXATTR:
292 case CEPH_OSD_OP_CMPXATTR:
293 BUG_ON(!req->r_trail);
294
295 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
296 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
297 dst->xattr.cmp_op = src->xattr.cmp_op;
298 dst->xattr.cmp_mode = src->xattr.cmp_mode;
299 ceph_pagelist_append(req->r_trail, src->xattr.name,
300 src->xattr.name_len);
301 ceph_pagelist_append(req->r_trail, src->xattr.val,
302 src->xattr.value_len);
303 break;
304 case CEPH_OSD_OP_STARTSYNC:
305 break;
306 default:
307 pr_err("unrecognized osd opcode %d\n", dst->op);
308 WARN_ON(1);
309 break;
310 }
311 dst->payload_len = cpu_to_le32(src->payload_len);
312}
313
198/* 314/*
199 * build new request AND message 315 * build new request AND message
200 * 316 *
201 */ 317 */
202void ceph_osdc_build_request(struct ceph_osd_request *req, 318void ceph_osdc_build_request(struct ceph_osd_request *req,
203 u64 off, u64 *plen, 319 u64 off, u64 *plen,
204 int opcode, 320 struct ceph_osd_req_op *src_ops,
205 struct ceph_snap_context *snapc, 321 struct ceph_snap_context *snapc,
206 int do_sync, 322 struct timespec *mtime,
207 u32 truncate_seq, 323 const char *oid,
208 u64 truncate_size, 324 int oid_len)
209 struct timespec *mtime,
210 const char *oid,
211 int oid_len)
212{ 325{
213 struct ceph_msg *msg = req->r_request; 326 struct ceph_msg *msg = req->r_request;
214 struct ceph_osd_request_head *head; 327 struct ceph_osd_request_head *head;
328 struct ceph_osd_req_op *src_op;
215 struct ceph_osd_op *op; 329 struct ceph_osd_op *op;
216 void *p; 330 void *p;
217 int num_op = 1 + do_sync; 331 int num_op = get_num_ops(src_ops, NULL);
218 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 332 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
219 int i;
220 int flags = req->r_flags; 333 int flags = req->r_flags;
334 u64 data_len = 0;
335 int i;
221 336
222 head = msg->front.iov_base; 337 head = msg->front.iov_base;
223 op = (void *)(head + 1); 338 op = (void *)(head + 1);
@@ -230,25 +345,23 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
230 if (flags & CEPH_OSD_FLAG_WRITE) 345 if (flags & CEPH_OSD_FLAG_WRITE)
231 ceph_encode_timespec(&head->mtime, mtime); 346 ceph_encode_timespec(&head->mtime, mtime);
232 head->num_ops = cpu_to_le16(num_op); 347 head->num_ops = cpu_to_le16(num_op);
233 op->op = cpu_to_le16(opcode);
234 348
235 if (flags & CEPH_OSD_FLAG_WRITE) {
236 req->r_request->hdr.data_off = cpu_to_le16(off);
237 req->r_request->hdr.data_len = cpu_to_le32(*plen);
238 op->payload_len = cpu_to_le32(*plen);
239 }
240 op->extent.truncate_size = cpu_to_le64(truncate_size);
241 op->extent.truncate_seq = cpu_to_le32(truncate_seq);
242 349
243 /* fill in oid */ 350 /* fill in oid */
244 head->object_len = cpu_to_le32(oid_len); 351 head->object_len = cpu_to_le32(oid_len);
245 memcpy(p, oid, oid_len); 352 memcpy(p, oid, oid_len);
246 p += oid_len; 353 p += oid_len;
247 354
248 if (do_sync) { 355 src_op = src_ops;
356 while (src_op->op) {
357 osd_req_encode_op(req, op, src_op);
358 src_op++;
249 op++; 359 op++;
250 op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
251 } 360 }
361
362 if (req->r_trail)
363 data_len += req->r_trail->length;
364
252 if (snapc) { 365 if (snapc) {
253 head->snap_seq = cpu_to_le64(snapc->seq); 366 head->snap_seq = cpu_to_le64(snapc->seq);
254 head->num_snaps = cpu_to_le32(snapc->num_snaps); 367 head->num_snaps = cpu_to_le32(snapc->num_snaps);
@@ -258,6 +371,14 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
258 } 371 }
259 } 372 }
260 373
374 if (flags & CEPH_OSD_FLAG_WRITE) {
375 req->r_request->hdr.data_off = cpu_to_le16(off);
376 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
377 } else if (data_len) {
378 req->r_request->hdr.data_off = 0;
379 req->r_request->hdr.data_len = cpu_to_le32(data_len);
380 }
381
261 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 382 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
262 msg_size = p - msg->front.iov_base; 383 msg_size = p - msg->front.iov_base;
263 msg->front.iov_len = msg_size; 384 msg->front.iov_len = msg_size;
@@ -288,21 +409,34 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
288 struct timespec *mtime, 409 struct timespec *mtime,
289 bool use_mempool, int num_reply) 410 bool use_mempool, int num_reply)
290{ 411{
291 struct ceph_osd_request *req = 412 struct ceph_osd_req_op ops[3];
292 ceph_osdc_alloc_request(osdc, flags, 413 struct ceph_osd_request *req;
293 snapc, do_sync, 414
415 ops[0].op = opcode;
416 ops[0].extent.truncate_seq = truncate_seq;
417 ops[0].extent.truncate_size = truncate_size;
418 ops[0].payload_len = 0;
419
420 if (do_sync) {
421 ops[1].op = CEPH_OSD_OP_STARTSYNC;
422 ops[1].payload_len = 0;
423 ops[2].op = 0;
424 } else
425 ops[1].op = 0;
426
427 req = ceph_osdc_alloc_request(osdc, flags,
428 snapc, ops,
294 use_mempool, 429 use_mempool,
295 GFP_NOFS, NULL); 430 GFP_NOFS, NULL, NULL);
296 if (IS_ERR(req)) 431 if (IS_ERR(req))
297 return req; 432 return req;
298 433
299 /* calculate max write size */ 434 /* calculate max write size */
300 calc_layout(osdc, vino, layout, off, plen, req); 435 calc_layout(osdc, vino, layout, off, plen, req, ops);
301 req->r_file_layout = *layout; /* keep a copy */ 436 req->r_file_layout = *layout; /* keep a copy */
302 437
303 ceph_osdc_build_request(req, off, plen, opcode, 438 ceph_osdc_build_request(req, off, plen, ops,
304 snapc, do_sync, 439 snapc,
305 truncate_seq, truncate_size,
306 mtime, 440 mtime,
307 req->r_oid, req->r_oid_len); 441 req->r_oid, req->r_oid_len);
308 442
@@ -1177,6 +1311,10 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1177 1311
1178 req->r_request->pages = req->r_pages; 1312 req->r_request->pages = req->r_pages;
1179 req->r_request->nr_pages = req->r_num_pages; 1313 req->r_request->nr_pages = req->r_num_pages;
1314#ifdef CONFIG_BLOCK
1315 req->r_request->bio = req->r_bio;
1316#endif
1317 req->r_request->trail = req->r_trail;
1180 1318
1181 register_request(osdc, req); 1319 register_request(osdc, req);
1182 1320
@@ -1493,6 +1631,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1493 } 1631 }
1494 m->pages = req->r_pages; 1632 m->pages = req->r_pages;
1495 m->nr_pages = req->r_num_pages; 1633 m->nr_pages = req->r_num_pages;
1634#ifdef CONFIG_BLOCK
1635 m->bio = req->r_bio;
1636#endif
1496 } 1637 }
1497 *skip = 0; 1638 *skip = 0;
1498 req->r_con_filling_msg = ceph_con_get(con); 1639 req->r_con_filling_msg = ceph_con_get(con);
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index b687c2ea72e6..d583d1bf6cd9 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -15,6 +15,7 @@ struct ceph_snap_context;
15struct ceph_osd_request; 15struct ceph_osd_request;
16struct ceph_osd_client; 16struct ceph_osd_client;
17struct ceph_authorizer; 17struct ceph_authorizer;
18struct ceph_pagelist;
18 19
19/* 20/*
20 * completion callback for async writepages 21 * completion callback for async writepages
@@ -80,6 +81,11 @@ struct ceph_osd_request {
80 struct page **r_pages; /* pages for data payload */ 81 struct page **r_pages; /* pages for data payload */
81 int r_pages_from_pool; 82 int r_pages_from_pool;
82 int r_own_pages; /* if true, i own page list */ 83 int r_own_pages; /* if true, i own page list */
84#ifdef CONFIG_BLOCK
85 struct bio *r_bio; /* instead of pages */
86#endif
87
88 struct ceph_pagelist *r_trail; /* trailing part of the data */
83}; 89};
84 90
85struct ceph_osd_client { 91struct ceph_osd_client {
@@ -110,6 +116,36 @@ struct ceph_osd_client {
110 struct ceph_msgpool msgpool_op_reply; 116 struct ceph_msgpool msgpool_op_reply;
111}; 117};
112 118
119struct ceph_osd_req_op {
120 u16 op; /* CEPH_OSD_OP_* */
121 u32 flags; /* CEPH_OSD_FLAG_* */
122 union {
123 struct {
124 u64 offset, length;
125 u64 truncate_size;
126 u32 truncate_seq;
127 } extent;
128 struct {
129 const char *name;
130 u32 name_len;
131 const char *val;
132 u32 value_len;
133 __u8 cmp_op; /* CEPH_OSD_CMPXATTR_OP_* */
134 __u8 cmp_mode; /* CEPH_OSD_CMPXATTR_MODE_* */
135 } xattr;
136 struct {
137 __u8 class_len;
138 __u8 method_len;
139 __u8 argc;
140 u32 indata_len;
141 } cls;
142 struct {
143 u64 cookie, count;
144 } pgls;
145 };
146 u32 payload_len;
147};
148
113extern int ceph_osdc_init(struct ceph_osd_client *osdc, 149extern int ceph_osdc_init(struct ceph_osd_client *osdc,
114 struct ceph_client *client); 150 struct ceph_client *client);
115extern void ceph_osdc_stop(struct ceph_osd_client *osdc); 151extern void ceph_osdc_stop(struct ceph_osd_client *osdc);
@@ -122,27 +158,26 @@ extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
122extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc, 158extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
123 struct ceph_file_layout *layout, 159 struct ceph_file_layout *layout,
124 u64 snapid, 160 u64 snapid,
125 u64 off, u64 len, u64 *bno, 161 u64 off, u64 *plen, u64 *bno,
126 struct ceph_osd_request *req); 162 struct ceph_osd_request *req,
163 struct ceph_osd_req_op *op);
127 164
128extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 165extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
129 int flags, 166 int flags,
130 struct ceph_snap_context *snapc, 167 struct ceph_snap_context *snapc,
131 int do_sync, 168 struct ceph_osd_req_op *ops,
132 bool use_mempool, 169 bool use_mempool,
133 gfp_t gfp_flags, 170 gfp_t gfp_flags,
134 struct page **pages); 171 struct page **pages,
172 struct bio *bio);
135 173
136extern void ceph_osdc_build_request(struct ceph_osd_request *req, 174extern void ceph_osdc_build_request(struct ceph_osd_request *req,
137 u64 off, u64 *plen, 175 u64 off, u64 *plen,
138 int opcode, 176 struct ceph_osd_req_op *src_ops,
139 struct ceph_snap_context *snapc, 177 struct ceph_snap_context *snapc,
140 int do_sync, 178 struct timespec *mtime,
141 u32 truncate_seq, 179 const char *oid,
142 u64 truncate_size, 180 int oid_len);
143 struct timespec *mtime,
144 const char *oid,
145 int oid_len);
146 181
147extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, 182extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
148 struct ceph_file_layout *layout, 183 struct ceph_file_layout *layout,
diff --git a/fs/ceph/pagelist.c b/fs/ceph/pagelist.c
index 46a368b6dce5..326e1c04176f 100644
--- a/fs/ceph/pagelist.c
+++ b/fs/ceph/pagelist.c
@@ -39,7 +39,7 @@ static int ceph_pagelist_addpage(struct ceph_pagelist *pl)
39 return 0; 39 return 0;
40} 40}
41 41
42int ceph_pagelist_append(struct ceph_pagelist *pl, void *buf, size_t len) 42int ceph_pagelist_append(struct ceph_pagelist *pl, const void *buf, size_t len)
43{ 43{
44 while (pl->room < len) { 44 while (pl->room < len) {
45 size_t bit = pl->room; 45 size_t bit = pl->room;
diff --git a/fs/ceph/pagelist.h b/fs/ceph/pagelist.h
index e8a4187e1087..cc9327aa1c98 100644
--- a/fs/ceph/pagelist.h
+++ b/fs/ceph/pagelist.h
@@ -19,7 +19,7 @@ static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
19} 19}
20extern int ceph_pagelist_release(struct ceph_pagelist *pl); 20extern int ceph_pagelist_release(struct ceph_pagelist *pl);
21 21
22extern int ceph_pagelist_append(struct ceph_pagelist *pl, void *d, size_t l); 22extern int ceph_pagelist_append(struct ceph_pagelist *pl, const void *d, size_t l);
23 23
24static inline int ceph_pagelist_encode_64(struct ceph_pagelist *pl, u64 v) 24static inline int ceph_pagelist_encode_64(struct ceph_pagelist *pl, u64 v)
25{ 25{