aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/file.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-01-28 14:02:23 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2014-01-28 14:02:23 -0500
commitd891ea23d5203e5c47439b2a174f86a00b356a6c (patch)
tree3876cefcced9df5519f437cd8eb275cb979b93f6 /fs/ceph/file.c
parent08d21b5f93eb92a781daea71b6fcb3a340909141 (diff)
parent125d725c923527a85876c031028c7f55c28b74b3 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull ceph updates from Sage Weil: "This is a big batch. From Ilya we have: - rbd support for more than ~250 mapped devices (now uses same scheme that SCSI does for device major/minor numbering) - crush updates for new mapping behaviors (will be needed for coming erasure coding support, among other things) - preliminary support for tiered storage pools There is also a big series fixing a pile cephfs bugs with clustered MDSs from Yan Zheng, ACL support for cephfs from Guangliang Zhao, ceph fscache improvements from Li Wang, improved behavior when we get ENOSPC from Josh Durgin, some readv/writev improvements from Majianpeng, and the usual mix of small cleanups" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (76 commits) ceph: cast PAGE_SIZE to size_t in ceph_sync_write() ceph: fix dout() compile warnings in ceph_filemap_fault() libceph: support CEPH_FEATURE_OSD_CACHEPOOL feature libceph: follow redirect replies from osds libceph: rename ceph_osd_request::r_{oloc,oid} to r_base_{oloc,oid} libceph: follow {read,write}_tier fields on osd request submission libceph: add ceph_pg_pool_by_id() libceph: CEPH_OSD_FLAG_* enum update libceph: replace ceph_calc_ceph_pg() with ceph_oloc_oid_to_pg() libceph: introduce and start using oid abstraction libceph: rename MAX_OBJ_NAME_SIZE to CEPH_MAX_OID_NAME_LEN libceph: move ceph_file_layout helpers to ceph_fs.h libceph: start using oloc abstraction libceph: dout() is missing a newline libceph: add ceph_kv{malloc,free}() and switch to them libceph: support CEPH_FEATURE_EXPORT_PEER ceph: add imported caps when handling cap export message ceph: add open export target session helper ceph: remove exported caps when handling cap import message ceph: handle session flush message ...
Diffstat (limited to 'fs/ceph/file.c')
-rw-r--r--fs/ceph/file.c437
1 files changed, 310 insertions, 127 deletions
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 3de89829e2a1..dfd2ce3419f8 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -408,51 +408,92 @@ more:
408 * 408 *
409 * If the read spans object boundary, just do multiple reads. 409 * If the read spans object boundary, just do multiple reads.
410 */ 410 */
411static ssize_t ceph_sync_read(struct file *file, char __user *data, 411static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
412 unsigned len, loff_t *poff, int *checkeof) 412 int *checkeof)
413{ 413{
414 struct file *file = iocb->ki_filp;
414 struct inode *inode = file_inode(file); 415 struct inode *inode = file_inode(file);
415 struct page **pages; 416 struct page **pages;
416 u64 off = *poff; 417 u64 off = iocb->ki_pos;
417 int num_pages, ret; 418 int num_pages, ret;
419 size_t len = i->count;
418 420
419 dout("sync_read on file %p %llu~%u %s\n", file, off, len, 421 dout("sync_read on file %p %llu~%u %s\n", file, off,
422 (unsigned)len,
420 (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 423 (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
421
422 if (file->f_flags & O_DIRECT) {
423 num_pages = calc_pages_for((unsigned long)data, len);
424 pages = ceph_get_direct_page_vector(data, num_pages, true);
425 } else {
426 num_pages = calc_pages_for(off, len);
427 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
428 }
429 if (IS_ERR(pages))
430 return PTR_ERR(pages);
431
432 /* 424 /*
433 * flush any page cache pages in this range. this 425 * flush any page cache pages in this range. this
434 * will make concurrent normal and sync io slow, 426 * will make concurrent normal and sync io slow,
435 * but it will at least behave sensibly when they are 427 * but it will at least behave sensibly when they are
436 * in sequence. 428 * in sequence.
437 */ 429 */
438 ret = filemap_write_and_wait(inode->i_mapping); 430 ret = filemap_write_and_wait_range(inode->i_mapping, off,
431 off + len);
439 if (ret < 0) 432 if (ret < 0)
440 goto done; 433 return ret;
441 434
442 ret = striped_read(inode, off, len, pages, num_pages, checkeof, 435 if (file->f_flags & O_DIRECT) {
443 file->f_flags & O_DIRECT, 436 while (iov_iter_count(i)) {
444 (unsigned long)data & ~PAGE_MASK); 437 void __user *data = i->iov[0].iov_base + i->iov_offset;
438 size_t len = i->iov[0].iov_len - i->iov_offset;
439
440 num_pages = calc_pages_for((unsigned long)data, len);
441 pages = ceph_get_direct_page_vector(data,
442 num_pages, true);
443 if (IS_ERR(pages))
444 return PTR_ERR(pages);
445
446 ret = striped_read(inode, off, len,
447 pages, num_pages, checkeof,
448 1, (unsigned long)data & ~PAGE_MASK);
449 ceph_put_page_vector(pages, num_pages, true);
450
451 if (ret <= 0)
452 break;
453 off += ret;
454 iov_iter_advance(i, ret);
455 if (ret < len)
456 break;
457 }
458 } else {
459 num_pages = calc_pages_for(off, len);
460 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
461 if (IS_ERR(pages))
462 return PTR_ERR(pages);
463 ret = striped_read(inode, off, len, pages,
464 num_pages, checkeof, 0, 0);
465 if (ret > 0) {
466 int l, k = 0;
467 size_t left = len = ret;
468
469 while (left) {
470 void __user *data = i->iov[0].iov_base
471 + i->iov_offset;
472 l = min(i->iov[0].iov_len - i->iov_offset,
473 left);
474
475 ret = ceph_copy_page_vector_to_user(&pages[k],
476 data, off,
477 l);
478 if (ret > 0) {
479 iov_iter_advance(i, ret);
480 left -= ret;
481 off += ret;
482 k = calc_pages_for(iocb->ki_pos,
483 len - left + 1) - 1;
484 BUG_ON(k >= num_pages && left);
485 } else
486 break;
487 }
488 }
489 ceph_release_page_vector(pages, num_pages);
490 }
445 491
446 if (ret >= 0 && (file->f_flags & O_DIRECT) == 0) 492 if (off > iocb->ki_pos) {
447 ret = ceph_copy_page_vector_to_user(pages, data, off, ret); 493 ret = off - iocb->ki_pos;
448 if (ret >= 0) 494 iocb->ki_pos = off;
449 *poff = off + ret; 495 }
450 496
451done:
452 if (file->f_flags & O_DIRECT)
453 ceph_put_page_vector(pages, num_pages, true);
454 else
455 ceph_release_page_vector(pages, num_pages);
456 dout("sync_read result %d\n", ret); 497 dout("sync_read result %d\n", ret);
457 return ret; 498 return ret;
458} 499}
@@ -489,83 +530,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
489 } 530 }
490} 531}
491 532
533
492/* 534/*
493 * Synchronous write, straight from __user pointer or user pages (if 535 * Synchronous write, straight from __user pointer or user pages.
494 * O_DIRECT).
495 * 536 *
496 * If write spans object boundary, just do multiple writes. (For a 537 * If write spans object boundary, just do multiple writes. (For a
497 * correct atomic write, we should e.g. take write locks on all 538 * correct atomic write, we should e.g. take write locks on all
498 * objects, rollback on failure, etc.) 539 * objects, rollback on failure, etc.)
499 */ 540 */
500static ssize_t ceph_sync_write(struct file *file, const char __user *data, 541static ssize_t
501 size_t left, loff_t pos, loff_t *ppos) 542ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
543 unsigned long nr_segs, size_t count)
502{ 544{
545 struct file *file = iocb->ki_filp;
503 struct inode *inode = file_inode(file); 546 struct inode *inode = file_inode(file);
504 struct ceph_inode_info *ci = ceph_inode(inode); 547 struct ceph_inode_info *ci = ceph_inode(inode);
505 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 548 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
506 struct ceph_snap_context *snapc; 549 struct ceph_snap_context *snapc;
507 struct ceph_vino vino; 550 struct ceph_vino vino;
508 struct ceph_osd_request *req; 551 struct ceph_osd_request *req;
509 int num_ops = 1;
510 struct page **pages; 552 struct page **pages;
511 int num_pages; 553 int num_pages;
512 u64 len;
513 int written = 0; 554 int written = 0;
514 int flags; 555 int flags;
515 int check_caps = 0; 556 int check_caps = 0;
516 int page_align, io_align; 557 int page_align;
517 unsigned long buf_align;
518 int ret; 558 int ret;
519 struct timespec mtime = CURRENT_TIME; 559 struct timespec mtime = CURRENT_TIME;
520 bool own_pages = false; 560 loff_t pos = iocb->ki_pos;
561 struct iov_iter i;
521 562
522 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 563 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
523 return -EROFS; 564 return -EROFS;
524 565
525 dout("sync_write on file %p %lld~%u %s\n", file, pos, 566 dout("sync_direct_write on file %p %lld~%u\n", file, pos,
526 (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 567 (unsigned)count);
527 568
528 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); 569 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
529 if (ret < 0) 570 if (ret < 0)
530 return ret; 571 return ret;
531 572
532 ret = invalidate_inode_pages2_range(inode->i_mapping, 573 ret = invalidate_inode_pages2_range(inode->i_mapping,
533 pos >> PAGE_CACHE_SHIFT, 574 pos >> PAGE_CACHE_SHIFT,
534 (pos + left) >> PAGE_CACHE_SHIFT); 575 (pos + count) >> PAGE_CACHE_SHIFT);
535 if (ret < 0) 576 if (ret < 0)
536 dout("invalidate_inode_pages2_range returned %d\n", ret); 577 dout("invalidate_inode_pages2_range returned %d\n", ret);
537 578
538 flags = CEPH_OSD_FLAG_ORDERSNAP | 579 flags = CEPH_OSD_FLAG_ORDERSNAP |
539 CEPH_OSD_FLAG_ONDISK | 580 CEPH_OSD_FLAG_ONDISK |
540 CEPH_OSD_FLAG_WRITE; 581 CEPH_OSD_FLAG_WRITE;
541 if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
542 flags |= CEPH_OSD_FLAG_ACK;
543 else
544 num_ops++; /* Also include a 'startsync' command. */
545 582
546 /* 583 iov_iter_init(&i, iov, nr_segs, count, 0);
547 * we may need to do multiple writes here if we span an object 584
548 * boundary. this isn't atomic, unfortunately. :( 585 while (iov_iter_count(&i) > 0) {
549 */ 586 void __user *data = i.iov->iov_base + i.iov_offset;
550more: 587 u64 len = i.iov->iov_len - i.iov_offset;
551 io_align = pos & ~PAGE_MASK; 588
552 buf_align = (unsigned long)data & ~PAGE_MASK; 589 page_align = (unsigned long)data & ~PAGE_MASK;
553 len = left; 590
554 591 snapc = ci->i_snap_realm->cached_context;
555 snapc = ci->i_snap_realm->cached_context; 592 vino = ceph_vino(inode);
556 vino = ceph_vino(inode); 593 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
557 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 594 vino, pos, &len,
558 vino, pos, &len, num_ops, 595 2,/*include a 'startsync' command*/
559 CEPH_OSD_OP_WRITE, flags, snapc, 596 CEPH_OSD_OP_WRITE, flags, snapc,
560 ci->i_truncate_seq, ci->i_truncate_size, 597 ci->i_truncate_seq,
561 false); 598 ci->i_truncate_size,
562 if (IS_ERR(req)) 599 false);
563 return PTR_ERR(req); 600 if (IS_ERR(req)) {
601 ret = PTR_ERR(req);
602 goto out;
603 }
564 604
565 /* write from beginning of first page, regardless of io alignment */ 605 num_pages = calc_pages_for(page_align, len);
566 page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
567 num_pages = calc_pages_for(page_align, len);
568 if (file->f_flags & O_DIRECT) {
569 pages = ceph_get_direct_page_vector(data, num_pages, false); 606 pages = ceph_get_direct_page_vector(data, num_pages, false);
570 if (IS_ERR(pages)) { 607 if (IS_ERR(pages)) {
571 ret = PTR_ERR(pages); 608 ret = PTR_ERR(pages);
@@ -577,60 +614,175 @@ more:
577 * may block. 614 * may block.
578 */ 615 */
579 truncate_inode_pages_range(inode->i_mapping, pos, 616 truncate_inode_pages_range(inode->i_mapping, pos,
580 (pos+len) | (PAGE_CACHE_SIZE-1)); 617 (pos+len) | (PAGE_CACHE_SIZE-1));
581 } else { 618 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
619 false, false);
620
621 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
622 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
623
624 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
625 if (!ret)
626 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
627
628 ceph_put_page_vector(pages, num_pages, false);
629
630out:
631 ceph_osdc_put_request(req);
632 if (ret == 0) {
633 pos += len;
634 written += len;
635 iov_iter_advance(&i, (size_t)len);
636
637 if (pos > i_size_read(inode)) {
638 check_caps = ceph_inode_set_size(inode, pos);
639 if (check_caps)
640 ceph_check_caps(ceph_inode(inode),
641 CHECK_CAPS_AUTHONLY,
642 NULL);
643 }
644 } else
645 break;
646 }
647
648 if (ret != -EOLDSNAPC && written > 0) {
649 iocb->ki_pos = pos;
650 ret = written;
651 }
652 return ret;
653}
654
655
656/*
657 * Synchronous write, straight from __user pointer or user pages.
658 *
659 * If write spans object boundary, just do multiple writes. (For a
660 * correct atomic write, we should e.g. take write locks on all
661 * objects, rollback on failure, etc.)
662 */
663static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
664 unsigned long nr_segs, size_t count)
665{
666 struct file *file = iocb->ki_filp;
667 struct inode *inode = file_inode(file);
668 struct ceph_inode_info *ci = ceph_inode(inode);
669 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
670 struct ceph_snap_context *snapc;
671 struct ceph_vino vino;
672 struct ceph_osd_request *req;
673 struct page **pages;
674 u64 len;
675 int num_pages;
676 int written = 0;
677 int flags;
678 int check_caps = 0;
679 int ret;
680 struct timespec mtime = CURRENT_TIME;
681 loff_t pos = iocb->ki_pos;
682 struct iov_iter i;
683
684 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
685 return -EROFS;
686
687 dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
688
689 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
690 if (ret < 0)
691 return ret;
692
693 ret = invalidate_inode_pages2_range(inode->i_mapping,
694 pos >> PAGE_CACHE_SHIFT,
695 (pos + count) >> PAGE_CACHE_SHIFT);
696 if (ret < 0)
697 dout("invalidate_inode_pages2_range returned %d\n", ret);
698
699 flags = CEPH_OSD_FLAG_ORDERSNAP |
700 CEPH_OSD_FLAG_ONDISK |
701 CEPH_OSD_FLAG_WRITE |
702 CEPH_OSD_FLAG_ACK;
703
704 iov_iter_init(&i, iov, nr_segs, count, 0);
705
706 while ((len = iov_iter_count(&i)) > 0) {
707 size_t left;
708 int n;
709
710 snapc = ci->i_snap_realm->cached_context;
711 vino = ceph_vino(inode);
712 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
713 vino, pos, &len, 1,
714 CEPH_OSD_OP_WRITE, flags, snapc,
715 ci->i_truncate_seq,
716 ci->i_truncate_size,
717 false);
718 if (IS_ERR(req)) {
719 ret = PTR_ERR(req);
720 goto out;
721 }
722
723 /*
724 * write from beginning of first page,
725 * regardless of io alignment
726 */
727 num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
728
582 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); 729 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
583 if (IS_ERR(pages)) { 730 if (IS_ERR(pages)) {
584 ret = PTR_ERR(pages); 731 ret = PTR_ERR(pages);
585 goto out; 732 goto out;
586 } 733 }
587 ret = ceph_copy_user_to_page_vector(pages, data, pos, len); 734
735 left = len;
736 for (n = 0; n < num_pages; n++) {
737 size_t plen = min_t(size_t, left, PAGE_SIZE);
738 ret = iov_iter_copy_from_user(pages[n], &i, 0, plen);
739 if (ret != plen) {
740 ret = -EFAULT;
741 break;
742 }
743 left -= ret;
744 iov_iter_advance(&i, ret);
745 }
746
588 if (ret < 0) { 747 if (ret < 0) {
589 ceph_release_page_vector(pages, num_pages); 748 ceph_release_page_vector(pages, num_pages);
590 goto out; 749 goto out;
591 } 750 }
592 751
593 if ((file->f_flags & O_SYNC) == 0) { 752 /* get a second commit callback */
594 /* get a second commit callback */ 753 req->r_unsafe_callback = ceph_sync_write_unsafe;
595 req->r_unsafe_callback = ceph_sync_write_unsafe; 754 req->r_inode = inode;
596 req->r_inode = inode;
597 own_pages = true;
598 }
599 }
600 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
601 false, own_pages);
602 755
603 /* BUG_ON(vino.snap != CEPH_NOSNAP); */ 756 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
604 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); 757 false, true);
605 758
606 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 759 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
607 if (!ret) 760 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
608 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
609 761
610 if (file->f_flags & O_DIRECT) 762 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
611 ceph_put_page_vector(pages, num_pages, false); 763 if (!ret)
612 else if (file->f_flags & O_SYNC) 764 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
613 ceph_release_page_vector(pages, num_pages);
614 765
615out: 766out:
616 ceph_osdc_put_request(req); 767 ceph_osdc_put_request(req);
617 if (ret == 0) { 768 if (ret == 0) {
618 pos += len; 769 pos += len;
619 written += len; 770 written += len;
620 left -= len; 771
621 data += len; 772 if (pos > i_size_read(inode)) {
622 if (left) 773 check_caps = ceph_inode_set_size(inode, pos);
623 goto more; 774 if (check_caps)
775 ceph_check_caps(ceph_inode(inode),
776 CHECK_CAPS_AUTHONLY,
777 NULL);
778 }
779 } else
780 break;
781 }
624 782
783 if (ret != -EOLDSNAPC && written > 0) {
625 ret = written; 784 ret = written;
626 *ppos = pos; 785 iocb->ki_pos = pos;
627 if (pos > i_size_read(inode))
628 check_caps = ceph_inode_set_size(inode, pos);
629 if (check_caps)
630 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
631 NULL);
632 } else if (ret != -EOLDSNAPC && written > 0) {
633 ret = written;
634 } 786 }
635 return ret; 787 return ret;
636} 788}
@@ -647,55 +799,84 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
647{ 799{
648 struct file *filp = iocb->ki_filp; 800 struct file *filp = iocb->ki_filp;
649 struct ceph_file_info *fi = filp->private_data; 801 struct ceph_file_info *fi = filp->private_data;
650 loff_t *ppos = &iocb->ki_pos; 802 size_t len = iocb->ki_nbytes;
651 size_t len = iov->iov_len;
652 struct inode *inode = file_inode(filp); 803 struct inode *inode = file_inode(filp);
653 struct ceph_inode_info *ci = ceph_inode(inode); 804 struct ceph_inode_info *ci = ceph_inode(inode);
654 void __user *base = iov->iov_base;
655 ssize_t ret; 805 ssize_t ret;
656 int want, got = 0; 806 int want, got = 0;
657 int checkeof = 0, read = 0; 807 int checkeof = 0, read = 0;
658 808
659 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
660 inode, ceph_vinop(inode), pos, (unsigned)len, inode);
661again: 809again:
810 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
811 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
812
662 if (fi->fmode & CEPH_FILE_MODE_LAZY) 813 if (fi->fmode & CEPH_FILE_MODE_LAZY)
663 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 814 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
664 else 815 else
665 want = CEPH_CAP_FILE_CACHE; 816 want = CEPH_CAP_FILE_CACHE;
666 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); 817 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
667 if (ret < 0) 818 if (ret < 0)
668 goto out; 819 return ret;
669 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
670 inode, ceph_vinop(inode), pos, (unsigned)len,
671 ceph_cap_string(got));
672 820
673 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || 821 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
674 (iocb->ki_filp->f_flags & O_DIRECT) || 822 (iocb->ki_filp->f_flags & O_DIRECT) ||
675 (fi->flags & CEPH_F_SYNC)) 823 (fi->flags & CEPH_F_SYNC)) {
824 struct iov_iter i;
825
826 dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n",
827 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
828 ceph_cap_string(got));
829
830 if (!read) {
831 ret = generic_segment_checks(iov, &nr_segs,
832 &len, VERIFY_WRITE);
833 if (ret)
834 goto out;
835 }
836
837 iov_iter_init(&i, iov, nr_segs, len, read);
838
676 /* hmm, this isn't really async... */ 839 /* hmm, this isn't really async... */
677 ret = ceph_sync_read(filp, base, len, ppos, &checkeof); 840 ret = ceph_sync_read(iocb, &i, &checkeof);
678 else 841 } else {
679 ret = generic_file_aio_read(iocb, iov, nr_segs, pos); 842 /*
843 * We can't modify the content of iov,
844 * so we only read from beginning.
845 */
846 if (read) {
847 iocb->ki_pos = pos;
848 len = iocb->ki_nbytes;
849 read = 0;
850 }
851 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
852 inode, ceph_vinop(inode), pos, (unsigned)len,
853 ceph_cap_string(got));
680 854
855 ret = generic_file_aio_read(iocb, iov, nr_segs, pos);
856 }
681out: 857out:
682 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", 858 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
683 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); 859 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
684 ceph_put_cap_refs(ci, got); 860 ceph_put_cap_refs(ci, got);
685 861
686 if (checkeof && ret >= 0) { 862 if (checkeof && ret >= 0) {
687 int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); 863 int statret = ceph_do_getattr(inode,
864 CEPH_STAT_CAP_SIZE);
688 865
689 /* hit EOF or hole? */ 866 /* hit EOF or hole? */
690 if (statret == 0 && *ppos < inode->i_size) { 867 if (statret == 0 && iocb->ki_pos < inode->i_size &&
691 dout("aio_read sync_read hit hole, ppos %lld < size %lld, reading more\n", *ppos, inode->i_size); 868 ret < len) {
869 dout("sync_read hit hole, ppos %lld < size %lld"
870 ", reading more\n", iocb->ki_pos,
871 inode->i_size);
872
692 read += ret; 873 read += ret;
693 base += ret;
694 len -= ret; 874 len -= ret;
695 checkeof = 0; 875 checkeof = 0;
696 goto again; 876 goto again;
697 } 877 }
698 } 878 }
879
699 if (ret >= 0) 880 if (ret >= 0)
700 ret += read; 881 ret += read;
701 882
@@ -772,11 +953,13 @@ retry_snap:
772 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); 953 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
773 954
774 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || 955 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
775 (iocb->ki_filp->f_flags & O_DIRECT) || 956 (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
776 (fi->flags & CEPH_F_SYNC)) {
777 mutex_unlock(&inode->i_mutex); 957 mutex_unlock(&inode->i_mutex);
778 written = ceph_sync_write(file, iov->iov_base, count, 958 if (file->f_flags & O_DIRECT)
779 pos, &iocb->ki_pos); 959 written = ceph_sync_direct_write(iocb, iov,
960 nr_segs, count);
961 else
962 written = ceph_sync_write(iocb, iov, nr_segs, count);
780 if (written == -EOLDSNAPC) { 963 if (written == -EOLDSNAPC) {
781 dout("aio_write %p %llx.%llx %llu~%u" 964 dout("aio_write %p %llx.%llx %llu~%u"
782 "got EOLDSNAPC, retrying\n", 965 "got EOLDSNAPC, retrying\n",
@@ -1018,7 +1201,7 @@ static long ceph_fallocate(struct file *file, int mode,
1018 loff_t offset, loff_t length) 1201 loff_t offset, loff_t length)
1019{ 1202{
1020 struct ceph_file_info *fi = file->private_data; 1203 struct ceph_file_info *fi = file->private_data;
1021 struct inode *inode = file->f_dentry->d_inode; 1204 struct inode *inode = file_inode(file);
1022 struct ceph_inode_info *ci = ceph_inode(inode); 1205 struct ceph_inode_info *ci = ceph_inode(inode);
1023 struct ceph_osd_client *osdc = 1206 struct ceph_osd_client *osdc =
1024 &ceph_inode_to_client(inode)->client->osdc; 1207 &ceph_inode_to_client(inode)->client->osdc;