aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-03-26 18:53:16 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2016-03-26 18:53:16 -0400
commitd5a38f6e4668b3110a66cd96ce2096184bf66def (patch)
treebd2209de25a343e7b01d143abce7bf774122227f
parent698f415cf5756e320623bdb015a600945743377c (diff)
parent5ee61e95b6b33c82f6fa1382585faed66aa01245 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "There is quite a bit here, including some overdue refactoring and cleanup on the mon_client and osd_client code from Ilya, scattered writeback support for CephFS and a pile of bug fixes from Zheng, and a few random cleanups and fixes from others" [ I already decided not to pull this because of it having been rebased recently, but ended up changing my mind after all. Next time I'll really hold people to it. Oh well. - Linus ] * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (34 commits) libceph: use KMEM_CACHE macro ceph: use kmem_cache_zalloc rbd: use KMEM_CACHE macro ceph: use lookup request to revalidate dentry ceph: kill ceph_get_dentry_parent_inode() ceph: fix security xattr deadlock ceph: don't request vxattrs from MDS ceph: fix mounting same fs multiple times ceph: remove unnecessary NULL check ceph: avoid updating directory inode's i_size accidentally ceph: fix race during filling readdir cache libceph: use sizeof_footer() more ceph: kill ceph_empty_snapc ceph: fix a wrong comparison ceph: replace CURRENT_TIME by current_fs_time() ceph: scattered page writeback libceph: add helper that duplicates last extent operation libceph: enable large, variable-sized OSD requests libceph: osdc->req_mempool should be backed by a slab pool libceph: make r_request msg_size calculation clearer ...
-rw-r--r--drivers/block/rbd.c14
-rw-r--r--fs/ceph/addr.c324
-rw-r--r--fs/ceph/caps.c11
-rw-r--r--fs/ceph/dir.c69
-rw-r--r--fs/ceph/export.c13
-rw-r--r--fs/ceph/file.c15
-rw-r--r--fs/ceph/inode.c34
-rw-r--r--fs/ceph/mds_client.c7
-rw-r--r--fs/ceph/snap.c16
-rw-r--r--fs/ceph/super.c47
-rw-r--r--fs/ceph/super.h23
-rw-r--r--fs/ceph/xattr.c78
-rw-r--r--include/linux/ceph/ceph_features.h2
-rw-r--r--include/linux/ceph/ceph_fs.h7
-rw-r--r--include/linux/ceph/libceph.h8
-rw-r--r--include/linux/ceph/mon_client.h31
-rw-r--r--include/linux/ceph/osd_client.h15
-rw-r--r--net/ceph/ceph_common.c4
-rw-r--r--net/ceph/debugfs.c17
-rw-r--r--net/ceph/messenger.c29
-rw-r--r--net/ceph/mon_client.c457
-rw-r--r--net/ceph/osd_client.c109
22 files changed, 811 insertions, 519 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 4a876785b68c..9c6234428607 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1847,14 +1847,12 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1847 if (osd_req->r_result < 0) 1847 if (osd_req->r_result < 0)
1848 obj_request->result = osd_req->r_result; 1848 obj_request->result = osd_req->r_result;
1849 1849
1850 rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
1851
1852 /* 1850 /*
1853 * We support a 64-bit length, but ultimately it has to be 1851 * We support a 64-bit length, but ultimately it has to be
1854 * passed to the block layer, which just supports a 32-bit 1852 * passed to the block layer, which just supports a 32-bit
1855 * length field. 1853 * length field.
1856 */ 1854 */
1857 obj_request->xferred = osd_req->r_reply_op_len[0]; 1855 obj_request->xferred = osd_req->r_ops[0].outdata_len;
1858 rbd_assert(obj_request->xferred < (u64)UINT_MAX); 1856 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1859 1857
1860 opcode = osd_req->r_ops[0].op; 1858 opcode = osd_req->r_ops[0].op;
@@ -5643,18 +5641,12 @@ static void rbd_sysfs_cleanup(void)
5643static int rbd_slab_init(void) 5641static int rbd_slab_init(void)
5644{ 5642{
5645 rbd_assert(!rbd_img_request_cache); 5643 rbd_assert(!rbd_img_request_cache);
5646 rbd_img_request_cache = kmem_cache_create("rbd_img_request", 5644 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
5647 sizeof (struct rbd_img_request),
5648 __alignof__(struct rbd_img_request),
5649 0, NULL);
5650 if (!rbd_img_request_cache) 5645 if (!rbd_img_request_cache)
5651 return -ENOMEM; 5646 return -ENOMEM;
5652 5647
5653 rbd_assert(!rbd_obj_request_cache); 5648 rbd_assert(!rbd_obj_request_cache);
5654 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request", 5649 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
5655 sizeof (struct rbd_obj_request),
5656 __alignof__(struct rbd_obj_request),
5657 0, NULL);
5658 if (!rbd_obj_request_cache) 5650 if (!rbd_obj_request_cache)
5659 goto out_err; 5651 goto out_err;
5660 5652
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 19adeb0ef82a..fc5cae2a0db2 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -175,8 +175,8 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
175 175
176static int ceph_releasepage(struct page *page, gfp_t g) 176static int ceph_releasepage(struct page *page, gfp_t g)
177{ 177{
178 struct inode *inode = page->mapping ? page->mapping->host : NULL; 178 dout("%p releasepage %p idx %lu\n", page->mapping->host,
179 dout("%p releasepage %p idx %lu\n", inode, page, page->index); 179 page, page->index);
180 WARN_ON(PageDirty(page)); 180 WARN_ON(PageDirty(page));
181 181
182 /* Can we release the page from the cache? */ 182 /* Can we release the page from the cache? */
@@ -276,7 +276,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
276 for (i = 0; i < num_pages; i++) { 276 for (i = 0; i < num_pages; i++) {
277 struct page *page = osd_data->pages[i]; 277 struct page *page = osd_data->pages[i];
278 278
279 if (rc < 0 && rc != ENOENT) 279 if (rc < 0 && rc != -ENOENT)
280 goto unlock; 280 goto unlock;
281 if (bytes < (int)PAGE_CACHE_SIZE) { 281 if (bytes < (int)PAGE_CACHE_SIZE) {
282 /* zero (remainder of) page */ 282 /* zero (remainder of) page */
@@ -606,71 +606,71 @@ static void writepages_finish(struct ceph_osd_request *req,
606 struct inode *inode = req->r_inode; 606 struct inode *inode = req->r_inode;
607 struct ceph_inode_info *ci = ceph_inode(inode); 607 struct ceph_inode_info *ci = ceph_inode(inode);
608 struct ceph_osd_data *osd_data; 608 struct ceph_osd_data *osd_data;
609 unsigned wrote;
610 struct page *page; 609 struct page *page;
611 int num_pages; 610 int num_pages, total_pages = 0;
612 int i; 611 int i, j;
612 int rc = req->r_result;
613 struct ceph_snap_context *snapc = req->r_snapc; 613 struct ceph_snap_context *snapc = req->r_snapc;
614 struct address_space *mapping = inode->i_mapping; 614 struct address_space *mapping = inode->i_mapping;
615 int rc = req->r_result;
616 u64 bytes = req->r_ops[0].extent.length;
617 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 615 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
618 long writeback_stat; 616 bool remove_page;
619 unsigned issued = ceph_caps_issued(ci);
620 617
621 osd_data = osd_req_op_extent_osd_data(req, 0); 618
622 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 619 dout("writepages_finish %p rc %d\n", inode, rc);
623 num_pages = calc_pages_for((u64)osd_data->alignment, 620 if (rc < 0)
624 (u64)osd_data->length);
625 if (rc >= 0) {
626 /*
627 * Assume we wrote the pages we originally sent. The
628 * osd might reply with fewer pages if our writeback
629 * raced with a truncation and was adjusted at the osd,
630 * so don't believe the reply.
631 */
632 wrote = num_pages;
633 } else {
634 wrote = 0;
635 mapping_set_error(mapping, rc); 621 mapping_set_error(mapping, rc);
636 }
637 dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n",
638 inode, rc, bytes, wrote);
639 622
640 /* clean all pages */ 623 /*
641 for (i = 0; i < num_pages; i++) { 624 * We lost the cache cap, need to truncate the page before
642 page = osd_data->pages[i]; 625 * it is unlocked, otherwise we'd truncate it later in the
643 BUG_ON(!page); 626 * page truncation thread, possibly losing some data that
644 WARN_ON(!PageUptodate(page)); 627 * raced its way in
628 */
629 remove_page = !(ceph_caps_issued(ci) &
630 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
645 631
646 writeback_stat = 632 /* clean all pages */
647 atomic_long_dec_return(&fsc->writeback_count); 633 for (i = 0; i < req->r_num_ops; i++) {
648 if (writeback_stat < 634 if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
649 CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb)) 635 break;
650 clear_bdi_congested(&fsc->backing_dev_info,
651 BLK_RW_ASYNC);
652 636
653 ceph_put_snap_context(page_snap_context(page)); 637 osd_data = osd_req_op_extent_osd_data(req, i);
654 page->private = 0; 638 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
655 ClearPagePrivate(page); 639 num_pages = calc_pages_for((u64)osd_data->alignment,
656 dout("unlocking %d %p\n", i, page); 640 (u64)osd_data->length);
657 end_page_writeback(page); 641 total_pages += num_pages;
642 for (j = 0; j < num_pages; j++) {
643 page = osd_data->pages[j];
644 BUG_ON(!page);
645 WARN_ON(!PageUptodate(page));
646
647 if (atomic_long_dec_return(&fsc->writeback_count) <
648 CONGESTION_OFF_THRESH(
649 fsc->mount_options->congestion_kb))
650 clear_bdi_congested(&fsc->backing_dev_info,
651 BLK_RW_ASYNC);
652
653 ceph_put_snap_context(page_snap_context(page));
654 page->private = 0;
655 ClearPagePrivate(page);
656 dout("unlocking %p\n", page);
657 end_page_writeback(page);
658
659 if (remove_page)
660 generic_error_remove_page(inode->i_mapping,
661 page);
658 662
659 /* 663 unlock_page(page);
660 * We lost the cache cap, need to truncate the page before 664 }
661 * it is unlocked, otherwise we'd truncate it later in the 665 dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
662 * page truncation thread, possibly losing some data that 666 inode, osd_data->length, rc >= 0 ? num_pages : 0);
663 * raced its way in
664 */
665 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
666 generic_error_remove_page(inode->i_mapping, page);
667 667
668 unlock_page(page); 668 ceph_release_pages(osd_data->pages, num_pages);
669 } 669 }
670 dout("%p wrote+cleaned %d pages\n", inode, wrote);
671 ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc);
672 670
673 ceph_release_pages(osd_data->pages, num_pages); 671 ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
672
673 osd_data = osd_req_op_extent_osd_data(req, 0);
674 if (osd_data->pages_from_pool) 674 if (osd_data->pages_from_pool)
675 mempool_free(osd_data->pages, 675 mempool_free(osd_data->pages,
676 ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); 676 ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
@@ -778,17 +778,15 @@ retry:
778 while (!done && index <= end) { 778 while (!done && index <= end) {
779 unsigned i; 779 unsigned i;
780 int first; 780 int first;
781 pgoff_t next; 781 pgoff_t strip_unit_end = 0;
782 int pvec_pages, locked_pages; 782 int num_ops = 0, op_idx;
783 struct page **pages = NULL; 783 int pvec_pages, locked_pages = 0;
784 struct page **pages = NULL, **data_pages;
784 mempool_t *pool = NULL; /* Becomes non-null if mempool used */ 785 mempool_t *pool = NULL; /* Becomes non-null if mempool used */
785 struct page *page; 786 struct page *page;
786 int want; 787 int want;
787 u64 offset, len; 788 u64 offset = 0, len = 0;
788 long writeback_stat;
789 789
790 next = 0;
791 locked_pages = 0;
792 max_pages = max_pages_ever; 790 max_pages = max_pages_ever;
793 791
794get_more_pages: 792get_more_pages:
@@ -824,8 +822,8 @@ get_more_pages:
824 unlock_page(page); 822 unlock_page(page);
825 break; 823 break;
826 } 824 }
827 if (next && (page->index != next)) { 825 if (strip_unit_end && (page->index > strip_unit_end)) {
828 dout("not consecutive %p\n", page); 826 dout("end of strip unit %p\n", page);
829 unlock_page(page); 827 unlock_page(page);
830 break; 828 break;
831 } 829 }
@@ -867,36 +865,31 @@ get_more_pages:
867 /* 865 /*
868 * We have something to write. If this is 866 * We have something to write. If this is
869 * the first locked page this time through, 867 * the first locked page this time through,
870 * allocate an osd request and a page array 868 * calculate max possinle write size and
871 * that it will use. 869 * allocate a page array
872 */ 870 */
873 if (locked_pages == 0) { 871 if (locked_pages == 0) {
874 BUG_ON(pages); 872 u64 objnum;
873 u64 objoff;
874
875 /* prepare async write request */ 875 /* prepare async write request */
876 offset = (u64)page_offset(page); 876 offset = (u64)page_offset(page);
877 len = wsize; 877 len = wsize;
878 req = ceph_osdc_new_request(&fsc->client->osdc, 878
879 &ci->i_layout, vino, 879 rc = ceph_calc_file_object_mapping(&ci->i_layout,
880 offset, &len, 0, 880 offset, len,
881 do_sync ? 2 : 1, 881 &objnum, &objoff,
882 CEPH_OSD_OP_WRITE, 882 &len);
883 CEPH_OSD_FLAG_WRITE | 883 if (rc < 0) {
884 CEPH_OSD_FLAG_ONDISK,
885 snapc, truncate_seq,
886 truncate_size, true);
887 if (IS_ERR(req)) {
888 rc = PTR_ERR(req);
889 unlock_page(page); 884 unlock_page(page);
890 break; 885 break;
891 } 886 }
892 887
893 if (do_sync) 888 num_ops = 1 + do_sync;
894 osd_req_op_init(req, 1, 889 strip_unit_end = page->index +
895 CEPH_OSD_OP_STARTSYNC, 0); 890 ((len - 1) >> PAGE_CACHE_SHIFT);
896
897 req->r_callback = writepages_finish;
898 req->r_inode = inode;
899 891
892 BUG_ON(pages);
900 max_pages = calc_pages_for(0, (u64)len); 893 max_pages = calc_pages_for(0, (u64)len);
901 pages = kmalloc(max_pages * sizeof (*pages), 894 pages = kmalloc(max_pages * sizeof (*pages),
902 GFP_NOFS); 895 GFP_NOFS);
@@ -905,6 +898,20 @@ get_more_pages:
905 pages = mempool_alloc(pool, GFP_NOFS); 898 pages = mempool_alloc(pool, GFP_NOFS);
906 BUG_ON(!pages); 899 BUG_ON(!pages);
907 } 900 }
901
902 len = 0;
903 } else if (page->index !=
904 (offset + len) >> PAGE_CACHE_SHIFT) {
905 if (num_ops >= (pool ? CEPH_OSD_SLAB_OPS :
906 CEPH_OSD_MAX_OPS)) {
907 redirty_page_for_writepage(wbc, page);
908 unlock_page(page);
909 break;
910 }
911
912 num_ops++;
913 offset = (u64)page_offset(page);
914 len = 0;
908 } 915 }
909 916
910 /* note position of first page in pvec */ 917 /* note position of first page in pvec */
@@ -913,18 +920,16 @@ get_more_pages:
913 dout("%p will write page %p idx %lu\n", 920 dout("%p will write page %p idx %lu\n",
914 inode, page, page->index); 921 inode, page, page->index);
915 922
916 writeback_stat = 923 if (atomic_long_inc_return(&fsc->writeback_count) >
917 atomic_long_inc_return(&fsc->writeback_count); 924 CONGESTION_ON_THRESH(
918 if (writeback_stat > CONGESTION_ON_THRESH(
919 fsc->mount_options->congestion_kb)) { 925 fsc->mount_options->congestion_kb)) {
920 set_bdi_congested(&fsc->backing_dev_info, 926 set_bdi_congested(&fsc->backing_dev_info,
921 BLK_RW_ASYNC); 927 BLK_RW_ASYNC);
922 } 928 }
923 929
924 set_page_writeback(page);
925 pages[locked_pages] = page; 930 pages[locked_pages] = page;
926 locked_pages++; 931 locked_pages++;
927 next = page->index + 1; 932 len += PAGE_CACHE_SIZE;
928 } 933 }
929 934
930 /* did we get anything? */ 935 /* did we get anything? */
@@ -944,38 +949,119 @@ get_more_pages:
944 /* shift unused pages over in the pvec... we 949 /* shift unused pages over in the pvec... we
945 * will need to release them below. */ 950 * will need to release them below. */
946 for (j = i; j < pvec_pages; j++) { 951 for (j = i; j < pvec_pages; j++) {
947 dout(" pvec leftover page %p\n", 952 dout(" pvec leftover page %p\n", pvec.pages[j]);
948 pvec.pages[j]);
949 pvec.pages[j-i+first] = pvec.pages[j]; 953 pvec.pages[j-i+first] = pvec.pages[j];
950 } 954 }
951 pvec.nr -= i-first; 955 pvec.nr -= i-first;
952 } 956 }
953 957
954 /* Format the osd request message and submit the write */ 958new_request:
955 offset = page_offset(pages[0]); 959 offset = page_offset(pages[0]);
956 len = (u64)locked_pages << PAGE_CACHE_SHIFT; 960 len = wsize;
957 if (snap_size == -1) { 961
958 len = min(len, (u64)i_size_read(inode) - offset); 962 req = ceph_osdc_new_request(&fsc->client->osdc,
959 /* writepages_finish() clears writeback pages 963 &ci->i_layout, vino,
960 * according to the data length, so make sure 964 offset, &len, 0, num_ops,
961 * data length covers all locked pages */ 965 CEPH_OSD_OP_WRITE,
962 len = max(len, 1 + 966 CEPH_OSD_FLAG_WRITE |
963 ((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT)); 967 CEPH_OSD_FLAG_ONDISK,
964 } else { 968 snapc, truncate_seq,
965 len = min(len, snap_size - offset); 969 truncate_size, false);
970 if (IS_ERR(req)) {
971 req = ceph_osdc_new_request(&fsc->client->osdc,
972 &ci->i_layout, vino,
973 offset, &len, 0,
974 min(num_ops,
975 CEPH_OSD_SLAB_OPS),
976 CEPH_OSD_OP_WRITE,
977 CEPH_OSD_FLAG_WRITE |
978 CEPH_OSD_FLAG_ONDISK,
979 snapc, truncate_seq,
980 truncate_size, true);
981 BUG_ON(IS_ERR(req));
966 } 982 }
967 dout("writepages got %d pages at %llu~%llu\n", 983 BUG_ON(len < page_offset(pages[locked_pages - 1]) +
968 locked_pages, offset, len); 984 PAGE_CACHE_SIZE - offset);
985
986 req->r_callback = writepages_finish;
987 req->r_inode = inode;
969 988
970 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, 989 /* Format the osd request message and submit the write */
990 len = 0;
991 data_pages = pages;
992 op_idx = 0;
993 for (i = 0; i < locked_pages; i++) {
994 u64 cur_offset = page_offset(pages[i]);
995 if (offset + len != cur_offset) {
996 if (op_idx + do_sync + 1 == req->r_num_ops)
997 break;
998 osd_req_op_extent_dup_last(req, op_idx,
999 cur_offset - offset);
1000 dout("writepages got pages at %llu~%llu\n",
1001 offset, len);
1002 osd_req_op_extent_osd_data_pages(req, op_idx,
1003 data_pages, len, 0,
971 !!pool, false); 1004 !!pool, false);
1005 osd_req_op_extent_update(req, op_idx, len);
972 1006
973 pages = NULL; /* request message now owns the pages array */ 1007 len = 0;
974 pool = NULL; 1008 offset = cur_offset;
1009 data_pages = pages + i;
1010 op_idx++;
1011 }
975 1012
976 /* Update the write op length in case we changed it */ 1013 set_page_writeback(pages[i]);
1014 len += PAGE_CACHE_SIZE;
1015 }
1016
1017 if (snap_size != -1) {
1018 len = min(len, snap_size - offset);
1019 } else if (i == locked_pages) {
1020 /* writepages_finish() clears writeback pages
1021 * according to the data length, so make sure
1022 * data length covers all locked pages */
1023 u64 min_len = len + 1 - PAGE_CACHE_SIZE;
1024 len = min(len, (u64)i_size_read(inode) - offset);
1025 len = max(len, min_len);
1026 }
1027 dout("writepages got pages at %llu~%llu\n", offset, len);
1028
1029 osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
1030 0, !!pool, false);
1031 osd_req_op_extent_update(req, op_idx, len);
977 1032
978 osd_req_op_extent_update(req, 0, len); 1033 if (do_sync) {
1034 op_idx++;
1035 osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0);
1036 }
1037 BUG_ON(op_idx + 1 != req->r_num_ops);
1038
1039 pool = NULL;
1040 if (i < locked_pages) {
1041 BUG_ON(num_ops <= req->r_num_ops);
1042 num_ops -= req->r_num_ops;
1043 num_ops += do_sync;
1044 locked_pages -= i;
1045
1046 /* allocate new pages array for next request */
1047 data_pages = pages;
1048 pages = kmalloc(locked_pages * sizeof (*pages),
1049 GFP_NOFS);
1050 if (!pages) {
1051 pool = fsc->wb_pagevec_pool;
1052 pages = mempool_alloc(pool, GFP_NOFS);
1053 BUG_ON(!pages);
1054 }
1055 memcpy(pages, data_pages + i,
1056 locked_pages * sizeof(*pages));
1057 memset(data_pages + i, 0,
1058 locked_pages * sizeof(*pages));
1059 } else {
1060 BUG_ON(num_ops != req->r_num_ops);
1061 index = pages[i - 1]->index + 1;
1062 /* request message now owns the pages array */
1063 pages = NULL;
1064 }
979 1065
980 vino = ceph_vino(inode); 1066 vino = ceph_vino(inode);
981 ceph_osdc_build_request(req, offset, snapc, vino.snap, 1067 ceph_osdc_build_request(req, offset, snapc, vino.snap,
@@ -985,9 +1071,10 @@ get_more_pages:
985 BUG_ON(rc); 1071 BUG_ON(rc);
986 req = NULL; 1072 req = NULL;
987 1073
988 /* continue? */ 1074 wbc->nr_to_write -= i;
989 index = next; 1075 if (pages)
990 wbc->nr_to_write -= locked_pages; 1076 goto new_request;
1077
991 if (wbc->nr_to_write <= 0) 1078 if (wbc->nr_to_write <= 0)
992 done = 1; 1079 done = 1;
993 1080
@@ -1522,7 +1609,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
1522 ceph_vino(inode), 0, &len, 0, 1, 1609 ceph_vino(inode), 0, &len, 0, 1,
1523 CEPH_OSD_OP_CREATE, 1610 CEPH_OSD_OP_CREATE,
1524 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 1611 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1525 ceph_empty_snapc, 0, 0, false); 1612 NULL, 0, 0, false);
1526 if (IS_ERR(req)) { 1613 if (IS_ERR(req)) {
1527 err = PTR_ERR(req); 1614 err = PTR_ERR(req);
1528 goto out; 1615 goto out;
@@ -1540,9 +1627,8 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
1540 ceph_vino(inode), 0, &len, 1, 3, 1627 ceph_vino(inode), 0, &len, 1, 3,
1541 CEPH_OSD_OP_WRITE, 1628 CEPH_OSD_OP_WRITE,
1542 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 1629 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1543 ceph_empty_snapc, 1630 NULL, ci->i_truncate_seq,
1544 ci->i_truncate_seq, ci->i_truncate_size, 1631 ci->i_truncate_size, false);
1545 false);
1546 if (IS_ERR(req)) { 1632 if (IS_ERR(req)) {
1547 err = PTR_ERR(req); 1633 err = PTR_ERR(req);
1548 goto out; 1634 goto out;
@@ -1663,8 +1749,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
1663 goto out; 1749 goto out;
1664 } 1750 }
1665 1751
1666 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, 1752 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1667 ceph_empty_snapc,
1668 1, false, GFP_NOFS); 1753 1, false, GFP_NOFS);
1669 if (!rd_req) { 1754 if (!rd_req) {
1670 err = -ENOMEM; 1755 err = -ENOMEM;
@@ -1678,8 +1763,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
1678 "%llx.00000000", ci->i_vino.ino); 1763 "%llx.00000000", ci->i_vino.ino);
1679 rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name); 1764 rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
1680 1765
1681 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, 1766 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1682 ceph_empty_snapc,
1683 1, false, GFP_NOFS); 1767 1, false, GFP_NOFS);
1684 if (!wr_req) { 1768 if (!wr_req) {
1685 err = -ENOMEM; 1769 err = -ENOMEM;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 6fe0ad26a7df..de17bb232ff8 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -991,7 +991,7 @@ static int send_cap_msg(struct ceph_mds_session *session,
991 u32 seq, u64 flush_tid, u64 oldest_flush_tid, 991 u32 seq, u64 flush_tid, u64 oldest_flush_tid,
992 u32 issue_seq, u32 mseq, u64 size, u64 max_size, 992 u32 issue_seq, u32 mseq, u64 size, u64 max_size,
993 struct timespec *mtime, struct timespec *atime, 993 struct timespec *mtime, struct timespec *atime,
994 u64 time_warp_seq, 994 struct timespec *ctime, u64 time_warp_seq,
995 kuid_t uid, kgid_t gid, umode_t mode, 995 kuid_t uid, kgid_t gid, umode_t mode,
996 u64 xattr_version, 996 u64 xattr_version,
997 struct ceph_buffer *xattrs_buf, 997 struct ceph_buffer *xattrs_buf,
@@ -1042,6 +1042,8 @@ static int send_cap_msg(struct ceph_mds_session *session,
1042 ceph_encode_timespec(&fc->mtime, mtime); 1042 ceph_encode_timespec(&fc->mtime, mtime);
1043 if (atime) 1043 if (atime)
1044 ceph_encode_timespec(&fc->atime, atime); 1044 ceph_encode_timespec(&fc->atime, atime);
1045 if (ctime)
1046 ceph_encode_timespec(&fc->ctime, ctime);
1045 fc->time_warp_seq = cpu_to_le32(time_warp_seq); 1047 fc->time_warp_seq = cpu_to_le32(time_warp_seq);
1046 1048
1047 fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid)); 1049 fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid));
@@ -1116,7 +1118,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1116 int held, revoking, dropping, keep; 1118 int held, revoking, dropping, keep;
1117 u64 seq, issue_seq, mseq, time_warp_seq, follows; 1119 u64 seq, issue_seq, mseq, time_warp_seq, follows;
1118 u64 size, max_size; 1120 u64 size, max_size;
1119 struct timespec mtime, atime; 1121 struct timespec mtime, atime, ctime;
1120 int wake = 0; 1122 int wake = 0;
1121 umode_t mode; 1123 umode_t mode;
1122 kuid_t uid; 1124 kuid_t uid;
@@ -1180,6 +1182,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1180 ci->i_requested_max_size = max_size; 1182 ci->i_requested_max_size = max_size;
1181 mtime = inode->i_mtime; 1183 mtime = inode->i_mtime;
1182 atime = inode->i_atime; 1184 atime = inode->i_atime;
1185 ctime = inode->i_ctime;
1183 time_warp_seq = ci->i_time_warp_seq; 1186 time_warp_seq = ci->i_time_warp_seq;
1184 uid = inode->i_uid; 1187 uid = inode->i_uid;
1185 gid = inode->i_gid; 1188 gid = inode->i_gid;
@@ -1198,7 +1201,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1198 ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, 1201 ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
1199 op, keep, want, flushing, seq, 1202 op, keep, want, flushing, seq,
1200 flush_tid, oldest_flush_tid, issue_seq, mseq, 1203 flush_tid, oldest_flush_tid, issue_seq, mseq,
1201 size, max_size, &mtime, &atime, time_warp_seq, 1204 size, max_size, &mtime, &atime, &ctime, time_warp_seq,
1202 uid, gid, mode, xattr_version, xattr_blob, 1205 uid, gid, mode, xattr_version, xattr_blob,
1203 follows, inline_data); 1206 follows, inline_data);
1204 if (ret < 0) { 1207 if (ret < 0) {
@@ -1320,7 +1323,7 @@ retry:
1320 capsnap->dirty, 0, capsnap->flush_tid, 0, 1323 capsnap->dirty, 0, capsnap->flush_tid, 0,
1321 0, mseq, capsnap->size, 0, 1324 0, mseq, capsnap->size, 0,
1322 &capsnap->mtime, &capsnap->atime, 1325 &capsnap->mtime, &capsnap->atime,
1323 capsnap->time_warp_seq, 1326 &capsnap->ctime, capsnap->time_warp_seq,
1324 capsnap->uid, capsnap->gid, capsnap->mode, 1327 capsnap->uid, capsnap->gid, capsnap->mode,
1325 capsnap->xattr_version, capsnap->xattr_blob, 1328 capsnap->xattr_version, capsnap->xattr_blob,
1326 capsnap->follows, capsnap->inline_data); 1329 capsnap->follows, capsnap->inline_data);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index fd11fb231a2e..fadc243dfb28 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -38,7 +38,7 @@ int ceph_init_dentry(struct dentry *dentry)
38 if (dentry->d_fsdata) 38 if (dentry->d_fsdata)
39 return 0; 39 return 0;
40 40
41 di = kmem_cache_alloc(ceph_dentry_cachep, GFP_KERNEL | __GFP_ZERO); 41 di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
42 if (!di) 42 if (!di)
43 return -ENOMEM; /* oh well */ 43 return -ENOMEM; /* oh well */
44 44
@@ -68,23 +68,6 @@ out_unlock:
68 return 0; 68 return 0;
69} 69}
70 70
71struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry)
72{
73 struct inode *inode = NULL;
74
75 if (!dentry)
76 return NULL;
77
78 spin_lock(&dentry->d_lock);
79 if (!IS_ROOT(dentry)) {
80 inode = d_inode(dentry->d_parent);
81 ihold(inode);
82 }
83 spin_unlock(&dentry->d_lock);
84 return inode;
85}
86
87
88/* 71/*
89 * for readdir, we encode the directory frag and offset within that 72 * for readdir, we encode the directory frag and offset within that
90 * frag into f_pos. 73 * frag into f_pos.
@@ -624,6 +607,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
624 struct ceph_mds_client *mdsc = fsc->mdsc; 607 struct ceph_mds_client *mdsc = fsc->mdsc;
625 struct ceph_mds_request *req; 608 struct ceph_mds_request *req;
626 int op; 609 int op;
610 int mask;
627 int err; 611 int err;
628 612
629 dout("lookup %p dentry %p '%pd'\n", 613 dout("lookup %p dentry %p '%pd'\n",
@@ -666,8 +650,12 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
666 return ERR_CAST(req); 650 return ERR_CAST(req);
667 req->r_dentry = dget(dentry); 651 req->r_dentry = dget(dentry);
668 req->r_num_caps = 2; 652 req->r_num_caps = 2;
669 /* we only need inode linkage */ 653
670 req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE); 654 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
655 if (ceph_security_xattr_wanted(dir))
656 mask |= CEPH_CAP_XATTR_SHARED;
657 req->r_args.getattr.mask = cpu_to_le32(mask);
658
671 req->r_locked_dir = dir; 659 req->r_locked_dir = dir;
672 err = ceph_mdsc_do_request(mdsc, NULL, req); 660 err = ceph_mdsc_do_request(mdsc, NULL, req);
673 err = ceph_handle_snapdir(req, dentry, err); 661 err = ceph_handle_snapdir(req, dentry, err);
@@ -1095,6 +1083,7 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
1095static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) 1083static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1096{ 1084{
1097 int valid = 0; 1085 int valid = 0;
1086 struct dentry *parent;
1098 struct inode *dir; 1087 struct inode *dir;
1099 1088
1100 if (flags & LOOKUP_RCU) 1089 if (flags & LOOKUP_RCU)
@@ -1103,7 +1092,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1103 dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry, 1092 dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
1104 dentry, d_inode(dentry), ceph_dentry(dentry)->offset); 1093 dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
1105 1094
1106 dir = ceph_get_dentry_parent_inode(dentry); 1095 parent = dget_parent(dentry);
1096 dir = d_inode(parent);
1107 1097
1108 /* always trust cached snapped dentries, snapdir dentry */ 1098 /* always trust cached snapped dentries, snapdir dentry */
1109 if (ceph_snap(dir) != CEPH_NOSNAP) { 1099 if (ceph_snap(dir) != CEPH_NOSNAP) {
@@ -1121,13 +1111,48 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1121 valid = 1; 1111 valid = 1;
1122 } 1112 }
1123 1113
1114 if (!valid) {
1115 struct ceph_mds_client *mdsc =
1116 ceph_sb_to_client(dir->i_sb)->mdsc;
1117 struct ceph_mds_request *req;
1118 int op, mask, err;
1119
1120 op = ceph_snap(dir) == CEPH_SNAPDIR ?
1121 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
1122 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
1123 if (!IS_ERR(req)) {
1124 req->r_dentry = dget(dentry);
1125 req->r_num_caps = 2;
1126
1127 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
1128 if (ceph_security_xattr_wanted(dir))
1129 mask |= CEPH_CAP_XATTR_SHARED;
1130 req->r_args.getattr.mask = mask;
1131
1132 req->r_locked_dir = dir;
1133 err = ceph_mdsc_do_request(mdsc, NULL, req);
1134 if (err == 0 || err == -ENOENT) {
1135 if (dentry == req->r_dentry) {
1136 valid = !d_unhashed(dentry);
1137 } else {
1138 d_invalidate(req->r_dentry);
1139 err = -EAGAIN;
1140 }
1141 }
1142 ceph_mdsc_put_request(req);
1143 dout("d_revalidate %p lookup result=%d\n",
1144 dentry, err);
1145 }
1146 }
1147
1124 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); 1148 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
1125 if (valid) { 1149 if (valid) {
1126 ceph_dentry_lru_touch(dentry); 1150 ceph_dentry_lru_touch(dentry);
1127 } else { 1151 } else {
1128 ceph_dir_clear_complete(dir); 1152 ceph_dir_clear_complete(dir);
1129 } 1153 }
1130 iput(dir); 1154
1155 dput(parent);
1131 return valid; 1156 return valid;
1132} 1157}
1133 1158
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 3b3172357326..6e72c98162d5 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -71,12 +71,18 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
71 inode = ceph_find_inode(sb, vino); 71 inode = ceph_find_inode(sb, vino);
72 if (!inode) { 72 if (!inode) {
73 struct ceph_mds_request *req; 73 struct ceph_mds_request *req;
74 int mask;
74 75
75 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO, 76 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO,
76 USE_ANY_MDS); 77 USE_ANY_MDS);
77 if (IS_ERR(req)) 78 if (IS_ERR(req))
78 return ERR_CAST(req); 79 return ERR_CAST(req);
79 80
81 mask = CEPH_STAT_CAP_INODE;
82 if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
83 mask |= CEPH_CAP_XATTR_SHARED;
84 req->r_args.getattr.mask = cpu_to_le32(mask);
85
80 req->r_ino1 = vino; 86 req->r_ino1 = vino;
81 req->r_num_caps = 1; 87 req->r_num_caps = 1;
82 err = ceph_mdsc_do_request(mdsc, NULL, req); 88 err = ceph_mdsc_do_request(mdsc, NULL, req);
@@ -128,6 +134,7 @@ static struct dentry *__get_parent(struct super_block *sb,
128 struct ceph_mds_request *req; 134 struct ceph_mds_request *req;
129 struct inode *inode; 135 struct inode *inode;
130 struct dentry *dentry; 136 struct dentry *dentry;
137 int mask;
131 int err; 138 int err;
132 139
133 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPPARENT, 140 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPPARENT,
@@ -144,6 +151,12 @@ static struct dentry *__get_parent(struct super_block *sb,
144 .snap = CEPH_NOSNAP, 151 .snap = CEPH_NOSNAP,
145 }; 152 };
146 } 153 }
154
155 mask = CEPH_STAT_CAP_INODE;
156 if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
157 mask |= CEPH_CAP_XATTR_SHARED;
158 req->r_args.getattr.mask = cpu_to_le32(mask);
159
147 req->r_num_caps = 1; 160 req->r_num_caps = 1;
148 err = ceph_mdsc_do_request(mdsc, NULL, req); 161 err = ceph_mdsc_do_request(mdsc, NULL, req);
149 inode = req->r_target_inode; 162 inode = req->r_target_inode;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index eb9028e8cfc5..ef38f01c1795 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -157,7 +157,7 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
157 case S_IFDIR: 157 case S_IFDIR:
158 dout("init_file %p %p 0%o (regular)\n", inode, file, 158 dout("init_file %p %p 0%o (regular)\n", inode, file,
159 inode->i_mode); 159 inode->i_mode);
160 cf = kmem_cache_alloc(ceph_file_cachep, GFP_KERNEL | __GFP_ZERO); 160 cf = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
161 if (cf == NULL) { 161 if (cf == NULL) {
162 ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ 162 ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
163 return -ENOMEM; 163 return -ENOMEM;
@@ -300,6 +300,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
300 struct ceph_mds_request *req; 300 struct ceph_mds_request *req;
301 struct dentry *dn; 301 struct dentry *dn;
302 struct ceph_acls_info acls = {}; 302 struct ceph_acls_info acls = {};
303 int mask;
303 int err; 304 int err;
304 305
305 dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n", 306 dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
@@ -335,6 +336,12 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
335 acls.pagelist = NULL; 336 acls.pagelist = NULL;
336 } 337 }
337 } 338 }
339
340 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
341 if (ceph_security_xattr_wanted(dir))
342 mask |= CEPH_CAP_XATTR_SHARED;
343 req->r_args.open.mask = cpu_to_le32(mask);
344
338 req->r_locked_dir = dir; /* caller holds dir->i_mutex */ 345 req->r_locked_dir = dir; /* caller holds dir->i_mutex */
339 err = ceph_mdsc_do_request(mdsc, 346 err = ceph_mdsc_do_request(mdsc,
340 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, 347 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
@@ -725,7 +732,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
725 ret = ceph_osdc_start_request(req->r_osdc, req, false); 732 ret = ceph_osdc_start_request(req->r_osdc, req, false);
726out: 733out:
727 if (ret < 0) { 734 if (ret < 0) {
728 BUG_ON(ret == -EOLDSNAPC);
729 req->r_result = ret; 735 req->r_result = ret;
730 ceph_aio_complete_req(req, NULL); 736 ceph_aio_complete_req(req, NULL);
731 } 737 }
@@ -783,7 +789,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
783 int num_pages = 0; 789 int num_pages = 0;
784 int flags; 790 int flags;
785 int ret; 791 int ret;
786 struct timespec mtime = CURRENT_TIME; 792 struct timespec mtime = current_fs_time(inode->i_sb);
787 size_t count = iov_iter_count(iter); 793 size_t count = iov_iter_count(iter);
788 loff_t pos = iocb->ki_pos; 794 loff_t pos = iocb->ki_pos;
789 bool write = iov_iter_rw(iter) == WRITE; 795 bool write = iov_iter_rw(iter) == WRITE;
@@ -949,7 +955,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
949 ret = ceph_osdc_start_request(req->r_osdc, 955 ret = ceph_osdc_start_request(req->r_osdc,
950 req, false); 956 req, false);
951 if (ret < 0) { 957 if (ret < 0) {
952 BUG_ON(ret == -EOLDSNAPC);
953 req->r_result = ret; 958 req->r_result = ret;
954 ceph_aio_complete_req(req, NULL); 959 ceph_aio_complete_req(req, NULL);
955 } 960 }
@@ -988,7 +993,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
988 int flags; 993 int flags;
989 int check_caps = 0; 994 int check_caps = 0;
990 int ret; 995 int ret;
991 struct timespec mtime = CURRENT_TIME; 996 struct timespec mtime = current_fs_time(inode->i_sb);
992 size_t count = iov_iter_count(from); 997 size_t count = iov_iter_count(from);
993 998
994 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 999 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e48fd8b23257..ed58b168904a 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -549,6 +549,10 @@ int ceph_fill_file_size(struct inode *inode, int issued,
549 if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 || 549 if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
550 (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) { 550 (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
551 dout("size %lld -> %llu\n", inode->i_size, size); 551 dout("size %lld -> %llu\n", inode->i_size, size);
552 if (size > 0 && S_ISDIR(inode->i_mode)) {
553 pr_err("fill_file_size non-zero size for directory\n");
554 size = 0;
555 }
552 i_size_write(inode, size); 556 i_size_write(inode, size);
553 inode->i_blocks = (size + (1<<9) - 1) >> 9; 557 inode->i_blocks = (size + (1<<9) - 1) >> 9;
554 ci->i_reported_size = size; 558 ci->i_reported_size = size;
@@ -1261,6 +1265,7 @@ retry_lookup:
1261 dout(" %p links to %p %llx.%llx, not %llx.%llx\n", 1265 dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
1262 dn, d_inode(dn), ceph_vinop(d_inode(dn)), 1266 dn, d_inode(dn), ceph_vinop(d_inode(dn)),
1263 ceph_vinop(in)); 1267 ceph_vinop(in));
1268 d_invalidate(dn);
1264 have_lease = false; 1269 have_lease = false;
1265 } 1270 }
1266 1271
@@ -1349,15 +1354,20 @@ static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
1349 1354
1350 if (!ctl->page || pgoff != page_index(ctl->page)) { 1355 if (!ctl->page || pgoff != page_index(ctl->page)) {
1351 ceph_readdir_cache_release(ctl); 1356 ceph_readdir_cache_release(ctl);
1352 ctl->page = grab_cache_page(&dir->i_data, pgoff); 1357 if (idx == 0)
1358 ctl->page = grab_cache_page(&dir->i_data, pgoff);
1359 else
1360 ctl->page = find_lock_page(&dir->i_data, pgoff);
1353 if (!ctl->page) { 1361 if (!ctl->page) {
1354 ctl->index = -1; 1362 ctl->index = -1;
1355 return -ENOMEM; 1363 return idx == 0 ? -ENOMEM : 0;
1356 } 1364 }
1357 /* reading/filling the cache are serialized by 1365 /* reading/filling the cache are serialized by
1358 * i_mutex, no need to use page lock */ 1366 * i_mutex, no need to use page lock */
1359 unlock_page(ctl->page); 1367 unlock_page(ctl->page);
1360 ctl->dentries = kmap(ctl->page); 1368 ctl->dentries = kmap(ctl->page);
1369 if (idx == 0)
1370 memset(ctl->dentries, 0, PAGE_CACHE_SIZE);
1361 } 1371 }
1362 1372
1363 if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) && 1373 if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
@@ -1380,7 +1390,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1380 struct qstr dname; 1390 struct qstr dname;
1381 struct dentry *dn; 1391 struct dentry *dn;
1382 struct inode *in; 1392 struct inode *in;
1383 int err = 0, ret, i; 1393 int err = 0, skipped = 0, ret, i;
1384 struct inode *snapdir = NULL; 1394 struct inode *snapdir = NULL;
1385 struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; 1395 struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
1386 struct ceph_dentry_info *di; 1396 struct ceph_dentry_info *di;
@@ -1492,7 +1502,17 @@ retry_lookup:
1492 } 1502 }
1493 1503
1494 if (d_really_is_negative(dn)) { 1504 if (d_really_is_negative(dn)) {
1495 struct dentry *realdn = splice_dentry(dn, in); 1505 struct dentry *realdn;
1506
1507 if (ceph_security_xattr_deadlock(in)) {
1508 dout(" skip splicing dn %p to inode %p"
1509 " (security xattr deadlock)\n", dn, in);
1510 iput(in);
1511 skipped++;
1512 goto next_item;
1513 }
1514
1515 realdn = splice_dentry(dn, in);
1496 if (IS_ERR(realdn)) { 1516 if (IS_ERR(realdn)) {
1497 err = PTR_ERR(realdn); 1517 err = PTR_ERR(realdn);
1498 d_drop(dn); 1518 d_drop(dn);
@@ -1509,7 +1529,7 @@ retry_lookup:
1509 req->r_session, 1529 req->r_session,
1510 req->r_request_started); 1530 req->r_request_started);
1511 1531
1512 if (err == 0 && cache_ctl.index >= 0) { 1532 if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
1513 ret = fill_readdir_cache(d_inode(parent), dn, 1533 ret = fill_readdir_cache(d_inode(parent), dn,
1514 &cache_ctl, req); 1534 &cache_ctl, req);
1515 if (ret < 0) 1535 if (ret < 0)
@@ -1520,7 +1540,7 @@ next_item:
1520 dput(dn); 1540 dput(dn);
1521 } 1541 }
1522out: 1542out:
1523 if (err == 0) { 1543 if (err == 0 && skipped == 0) {
1524 req->r_did_prepopulate = true; 1544 req->r_did_prepopulate = true;
1525 req->r_readdir_cache_idx = cache_ctl.index; 1545 req->r_readdir_cache_idx = cache_ctl.index;
1526 } 1546 }
@@ -1950,7 +1970,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1950 if (dirtied) { 1970 if (dirtied) {
1951 inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied, 1971 inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied,
1952 &prealloc_cf); 1972 &prealloc_cf);
1953 inode->i_ctime = CURRENT_TIME; 1973 inode->i_ctime = current_fs_time(inode->i_sb);
1954 } 1974 }
1955 1975
1956 release &= issued; 1976 release &= issued;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 911d64d865f1..44852c3ae531 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1729,7 +1729,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1729 init_completion(&req->r_safe_completion); 1729 init_completion(&req->r_safe_completion);
1730 INIT_LIST_HEAD(&req->r_unsafe_item); 1730 INIT_LIST_HEAD(&req->r_unsafe_item);
1731 1731
1732 req->r_stamp = CURRENT_TIME; 1732 req->r_stamp = current_fs_time(mdsc->fsc->sb);
1733 1733
1734 req->r_op = op; 1734 req->r_op = op;
1735 req->r_direct_mode = mode; 1735 req->r_direct_mode = mode;
@@ -2540,6 +2540,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2540 2540
2541 /* insert trace into our cache */ 2541 /* insert trace into our cache */
2542 mutex_lock(&req->r_fill_mutex); 2542 mutex_lock(&req->r_fill_mutex);
2543 current->journal_info = req;
2543 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); 2544 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2544 if (err == 0) { 2545 if (err == 0) {
2545 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 2546 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
@@ -2547,6 +2548,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2547 ceph_readdir_prepopulate(req, req->r_session); 2548 ceph_readdir_prepopulate(req, req->r_session);
2548 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2549 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2549 } 2550 }
2551 current->journal_info = NULL;
2550 mutex_unlock(&req->r_fill_mutex); 2552 mutex_unlock(&req->r_fill_mutex);
2551 2553
2552 up_read(&mdsc->snap_rwsem); 2554 up_read(&mdsc->snap_rwsem);
@@ -3764,7 +3766,6 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3764 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 3766 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3765 3767
3766 /* do we need it? */ 3768 /* do we need it? */
3767 ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3768 mutex_lock(&mdsc->mutex); 3769 mutex_lock(&mdsc->mutex);
3769 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 3770 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3770 dout("handle_map epoch %u <= our %u\n", 3771 dout("handle_map epoch %u <= our %u\n",
@@ -3791,6 +3792,8 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3791 mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size; 3792 mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3792 3793
3793 __wake_requests(mdsc, &mdsc->waiting_for_map); 3794 __wake_requests(mdsc, &mdsc->waiting_for_map);
3795 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
3796 mdsc->mdsmap->m_epoch);
3794 3797
3795 mutex_unlock(&mdsc->mutex); 3798 mutex_unlock(&mdsc->mutex);
3796 schedule_delayed(mdsc); 3799 schedule_delayed(mdsc);
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 4aa7122a8d38..9caaa7ffc93f 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -296,8 +296,6 @@ static int cmpu64_rev(const void *a, const void *b)
296} 296}
297 297
298 298
299struct ceph_snap_context *ceph_empty_snapc;
300
301/* 299/*
302 * build the snap context for a given realm. 300 * build the snap context for a given realm.
303 */ 301 */
@@ -987,17 +985,3 @@ out:
987 up_write(&mdsc->snap_rwsem); 985 up_write(&mdsc->snap_rwsem);
988 return; 986 return;
989} 987}
990
991int __init ceph_snap_init(void)
992{
993 ceph_empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
994 if (!ceph_empty_snapc)
995 return -ENOMEM;
996 ceph_empty_snapc->seq = 1;
997 return 0;
998}
999
1000void ceph_snap_exit(void)
1001{
1002 ceph_put_snap_context(ceph_empty_snapc);
1003}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index ca4d5e8457f1..c973043deb0e 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -439,8 +439,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
439 439
440 if (fsopt->flags & CEPH_MOUNT_OPT_DIRSTAT) 440 if (fsopt->flags & CEPH_MOUNT_OPT_DIRSTAT)
441 seq_puts(m, ",dirstat"); 441 seq_puts(m, ",dirstat");
442 if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES) == 0) 442 if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES))
443 seq_puts(m, ",norbytes"); 443 seq_puts(m, ",rbytes");
444 if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR) 444 if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
445 seq_puts(m, ",noasyncreaddir"); 445 seq_puts(m, ",noasyncreaddir");
446 if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0) 446 if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
@@ -530,7 +530,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
530 goto fail; 530 goto fail;
531 } 531 }
532 fsc->client->extra_mon_dispatch = extra_mon_dispatch; 532 fsc->client->extra_mon_dispatch = extra_mon_dispatch;
533 fsc->client->monc.want_mdsmap = 1; 533 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
534 534
535 fsc->mount_options = fsopt; 535 fsc->mount_options = fsopt;
536 536
@@ -793,22 +793,20 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
793 struct dentry *root; 793 struct dentry *root;
794 int first = 0; /* first vfsmount for this super_block */ 794 int first = 0; /* first vfsmount for this super_block */
795 795
796 dout("mount start\n"); 796 dout("mount start %p\n", fsc);
797 mutex_lock(&fsc->client->mount_mutex); 797 mutex_lock(&fsc->client->mount_mutex);
798 798
799 err = __ceph_open_session(fsc->client, started); 799 if (!fsc->sb->s_root) {
800 if (err < 0) 800 err = __ceph_open_session(fsc->client, started);
801 goto out; 801 if (err < 0)
802 goto out;
802 803
803 dout("mount opening root\n"); 804 dout("mount opening root\n");
804 root = open_root_dentry(fsc, "", started); 805 root = open_root_dentry(fsc, "", started);
805 if (IS_ERR(root)) { 806 if (IS_ERR(root)) {
806 err = PTR_ERR(root); 807 err = PTR_ERR(root);
807 goto out; 808 goto out;
808 } 809 }
809 if (fsc->sb->s_root) {
810 dput(root);
811 } else {
812 fsc->sb->s_root = root; 810 fsc->sb->s_root = root;
813 first = 1; 811 first = 1;
814 812
@@ -818,6 +816,7 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
818 } 816 }
819 817
820 if (path[0] == 0) { 818 if (path[0] == 0) {
819 root = fsc->sb->s_root;
821 dget(root); 820 dget(root);
822 } else { 821 } else {
823 dout("mount opening base mountpoint\n"); 822 dout("mount opening base mountpoint\n");
@@ -833,16 +832,14 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
833 mutex_unlock(&fsc->client->mount_mutex); 832 mutex_unlock(&fsc->client->mount_mutex);
834 return root; 833 return root;
835 834
836out:
837 mutex_unlock(&fsc->client->mount_mutex);
838 return ERR_PTR(err);
839
840fail: 835fail:
841 if (first) { 836 if (first) {
842 dput(fsc->sb->s_root); 837 dput(fsc->sb->s_root);
843 fsc->sb->s_root = NULL; 838 fsc->sb->s_root = NULL;
844 } 839 }
845 goto out; 840out:
841 mutex_unlock(&fsc->client->mount_mutex);
842 return ERR_PTR(err);
846} 843}
847 844
848static int ceph_set_super(struct super_block *s, void *data) 845static int ceph_set_super(struct super_block *s, void *data)
@@ -1042,19 +1039,14 @@ static int __init init_ceph(void)
1042 1039
1043 ceph_flock_init(); 1040 ceph_flock_init();
1044 ceph_xattr_init(); 1041 ceph_xattr_init();
1045 ret = ceph_snap_init();
1046 if (ret)
1047 goto out_xattr;
1048 ret = register_filesystem(&ceph_fs_type); 1042 ret = register_filesystem(&ceph_fs_type);
1049 if (ret) 1043 if (ret)
1050 goto out_snap; 1044 goto out_xattr;
1051 1045
1052 pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL); 1046 pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
1053 1047
1054 return 0; 1048 return 0;
1055 1049
1056out_snap:
1057 ceph_snap_exit();
1058out_xattr: 1050out_xattr:
1059 ceph_xattr_exit(); 1051 ceph_xattr_exit();
1060 destroy_caches(); 1052 destroy_caches();
@@ -1066,7 +1058,6 @@ static void __exit exit_ceph(void)
1066{ 1058{
1067 dout("exit_ceph\n"); 1059 dout("exit_ceph\n");
1068 unregister_filesystem(&ceph_fs_type); 1060 unregister_filesystem(&ceph_fs_type);
1069 ceph_snap_exit();
1070 ceph_xattr_exit(); 1061 ceph_xattr_exit();
1071 destroy_caches(); 1062 destroy_caches();
1072} 1063}
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 9c458eb52245..e705c4d612d7 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -37,8 +37,7 @@
37#define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */ 37#define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */
38#define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */ 38#define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */
39 39
40#define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES | \ 40#define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE
41 CEPH_MOUNT_OPT_DCACHE)
42 41
43#define ceph_set_mount_opt(fsc, opt) \ 42#define ceph_set_mount_opt(fsc, opt) \
44 (fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt; 43 (fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt;
@@ -469,7 +468,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
469#define CEPH_I_POOL_PERM (1 << 4) /* pool rd/wr bits are valid */ 468#define CEPH_I_POOL_PERM (1 << 4) /* pool rd/wr bits are valid */
470#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */ 469#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */
471#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */ 470#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */
472 471#define CEPH_I_SEC_INITED (1 << 7) /* security initialized */
473 472
474static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, 473static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
475 long long release_count, 474 long long release_count,
@@ -721,7 +720,6 @@ static inline int default_congestion_kb(void)
721 720
722 721
723/* snap.c */ 722/* snap.c */
724extern struct ceph_snap_context *ceph_empty_snapc;
725struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, 723struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
726 u64 ino); 724 u64 ino);
727extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc, 725extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
@@ -738,8 +736,6 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
738extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, 736extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
739 struct ceph_cap_snap *capsnap); 737 struct ceph_cap_snap *capsnap);
740extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); 738extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
741extern int ceph_snap_init(void);
742extern void ceph_snap_exit(void);
743 739
744/* 740/*
745 * a cap_snap is "pending" if it is still awaiting an in-progress 741 * a cap_snap is "pending" if it is still awaiting an in-progress
@@ -808,6 +804,20 @@ extern void __init ceph_xattr_init(void);
808extern void ceph_xattr_exit(void); 804extern void ceph_xattr_exit(void);
809extern const struct xattr_handler *ceph_xattr_handlers[]; 805extern const struct xattr_handler *ceph_xattr_handlers[];
810 806
807#ifdef CONFIG_SECURITY
808extern bool ceph_security_xattr_deadlock(struct inode *in);
809extern bool ceph_security_xattr_wanted(struct inode *in);
810#else
811static inline bool ceph_security_xattr_deadlock(struct inode *in)
812{
813 return false;
814}
815static inline bool ceph_security_xattr_wanted(struct inode *in)
816{
817 return false;
818}
819#endif
820
811/* acl.c */ 821/* acl.c */
812struct ceph_acls_info { 822struct ceph_acls_info {
813 void *default_acl; 823 void *default_acl;
@@ -947,7 +957,6 @@ extern void ceph_dentry_lru_touch(struct dentry *dn);
947extern void ceph_dentry_lru_del(struct dentry *dn); 957extern void ceph_dentry_lru_del(struct dentry *dn);
948extern void ceph_invalidate_dentry_lease(struct dentry *dentry); 958extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
949extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn); 959extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
950extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry);
951extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl); 960extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl);
952 961
953/* 962/*
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 819163d8313b..9410abdef3ce 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -714,31 +714,62 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
714 } 714 }
715} 715}
716 716
717static inline int __get_request_mask(struct inode *in) {
718 struct ceph_mds_request *req = current->journal_info;
719 int mask = 0;
720 if (req && req->r_target_inode == in) {
721 if (req->r_op == CEPH_MDS_OP_LOOKUP ||
722 req->r_op == CEPH_MDS_OP_LOOKUPINO ||
723 req->r_op == CEPH_MDS_OP_LOOKUPPARENT ||
724 req->r_op == CEPH_MDS_OP_GETATTR) {
725 mask = le32_to_cpu(req->r_args.getattr.mask);
726 } else if (req->r_op == CEPH_MDS_OP_OPEN ||
727 req->r_op == CEPH_MDS_OP_CREATE) {
728 mask = le32_to_cpu(req->r_args.open.mask);
729 }
730 }
731 return mask;
732}
733
717ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, 734ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
718 size_t size) 735 size_t size)
719{ 736{
720 struct ceph_inode_info *ci = ceph_inode(inode); 737 struct ceph_inode_info *ci = ceph_inode(inode);
721 int err;
722 struct ceph_inode_xattr *xattr; 738 struct ceph_inode_xattr *xattr;
723 struct ceph_vxattr *vxattr = NULL; 739 struct ceph_vxattr *vxattr = NULL;
740 int req_mask;
741 int err;
724 742
725 if (!ceph_is_valid_xattr(name)) 743 if (!ceph_is_valid_xattr(name))
726 return -ENODATA; 744 return -ENODATA;
727 745
728 /* let's see if a virtual xattr was requested */ 746 /* let's see if a virtual xattr was requested */
729 vxattr = ceph_match_vxattr(inode, name); 747 vxattr = ceph_match_vxattr(inode, name);
730 if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) { 748 if (vxattr) {
731 err = vxattr->getxattr_cb(ci, value, size); 749 err = -ENODATA;
750 if (!(vxattr->exists_cb && !vxattr->exists_cb(ci)))
751 err = vxattr->getxattr_cb(ci, value, size);
732 return err; 752 return err;
733 } 753 }
734 754
755 req_mask = __get_request_mask(inode);
756
735 spin_lock(&ci->i_ceph_lock); 757 spin_lock(&ci->i_ceph_lock);
736 dout("getxattr %p ver=%lld index_ver=%lld\n", inode, 758 dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
737 ci->i_xattrs.version, ci->i_xattrs.index_version); 759 ci->i_xattrs.version, ci->i_xattrs.index_version);
738 760
739 if (ci->i_xattrs.version == 0 || 761 if (ci->i_xattrs.version == 0 ||
740 !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) { 762 !((req_mask & CEPH_CAP_XATTR_SHARED) ||
763 __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1))) {
741 spin_unlock(&ci->i_ceph_lock); 764 spin_unlock(&ci->i_ceph_lock);
765
766 /* security module gets xattr while filling trace */
767 if (current->journal_info != NULL) {
768 pr_warn_ratelimited("sync getxattr %p "
769 "during filling trace\n", inode);
770 return -EBUSY;
771 }
772
742 /* get xattrs from mds (if we don't already have them) */ 773 /* get xattrs from mds (if we don't already have them) */
743 err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true); 774 err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
744 if (err) 775 if (err)
@@ -765,6 +796,9 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
765 796
766 memcpy(value, xattr->val, xattr->val_len); 797 memcpy(value, xattr->val, xattr->val_len);
767 798
799 if (current->journal_info != NULL &&
800 !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN))
801 ci->i_ceph_flags |= CEPH_I_SEC_INITED;
768out: 802out:
769 spin_unlock(&ci->i_ceph_lock); 803 spin_unlock(&ci->i_ceph_lock);
770 return err; 804 return err;
@@ -999,7 +1033,7 @@ retry:
999 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL, 1033 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
1000 &prealloc_cf); 1034 &prealloc_cf);
1001 ci->i_xattrs.dirty = true; 1035 ci->i_xattrs.dirty = true;
1002 inode->i_ctime = CURRENT_TIME; 1036 inode->i_ctime = current_fs_time(inode->i_sb);
1003 } 1037 }
1004 1038
1005 spin_unlock(&ci->i_ceph_lock); 1039 spin_unlock(&ci->i_ceph_lock);
@@ -1015,7 +1049,15 @@ do_sync:
1015do_sync_unlocked: 1049do_sync_unlocked:
1016 if (lock_snap_rwsem) 1050 if (lock_snap_rwsem)
1017 up_read(&mdsc->snap_rwsem); 1051 up_read(&mdsc->snap_rwsem);
1018 err = ceph_sync_setxattr(dentry, name, value, size, flags); 1052
1053 /* security module set xattr while filling trace */
1054 if (current->journal_info != NULL) {
1055 pr_warn_ratelimited("sync setxattr %p "
1056 "during filling trace\n", inode);
1057 err = -EBUSY;
1058 } else {
1059 err = ceph_sync_setxattr(dentry, name, value, size, flags);
1060 }
1019out: 1061out:
1020 ceph_free_cap_flush(prealloc_cf); 1062 ceph_free_cap_flush(prealloc_cf);
1021 kfree(newname); 1063 kfree(newname);
@@ -1136,7 +1178,7 @@ retry:
1136 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL, 1178 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
1137 &prealloc_cf); 1179 &prealloc_cf);
1138 ci->i_xattrs.dirty = true; 1180 ci->i_xattrs.dirty = true;
1139 inode->i_ctime = CURRENT_TIME; 1181 inode->i_ctime = current_fs_time(inode->i_sb);
1140 spin_unlock(&ci->i_ceph_lock); 1182 spin_unlock(&ci->i_ceph_lock);
1141 if (lock_snap_rwsem) 1183 if (lock_snap_rwsem)
1142 up_read(&mdsc->snap_rwsem); 1184 up_read(&mdsc->snap_rwsem);
@@ -1164,3 +1206,25 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
1164 1206
1165 return __ceph_removexattr(dentry, name); 1207 return __ceph_removexattr(dentry, name);
1166} 1208}
1209
1210#ifdef CONFIG_SECURITY
1211bool ceph_security_xattr_wanted(struct inode *in)
1212{
1213 return in->i_security != NULL;
1214}
1215
1216bool ceph_security_xattr_deadlock(struct inode *in)
1217{
1218 struct ceph_inode_info *ci;
1219 bool ret;
1220 if (in->i_security == NULL)
1221 return false;
1222 ci = ceph_inode(in);
1223 spin_lock(&ci->i_ceph_lock);
1224 ret = !(ci->i_ceph_flags & CEPH_I_SEC_INITED) &&
1225 !(ci->i_xattrs.version > 0 &&
1226 __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0));
1227 spin_unlock(&ci->i_ceph_lock);
1228 return ret;
1229}
1230#endif
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index 15151f3c4120..ae2f66833762 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -105,6 +105,7 @@ static inline u64 ceph_sanitize_features(u64 features)
105 */ 105 */
106#define CEPH_FEATURES_SUPPORTED_DEFAULT \ 106#define CEPH_FEATURES_SUPPORTED_DEFAULT \
107 (CEPH_FEATURE_NOSRCADDR | \ 107 (CEPH_FEATURE_NOSRCADDR | \
108 CEPH_FEATURE_SUBSCRIBE2 | \
108 CEPH_FEATURE_RECONNECT_SEQ | \ 109 CEPH_FEATURE_RECONNECT_SEQ | \
109 CEPH_FEATURE_PGID64 | \ 110 CEPH_FEATURE_PGID64 | \
110 CEPH_FEATURE_PGPOOL3 | \ 111 CEPH_FEATURE_PGPOOL3 | \
@@ -127,6 +128,7 @@ static inline u64 ceph_sanitize_features(u64 features)
127 128
128#define CEPH_FEATURES_REQUIRED_DEFAULT \ 129#define CEPH_FEATURES_REQUIRED_DEFAULT \
129 (CEPH_FEATURE_NOSRCADDR | \ 130 (CEPH_FEATURE_NOSRCADDR | \
131 CEPH_FEATURE_SUBSCRIBE2 | \
130 CEPH_FEATURE_RECONNECT_SEQ | \ 132 CEPH_FEATURE_RECONNECT_SEQ | \
131 CEPH_FEATURE_PGID64 | \ 133 CEPH_FEATURE_PGID64 | \
132 CEPH_FEATURE_PGPOOL3 | \ 134 CEPH_FEATURE_PGPOOL3 | \
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index d7d072a25c27..37f28bf55ce4 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -198,8 +198,8 @@ struct ceph_client_mount {
198#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */ 198#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */
199 199
200struct ceph_mon_subscribe_item { 200struct ceph_mon_subscribe_item {
201 __le64 have_version; __le64 have; 201 __le64 start;
202 __u8 onetime; 202 __u8 flags;
203} __attribute__ ((packed)); 203} __attribute__ ((packed));
204 204
205struct ceph_mon_subscribe_ack { 205struct ceph_mon_subscribe_ack {
@@ -376,7 +376,8 @@ union ceph_mds_request_args {
376 __le32 stripe_count; /* ... */ 376 __le32 stripe_count; /* ... */
377 __le32 object_size; 377 __le32 object_size;
378 __le32 file_replication; 378 __le32 file_replication;
379 __le32 unused; /* used to be preferred osd */ 379 __le32 mask; /* CEPH_CAP_* */
380 __le32 old_size;
380 } __attribute__ ((packed)) open; 381 } __attribute__ ((packed)) open;
381 struct { 382 struct {
382 __le32 flags; 383 __le32 flags;
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 3e3799cdc6e6..e7975e4681e1 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -47,7 +47,6 @@ struct ceph_options {
47 unsigned long mount_timeout; /* jiffies */ 47 unsigned long mount_timeout; /* jiffies */
48 unsigned long osd_idle_ttl; /* jiffies */ 48 unsigned long osd_idle_ttl; /* jiffies */
49 unsigned long osd_keepalive_timeout; /* jiffies */ 49 unsigned long osd_keepalive_timeout; /* jiffies */
50 unsigned long monc_ping_timeout; /* jiffies */
51 50
52 /* 51 /*
53 * any type that can't be simply compared or doesn't need need 52 * any type that can't be simply compared or doesn't need need
@@ -68,7 +67,12 @@ struct ceph_options {
68#define CEPH_MOUNT_TIMEOUT_DEFAULT msecs_to_jiffies(60 * 1000) 67#define CEPH_MOUNT_TIMEOUT_DEFAULT msecs_to_jiffies(60 * 1000)
69#define CEPH_OSD_KEEPALIVE_DEFAULT msecs_to_jiffies(5 * 1000) 68#define CEPH_OSD_KEEPALIVE_DEFAULT msecs_to_jiffies(5 * 1000)
70#define CEPH_OSD_IDLE_TTL_DEFAULT msecs_to_jiffies(60 * 1000) 69#define CEPH_OSD_IDLE_TTL_DEFAULT msecs_to_jiffies(60 * 1000)
71#define CEPH_MONC_PING_TIMEOUT_DEFAULT msecs_to_jiffies(30 * 1000) 70
71#define CEPH_MONC_HUNT_INTERVAL msecs_to_jiffies(3 * 1000)
72#define CEPH_MONC_PING_INTERVAL msecs_to_jiffies(10 * 1000)
73#define CEPH_MONC_PING_TIMEOUT msecs_to_jiffies(30 * 1000)
74#define CEPH_MONC_HUNT_BACKOFF 2
75#define CEPH_MONC_HUNT_MAX_MULT 10
72 76
73#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024) 77#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)
74#define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024) 78#define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024)
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 81810dc21f06..e230e7ed60d3 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -68,18 +68,24 @@ struct ceph_mon_client {
68 68
69 bool hunting; 69 bool hunting;
70 int cur_mon; /* last monitor i contacted */ 70 int cur_mon; /* last monitor i contacted */
71 unsigned long sub_sent, sub_renew_after; 71 unsigned long sub_renew_after;
72 unsigned long sub_renew_sent;
72 struct ceph_connection con; 73 struct ceph_connection con;
73 74
75 bool had_a_connection;
76 int hunt_mult; /* [1..CEPH_MONC_HUNT_MAX_MULT] */
77
74 /* pending generic requests */ 78 /* pending generic requests */
75 struct rb_root generic_request_tree; 79 struct rb_root generic_request_tree;
76 int num_generic_requests; 80 int num_generic_requests;
77 u64 last_tid; 81 u64 last_tid;
78 82
79 /* mds/osd map */ 83 /* subs, indexed with CEPH_SUB_* */
80 int want_mdsmap; 84 struct {
81 int want_next_osdmap; /* 1 = want, 2 = want+asked */ 85 struct ceph_mon_subscribe_item item;
82 u32 have_osdmap, have_mdsmap; 86 bool want;
87 u32 have; /* epoch */
88 } subs[3];
83 89
84#ifdef CONFIG_DEBUG_FS 90#ifdef CONFIG_DEBUG_FS
85 struct dentry *debugfs_file; 91 struct dentry *debugfs_file;
@@ -93,14 +99,23 @@ extern int ceph_monmap_contains(struct ceph_monmap *m,
93extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl); 99extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl);
94extern void ceph_monc_stop(struct ceph_mon_client *monc); 100extern void ceph_monc_stop(struct ceph_mon_client *monc);
95 101
102enum {
103 CEPH_SUB_MDSMAP = 0,
104 CEPH_SUB_MONMAP,
105 CEPH_SUB_OSDMAP,
106};
107
108extern const char *ceph_sub_str[];
109
96/* 110/*
97 * The model here is to indicate that we need a new map of at least 111 * The model here is to indicate that we need a new map of at least
98 * epoch @want, and also call in when we receive a map. We will 112 * epoch @epoch, and also call in when we receive a map. We will
99 * periodically rerequest the map from the monitor cluster until we 113 * periodically rerequest the map from the monitor cluster until we
100 * get what we want. 114 * get what we want.
101 */ 115 */
102extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have); 116bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
103extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have); 117 bool continuous);
118void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch);
104 119
105extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc); 120extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
106extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 121extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 7506b485bb6d..4343df806710 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -43,7 +43,8 @@ struct ceph_osd {
43}; 43};
44 44
45 45
46#define CEPH_OSD_MAX_OP 3 46#define CEPH_OSD_SLAB_OPS 2
47#define CEPH_OSD_MAX_OPS 16
47 48
48enum ceph_osd_data_type { 49enum ceph_osd_data_type {
49 CEPH_OSD_DATA_TYPE_NONE = 0, 50 CEPH_OSD_DATA_TYPE_NONE = 0,
@@ -77,7 +78,10 @@ struct ceph_osd_data {
77struct ceph_osd_req_op { 78struct ceph_osd_req_op {
78 u16 op; /* CEPH_OSD_OP_* */ 79 u16 op; /* CEPH_OSD_OP_* */
79 u32 flags; /* CEPH_OSD_OP_FLAG_* */ 80 u32 flags; /* CEPH_OSD_OP_FLAG_* */
80 u32 payload_len; 81 u32 indata_len; /* request */
82 u32 outdata_len; /* reply */
83 s32 rval;
84
81 union { 85 union {
82 struct ceph_osd_data raw_data_in; 86 struct ceph_osd_data raw_data_in;
83 struct { 87 struct {
@@ -136,7 +140,6 @@ struct ceph_osd_request {
136 140
137 /* request osd ops array */ 141 /* request osd ops array */
138 unsigned int r_num_ops; 142 unsigned int r_num_ops;
139 struct ceph_osd_req_op r_ops[CEPH_OSD_MAX_OP];
140 143
141 /* these are updated on each send */ 144 /* these are updated on each send */
142 __le32 *r_request_osdmap_epoch; 145 __le32 *r_request_osdmap_epoch;
@@ -148,8 +151,6 @@ struct ceph_osd_request {
148 struct ceph_eversion *r_request_reassert_version; 151 struct ceph_eversion *r_request_reassert_version;
149 152
150 int r_result; 153 int r_result;
151 int r_reply_op_len[CEPH_OSD_MAX_OP];
152 s32 r_reply_op_result[CEPH_OSD_MAX_OP];
153 int r_got_reply; 154 int r_got_reply;
154 int r_linger; 155 int r_linger;
155 156
@@ -174,6 +175,8 @@ struct ceph_osd_request {
174 unsigned long r_stamp; /* send OR check time */ 175 unsigned long r_stamp; /* send OR check time */
175 176
176 struct ceph_snap_context *r_snapc; /* snap context for writes */ 177 struct ceph_snap_context *r_snapc; /* snap context for writes */
178
179 struct ceph_osd_req_op r_ops[];
177}; 180};
178 181
179struct ceph_request_redirect { 182struct ceph_request_redirect {
@@ -263,6 +266,8 @@ extern void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
263 u64 truncate_size, u32 truncate_seq); 266 u64 truncate_size, u32 truncate_seq);
264extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 267extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
265 unsigned int which, u64 length); 268 unsigned int which, u64 length);
269extern void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
270 unsigned int which, u64 offset_inc);
266 271
267extern struct ceph_osd_data *osd_req_op_extent_osd_data( 272extern struct ceph_osd_data *osd_req_op_extent_osd_data(
268 struct ceph_osd_request *osd_req, 273 struct ceph_osd_request *osd_req,
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index bcbec33c6a14..dcc18c6f7cf9 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -361,7 +361,6 @@ ceph_parse_options(char *options, const char *dev_name,
361 opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; 361 opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
362 opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; 362 opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
363 opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; 363 opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
364 opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT;
365 364
366 /* get mon ip(s) */ 365 /* get mon ip(s) */
367 /* ip1[:port1][,ip2[:port2]...] */ 366 /* ip1[:port1][,ip2[:port2]...] */
@@ -686,6 +685,9 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started)
686 return client->auth_err; 685 return client->auth_err;
687 } 686 }
688 687
688 pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid);
689 ceph_debugfs_client_init(client);
690
689 return 0; 691 return 0;
690} 692}
691EXPORT_SYMBOL(__ceph_open_session); 693EXPORT_SYMBOL(__ceph_open_session);
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 593dc2eabcc8..b902fbc7863e 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -112,15 +112,20 @@ static int monc_show(struct seq_file *s, void *p)
112 struct ceph_mon_generic_request *req; 112 struct ceph_mon_generic_request *req;
113 struct ceph_mon_client *monc = &client->monc; 113 struct ceph_mon_client *monc = &client->monc;
114 struct rb_node *rp; 114 struct rb_node *rp;
115 int i;
115 116
116 mutex_lock(&monc->mutex); 117 mutex_lock(&monc->mutex);
117 118
118 if (monc->have_mdsmap) 119 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
119 seq_printf(s, "have mdsmap %u\n", (unsigned int)monc->have_mdsmap); 120 seq_printf(s, "have %s %u", ceph_sub_str[i],
120 if (monc->have_osdmap) 121 monc->subs[i].have);
121 seq_printf(s, "have osdmap %u\n", (unsigned int)monc->have_osdmap); 122 if (monc->subs[i].want)
122 if (monc->want_next_osdmap) 123 seq_printf(s, " want %llu%s",
123 seq_printf(s, "want next osdmap\n"); 124 le64_to_cpu(monc->subs[i].item.start),
125 (monc->subs[i].item.flags &
126 CEPH_SUBSCRIBE_ONETIME ? "" : "+"));
127 seq_putc(s, '\n');
128 }
124 129
125 for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) { 130 for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) {
126 __u16 op; 131 __u16 op;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9382619a405b..1831f6353622 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -235,18 +235,12 @@ static struct workqueue_struct *ceph_msgr_wq;
235static int ceph_msgr_slab_init(void) 235static int ceph_msgr_slab_init(void)
236{ 236{
237 BUG_ON(ceph_msg_cache); 237 BUG_ON(ceph_msg_cache);
238 ceph_msg_cache = kmem_cache_create("ceph_msg", 238 ceph_msg_cache = KMEM_CACHE(ceph_msg, 0);
239 sizeof (struct ceph_msg),
240 __alignof__(struct ceph_msg), 0, NULL);
241
242 if (!ceph_msg_cache) 239 if (!ceph_msg_cache)
243 return -ENOMEM; 240 return -ENOMEM;
244 241
245 BUG_ON(ceph_msg_data_cache); 242 BUG_ON(ceph_msg_data_cache);
246 ceph_msg_data_cache = kmem_cache_create("ceph_msg_data", 243 ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0);
247 sizeof (struct ceph_msg_data),
248 __alignof__(struct ceph_msg_data),
249 0, NULL);
250 if (ceph_msg_data_cache) 244 if (ceph_msg_data_cache)
251 return 0; 245 return 0;
252 246
@@ -1221,25 +1215,19 @@ static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
1221static void prepare_write_message_footer(struct ceph_connection *con) 1215static void prepare_write_message_footer(struct ceph_connection *con)
1222{ 1216{
1223 struct ceph_msg *m = con->out_msg; 1217 struct ceph_msg *m = con->out_msg;
1224 int v = con->out_kvec_left;
1225 1218
1226 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; 1219 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
1227 1220
1228 dout("prepare_write_message_footer %p\n", con); 1221 dout("prepare_write_message_footer %p\n", con);
1229 con->out_kvec[v].iov_base = &m->footer; 1222 con_out_kvec_add(con, sizeof_footer(con), &m->footer);
1230 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { 1223 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
1231 if (con->ops->sign_message) 1224 if (con->ops->sign_message)
1232 con->ops->sign_message(m); 1225 con->ops->sign_message(m);
1233 else 1226 else
1234 m->footer.sig = 0; 1227 m->footer.sig = 0;
1235 con->out_kvec[v].iov_len = sizeof(m->footer);
1236 con->out_kvec_bytes += sizeof(m->footer);
1237 } else { 1228 } else {
1238 m->old_footer.flags = m->footer.flags; 1229 m->old_footer.flags = m->footer.flags;
1239 con->out_kvec[v].iov_len = sizeof(m->old_footer);
1240 con->out_kvec_bytes += sizeof(m->old_footer);
1241 } 1230 }
1242 con->out_kvec_left++;
1243 con->out_more = m->more_to_follow; 1231 con->out_more = m->more_to_follow;
1244 con->out_msg_done = true; 1232 con->out_msg_done = true;
1245} 1233}
@@ -2409,11 +2397,7 @@ static int read_partial_message(struct ceph_connection *con)
2409 } 2397 }
2410 2398
2411 /* footer */ 2399 /* footer */
2412 if (need_sign) 2400 size = sizeof_footer(con);
2413 size = sizeof(m->footer);
2414 else
2415 size = sizeof(m->old_footer);
2416
2417 end += size; 2401 end += size;
2418 ret = read_partial(con, end, size, &m->footer); 2402 ret = read_partial(con, end, size, &m->footer);
2419 if (ret <= 0) 2403 if (ret <= 0)
@@ -3089,10 +3073,7 @@ void ceph_msg_revoke(struct ceph_msg *msg)
3089 con->out_skip += con_out_kvec_skip(con); 3073 con->out_skip += con_out_kvec_skip(con);
3090 } else { 3074 } else {
3091 BUG_ON(!msg->data_length); 3075 BUG_ON(!msg->data_length);
3092 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) 3076 con->out_skip += sizeof_footer(con);
3093 con->out_skip += sizeof(msg->footer);
3094 else
3095 con->out_skip += sizeof(msg->old_footer);
3096 } 3077 }
3097 /* data, middle, front */ 3078 /* data, middle, front */
3098 if (msg->data_length) 3079 if (msg->data_length)
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index de85dddc3dc0..cf638c009cfa 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -122,51 +122,91 @@ static void __close_session(struct ceph_mon_client *monc)
122 ceph_msg_revoke(monc->m_subscribe); 122 ceph_msg_revoke(monc->m_subscribe);
123 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 123 ceph_msg_revoke_incoming(monc->m_subscribe_ack);
124 ceph_con_close(&monc->con); 124 ceph_con_close(&monc->con);
125 monc->cur_mon = -1; 125
126 monc->pending_auth = 0; 126 monc->pending_auth = 0;
127 ceph_auth_reset(monc->auth); 127 ceph_auth_reset(monc->auth);
128} 128}
129 129
130/* 130/*
131 * Open a session with a (new) monitor. 131 * Pick a new monitor at random and set cur_mon. If we are repicking
132 * (i.e. cur_mon is already set), be sure to pick a different one.
132 */ 133 */
133static int __open_session(struct ceph_mon_client *monc) 134static void pick_new_mon(struct ceph_mon_client *monc)
134{ 135{
135 char r; 136 int old_mon = monc->cur_mon;
136 int ret;
137 137
138 if (monc->cur_mon < 0) { 138 BUG_ON(monc->monmap->num_mon < 1);
139 get_random_bytes(&r, 1); 139
140 monc->cur_mon = r % monc->monmap->num_mon; 140 if (monc->monmap->num_mon == 1) {
141 dout("open_session num=%d r=%d -> mon%d\n", 141 monc->cur_mon = 0;
142 monc->monmap->num_mon, r, monc->cur_mon);
143 monc->sub_sent = 0;
144 monc->sub_renew_after = jiffies; /* i.e., expired */
145 monc->want_next_osdmap = !!monc->want_next_osdmap;
146
147 dout("open_session mon%d opening\n", monc->cur_mon);
148 ceph_con_open(&monc->con,
149 CEPH_ENTITY_TYPE_MON, monc->cur_mon,
150 &monc->monmap->mon_inst[monc->cur_mon].addr);
151
152 /* send an initial keepalive to ensure our timestamp is
153 * valid by the time we are in an OPENED state */
154 ceph_con_keepalive(&monc->con);
155
156 /* initiatiate authentication handshake */
157 ret = ceph_auth_build_hello(monc->auth,
158 monc->m_auth->front.iov_base,
159 monc->m_auth->front_alloc_len);
160 __send_prepared_auth_request(monc, ret);
161 } else { 142 } else {
162 dout("open_session mon%d already open\n", monc->cur_mon); 143 int max = monc->monmap->num_mon;
144 int o = -1;
145 int n;
146
147 if (monc->cur_mon >= 0) {
148 if (monc->cur_mon < monc->monmap->num_mon)
149 o = monc->cur_mon;
150 if (o >= 0)
151 max--;
152 }
153
154 n = prandom_u32() % max;
155 if (o >= 0 && n >= o)
156 n++;
157
158 monc->cur_mon = n;
163 } 159 }
164 return 0; 160
161 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
162 monc->cur_mon, monc->monmap->num_mon);
163}
164
165/*
166 * Open a session with a new monitor.
167 */
168static void __open_session(struct ceph_mon_client *monc)
169{
170 int ret;
171
172 pick_new_mon(monc);
173
174 monc->hunting = true;
175 if (monc->had_a_connection) {
176 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
177 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
178 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
179 }
180
181 monc->sub_renew_after = jiffies; /* i.e., expired */
182 monc->sub_renew_sent = 0;
183
184 dout("%s opening mon%d\n", __func__, monc->cur_mon);
185 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
186 &monc->monmap->mon_inst[monc->cur_mon].addr);
187
188 /*
189 * send an initial keepalive to ensure our timestamp is valid
190 * by the time we are in an OPENED state
191 */
192 ceph_con_keepalive(&monc->con);
193
194 /* initiate authentication handshake */
195 ret = ceph_auth_build_hello(monc->auth,
196 monc->m_auth->front.iov_base,
197 monc->m_auth->front_alloc_len);
198 BUG_ON(ret <= 0);
199 __send_prepared_auth_request(monc, ret);
165} 200}
166 201
167static bool __sub_expired(struct ceph_mon_client *monc) 202static void reopen_session(struct ceph_mon_client *monc)
168{ 203{
169 return time_after_eq(jiffies, monc->sub_renew_after); 204 if (!monc->hunting)
205 pr_info("mon%d %s session lost, hunting for new mon\n",
206 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
207
208 __close_session(monc);
209 __open_session(monc);
170} 210}
171 211
172/* 212/*
@@ -174,74 +214,70 @@ static bool __sub_expired(struct ceph_mon_client *monc)
174 */ 214 */
175static void __schedule_delayed(struct ceph_mon_client *monc) 215static void __schedule_delayed(struct ceph_mon_client *monc)
176{ 216{
177 struct ceph_options *opt = monc->client->options;
178 unsigned long delay; 217 unsigned long delay;
179 218
180 if (monc->cur_mon < 0 || __sub_expired(monc)) { 219 if (monc->hunting)
181 delay = 10 * HZ; 220 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
182 } else { 221 else
183 delay = 20 * HZ; 222 delay = CEPH_MONC_PING_INTERVAL;
184 if (opt->monc_ping_timeout > 0) 223
185 delay = min(delay, opt->monc_ping_timeout / 3);
186 }
187 dout("__schedule_delayed after %lu\n", delay); 224 dout("__schedule_delayed after %lu\n", delay);
188 schedule_delayed_work(&monc->delayed_work, 225 mod_delayed_work(system_wq, &monc->delayed_work,
189 round_jiffies_relative(delay)); 226 round_jiffies_relative(delay));
190} 227}
191 228
229const char *ceph_sub_str[] = {
230 [CEPH_SUB_MDSMAP] = "mdsmap",
231 [CEPH_SUB_MONMAP] = "monmap",
232 [CEPH_SUB_OSDMAP] = "osdmap",
233};
234
192/* 235/*
193 * Send subscribe request for mdsmap and/or osdmap. 236 * Send subscribe request for one or more maps, according to
237 * monc->subs.
194 */ 238 */
195static void __send_subscribe(struct ceph_mon_client *monc) 239static void __send_subscribe(struct ceph_mon_client *monc)
196{ 240{
197 dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", 241 struct ceph_msg *msg = monc->m_subscribe;
198 (unsigned int)monc->sub_sent, __sub_expired(monc), 242 void *p = msg->front.iov_base;
199 monc->want_next_osdmap); 243 void *const end = p + msg->front_alloc_len;
200 if ((__sub_expired(monc) && !monc->sub_sent) || 244 int num = 0;
201 monc->want_next_osdmap == 1) { 245 int i;
202 struct ceph_msg *msg = monc->m_subscribe; 246
203 struct ceph_mon_subscribe_item *i; 247 dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
204 void *p, *end; 248
205 int num; 249 BUG_ON(monc->cur_mon < 0);
206 250
207 p = msg->front.iov_base; 251 if (!monc->sub_renew_sent)
208 end = p + msg->front_alloc_len; 252 monc->sub_renew_sent = jiffies | 1; /* never 0 */
209 253
210 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; 254 msg->hdr.version = cpu_to_le16(2);
211 ceph_encode_32(&p, num); 255
212 256 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
213 if (monc->want_next_osdmap) { 257 if (monc->subs[i].want)
214 dout("__send_subscribe to 'osdmap' %u\n", 258 num++;
215 (unsigned int)monc->have_osdmap);
216 ceph_encode_string(&p, end, "osdmap", 6);
217 i = p;
218 i->have = cpu_to_le64(monc->have_osdmap);
219 i->onetime = 1;
220 p += sizeof(*i);
221 monc->want_next_osdmap = 2; /* requested */
222 }
223 if (monc->want_mdsmap) {
224 dout("__send_subscribe to 'mdsmap' %u+\n",
225 (unsigned int)monc->have_mdsmap);
226 ceph_encode_string(&p, end, "mdsmap", 6);
227 i = p;
228 i->have = cpu_to_le64(monc->have_mdsmap);
229 i->onetime = 0;
230 p += sizeof(*i);
231 }
232 ceph_encode_string(&p, end, "monmap", 6);
233 i = p;
234 i->have = 0;
235 i->onetime = 0;
236 p += sizeof(*i);
237
238 msg->front.iov_len = p - msg->front.iov_base;
239 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
240 ceph_msg_revoke(msg);
241 ceph_con_send(&monc->con, ceph_msg_get(msg));
242
243 monc->sub_sent = jiffies | 1; /* never 0 */
244 } 259 }
260 BUG_ON(num < 1); /* monmap sub is always there */
261 ceph_encode_32(&p, num);
262 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
263 const char *s = ceph_sub_str[i];
264
265 if (!monc->subs[i].want)
266 continue;
267
268 dout("%s %s start %llu flags 0x%x\n", __func__, s,
269 le64_to_cpu(monc->subs[i].item.start),
270 monc->subs[i].item.flags);
271 ceph_encode_string(&p, end, s, strlen(s));
272 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
273 p += sizeof(monc->subs[i].item);
274 }
275
276 BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19));
277 msg->front.iov_len = p - msg->front.iov_base;
278 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
279 ceph_msg_revoke(msg);
280 ceph_con_send(&monc->con, ceph_msg_get(msg));
245} 281}
246 282
247static void handle_subscribe_ack(struct ceph_mon_client *monc, 283static void handle_subscribe_ack(struct ceph_mon_client *monc,
@@ -255,15 +291,16 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
255 seconds = le32_to_cpu(h->duration); 291 seconds = le32_to_cpu(h->duration);
256 292
257 mutex_lock(&monc->mutex); 293 mutex_lock(&monc->mutex);
258 if (monc->hunting) { 294 if (monc->sub_renew_sent) {
259 pr_info("mon%d %s session established\n", 295 monc->sub_renew_after = monc->sub_renew_sent +
260 monc->cur_mon, 296 (seconds >> 1) * HZ - 1;
261 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 297 dout("%s sent %lu duration %d renew after %lu\n", __func__,
262 monc->hunting = false; 298 monc->sub_renew_sent, seconds, monc->sub_renew_after);
299 monc->sub_renew_sent = 0;
300 } else {
301 dout("%s sent %lu renew after %lu, ignoring\n", __func__,
302 monc->sub_renew_sent, monc->sub_renew_after);
263 } 303 }
264 dout("handle_subscribe_ack after %d seconds\n", seconds);
265 monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
266 monc->sub_sent = 0;
267 mutex_unlock(&monc->mutex); 304 mutex_unlock(&monc->mutex);
268 return; 305 return;
269bad: 306bad:
@@ -272,36 +309,82 @@ bad:
272} 309}
273 310
274/* 311/*
275 * Keep track of which maps we have 312 * Register interest in a map
313 *
314 * @sub: one of CEPH_SUB_*
315 * @epoch: X for "every map since X", or 0 for "just the latest"
276 */ 316 */
277int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) 317static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
318 u32 epoch, bool continuous)
319{
320 __le64 start = cpu_to_le64(epoch);
321 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
322
323 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
324 epoch, continuous);
325
326 if (monc->subs[sub].want &&
327 monc->subs[sub].item.start == start &&
328 monc->subs[sub].item.flags == flags)
329 return false;
330
331 monc->subs[sub].item.start = start;
332 monc->subs[sub].item.flags = flags;
333 monc->subs[sub].want = true;
334
335 return true;
336}
337
338bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
339 bool continuous)
278{ 340{
341 bool need_request;
342
279 mutex_lock(&monc->mutex); 343 mutex_lock(&monc->mutex);
280 monc->have_mdsmap = got; 344 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
281 mutex_unlock(&monc->mutex); 345 mutex_unlock(&monc->mutex);
282 return 0; 346
347 return need_request;
283} 348}
284EXPORT_SYMBOL(ceph_monc_got_mdsmap); 349EXPORT_SYMBOL(ceph_monc_want_map);
285 350
286int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) 351/*
352 * Keep track of which maps we have
353 *
354 * @sub: one of CEPH_SUB_*
355 */
356static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
357 u32 epoch)
358{
359 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
360
361 if (monc->subs[sub].want) {
362 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
363 monc->subs[sub].want = false;
364 else
365 monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
366 }
367
368 monc->subs[sub].have = epoch;
369}
370
371void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
287{ 372{
288 mutex_lock(&monc->mutex); 373 mutex_lock(&monc->mutex);
289 monc->have_osdmap = got; 374 __ceph_monc_got_map(monc, sub, epoch);
290 monc->want_next_osdmap = 0;
291 mutex_unlock(&monc->mutex); 375 mutex_unlock(&monc->mutex);
292 return 0;
293} 376}
377EXPORT_SYMBOL(ceph_monc_got_map);
294 378
295/* 379/*
296 * Register interest in the next osdmap 380 * Register interest in the next osdmap
297 */ 381 */
298void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) 382void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
299{ 383{
300 dout("request_next_osdmap have %u\n", monc->have_osdmap); 384 dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have);
301 mutex_lock(&monc->mutex); 385 mutex_lock(&monc->mutex);
302 if (!monc->want_next_osdmap) 386 if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP,
303 monc->want_next_osdmap = 1; 387 monc->subs[CEPH_SUB_OSDMAP].have + 1, false))
304 if (monc->want_next_osdmap < 2)
305 __send_subscribe(monc); 388 __send_subscribe(monc);
306 mutex_unlock(&monc->mutex); 389 mutex_unlock(&monc->mutex);
307} 390}
@@ -320,15 +403,15 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
320 long ret; 403 long ret;
321 404
322 mutex_lock(&monc->mutex); 405 mutex_lock(&monc->mutex);
323 while (monc->have_osdmap < epoch) { 406 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
324 mutex_unlock(&monc->mutex); 407 mutex_unlock(&monc->mutex);
325 408
326 if (timeout && time_after_eq(jiffies, started + timeout)) 409 if (timeout && time_after_eq(jiffies, started + timeout))
327 return -ETIMEDOUT; 410 return -ETIMEDOUT;
328 411
329 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 412 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
330 monc->have_osdmap >= epoch, 413 monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
331 ceph_timeout_jiffies(timeout)); 414 ceph_timeout_jiffies(timeout));
332 if (ret < 0) 415 if (ret < 0)
333 return ret; 416 return ret;
334 417
@@ -341,11 +424,14 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
341EXPORT_SYMBOL(ceph_monc_wait_osdmap); 424EXPORT_SYMBOL(ceph_monc_wait_osdmap);
342 425
343/* 426/*
344 * 427 * Open a session with a random monitor. Request monmap and osdmap,
428 * which are waited upon in __ceph_open_session().
345 */ 429 */
346int ceph_monc_open_session(struct ceph_mon_client *monc) 430int ceph_monc_open_session(struct ceph_mon_client *monc)
347{ 431{
348 mutex_lock(&monc->mutex); 432 mutex_lock(&monc->mutex);
433 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
434 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
349 __open_session(monc); 435 __open_session(monc);
350 __schedule_delayed(monc); 436 __schedule_delayed(monc);
351 mutex_unlock(&monc->mutex); 437 mutex_unlock(&monc->mutex);
@@ -353,29 +439,15 @@ int ceph_monc_open_session(struct ceph_mon_client *monc)
353} 439}
354EXPORT_SYMBOL(ceph_monc_open_session); 440EXPORT_SYMBOL(ceph_monc_open_session);
355 441
356/*
357 * We require the fsid and global_id in order to initialize our
358 * debugfs dir.
359 */
360static bool have_debugfs_info(struct ceph_mon_client *monc)
361{
362 dout("have_debugfs_info fsid %d globalid %lld\n",
363 (int)monc->client->have_fsid, monc->auth->global_id);
364 return monc->client->have_fsid && monc->auth->global_id > 0;
365}
366
367static void ceph_monc_handle_map(struct ceph_mon_client *monc, 442static void ceph_monc_handle_map(struct ceph_mon_client *monc,
368 struct ceph_msg *msg) 443 struct ceph_msg *msg)
369{ 444{
370 struct ceph_client *client = monc->client; 445 struct ceph_client *client = monc->client;
371 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 446 struct ceph_monmap *monmap = NULL, *old = monc->monmap;
372 void *p, *end; 447 void *p, *end;
373 int had_debugfs_info, init_debugfs = 0;
374 448
375 mutex_lock(&monc->mutex); 449 mutex_lock(&monc->mutex);
376 450
377 had_debugfs_info = have_debugfs_info(monc);
378
379 dout("handle_monmap\n"); 451 dout("handle_monmap\n");
380 p = msg->front.iov_base; 452 p = msg->front.iov_base;
381 end = p + msg->front.iov_len; 453 end = p + msg->front.iov_len;
@@ -395,29 +467,11 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
395 client->monc.monmap = monmap; 467 client->monc.monmap = monmap;
396 kfree(old); 468 kfree(old);
397 469
398 if (!client->have_fsid) { 470 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
399 client->have_fsid = true; 471 client->have_fsid = true;
400 if (!had_debugfs_info && have_debugfs_info(monc)) {
401 pr_info("client%lld fsid %pU\n",
402 ceph_client_id(monc->client),
403 &monc->client->fsid);
404 init_debugfs = 1;
405 }
406 mutex_unlock(&monc->mutex);
407
408 if (init_debugfs) {
409 /*
410 * do debugfs initialization without mutex to avoid
411 * creating a locking dependency
412 */
413 ceph_debugfs_client_init(monc->client);
414 }
415 472
416 goto out_unlocked;
417 }
418out: 473out:
419 mutex_unlock(&monc->mutex); 474 mutex_unlock(&monc->mutex);
420out_unlocked:
421 wake_up_all(&client->auth_wq); 475 wake_up_all(&client->auth_wq);
422} 476}
423 477
@@ -745,18 +799,15 @@ static void delayed_work(struct work_struct *work)
745 dout("monc delayed_work\n"); 799 dout("monc delayed_work\n");
746 mutex_lock(&monc->mutex); 800 mutex_lock(&monc->mutex);
747 if (monc->hunting) { 801 if (monc->hunting) {
748 __close_session(monc); 802 dout("%s continuing hunt\n", __func__);
749 __open_session(monc); /* continue hunting */ 803 reopen_session(monc);
750 } else { 804 } else {
751 struct ceph_options *opt = monc->client->options;
752 int is_auth = ceph_auth_is_authenticated(monc->auth); 805 int is_auth = ceph_auth_is_authenticated(monc->auth);
753 if (ceph_con_keepalive_expired(&monc->con, 806 if (ceph_con_keepalive_expired(&monc->con,
754 opt->monc_ping_timeout)) { 807 CEPH_MONC_PING_TIMEOUT)) {
755 dout("monc keepalive timeout\n"); 808 dout("monc keepalive timeout\n");
756 is_auth = 0; 809 is_auth = 0;
757 __close_session(monc); 810 reopen_session(monc);
758 monc->hunting = true;
759 __open_session(monc);
760 } 811 }
761 812
762 if (!monc->hunting) { 813 if (!monc->hunting) {
@@ -764,8 +815,14 @@ static void delayed_work(struct work_struct *work)
764 __validate_auth(monc); 815 __validate_auth(monc);
765 } 816 }
766 817
767 if (is_auth) 818 if (is_auth) {
768 __send_subscribe(monc); 819 unsigned long now = jiffies;
820
821 dout("%s renew subs? now %lu renew after %lu\n",
822 __func__, now, monc->sub_renew_after);
823 if (time_after_eq(now, monc->sub_renew_after))
824 __send_subscribe(monc);
825 }
769 } 826 }
770 __schedule_delayed(monc); 827 __schedule_delayed(monc);
771 mutex_unlock(&monc->mutex); 828 mutex_unlock(&monc->mutex);
@@ -852,18 +909,14 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
852 &monc->client->msgr); 909 &monc->client->msgr);
853 910
854 monc->cur_mon = -1; 911 monc->cur_mon = -1;
855 monc->hunting = true; 912 monc->had_a_connection = false;
856 monc->sub_renew_after = jiffies; 913 monc->hunt_mult = 1;
857 monc->sub_sent = 0;
858 914
859 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 915 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
860 monc->generic_request_tree = RB_ROOT; 916 monc->generic_request_tree = RB_ROOT;
861 monc->num_generic_requests = 0; 917 monc->num_generic_requests = 0;
862 monc->last_tid = 0; 918 monc->last_tid = 0;
863 919
864 monc->have_mdsmap = 0;
865 monc->have_osdmap = 0;
866 monc->want_next_osdmap = 1;
867 return 0; 920 return 0;
868 921
869out_auth_reply: 922out_auth_reply:
@@ -888,7 +941,7 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
888 941
889 mutex_lock(&monc->mutex); 942 mutex_lock(&monc->mutex);
890 __close_session(monc); 943 __close_session(monc);
891 944 monc->cur_mon = -1;
892 mutex_unlock(&monc->mutex); 945 mutex_unlock(&monc->mutex);
893 946
894 /* 947 /*
@@ -910,26 +963,40 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
910} 963}
911EXPORT_SYMBOL(ceph_monc_stop); 964EXPORT_SYMBOL(ceph_monc_stop);
912 965
966static void finish_hunting(struct ceph_mon_client *monc)
967{
968 if (monc->hunting) {
969 dout("%s found mon%d\n", __func__, monc->cur_mon);
970 monc->hunting = false;
971 monc->had_a_connection = true;
972 monc->hunt_mult /= 2; /* reduce by 50% */
973 if (monc->hunt_mult < 1)
974 monc->hunt_mult = 1;
975 }
976}
977
913static void handle_auth_reply(struct ceph_mon_client *monc, 978static void handle_auth_reply(struct ceph_mon_client *monc,
914 struct ceph_msg *msg) 979 struct ceph_msg *msg)
915{ 980{
916 int ret; 981 int ret;
917 int was_auth = 0; 982 int was_auth = 0;
918 int had_debugfs_info, init_debugfs = 0;
919 983
920 mutex_lock(&monc->mutex); 984 mutex_lock(&monc->mutex);
921 had_debugfs_info = have_debugfs_info(monc);
922 was_auth = ceph_auth_is_authenticated(monc->auth); 985 was_auth = ceph_auth_is_authenticated(monc->auth);
923 monc->pending_auth = 0; 986 monc->pending_auth = 0;
924 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 987 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
925 msg->front.iov_len, 988 msg->front.iov_len,
926 monc->m_auth->front.iov_base, 989 monc->m_auth->front.iov_base,
927 monc->m_auth->front_alloc_len); 990 monc->m_auth->front_alloc_len);
991 if (ret > 0) {
992 __send_prepared_auth_request(monc, ret);
993 goto out;
994 }
995
996 finish_hunting(monc);
997
928 if (ret < 0) { 998 if (ret < 0) {
929 monc->client->auth_err = ret; 999 monc->client->auth_err = ret;
930 wake_up_all(&monc->client->auth_wq);
931 } else if (ret > 0) {
932 __send_prepared_auth_request(monc, ret);
933 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { 1000 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
934 dout("authenticated, starting session\n"); 1001 dout("authenticated, starting session\n");
935 1002
@@ -939,23 +1006,15 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
939 1006
940 __send_subscribe(monc); 1007 __send_subscribe(monc);
941 __resend_generic_request(monc); 1008 __resend_generic_request(monc);
942 }
943 1009
944 if (!had_debugfs_info && have_debugfs_info(monc)) { 1010 pr_info("mon%d %s session established\n", monc->cur_mon,
945 pr_info("client%lld fsid %pU\n", 1011 ceph_pr_addr(&monc->con.peer_addr.in_addr));
946 ceph_client_id(monc->client),
947 &monc->client->fsid);
948 init_debugfs = 1;
949 } 1012 }
950 mutex_unlock(&monc->mutex);
951 1013
952 if (init_debugfs) { 1014out:
953 /* 1015 mutex_unlock(&monc->mutex);
954 * do debugfs initialization without mutex to avoid 1016 if (monc->client->auth_err < 0)
955 * creating a locking dependency 1017 wake_up_all(&monc->client->auth_wq);
956 */
957 ceph_debugfs_client_init(monc->client);
958 }
959} 1018}
960 1019
961static int __validate_auth(struct ceph_mon_client *monc) 1020static int __validate_auth(struct ceph_mon_client *monc)
@@ -1096,29 +1155,17 @@ static void mon_fault(struct ceph_connection *con)
1096{ 1155{
1097 struct ceph_mon_client *monc = con->private; 1156 struct ceph_mon_client *monc = con->private;
1098 1157
1099 if (!monc)
1100 return;
1101
1102 dout("mon_fault\n");
1103 mutex_lock(&monc->mutex); 1158 mutex_lock(&monc->mutex);
1104 if (!con->private) 1159 dout("%s mon%d\n", __func__, monc->cur_mon);
1105 goto out; 1160 if (monc->cur_mon >= 0) {
1106 1161 if (!monc->hunting) {
1107 if (!monc->hunting) 1162 dout("%s hunting for new mon\n", __func__);
1108 pr_info("mon%d %s session lost, " 1163 reopen_session(monc);
1109 "hunting for new mon\n", monc->cur_mon, 1164 __schedule_delayed(monc);
1110 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 1165 } else {
1111 1166 dout("%s already hunting\n", __func__);
1112 __close_session(monc); 1167 }
1113 if (!monc->hunting) {
1114 /* start hunting */
1115 monc->hunting = true;
1116 __open_session(monc);
1117 } else {
1118 /* already hunting, let's wait a bit */
1119 __schedule_delayed(monc);
1120 } 1168 }
1121out:
1122 mutex_unlock(&monc->mutex); 1169 mutex_unlock(&monc->mutex);
1123} 1170}
1124 1171
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5bc053778fed..32355d9d0103 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,9 +338,10 @@ static void ceph_osdc_release_request(struct kref *kref)
338 ceph_put_snap_context(req->r_snapc); 338 ceph_put_snap_context(req->r_snapc);
339 if (req->r_mempool) 339 if (req->r_mempool)
340 mempool_free(req, req->r_osdc->req_mempool); 340 mempool_free(req, req->r_osdc->req_mempool);
341 else 341 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
342 kmem_cache_free(ceph_osd_request_cache, req); 342 kmem_cache_free(ceph_osd_request_cache, req);
343 343 else
344 kfree(req);
344} 345}
345 346
346void ceph_osdc_get_request(struct ceph_osd_request *req) 347void ceph_osdc_get_request(struct ceph_osd_request *req)
@@ -369,28 +370,22 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
369 struct ceph_msg *msg; 370 struct ceph_msg *msg;
370 size_t msg_size; 371 size_t msg_size;
371 372
372 BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
373 BUG_ON(num_ops > CEPH_OSD_MAX_OP);
374
375 msg_size = 4 + 4 + 8 + 8 + 4+8;
376 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
377 msg_size += 1 + 8 + 4 + 4; /* pg_t */
378 msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
379 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
380 msg_size += 8; /* snapid */
381 msg_size += 8; /* snap_seq */
382 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
383 msg_size += 4;
384
385 if (use_mempool) { 373 if (use_mempool) {
374 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
386 req = mempool_alloc(osdc->req_mempool, gfp_flags); 375 req = mempool_alloc(osdc->req_mempool, gfp_flags);
387 memset(req, 0, sizeof(*req)); 376 } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
377 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
388 } else { 378 } else {
389 req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags); 379 BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
380 req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
381 gfp_flags);
390 } 382 }
391 if (req == NULL) 383 if (unlikely(!req))
392 return NULL; 384 return NULL;
393 385
386 /* req only, each op is zeroed in _osd_req_op_init() */
387 memset(req, 0, sizeof(*req));
388
394 req->r_osdc = osdc; 389 req->r_osdc = osdc;
395 req->r_mempool = use_mempool; 390 req->r_mempool = use_mempool;
396 req->r_num_ops = num_ops; 391 req->r_num_ops = num_ops;
@@ -408,18 +403,36 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
408 req->r_base_oloc.pool = -1; 403 req->r_base_oloc.pool = -1;
409 req->r_target_oloc.pool = -1; 404 req->r_target_oloc.pool = -1;
410 405
406 msg_size = OSD_OPREPLY_FRONT_LEN;
407 if (num_ops > CEPH_OSD_SLAB_OPS) {
408 /* ceph_osd_op and rval */
409 msg_size += (num_ops - CEPH_OSD_SLAB_OPS) *
410 (sizeof(struct ceph_osd_op) + 4);
411 }
412
411 /* create reply message */ 413 /* create reply message */
412 if (use_mempool) 414 if (use_mempool)
413 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 415 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
414 else 416 else
415 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 417 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size,
416 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 418 gfp_flags, true);
417 if (!msg) { 419 if (!msg) {
418 ceph_osdc_put_request(req); 420 ceph_osdc_put_request(req);
419 return NULL; 421 return NULL;
420 } 422 }
421 req->r_reply = msg; 423 req->r_reply = msg;
422 424
425 msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
426 msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
427 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
428 msg_size += 1 + 8 + 4 + 4; /* pgid */
429 msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
430 msg_size += 2 + num_ops * sizeof(struct ceph_osd_op);
431 msg_size += 8; /* snapid */
432 msg_size += 8; /* snap_seq */
433 msg_size += 4 + 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
434 msg_size += 4; /* retry_attempt */
435
423 /* create request message; allow space for oid */ 436 /* create request message; allow space for oid */
424 if (use_mempool) 437 if (use_mempool)
425 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 438 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
@@ -498,7 +511,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
498 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) 511 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
499 payload_len += length; 512 payload_len += length;
500 513
501 op->payload_len = payload_len; 514 op->indata_len = payload_len;
502} 515}
503EXPORT_SYMBOL(osd_req_op_extent_init); 516EXPORT_SYMBOL(osd_req_op_extent_init);
504 517
@@ -517,10 +530,32 @@ void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
517 BUG_ON(length > previous); 530 BUG_ON(length > previous);
518 531
519 op->extent.length = length; 532 op->extent.length = length;
520 op->payload_len -= previous - length; 533 op->indata_len -= previous - length;
521} 534}
522EXPORT_SYMBOL(osd_req_op_extent_update); 535EXPORT_SYMBOL(osd_req_op_extent_update);
523 536
537void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
538 unsigned int which, u64 offset_inc)
539{
540 struct ceph_osd_req_op *op, *prev_op;
541
542 BUG_ON(which + 1 >= osd_req->r_num_ops);
543
544 prev_op = &osd_req->r_ops[which];
545 op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
546 /* dup previous one */
547 op->indata_len = prev_op->indata_len;
548 op->outdata_len = prev_op->outdata_len;
549 op->extent = prev_op->extent;
550 /* adjust offset */
551 op->extent.offset += offset_inc;
552 op->extent.length -= offset_inc;
553
554 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
555 op->indata_len -= offset_inc;
556}
557EXPORT_SYMBOL(osd_req_op_extent_dup_last);
558
524void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 559void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
525 u16 opcode, const char *class, const char *method) 560 u16 opcode, const char *class, const char *method)
526{ 561{
@@ -554,7 +589,7 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
554 589
555 op->cls.argc = 0; /* currently unused */ 590 op->cls.argc = 0; /* currently unused */
556 591
557 op->payload_len = payload_len; 592 op->indata_len = payload_len;
558} 593}
559EXPORT_SYMBOL(osd_req_op_cls_init); 594EXPORT_SYMBOL(osd_req_op_cls_init);
560 595
@@ -587,7 +622,7 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
587 op->xattr.cmp_mode = cmp_mode; 622 op->xattr.cmp_mode = cmp_mode;
588 623
589 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); 624 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
590 op->payload_len = payload_len; 625 op->indata_len = payload_len;
591 return 0; 626 return 0;
592} 627}
593EXPORT_SYMBOL(osd_req_op_xattr_init); 628EXPORT_SYMBOL(osd_req_op_xattr_init);
@@ -707,7 +742,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
707 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE); 742 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
708 dst->cls.indata_len = cpu_to_le32(data_length); 743 dst->cls.indata_len = cpu_to_le32(data_length);
709 ceph_osdc_msg_data_add(req->r_request, osd_data); 744 ceph_osdc_msg_data_add(req->r_request, osd_data);
710 src->payload_len += data_length; 745 src->indata_len += data_length;
711 request_data_len += data_length; 746 request_data_len += data_length;
712 } 747 }
713 osd_data = &src->cls.response_data; 748 osd_data = &src->cls.response_data;
@@ -750,7 +785,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
750 785
751 dst->op = cpu_to_le16(src->op); 786 dst->op = cpu_to_le16(src->op);
752 dst->flags = cpu_to_le32(src->flags); 787 dst->flags = cpu_to_le32(src->flags);
753 dst->payload_len = cpu_to_le32(src->payload_len); 788 dst->payload_len = cpu_to_le32(src->indata_len);
754 789
755 return request_data_len; 790 return request_data_len;
756} 791}
@@ -1810,7 +1845,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1810 1845
1811 ceph_decode_need(&p, end, 4, bad_put); 1846 ceph_decode_need(&p, end, 4, bad_put);
1812 numops = ceph_decode_32(&p); 1847 numops = ceph_decode_32(&p);
1813 if (numops > CEPH_OSD_MAX_OP) 1848 if (numops > CEPH_OSD_MAX_OPS)
1814 goto bad_put; 1849 goto bad_put;
1815 if (numops != req->r_num_ops) 1850 if (numops != req->r_num_ops)
1816 goto bad_put; 1851 goto bad_put;
@@ -1821,7 +1856,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1821 int len; 1856 int len;
1822 1857
1823 len = le32_to_cpu(op->payload_len); 1858 len = le32_to_cpu(op->payload_len);
1824 req->r_reply_op_len[i] = len; 1859 req->r_ops[i].outdata_len = len;
1825 dout(" op %d has %d bytes\n", i, len); 1860 dout(" op %d has %d bytes\n", i, len);
1826 payload_len += len; 1861 payload_len += len;
1827 p += sizeof(*op); 1862 p += sizeof(*op);
@@ -1836,7 +1871,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1836 ceph_decode_need(&p, end, 4 + numops * 4, bad_put); 1871 ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
1837 retry_attempt = ceph_decode_32(&p); 1872 retry_attempt = ceph_decode_32(&p);
1838 for (i = 0; i < numops; i++) 1873 for (i = 0; i < numops; i++)
1839 req->r_reply_op_result[i] = ceph_decode_32(&p); 1874 req->r_ops[i].rval = ceph_decode_32(&p);
1840 1875
1841 if (le16_to_cpu(msg->hdr.version) >= 6) { 1876 if (le16_to_cpu(msg->hdr.version) >= 6) {
1842 p += 8 + 4; /* skip replay_version */ 1877 p += 8 + 4; /* skip replay_version */
@@ -2187,7 +2222,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
2187 goto bad; 2222 goto bad;
2188done: 2223done:
2189 downgrade_write(&osdc->map_sem); 2224 downgrade_write(&osdc->map_sem);
2190 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 2225 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2226 osdc->osdmap->epoch);
2191 2227
2192 /* 2228 /*
2193 * subscribe to subsequent osdmap updates if full to ensure 2229 * subscribe to subsequent osdmap updates if full to ensure
@@ -2646,8 +2682,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
2646 round_jiffies_relative(osdc->client->options->osd_idle_ttl)); 2682 round_jiffies_relative(osdc->client->options->osd_idle_ttl));
2647 2683
2648 err = -ENOMEM; 2684 err = -ENOMEM;
2649 osdc->req_mempool = mempool_create_kmalloc_pool(10, 2685 osdc->req_mempool = mempool_create_slab_pool(10,
2650 sizeof(struct ceph_osd_request)); 2686 ceph_osd_request_cache);
2651 if (!osdc->req_mempool) 2687 if (!osdc->req_mempool)
2652 goto out; 2688 goto out;
2653 2689
@@ -2782,11 +2818,12 @@ EXPORT_SYMBOL(ceph_osdc_writepages);
2782 2818
2783int ceph_osdc_setup(void) 2819int ceph_osdc_setup(void)
2784{ 2820{
2821 size_t size = sizeof(struct ceph_osd_request) +
2822 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
2823
2785 BUG_ON(ceph_osd_request_cache); 2824 BUG_ON(ceph_osd_request_cache);
2786 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", 2825 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
2787 sizeof (struct ceph_osd_request), 2826 0, 0, NULL);
2788 __alignof__(struct ceph_osd_request),
2789 0, NULL);
2790 2827
2791 return ceph_osd_request_cache ? 0 : -ENOMEM; 2828 return ceph_osd_request_cache ? 0 : -ENOMEM;
2792} 2829}