aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
authormajianpeng <majianpeng@gmail.com>2013-09-12 01:54:26 -0400
committerSage Weil <sage@inktank.com>2013-12-13 12:13:17 -0500
commite8344e668915a7488def414f016dbf7d9fce84b5 (patch)
treef0ea6b254a90b29a8041b95464351a2976625cef /fs/ceph
parent9f12bd119e408388233e7aeb1152f372a8b5dcad (diff)
ceph: Implement writev/pwritev for sync operation.
For writev/pwritev sync-operatoin, ceph only do the first iov. I divided the write-sync-operation into two functions. One for direct-write, other for none-direct-sync-write. This is because for none-direct-sync-write we can merge iovs to one. But for direct-write, we can't merge iovs. Signed-off-by: Jianpeng Ma <majianpeng@gmail.com> Reviewed-by: Yan, Zheng <zheng.z.yan@intel.com> Signed-off-by: Sage Weil <sage@inktank.com>
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/file.c273
1 files changed, 193 insertions, 80 deletions
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 3de89829e2a1..5cf034e915bb 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -489,83 +489,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
489 } 489 }
490} 490}
491 491
492
492/* 493/*
493 * Synchronous write, straight from __user pointer or user pages (if 494 * Synchronous write, straight from __user pointer or user pages.
494 * O_DIRECT).
495 * 495 *
496 * If write spans object boundary, just do multiple writes. (For a 496 * If write spans object boundary, just do multiple writes. (For a
497 * correct atomic write, we should e.g. take write locks on all 497 * correct atomic write, we should e.g. take write locks on all
498 * objects, rollback on failure, etc.) 498 * objects, rollback on failure, etc.)
499 */ 499 */
500static ssize_t ceph_sync_write(struct file *file, const char __user *data, 500static ssize_t
501 size_t left, loff_t pos, loff_t *ppos) 501ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
502 unsigned long nr_segs, size_t count)
502{ 503{
504 struct file *file = iocb->ki_filp;
503 struct inode *inode = file_inode(file); 505 struct inode *inode = file_inode(file);
504 struct ceph_inode_info *ci = ceph_inode(inode); 506 struct ceph_inode_info *ci = ceph_inode(inode);
505 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 507 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
506 struct ceph_snap_context *snapc; 508 struct ceph_snap_context *snapc;
507 struct ceph_vino vino; 509 struct ceph_vino vino;
508 struct ceph_osd_request *req; 510 struct ceph_osd_request *req;
509 int num_ops = 1;
510 struct page **pages; 511 struct page **pages;
511 int num_pages; 512 int num_pages;
512 u64 len;
513 int written = 0; 513 int written = 0;
514 int flags; 514 int flags;
515 int check_caps = 0; 515 int check_caps = 0;
516 int page_align, io_align; 516 int page_align;
517 unsigned long buf_align;
518 int ret; 517 int ret;
519 struct timespec mtime = CURRENT_TIME; 518 struct timespec mtime = CURRENT_TIME;
520 bool own_pages = false; 519 loff_t pos = iocb->ki_pos;
520 struct iov_iter i;
521 521
522 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 522 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
523 return -EROFS; 523 return -EROFS;
524 524
525 dout("sync_write on file %p %lld~%u %s\n", file, pos, 525 dout("sync_direct_write on file %p %lld~%u\n", file, pos,
526 (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 526 (unsigned)count);
527 527
528 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); 528 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
529 if (ret < 0) 529 if (ret < 0)
530 return ret; 530 return ret;
531 531
532 ret = invalidate_inode_pages2_range(inode->i_mapping, 532 ret = invalidate_inode_pages2_range(inode->i_mapping,
533 pos >> PAGE_CACHE_SHIFT, 533 pos >> PAGE_CACHE_SHIFT,
534 (pos + left) >> PAGE_CACHE_SHIFT); 534 (pos + count) >> PAGE_CACHE_SHIFT);
535 if (ret < 0) 535 if (ret < 0)
536 dout("invalidate_inode_pages2_range returned %d\n", ret); 536 dout("invalidate_inode_pages2_range returned %d\n", ret);
537 537
538 flags = CEPH_OSD_FLAG_ORDERSNAP | 538 flags = CEPH_OSD_FLAG_ORDERSNAP |
539 CEPH_OSD_FLAG_ONDISK | 539 CEPH_OSD_FLAG_ONDISK |
540 CEPH_OSD_FLAG_WRITE; 540 CEPH_OSD_FLAG_WRITE;
541 if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
542 flags |= CEPH_OSD_FLAG_ACK;
543 else
544 num_ops++; /* Also include a 'startsync' command. */
545 541
546 /* 542 iov_iter_init(&i, iov, nr_segs, count, 0);
547 * we may need to do multiple writes here if we span an object 543
548 * boundary. this isn't atomic, unfortunately. :( 544 while (iov_iter_count(&i) > 0) {
549 */ 545 void __user *data = i.iov->iov_base + i.iov_offset;
550more: 546 u64 len = i.iov->iov_len - i.iov_offset;
551 io_align = pos & ~PAGE_MASK; 547
552 buf_align = (unsigned long)data & ~PAGE_MASK; 548 page_align = (unsigned long)data & ~PAGE_MASK;
553 len = left; 549
554 550 snapc = ci->i_snap_realm->cached_context;
555 snapc = ci->i_snap_realm->cached_context; 551 vino = ceph_vino(inode);
556 vino = ceph_vino(inode); 552 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
557 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 553 vino, pos, &len,
558 vino, pos, &len, num_ops, 554 2,/*include a 'startsync' command*/
559 CEPH_OSD_OP_WRITE, flags, snapc, 555 CEPH_OSD_OP_WRITE, flags, snapc,
560 ci->i_truncate_seq, ci->i_truncate_size, 556 ci->i_truncate_seq,
561 false); 557 ci->i_truncate_size,
562 if (IS_ERR(req)) 558 false);
563 return PTR_ERR(req); 559 if (IS_ERR(req)) {
560 ret = PTR_ERR(req);
561 goto out;
562 }
564 563
565 /* write from beginning of first page, regardless of io alignment */ 564 num_pages = calc_pages_for(page_align, len);
566 page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
567 num_pages = calc_pages_for(page_align, len);
568 if (file->f_flags & O_DIRECT) {
569 pages = ceph_get_direct_page_vector(data, num_pages, false); 565 pages = ceph_get_direct_page_vector(data, num_pages, false);
570 if (IS_ERR(pages)) { 566 if (IS_ERR(pages)) {
571 ret = PTR_ERR(pages); 567 ret = PTR_ERR(pages);
@@ -577,60 +573,175 @@ more:
577 * may block. 573 * may block.
578 */ 574 */
579 truncate_inode_pages_range(inode->i_mapping, pos, 575 truncate_inode_pages_range(inode->i_mapping, pos,
580 (pos+len) | (PAGE_CACHE_SIZE-1)); 576 (pos+len) | (PAGE_CACHE_SIZE-1));
581 } else { 577 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
578 false, false);
579
580 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
581 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
582
583 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
584 if (!ret)
585 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
586
587 ceph_put_page_vector(pages, num_pages, false);
588
589out:
590 ceph_osdc_put_request(req);
591 if (ret == 0) {
592 pos += len;
593 written += len;
594 iov_iter_advance(&i, (size_t)len);
595
596 if (pos > i_size_read(inode)) {
597 check_caps = ceph_inode_set_size(inode, pos);
598 if (check_caps)
599 ceph_check_caps(ceph_inode(inode),
600 CHECK_CAPS_AUTHONLY,
601 NULL);
602 }
603 } else
604 break;
605 }
606
607 if (ret != -EOLDSNAPC && written > 0) {
608 iocb->ki_pos = pos;
609 ret = written;
610 }
611 return ret;
612}
613
614
615/*
616 * Synchronous write, straight from __user pointer or user pages.
617 *
618 * If write spans object boundary, just do multiple writes. (For a
619 * correct atomic write, we should e.g. take write locks on all
620 * objects, rollback on failure, etc.)
621 */
622static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
623 unsigned long nr_segs, size_t count)
624{
625 struct file *file = iocb->ki_filp;
626 struct inode *inode = file_inode(file);
627 struct ceph_inode_info *ci = ceph_inode(inode);
628 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
629 struct ceph_snap_context *snapc;
630 struct ceph_vino vino;
631 struct ceph_osd_request *req;
632 struct page **pages;
633 u64 len;
634 int num_pages;
635 int written = 0;
636 int flags;
637 int check_caps = 0;
638 int ret;
639 struct timespec mtime = CURRENT_TIME;
640 loff_t pos = iocb->ki_pos;
641 struct iov_iter i;
642
643 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
644 return -EROFS;
645
646 dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
647
648 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
649 if (ret < 0)
650 return ret;
651
652 ret = invalidate_inode_pages2_range(inode->i_mapping,
653 pos >> PAGE_CACHE_SHIFT,
654 (pos + count) >> PAGE_CACHE_SHIFT);
655 if (ret < 0)
656 dout("invalidate_inode_pages2_range returned %d\n", ret);
657
658 flags = CEPH_OSD_FLAG_ORDERSNAP |
659 CEPH_OSD_FLAG_ONDISK |
660 CEPH_OSD_FLAG_WRITE |
661 CEPH_OSD_FLAG_ACK;
662
663 iov_iter_init(&i, iov, nr_segs, count, 0);
664
665 while ((len = iov_iter_count(&i)) > 0) {
666 size_t left;
667 int n;
668
669 snapc = ci->i_snap_realm->cached_context;
670 vino = ceph_vino(inode);
671 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
672 vino, pos, &len, 1,
673 CEPH_OSD_OP_WRITE, flags, snapc,
674 ci->i_truncate_seq,
675 ci->i_truncate_size,
676 false);
677 if (IS_ERR(req)) {
678 ret = PTR_ERR(req);
679 goto out;
680 }
681
682 /*
683 * write from beginning of first page,
684 * regardless of io alignment
685 */
686 num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
687
582 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); 688 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
583 if (IS_ERR(pages)) { 689 if (IS_ERR(pages)) {
584 ret = PTR_ERR(pages); 690 ret = PTR_ERR(pages);
585 goto out; 691 goto out;
586 } 692 }
587 ret = ceph_copy_user_to_page_vector(pages, data, pos, len); 693
694 left = len;
695 for (n = 0; n < num_pages; n++) {
696 size_t plen = min(left, PAGE_SIZE);
697 ret = iov_iter_copy_from_user(pages[n], &i, 0, plen);
698 if (ret != plen) {
699 ret = -EFAULT;
700 break;
701 }
702 left -= ret;
703 iov_iter_advance(&i, ret);
704 }
705
588 if (ret < 0) { 706 if (ret < 0) {
589 ceph_release_page_vector(pages, num_pages); 707 ceph_release_page_vector(pages, num_pages);
590 goto out; 708 goto out;
591 } 709 }
592 710
593 if ((file->f_flags & O_SYNC) == 0) { 711 /* get a second commit callback */
594 /* get a second commit callback */ 712 req->r_unsafe_callback = ceph_sync_write_unsafe;
595 req->r_unsafe_callback = ceph_sync_write_unsafe; 713 req->r_inode = inode;
596 req->r_inode = inode;
597 own_pages = true;
598 }
599 }
600 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
601 false, own_pages);
602 714
603 /* BUG_ON(vino.snap != CEPH_NOSNAP); */ 715 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
604 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); 716 false, true);
605 717
606 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 718 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
607 if (!ret) 719 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
608 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
609 720
610 if (file->f_flags & O_DIRECT) 721 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
611 ceph_put_page_vector(pages, num_pages, false); 722 if (!ret)
612 else if (file->f_flags & O_SYNC) 723 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
613 ceph_release_page_vector(pages, num_pages);
614 724
615out: 725out:
616 ceph_osdc_put_request(req); 726 ceph_osdc_put_request(req);
617 if (ret == 0) { 727 if (ret == 0) {
618 pos += len; 728 pos += len;
619 written += len; 729 written += len;
620 left -= len; 730
621 data += len; 731 if (pos > i_size_read(inode)) {
622 if (left) 732 check_caps = ceph_inode_set_size(inode, pos);
623 goto more; 733 if (check_caps)
734 ceph_check_caps(ceph_inode(inode),
735 CHECK_CAPS_AUTHONLY,
736 NULL);
737 }
738 } else
739 break;
740 }
624 741
742 if (ret != -EOLDSNAPC && written > 0) {
625 ret = written; 743 ret = written;
626 *ppos = pos; 744 iocb->ki_pos = pos;
627 if (pos > i_size_read(inode))
628 check_caps = ceph_inode_set_size(inode, pos);
629 if (check_caps)
630 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
631 NULL);
632 } else if (ret != -EOLDSNAPC && written > 0) {
633 ret = written;
634 } 745 }
635 return ret; 746 return ret;
636} 747}
@@ -772,11 +883,13 @@ retry_snap:
772 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); 883 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
773 884
774 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || 885 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
775 (iocb->ki_filp->f_flags & O_DIRECT) || 886 (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
776 (fi->flags & CEPH_F_SYNC)) {
777 mutex_unlock(&inode->i_mutex); 887 mutex_unlock(&inode->i_mutex);
778 written = ceph_sync_write(file, iov->iov_base, count, 888 if (file->f_flags & O_DIRECT)
779 pos, &iocb->ki_pos); 889 written = ceph_sync_direct_write(iocb, iov,
890 nr_segs, count);
891 else
892 written = ceph_sync_write(iocb, iov, nr_segs, count);
780 if (written == -EOLDSNAPC) { 893 if (written == -EOLDSNAPC) {
781 dout("aio_write %p %llx.%llx %llu~%u" 894 dout("aio_write %p %llx.%llx %llu~%u"
782 "got EOLDSNAPC, retrying\n", 895 "got EOLDSNAPC, retrying\n",