aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2010-11-09 15:43:12 -0500
committerSage Weil <sage@newdream.net>2010-11-09 15:43:12 -0500
commitb7495fc2ff941db6a118a93ab8d61149e3f4cef8 (patch)
tree231c339d74760e2fa13e5e6f41c10bc28cea51b3
parente98b6fed84d0f0155d7b398e0dfeac74c792f2d0 (diff)
ceph: make page alignment explicit in osd interface
We used to infer alignment of IOs within a page based on the file offset, which assumed they matched. This broke with direct IO that was not aligned to pages (e.g., 512-byte aligned IO). We were also trusting the alignment specified in the OSD reply, which could have been adjusted by the server. Explicitly specify the page alignment when setting up OSD IO requests. Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--fs/ceph/addr.c6
-rw-r--r--fs/ceph/file.c26
-rw-r--r--fs/ceph/inode.c2
-rw-r--r--include/linux/ceph/osd_client.h7
-rw-r--r--net/ceph/osd_client.c22
5 files changed, 44 insertions, 19 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 51bcc5ce3230..4aa857763037 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -204,7 +204,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
204 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, 204 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
205 page->index << PAGE_CACHE_SHIFT, &len, 205 page->index << PAGE_CACHE_SHIFT, &len,
206 ci->i_truncate_seq, ci->i_truncate_size, 206 ci->i_truncate_seq, ci->i_truncate_size,
207 &page, 1); 207 &page, 1, 0);
208 if (err == -ENOENT) 208 if (err == -ENOENT)
209 err = 0; 209 err = 0;
210 if (err < 0) { 210 if (err < 0) {
@@ -287,7 +287,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
287 rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, 287 rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
288 offset, &len, 288 offset, &len,
289 ci->i_truncate_seq, ci->i_truncate_size, 289 ci->i_truncate_seq, ci->i_truncate_size,
290 pages, nr_pages); 290 pages, nr_pages, 0);
291 if (rc == -ENOENT) 291 if (rc == -ENOENT)
292 rc = 0; 292 rc = 0;
293 if (rc < 0) 293 if (rc < 0)
@@ -782,7 +782,7 @@ get_more_pages:
782 snapc, do_sync, 782 snapc, do_sync,
783 ci->i_truncate_seq, 783 ci->i_truncate_seq,
784 ci->i_truncate_size, 784 ci->i_truncate_size,
785 &inode->i_mtime, true, 1); 785 &inode->i_mtime, true, 1, 0);
786 max_pages = req->r_num_pages; 786 max_pages = req->r_num_pages;
787 787
788 alloc_page_vec(fsc, req); 788 alloc_page_vec(fsc, req);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 603fd00af0a6..8d79b8912e31 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -282,11 +282,12 @@ int ceph_release(struct inode *inode, struct file *file)
282static int striped_read(struct inode *inode, 282static int striped_read(struct inode *inode,
283 u64 off, u64 len, 283 u64 off, u64 len,
284 struct page **pages, int num_pages, 284 struct page **pages, int num_pages,
285 int *checkeof) 285 int *checkeof, bool align_to_pages)
286{ 286{
287 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 287 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
288 struct ceph_inode_info *ci = ceph_inode(inode); 288 struct ceph_inode_info *ci = ceph_inode(inode);
289 u64 pos, this_len; 289 u64 pos, this_len;
290 int io_align, page_align;
290 int page_off = off & ~PAGE_CACHE_MASK; /* first byte's offset in page */ 291 int page_off = off & ~PAGE_CACHE_MASK; /* first byte's offset in page */
291 int left, pages_left; 292 int left, pages_left;
292 int read; 293 int read;
@@ -302,14 +303,19 @@ static int striped_read(struct inode *inode,
302 page_pos = pages; 303 page_pos = pages;
303 pages_left = num_pages; 304 pages_left = num_pages;
304 read = 0; 305 read = 0;
306 io_align = off & ~PAGE_MASK;
305 307
306more: 308more:
309 if (align_to_pages)
310 page_align = (pos - io_align) & ~PAGE_MASK;
311 else
312 page_align = pos & ~PAGE_MASK;
307 this_len = left; 313 this_len = left;
308 ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), 314 ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
309 &ci->i_layout, pos, &this_len, 315 &ci->i_layout, pos, &this_len,
310 ci->i_truncate_seq, 316 ci->i_truncate_seq,
311 ci->i_truncate_size, 317 ci->i_truncate_size,
312 page_pos, pages_left); 318 page_pos, pages_left, page_align);
313 hit_stripe = this_len < left; 319 hit_stripe = this_len < left;
314 was_short = ret >= 0 && ret < this_len; 320 was_short = ret >= 0 && ret < this_len;
315 if (ret == -ENOENT) 321 if (ret == -ENOENT)
@@ -393,7 +399,8 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data,
393 if (ret < 0) 399 if (ret < 0)
394 goto done; 400 goto done;
395 401
396 ret = striped_read(inode, off, len, pages, num_pages, checkeof); 402 ret = striped_read(inode, off, len, pages, num_pages, checkeof,
403 file->f_flags & O_DIRECT);
397 404
398 if (ret >= 0 && (file->f_flags & O_DIRECT) == 0) 405 if (ret >= 0 && (file->f_flags & O_DIRECT) == 0)
399 ret = ceph_copy_page_vector_to_user(pages, data, off, ret); 406 ret = ceph_copy_page_vector_to_user(pages, data, off, ret);
@@ -448,6 +455,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
448 int flags; 455 int flags;
449 int do_sync = 0; 456 int do_sync = 0;
450 int check_caps = 0; 457 int check_caps = 0;
458 int page_align, io_align;
451 int ret; 459 int ret;
452 struct timespec mtime = CURRENT_TIME; 460 struct timespec mtime = CURRENT_TIME;
453 461
@@ -462,6 +470,8 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
462 else 470 else
463 pos = *offset; 471 pos = *offset;
464 472
473 io_align = pos & ~PAGE_MASK;
474
465 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); 475 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
466 if (ret < 0) 476 if (ret < 0)
467 return ret; 477 return ret;
@@ -486,20 +496,26 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
486 */ 496 */
487more: 497more:
488 len = left; 498 len = left;
499 if (file->f_flags & O_DIRECT)
500 /* write from beginning of first page, regardless of
501 io alignment */
502 page_align = (pos - io_align) & ~PAGE_MASK;
503 else
504 page_align = pos & ~PAGE_MASK;
489 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 505 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
490 ceph_vino(inode), pos, &len, 506 ceph_vino(inode), pos, &len,
491 CEPH_OSD_OP_WRITE, flags, 507 CEPH_OSD_OP_WRITE, flags,
492 ci->i_snap_realm->cached_context, 508 ci->i_snap_realm->cached_context,
493 do_sync, 509 do_sync,
494 ci->i_truncate_seq, ci->i_truncate_size, 510 ci->i_truncate_seq, ci->i_truncate_size,
495 &mtime, false, 2); 511 &mtime, false, 2, page_align);
496 if (!req) 512 if (!req)
497 return -ENOMEM; 513 return -ENOMEM;
498 514
499 num_pages = calc_pages_for(pos, len); 515 num_pages = calc_pages_for(pos, len);
500 516
501 if (file->f_flags & O_DIRECT) { 517 if (file->f_flags & O_DIRECT) {
502 pages = ceph_get_direct_page_vector(data, num_pages, pos, len); 518 pages = ceph_get_direct_page_vector(data, num_pages);
503 if (IS_ERR(pages)) { 519 if (IS_ERR(pages)) {
504 ret = PTR_ERR(pages); 520 ret = PTR_ERR(pages);
505 goto out; 521 goto out;
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 7bc0fbd26af2..8153ee5a8d74 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1752,7 +1752,7 @@ int ceph_do_getattr(struct inode *inode, int mask)
1752 return 0; 1752 return 0;
1753 } 1753 }
1754 1754
1755 dout("do_getattr inode %p mask %s\n", inode, ceph_cap_string(mask)); 1755 dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
1756 if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) 1756 if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
1757 return 0; 1757 return 0;
1758 1758
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 6c91fb032c39..a1af29648fb5 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -79,6 +79,7 @@ struct ceph_osd_request {
79 struct ceph_file_layout r_file_layout; 79 struct ceph_file_layout r_file_layout;
80 struct ceph_snap_context *r_snapc; /* snap context for writes */ 80 struct ceph_snap_context *r_snapc; /* snap context for writes */
81 unsigned r_num_pages; /* size of page array (follows) */ 81 unsigned r_num_pages; /* size of page array (follows) */
82 unsigned r_page_alignment; /* io offset in first page */
82 struct page **r_pages; /* pages for data payload */ 83 struct page **r_pages; /* pages for data payload */
83 int r_pages_from_pool; 84 int r_pages_from_pool;
84 int r_own_pages; /* if true, i own page list */ 85 int r_own_pages; /* if true, i own page list */
@@ -194,7 +195,8 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
194 int do_sync, u32 truncate_seq, 195 int do_sync, u32 truncate_seq,
195 u64 truncate_size, 196 u64 truncate_size,
196 struct timespec *mtime, 197 struct timespec *mtime,
197 bool use_mempool, int num_reply); 198 bool use_mempool, int num_reply,
199 int page_align);
198 200
199static inline void ceph_osdc_get_request(struct ceph_osd_request *req) 201static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
200{ 202{
@@ -218,7 +220,8 @@ extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
218 struct ceph_file_layout *layout, 220 struct ceph_file_layout *layout,
219 u64 off, u64 *plen, 221 u64 off, u64 *plen,
220 u32 truncate_seq, u64 truncate_size, 222 u32 truncate_seq, u64 truncate_size,
221 struct page **pages, int nr_pages); 223 struct page **pages, int nr_pages,
224 int page_align);
222 225
223extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, 226extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
224 struct ceph_vino vino, 227 struct ceph_vino vino,
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 79391994b3ed..6c096239660c 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -71,6 +71,7 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
71 op->extent.length = objlen; 71 op->extent.length = objlen;
72 } 72 }
73 req->r_num_pages = calc_pages_for(off, *plen); 73 req->r_num_pages = calc_pages_for(off, *plen);
74 req->r_page_alignment = off & ~PAGE_MASK;
74 if (op->op == CEPH_OSD_OP_WRITE) 75 if (op->op == CEPH_OSD_OP_WRITE)
75 op->payload_len = *plen; 76 op->payload_len = *plen;
76 77
@@ -419,7 +420,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
419 u32 truncate_seq, 420 u32 truncate_seq,
420 u64 truncate_size, 421 u64 truncate_size,
421 struct timespec *mtime, 422 struct timespec *mtime,
422 bool use_mempool, int num_reply) 423 bool use_mempool, int num_reply,
424 int page_align)
423{ 425{
424 struct ceph_osd_req_op ops[3]; 426 struct ceph_osd_req_op ops[3];
425 struct ceph_osd_request *req; 427 struct ceph_osd_request *req;
@@ -447,6 +449,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
447 calc_layout(osdc, vino, layout, off, plen, req, ops); 449 calc_layout(osdc, vino, layout, off, plen, req, ops);
448 req->r_file_layout = *layout; /* keep a copy */ 450 req->r_file_layout = *layout; /* keep a copy */
449 451
452 /* in case it differs from natural alignment that calc_layout
453 filled in for us */
454 req->r_page_alignment = page_align;
455
450 ceph_osdc_build_request(req, off, plen, ops, 456 ceph_osdc_build_request(req, off, plen, ops,
451 snapc, 457 snapc,
452 mtime, 458 mtime,
@@ -1489,7 +1495,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1489 struct ceph_vino vino, struct ceph_file_layout *layout, 1495 struct ceph_vino vino, struct ceph_file_layout *layout,
1490 u64 off, u64 *plen, 1496 u64 off, u64 *plen,
1491 u32 truncate_seq, u64 truncate_size, 1497 u32 truncate_seq, u64 truncate_size,
1492 struct page **pages, int num_pages) 1498 struct page **pages, int num_pages, int page_align)
1493{ 1499{
1494 struct ceph_osd_request *req; 1500 struct ceph_osd_request *req;
1495 int rc = 0; 1501 int rc = 0;
@@ -1499,15 +1505,15 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1499 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1505 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1500 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1506 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1501 NULL, 0, truncate_seq, truncate_size, NULL, 1507 NULL, 0, truncate_seq, truncate_size, NULL,
1502 false, 1); 1508 false, 1, page_align);
1503 if (!req) 1509 if (!req)
1504 return -ENOMEM; 1510 return -ENOMEM;
1505 1511
1506 /* it may be a short read due to an object boundary */ 1512 /* it may be a short read due to an object boundary */
1507 req->r_pages = pages; 1513 req->r_pages = pages;
1508 1514
1509 dout("readpages final extent is %llu~%llu (%d pages)\n", 1515 dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
1510 off, *plen, req->r_num_pages); 1516 off, *plen, req->r_num_pages, page_align);
1511 1517
1512 rc = ceph_osdc_start_request(osdc, req, false); 1518 rc = ceph_osdc_start_request(osdc, req, false);
1513 if (!rc) 1519 if (!rc)
@@ -1533,6 +1539,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1533{ 1539{
1534 struct ceph_osd_request *req; 1540 struct ceph_osd_request *req;
1535 int rc = 0; 1541 int rc = 0;
1542 int page_align = off & ~PAGE_MASK;
1536 1543
1537 BUG_ON(vino.snap != CEPH_NOSNAP); 1544 BUG_ON(vino.snap != CEPH_NOSNAP);
1538 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1545 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
@@ -1541,7 +1548,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1541 CEPH_OSD_FLAG_WRITE, 1548 CEPH_OSD_FLAG_WRITE,
1542 snapc, do_sync, 1549 snapc, do_sync,
1543 truncate_seq, truncate_size, mtime, 1550 truncate_seq, truncate_size, mtime,
1544 nofail, 1); 1551 nofail, 1, page_align);
1545 if (!req) 1552 if (!req)
1546 return -ENOMEM; 1553 return -ENOMEM;
1547 1554
@@ -1638,8 +1645,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1638 m = ceph_msg_get(req->r_reply); 1645 m = ceph_msg_get(req->r_reply);
1639 1646
1640 if (data_len > 0) { 1647 if (data_len > 0) {
1641 unsigned data_off = le16_to_cpu(hdr->data_off); 1648 int want = calc_pages_for(req->r_page_alignment, data_len);
1642 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1643 1649
1644 if (unlikely(req->r_num_pages < want)) { 1650 if (unlikely(req->r_num_pages < want)) {
1645 pr_warning("tid %lld reply %d > expected %d pages\n", 1651 pr_warning("tid %lld reply %d > expected %d pages\n",