aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYan, Zheng <zheng.z.yan@intel.com>2013-05-31 04:48:29 -0400
committerSage Weil <sage@inktank.com>2013-07-03 18:32:47 -0400
commitfc2744aa12da7182509b1059aa3ab53754d0c83a (patch)
tree3a601a5e8a6bb5fe60141c21e07d99ebb8472d6a
parent3803da4963db01da6a983ab589ebe2e6ccb97ba9 (diff)
ceph: fix race between page writeback and truncate
The client can receive truncate request from MDS at any time. So the page writeback code need to get i_size, truncate_seq and truncate_size atomically Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com> Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r--fs/ceph/addr.c84
1 files changed, 40 insertions, 44 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 3e68ac101040..3500b74c32ed 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -438,13 +438,12 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
438 struct ceph_inode_info *ci; 438 struct ceph_inode_info *ci;
439 struct ceph_fs_client *fsc; 439 struct ceph_fs_client *fsc;
440 struct ceph_osd_client *osdc; 440 struct ceph_osd_client *osdc;
441 loff_t page_off = page_offset(page);
442 int len = PAGE_CACHE_SIZE;
443 loff_t i_size;
444 int err = 0;
445 struct ceph_snap_context *snapc, *oldest; 441 struct ceph_snap_context *snapc, *oldest;
446 u64 snap_size = 0; 442 loff_t page_off = page_offset(page);
447 long writeback_stat; 443 long writeback_stat;
444 u64 truncate_size, snap_size = 0;
445 u32 truncate_seq;
446 int err = 0, len = PAGE_CACHE_SIZE;
448 447
449 dout("writepage %p idx %lu\n", page, page->index); 448 dout("writepage %p idx %lu\n", page, page->index);
450 449
@@ -474,13 +473,20 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
474 } 473 }
475 ceph_put_snap_context(oldest); 474 ceph_put_snap_context(oldest);
476 475
476 spin_lock(&ci->i_ceph_lock);
477 truncate_seq = ci->i_truncate_seq;
478 truncate_size = ci->i_truncate_size;
479 if (!snap_size)
480 snap_size = i_size_read(inode);
481 spin_unlock(&ci->i_ceph_lock);
482
477 /* is this a partial page at end of file? */ 483 /* is this a partial page at end of file? */
478 if (snap_size) 484 if (page_off >= snap_size) {
479 i_size = snap_size; 485 dout("%p page eof %llu\n", page, snap_size);
480 else 486 goto out;
481 i_size = i_size_read(inode); 487 }
482 if (i_size < page_off + len) 488 if (snap_size < page_off + len)
483 len = i_size - page_off; 489 len = snap_size - page_off;
484 490
485 dout("writepage %p page %p index %lu on %llu~%u snapc %p\n", 491 dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
486 inode, page, page->index, page_off, len, snapc); 492 inode, page, page->index, page_off, len, snapc);
@@ -494,7 +500,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
494 err = ceph_osdc_writepages(osdc, ceph_vino(inode), 500 err = ceph_osdc_writepages(osdc, ceph_vino(inode),
495 &ci->i_layout, snapc, 501 &ci->i_layout, snapc,
496 page_off, len, 502 page_off, len,
497 ci->i_truncate_seq, ci->i_truncate_size, 503 truncate_seq, truncate_size,
498 &inode->i_mtime, &page, 1); 504 &inode->i_mtime, &page, 1);
499 if (err < 0) { 505 if (err < 0) {
500 dout("writepage setting page/mapping error %d %p\n", err, page); 506 dout("writepage setting page/mapping error %d %p\n", err, page);
@@ -631,25 +637,6 @@ static void writepages_finish(struct ceph_osd_request *req,
631 ceph_osdc_put_request(req); 637 ceph_osdc_put_request(req);
632} 638}
633 639
634static struct ceph_osd_request *
635ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len,
636 struct ceph_snap_context *snapc, int num_ops)
637{
638 struct ceph_fs_client *fsc;
639 struct ceph_inode_info *ci;
640 struct ceph_vino vino;
641
642 fsc = ceph_inode_to_client(inode);
643 ci = ceph_inode(inode);
644 vino = ceph_vino(inode);
645 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
646
647 return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
648 vino, offset, len, num_ops, CEPH_OSD_OP_WRITE,
649 CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK,
650 snapc, ci->i_truncate_seq, ci->i_truncate_size, true);
651}
652
653/* 640/*
654 * initiate async writeback 641 * initiate async writeback
655 */ 642 */
@@ -658,7 +645,8 @@ static int ceph_writepages_start(struct address_space *mapping,
658{ 645{
659 struct inode *inode = mapping->host; 646 struct inode *inode = mapping->host;
660 struct ceph_inode_info *ci = ceph_inode(inode); 647 struct ceph_inode_info *ci = ceph_inode(inode);
661 struct ceph_fs_client *fsc; 648 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
649 struct ceph_vino vino = ceph_vino(inode);
662 pgoff_t index, start, end; 650 pgoff_t index, start, end;
663 int range_whole = 0; 651 int range_whole = 0;
664 int should_loop = 1; 652 int should_loop = 1;
@@ -670,7 +658,8 @@ static int ceph_writepages_start(struct address_space *mapping,
670 unsigned wsize = 1 << inode->i_blkbits; 658 unsigned wsize = 1 << inode->i_blkbits;
671 struct ceph_osd_request *req = NULL; 659 struct ceph_osd_request *req = NULL;
672 int do_sync; 660 int do_sync;
673 u64 snap_size; 661 u64 truncate_size, snap_size;
662 u32 truncate_seq;
674 663
675 /* 664 /*
676 * Include a 'sync' in the OSD request if this is a data 665 * Include a 'sync' in the OSD request if this is a data
@@ -685,7 +674,6 @@ static int ceph_writepages_start(struct address_space *mapping,
685 wbc->sync_mode == WB_SYNC_NONE ? "NONE" : 674 wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
686 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); 675 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
687 676
688 fsc = ceph_inode_to_client(inode);
689 if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) { 677 if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
690 pr_warning("writepage_start %p on forced umount\n", inode); 678 pr_warning("writepage_start %p on forced umount\n", inode);
691 return -EIO; /* we're in a forced umount, don't write! */ 679 return -EIO; /* we're in a forced umount, don't write! */
@@ -728,6 +716,14 @@ retry:
728 snap_size = i_size_read(inode); 716 snap_size = i_size_read(inode);
729 dout(" oldest snapc is %p seq %lld (%d snaps)\n", 717 dout(" oldest snapc is %p seq %lld (%d snaps)\n",
730 snapc, snapc->seq, snapc->num_snaps); 718 snapc, snapc->seq, snapc->num_snaps);
719
720 spin_lock(&ci->i_ceph_lock);
721 truncate_seq = ci->i_truncate_seq;
722 truncate_size = ci->i_truncate_size;
723 if (!snap_size)
724 snap_size = i_size_read(inode);
725 spin_unlock(&ci->i_ceph_lock);
726
731 if (last_snapc && snapc != last_snapc) { 727 if (last_snapc && snapc != last_snapc) {
732 /* if we switched to a newer snapc, restart our scan at the 728 /* if we switched to a newer snapc, restart our scan at the
733 * start of the original file range. */ 729 * start of the original file range. */
@@ -739,7 +735,6 @@ retry:
739 735
740 while (!done && index <= end) { 736 while (!done && index <= end) {
741 int num_ops = do_sync ? 2 : 1; 737 int num_ops = do_sync ? 2 : 1;
742 struct ceph_vino vino;
743 unsigned i; 738 unsigned i;
744 int first; 739 int first;
745 pgoff_t next; 740 pgoff_t next;
@@ -833,17 +828,18 @@ get_more_pages:
833 * that it will use. 828 * that it will use.
834 */ 829 */
835 if (locked_pages == 0) { 830 if (locked_pages == 0) {
836 size_t size;
837
838 BUG_ON(pages); 831 BUG_ON(pages);
839
840 /* prepare async write request */ 832 /* prepare async write request */
841 offset = (u64)page_offset(page); 833 offset = (u64)page_offset(page);
842 len = wsize; 834 len = wsize;
843 req = ceph_writepages_osd_request(inode, 835 req = ceph_osdc_new_request(&fsc->client->osdc,
844 offset, &len, snapc, 836 &ci->i_layout, vino,
845 num_ops); 837 offset, &len, num_ops,
846 838 CEPH_OSD_OP_WRITE,
839 CEPH_OSD_FLAG_WRITE |
840 CEPH_OSD_FLAG_ONDISK,
841 snapc, truncate_seq,
842 truncate_size, true);
847 if (IS_ERR(req)) { 843 if (IS_ERR(req)) {
848 rc = PTR_ERR(req); 844 rc = PTR_ERR(req);
849 unlock_page(page); 845 unlock_page(page);
@@ -854,8 +850,8 @@ get_more_pages:
854 req->r_inode = inode; 850 req->r_inode = inode;
855 851
856 max_pages = calc_pages_for(0, (u64)len); 852 max_pages = calc_pages_for(0, (u64)len);
857 size = max_pages * sizeof (*pages); 853 pages = kmalloc(max_pages * sizeof (*pages),
858 pages = kmalloc(size, GFP_NOFS); 854 GFP_NOFS);
859 if (!pages) { 855 if (!pages) {
860 pool = fsc->wb_pagevec_pool; 856 pool = fsc->wb_pagevec_pool;
861 pages = mempool_alloc(pool, GFP_NOFS); 857 pages = mempool_alloc(pool, GFP_NOFS);