aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c14
-rw-r--r--fs/ceph/addr.c324
-rw-r--r--fs/ceph/caps.c11
-rw-r--r--fs/ceph/dir.c69
-rw-r--r--fs/ceph/export.c13
-rw-r--r--fs/ceph/file.c15
-rw-r--r--fs/ceph/inode.c34
-rw-r--r--fs/ceph/mds_client.c7
-rw-r--r--fs/ceph/snap.c16
-rw-r--r--fs/ceph/super.c47
-rw-r--r--fs/ceph/super.h23
-rw-r--r--fs/ceph/xattr.c78
-rw-r--r--include/linux/ceph/ceph_features.h2
-rw-r--r--include/linux/ceph/ceph_fs.h7
-rw-r--r--include/linux/ceph/libceph.h8
-rw-r--r--include/linux/ceph/mon_client.h31
-rw-r--r--include/linux/ceph/osd_client.h15
-rw-r--r--net/ceph/ceph_common.c4
-rw-r--r--net/ceph/debugfs.c17
-rw-r--r--net/ceph/messenger.c29
-rw-r--r--net/ceph/mon_client.c457
-rw-r--r--net/ceph/osd_client.c109
22 files changed, 811 insertions, 519 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 4a876785b68c..9c6234428607 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1847,14 +1847,12 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1847 if (osd_req->r_result < 0) 1847 if (osd_req->r_result < 0)
1848 obj_request->result = osd_req->r_result; 1848 obj_request->result = osd_req->r_result;
1849 1849
1850 rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
1851
1852 /* 1850 /*
1853 * We support a 64-bit length, but ultimately it has to be 1851 * We support a 64-bit length, but ultimately it has to be
1854 * passed to the block layer, which just supports a 32-bit 1852 * passed to the block layer, which just supports a 32-bit
1855 * length field. 1853 * length field.
1856 */ 1854 */
1857 obj_request->xferred = osd_req->r_reply_op_len[0]; 1855 obj_request->xferred = osd_req->r_ops[0].outdata_len;
1858 rbd_assert(obj_request->xferred < (u64)UINT_MAX); 1856 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1859 1857
1860 opcode = osd_req->r_ops[0].op; 1858 opcode = osd_req->r_ops[0].op;
@@ -5643,18 +5641,12 @@ static void rbd_sysfs_cleanup(void)
5643static int rbd_slab_init(void) 5641static int rbd_slab_init(void)
5644{ 5642{
5645 rbd_assert(!rbd_img_request_cache); 5643 rbd_assert(!rbd_img_request_cache);
5646 rbd_img_request_cache = kmem_cache_create("rbd_img_request", 5644 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
5647 sizeof (struct rbd_img_request),
5648 __alignof__(struct rbd_img_request),
5649 0, NULL);
5650 if (!rbd_img_request_cache) 5645 if (!rbd_img_request_cache)
5651 return -ENOMEM; 5646 return -ENOMEM;
5652 5647
5653 rbd_assert(!rbd_obj_request_cache); 5648 rbd_assert(!rbd_obj_request_cache);
5654 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request", 5649 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
5655 sizeof (struct rbd_obj_request),
5656 __alignof__(struct rbd_obj_request),
5657 0, NULL);
5658 if (!rbd_obj_request_cache) 5650 if (!rbd_obj_request_cache)
5659 goto out_err; 5651 goto out_err;
5660 5652
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 19adeb0ef82a..fc5cae2a0db2 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -175,8 +175,8 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
175 175
176static int ceph_releasepage(struct page *page, gfp_t g) 176static int ceph_releasepage(struct page *page, gfp_t g)
177{ 177{
178 struct inode *inode = page->mapping ? page->mapping->host : NULL; 178 dout("%p releasepage %p idx %lu\n", page->mapping->host,
179 dout("%p releasepage %p idx %lu\n", inode, page, page->index); 179 page, page->index);
180 WARN_ON(PageDirty(page)); 180 WARN_ON(PageDirty(page));
181 181
182 /* Can we release the page from the cache? */ 182 /* Can we release the page from the cache? */
@@ -276,7 +276,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
276 for (i = 0; i < num_pages; i++) { 276 for (i = 0; i < num_pages; i++) {
277 struct page *page = osd_data->pages[i]; 277 struct page *page = osd_data->pages[i];
278 278
279 if (rc < 0 && rc != ENOENT) 279 if (rc < 0 && rc != -ENOENT)
280 goto unlock; 280 goto unlock;
281 if (bytes < (int)PAGE_CACHE_SIZE) { 281 if (bytes < (int)PAGE_CACHE_SIZE) {
282 /* zero (remainder of) page */ 282 /* zero (remainder of) page */
@@ -606,71 +606,71 @@ static void writepages_finish(struct ceph_osd_request *req,
606 struct inode *inode = req->r_inode; 606 struct inode *inode = req->r_inode;
607 struct ceph_inode_info *ci = ceph_inode(inode); 607 struct ceph_inode_info *ci = ceph_inode(inode);
608 struct ceph_osd_data *osd_data; 608 struct ceph_osd_data *osd_data;
609 unsigned wrote;
610 struct page *page; 609 struct page *page;
611 int num_pages; 610 int num_pages, total_pages = 0;
612 int i; 611 int i, j;
612 int rc = req->r_result;
613 struct ceph_snap_context *snapc = req->r_snapc; 613 struct ceph_snap_context *snapc = req->r_snapc;
614 struct address_space *mapping = inode->i_mapping; 614 struct address_space *mapping = inode->i_mapping;
615 int rc = req->r_result;
616 u64 bytes = req->r_ops[0].extent.length;
617 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 615 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
618 long writeback_stat; 616 bool remove_page;
619 unsigned issued = ceph_caps_issued(ci);
620 617
621 osd_data = osd_req_op_extent_osd_data(req, 0); 618
622 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 619 dout("writepages_finish %p rc %d\n", inode, rc);
623 num_pages = calc_pages_for((u64)osd_data->alignment, 620 if (rc < 0)
624 (u64)osd_data->length);
625 if (rc >= 0) {
626 /*
627 * Assume we wrote the pages we originally sent. The
628 * osd might reply with fewer pages if our writeback
629 * raced with a truncation and was adjusted at the osd,
630 * so don't believe the reply.
631 */
632 wrote = num_pages;
633 } else {
634 wrote = 0;
635 mapping_set_error(mapping, rc); 621 mapping_set_error(mapping, rc);
636 }
637 dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n",
638 inode, rc, bytes, wrote);
639 622
640 /* clean all pages */ 623 /*
641 for (i = 0; i < num_pages; i++) { 624 * We lost the cache cap, need to truncate the page before
642 page = osd_data->pages[i]; 625 * it is unlocked, otherwise we'd truncate it later in the
643 BUG_ON(!page); 626 * page truncation thread, possibly losing some data that
644 WARN_ON(!PageUptodate(page)); 627 * raced its way in
628 */
629 remove_page = !(ceph_caps_issued(ci) &
630 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
645 631
646 writeback_stat = 632 /* clean all pages */
647 atomic_long_dec_return(&fsc->writeback_count); 633 for (i = 0; i < req->r_num_ops; i++) {
648 if (writeback_stat < 634 if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
649 CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb)) 635 break;
650 clear_bdi_congested(&fsc->backing_dev_info,
651 BLK_RW_ASYNC);
652 636
653 ceph_put_snap_context(page_snap_context(page)); 637 osd_data = osd_req_op_extent_osd_data(req, i);
654 page->private = 0; 638 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
655 ClearPagePrivate(page); 639 num_pages = calc_pages_for((u64)osd_data->alignment,
656 dout("unlocking %d %p\n", i, page); 640 (u64)osd_data->length);
657 end_page_writeback(page); 641 total_pages += num_pages;
642 for (j = 0; j < num_pages; j++) {
643 page = osd_data->pages[j];
644 BUG_ON(!page);
645 WARN_ON(!PageUptodate(page));
646
647 if (atomic_long_dec_return(&fsc->writeback_count) <
648 CONGESTION_OFF_THRESH(
649 fsc->mount_options->congestion_kb))
650 clear_bdi_congested(&fsc->backing_dev_info,
651 BLK_RW_ASYNC);
652
653 ceph_put_snap_context(page_snap_context(page));
654 page->private = 0;
655 ClearPagePrivate(page);
656 dout("unlocking %p\n", page);
657 end_page_writeback(page);
658
659 if (remove_page)
660 generic_error_remove_page(inode->i_mapping,
661 page);
658 662
659 /* 663 unlock_page(page);
660 * We lost the cache cap, need to truncate the page before 664 }
661 * it is unlocked, otherwise we'd truncate it later in the 665 dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
662 * page truncation thread, possibly losing some data that 666 inode, osd_data->length, rc >= 0 ? num_pages : 0);
663 * raced its way in
664 */
665 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
666 generic_error_remove_page(inode->i_mapping, page);
667 667
668 unlock_page(page); 668 ceph_release_pages(osd_data->pages, num_pages);
669 } 669 }
670 dout("%p wrote+cleaned %d pages\n", inode, wrote);
671 ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc);
672 670
673 ceph_release_pages(osd_data->pages, num_pages); 671 ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
672
673 osd_data = osd_req_op_extent_osd_data(req, 0);
674 if (osd_data->pages_from_pool) 674 if (osd_data->pages_from_pool)
675 mempool_free(osd_data->pages, 675 mempool_free(osd_data->pages,
676 ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); 676 ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
@@ -778,17 +778,15 @@ retry:
778 while (!done && index <= end) { 778 while (!done && index <= end) {
779 unsigned i; 779 unsigned i;
780 int first; 780 int first;
781 pgoff_t next; 781 pgoff_t strip_unit_end = 0;
782 int pvec_pages, locked_pages; 782 int num_ops = 0, op_idx;
783 struct page **pages = NULL; 783 int pvec_pages, locked_pages = 0;
784 struct page **pages = NULL, **data_pages;
784 mempool_t *pool = NULL; /* Becomes non-null if mempool used */ 785 mempool_t *pool = NULL; /* Becomes non-null if mempool used */
785 struct page *page; 786 struct page *page;
786 int want; 787 int want;
787 u64 offset, len; 788 u64 offset = 0, len = 0;
788 long writeback_stat;
789 789
790 next = 0;
791 locked_pages = 0;
792 max_pages = max_pages_ever; 790 max_pages = max_pages_ever;
793 791
794get_more_pages: 792get_more_pages:
@@ -824,8 +822,8 @@ get_more_pages:
824 unlock_page(page); 822 unlock_page(page);
825 break; 823 break;
826 } 824 }
827 if (next && (page->index != next)) { 825 if (strip_unit_end && (page->index > strip_unit_end)) {
828 dout("not consecutive %p\n", page); 826 dout("end of strip unit %p\n", page);
829 unlock_page(page); 827 unlock_page(page);
830 break; 828 break;
831 } 829 }
@@ -867,36 +865,31 @@ get_more_pages:
867 /* 865 /*
868 * We have something to write. If this is 866 * We have something to write. If this is
869 * the first locked page this time through, 867 * the first locked page this time through,
870 * allocate an osd request and a page array 868 * calculate max possinle write size and
871 * that it will use. 869 * allocate a page array
872 */ 870 */
873 if (locked_pages == 0) { 871 if (locked_pages == 0) {
874 BUG_ON(pages); 872 u64 objnum;
873 u64 objoff;
874
875 /* prepare async write request */ 875 /* prepare async write request */
876 offset = (u64)page_offset(page); 876 offset = (u64)page_offset(page);
877 len = wsize; 877 len = wsize;
878 req = ceph_osdc_new_request(&fsc->client->osdc, 878
879 &ci->i_layout, vino, 879 rc = ceph_calc_file_object_mapping(&ci->i_layout,
880 offset, &len, 0, 880 offset, len,
881 do_sync ? 2 : 1, 881 &objnum, &objoff,
882 CEPH_OSD_OP_WRITE, 882 &len);
883 CEPH_OSD_FLAG_WRITE | 883 if (rc < 0) {
884 CEPH_OSD_FLAG_ONDISK,
885 snapc, truncate_seq,
886 truncate_size, true);
887 if (IS_ERR(req)) {
888 rc = PTR_ERR(req);
889 unlock_page(page); 884 unlock_page(page);
890 break; 885 break;
891 } 886 }
892 887
893 if (do_sync) 888 num_ops = 1 + do_sync;
894 osd_req_op_init(req, 1, 889 strip_unit_end = page->index +
895 CEPH_OSD_OP_STARTSYNC, 0); 890 ((len - 1) >> PAGE_CACHE_SHIFT);
896
897 req->r_callback = writepages_finish;
898 req->r_inode = inode;
899 891
892 BUG_ON(pages);
900 max_pages = calc_pages_for(0, (u64)len); 893 max_pages = calc_pages_for(0, (u64)len);
901 pages = kmalloc(max_pages * sizeof (*pages), 894 pages = kmalloc(max_pages * sizeof (*pages),
902 GFP_NOFS); 895 GFP_NOFS);
@@ -905,6 +898,20 @@ get_more_pages:
905 pages = mempool_alloc(pool, GFP_NOFS); 898 pages = mempool_alloc(pool, GFP_NOFS);
906 BUG_ON(!pages); 899 BUG_ON(!pages);
907 } 900 }
901
902 len = 0;
903 } else if (page->index !=
904 (offset + len) >> PAGE_CACHE_SHIFT) {
905 if (num_ops >= (pool ? CEPH_OSD_SLAB_OPS :
906 CEPH_OSD_MAX_OPS)) {
907 redirty_page_for_writepage(wbc, page);
908 unlock_page(page);
909 break;
910 }
911
912 num_ops++;
913 offset = (u64)page_offset(page);
914 len = 0;
908 } 915 }
909 916
910 /* note position of first page in pvec */ 917 /* note position of first page in pvec */
@@ -913,18 +920,16 @@ get_more_pages:
913 dout("%p will write page %p idx %lu\n", 920 dout("%p will write page %p idx %lu\n",
914 inode, page, page->index); 921 inode, page, page->index);
915 922
916 writeback_stat = 923 if (atomic_long_inc_return(&fsc->writeback_count) >
917 atomic_long_inc_return(&fsc->writeback_count); 924 CONGESTION_ON_THRESH(
918 if (writeback_stat > CONGESTION_ON_THRESH(
919 fsc->mount_options->congestion_kb)) { 925 fsc->mount_options->congestion_kb)) {
920 set_bdi_congested(&fsc->backing_dev_info, 926 set_bdi_congested(&fsc->backing_dev_info,
921 BLK_RW_ASYNC); 927 BLK_RW_ASYNC);
922 } 928 }
923 929
924 set_page_writeback(page);
925 pages[locked_pages] = page; 930 pages[locked_pages] = page;
926 locked_pages++; 931 locked_pages++;
927 next = page->index + 1; 932 len += PAGE_CACHE_SIZE;
928 } 933 }
929 934
930 /* did we get anything? */ 935 /* did we get anything? */
@@ -944,38 +949,119 @@ get_more_pages:
944 /* shift unused pages over in the pvec... we 949 /* shift unused pages over in the pvec... we
945 * will need to release them below. */ 950 * will need to release them below. */
946 for (j = i; j < pvec_pages; j++) { 951 for (j = i; j < pvec_pages; j++) {
947 dout(" pvec leftover page %p\n", 952 dout(" pvec leftover page %p\n", pvec.pages[j]);
948 pvec.pages[j]);
949 pvec.pages[j-i+first] = pvec.pages[j]; 953 pvec.pages[j-i+first] = pvec.pages[j];
950 } 954 }
951 pvec.nr -= i-first; 955 pvec.nr -= i-first;
952 } 956 }
953 957
954 /* Format the osd request message and submit the write */ 958new_request:
955 offset = page_offset(pages[0]); 959 offset = page_offset(pages[0]);
956 len = (u64)locked_pages << PAGE_CACHE_SHIFT; 960 len = wsize;
957 if (snap_size == -1) { 961
958 len = min(len, (u64)i_size_read(inode) - offset); 962 req = ceph_osdc_new_request(&fsc->client->osdc,
959 /* writepages_finish() clears writeback pages 963 &ci->i_layout, vino,
960 * according to the data length, so make sure 964 offset, &len, 0, num_ops,
961 * data length covers all locked pages */ 965 CEPH_OSD_OP_WRITE,
962 len = max(len, 1 + 966 CEPH_OSD_FLAG_WRITE |
963 ((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT)); 967 CEPH_OSD_FLAG_ONDISK,
964 } else { 968 snapc, truncate_seq,
965 len = min(len, snap_size - offset); 969 truncate_size, false);
970 if (IS_ERR(req)) {
971 req = ceph_osdc_new_request(&fsc->client->osdc,
972 &ci->i_layout, vino,
973 offset, &len, 0,
974 min(num_ops,
975 CEPH_OSD_SLAB_OPS),
976 CEPH_OSD_OP_WRITE,
977 CEPH_OSD_FLAG_WRITE |
978 CEPH_OSD_FLAG_ONDISK,
979 snapc, truncate_seq,
980 truncate_size, true);
981 BUG_ON(IS_ERR(req));
966 } 982 }
967 dout("writepages got %d pages at %llu~%llu\n", 983 BUG_ON(len < page_offset(pages[locked_pages - 1]) +
968 locked_pages, offset, len); 984 PAGE_CACHE_SIZE - offset);
985
986 req->r_callback = writepages_finish;
987 req->r_inode = inode;
969 988
970 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, 989 /* Format the osd request message and submit the write */
990 len = 0;
991 data_pages = pages;
992 op_idx = 0;
993 for (i = 0; i < locked_pages; i++) {
994 u64 cur_offset = page_offset(pages[i]);
995 if (offset + len != cur_offset) {
996 if (op_idx + do_sync + 1 == req->r_num_ops)
997 break;
998 osd_req_op_extent_dup_last(req, op_idx,
999 cur_offset - offset);
1000 dout("writepages got pages at %llu~%llu\n",
1001 offset, len);
1002 osd_req_op_extent_osd_data_pages(req, op_idx,
1003 data_pages, len, 0,
971 !!pool, false); 1004 !!pool, false);
1005 osd_req_op_extent_update(req, op_idx, len);
972 1006
973 pages = NULL; /* request message now owns the pages array */ 1007 len = 0;
974 pool = NULL; 1008 offset = cur_offset;
1009 data_pages = pages + i;
1010 op_idx++;
1011 }
975 1012
976 /* Update the write op length in case we changed it */ 1013 set_page_writeback(pages[i]);
1014 len += PAGE_CACHE_SIZE;
1015 }
1016
1017 if (snap_size != -1) {
1018 len = min(len, snap_size - offset);
1019 } else if (i == locked_pages) {
1020 /* writepages_finish() clears writeback pages
1021 * according to the data length, so make sure
1022 * data length covers all locked pages */
1023 u64 min_len = len + 1 - PAGE_CACHE_SIZE;
1024 len = min(len, (u64)i_size_read(inode) - offset);
1025 len = max(len, min_len);
1026 }
1027 dout("writepages got pages at %llu~%llu\n", offset, len);
1028
1029 osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
1030 0, !!pool, false);
1031 osd_req_op_extent_update(req, op_idx, len);
977 1032
978 osd_req_op_extent_update(req, 0, len); 1033 if (do_sync) {
1034 op_idx++;
1035 osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0);
1036 }
1037 BUG_ON(op_idx + 1 != req->r_num_ops);
1038
1039 pool = NULL;
1040 if (i < locked_pages) {
1041 BUG_ON(num_ops <= req->r_num_ops);
1042 num_ops -= req->r_num_ops;
1043 num_ops += do_sync;
1044 locked_pages -= i;
1045
1046 /* allocate new pages array for next request */
1047 data_pages = pages;
1048 pages = kmalloc(locked_pages * sizeof (*pages),
1049 GFP_NOFS);
1050 if (!pages) {
1051 pool = fsc->wb_pagevec_pool;
1052 pages = mempool_alloc(pool, GFP_NOFS);
1053 BUG_ON(!pages);
1054 }
1055 memcpy(pages, data_pages + i,
1056 locked_pages * sizeof(*pages));
1057 memset(data_pages + i, 0,
1058 locked_pages * sizeof(*pages));
1059 } else {
1060 BUG_ON(num_ops != req->r_num_ops);
1061 index = pages[i - 1]->index + 1;
1062 /* request message now owns the pages array */
1063 pages = NULL;
1064 }
979 1065
980 vino = ceph_vino(inode); 1066 vino = ceph_vino(inode);
981 ceph_osdc_build_request(req, offset, snapc, vino.snap, 1067 ceph_osdc_build_request(req, offset, snapc, vino.snap,
@@ -985,9 +1071,10 @@ get_more_pages:
985 BUG_ON(rc); 1071 BUG_ON(rc);
986 req = NULL; 1072 req = NULL;
987 1073
988 /* continue? */ 1074 wbc->nr_to_write -= i;
989 index = next; 1075 if (pages)
990 wbc->nr_to_write -= locked_pages; 1076 goto new_request;
1077
991 if (wbc->nr_to_write <= 0) 1078 if (wbc->nr_to_write <= 0)
992 done = 1; 1079 done = 1;
993 1080
@@ -1522,7 +1609,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
1522 ceph_vino(inode), 0, &len, 0, 1, 1609 ceph_vino(inode), 0, &len, 0, 1,
1523 CEPH_OSD_OP_CREATE, 1610 CEPH_OSD_OP_CREATE,
1524 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 1611 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1525 ceph_empty_snapc, 0, 0, false); 1612 NULL, 0, 0, false);
1526 if (IS_ERR(req)) { 1613 if (IS_ERR(req)) {
1527 err = PTR_ERR(req); 1614 err = PTR_ERR(req);
1528 goto out; 1615 goto out;
@@ -1540,9 +1627,8 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
1540 ceph_vino(inode), 0, &len, 1, 3, 1627 ceph_vino(inode), 0, &len, 1, 3,
1541 CEPH_OSD_OP_WRITE, 1628 CEPH_OSD_OP_WRITE,
1542 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 1629 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1543 ceph_empty_snapc, 1630 NULL, ci->i_truncate_seq,
1544 ci->i_truncate_seq, ci->i_truncate_size, 1631 ci->i_truncate_size, false);
1545 false);
1546 if (IS_ERR(req)) { 1632 if (IS_ERR(req)) {
1547 err = PTR_ERR(req); 1633 err = PTR_ERR(req);
1548 goto out; 1634 goto out;
@@ -1663,8 +1749,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
1663 goto out; 1749 goto out;
1664 } 1750 }
1665 1751
1666 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, 1752 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1667 ceph_empty_snapc,
1668 1, false, GFP_NOFS); 1753 1, false, GFP_NOFS);
1669 if (!rd_req) { 1754 if (!rd_req) {
1670 err = -ENOMEM; 1755 err = -ENOMEM;
@@ -1678,8 +1763,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
1678 "%llx.00000000", ci->i_vino.ino); 1763 "%llx.00000000", ci->i_vino.ino);
1679 rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name); 1764 rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
1680 1765
1681 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, 1766 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1682 ceph_empty_snapc,
1683 1, false, GFP_NOFS); 1767 1, false, GFP_NOFS);
1684 if (!wr_req) { 1768 if (!wr_req) {
1685 err = -ENOMEM; 1769 err = -ENOMEM;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 6fe0ad26a7df..de17bb232ff8 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -991,7 +991,7 @@ static int send_cap_msg(struct ceph_mds_session *session,
991 u32 seq, u64 flush_tid, u64 oldest_flush_tid, 991 u32 seq, u64 flush_tid, u64 oldest_flush_tid,
992 u32 issue_seq, u32 mseq, u64 size, u64 max_size, 992 u32 issue_seq, u32 mseq, u64 size, u64 max_size,
993 struct timespec *mtime, struct timespec *atime, 993 struct timespec *mtime, struct timespec *atime,
994 u64 time_warp_seq, 994 struct timespec *ctime, u64 time_warp_seq,
995 kuid_t uid, kgid_t gid, umode_t mode, 995 kuid_t uid, kgid_t gid, umode_t mode,
996 u64 xattr_version, 996 u64 xattr_version,
997 struct ceph_buffer *xattrs_buf, 997 struct ceph_buffer *xattrs_buf,
@@ -1042,6 +1042,8 @@ static int send_cap_msg(struct ceph_mds_session *session,
1042 ceph_encode_timespec(&fc->mtime, mtime); 1042 ceph_encode_timespec(&fc->mtime, mtime);
1043 if (atime) 1043 if (atime)
1044 ceph_encode_timespec(&fc->atime, atime); 1044 ceph_encode_timespec(&fc->atime, atime);
1045 if (ctime)
1046 ceph_encode_timespec(&fc->ctime, ctime);
1045 fc->time_warp_seq = cpu_to_le32(time_warp_seq); 1047 fc->time_warp_seq = cpu_to_le32(time_warp_seq);
1046 1048
1047 fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid)); 1049 fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid));
@@ -1116,7 +1118,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1116 int held, revoking, dropping, keep; 1118 int held, revoking, dropping, keep;
1117 u64 seq, issue_seq, mseq, time_warp_seq, follows; 1119 u64 seq, issue_seq, mseq, time_warp_seq, follows;
1118 u64 size, max_size; 1120 u64 size, max_size;
1119 struct timespec mtime, atime; 1121 struct timespec mtime, atime, ctime;
1120 int wake = 0; 1122 int wake = 0;
1121 umode_t mode; 1123 umode_t mode;
1122 kuid_t uid; 1124 kuid_t uid;
@@ -1180,6 +1182,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1180 ci->i_requested_max_size = max_size; 1182 ci->i_requested_max_size = max_size;
1181 mtime = inode->i_mtime; 1183 mtime = inode->i_mtime;
1182 atime = inode->i_atime; 1184 atime = inode->i_atime;
1185 ctime = inode->i_ctime;
1183 time_warp_seq = ci->i_time_warp_seq; 1186 time_warp_seq = ci->i_time_warp_seq;
1184 uid = inode->i_uid; 1187 uid = inode->i_uid;
1185 gid = inode->i_gid; 1188 gid = inode->i_gid;
@@ -1198,7 +1201,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1198 ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, 1201 ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
1199 op, keep, want, flushing, seq, 1202 op, keep, want, flushing, seq,
1200 flush_tid, oldest_flush_tid, issue_seq, mseq, 1203 flush_tid, oldest_flush_tid, issue_seq, mseq,
1201 size, max_size, &mtime, &atime, time_warp_seq, 1204 size, max_size, &mtime, &atime, &ctime, time_warp_seq,
1202 uid, gid, mode, xattr_version, xattr_blob, 1205 uid, gid, mode, xattr_version, xattr_blob,
1203 follows, inline_data); 1206 follows, inline_data);
1204 if (ret < 0) { 1207 if (ret < 0) {
@@ -1320,7 +1323,7 @@ retry:
1320 capsnap->dirty, 0, capsnap->flush_tid, 0, 1323 capsnap->dirty, 0, capsnap->flush_tid, 0,
1321 0, mseq, capsnap->size, 0, 1324 0, mseq, capsnap->size, 0,
1322 &capsnap->mtime, &capsnap->atime, 1325 &capsnap->mtime, &capsnap->atime,
1323 capsnap->time_warp_seq, 1326 &capsnap->ctime, capsnap->time_warp_seq,
1324 capsnap->uid, capsnap->gid, capsnap->mode, 1327 capsnap->uid, capsnap->gid, capsnap->mode,
1325 capsnap->xattr_version, capsnap->xattr_blob, 1328 capsnap->xattr_version, capsnap->xattr_blob,
1326 capsnap->follows, capsnap->inline_data); 1329 capsnap->follows, capsnap->inline_data);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index fd11fb231a2e..fadc243dfb28 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -38,7 +38,7 @@ int ceph_init_dentry(struct dentry *dentry)
38 if (dentry->d_fsdata) 38 if (dentry->d_fsdata)
39 return 0; 39 return 0;
40 40
41 di = kmem_cache_alloc(ceph_dentry_cachep, GFP_KERNEL | __GFP_ZERO); 41 di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
42 if (!di) 42 if (!di)
43 return -ENOMEM; /* oh well */ 43 return -ENOMEM; /* oh well */
44 44
@@ -68,23 +68,6 @@ out_unlock:
68 return 0; 68 return 0;
69} 69}
70 70
71struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry)
72{
73 struct inode *inode = NULL;
74
75 if (!dentry)
76 return NULL;
77
78 spin_lock(&dentry->d_lock);
79 if (!IS_ROOT(dentry)) {
80 inode = d_inode(dentry->d_parent);
81 ihold(inode);
82 }
83 spin_unlock(&dentry->d_lock);
84 return inode;
85}
86
87
88/* 71/*
89 * for readdir, we encode the directory frag and offset within that 72 * for readdir, we encode the directory frag and offset within that
90 * frag into f_pos. 73 * frag into f_pos.
@@ -624,6 +607,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
624 struct ceph_mds_client *mdsc = fsc->mdsc; 607 struct ceph_mds_client *mdsc = fsc->mdsc;
625 struct ceph_mds_request *req; 608 struct ceph_mds_request *req;
626 int op; 609 int op;
610 int mask;
627 int err; 611 int err;
628 612
629 dout("lookup %p dentry %p '%pd'\n", 613 dout("lookup %p dentry %p '%pd'\n",
@@ -666,8 +650,12 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
666 return ERR_CAST(req); 650 return ERR_CAST(req);
667 req->r_dentry = dget(dentry); 651 req->r_dentry = dget(dentry);
668 req->r_num_caps = 2; 652 req->r_num_caps = 2;
669 /* we only need inode linkage */ 653
670 req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE); 654 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
655 if (ceph_security_xattr_wanted(dir))
656 mask |= CEPH_CAP_XATTR_SHARED;
657 req->r_args.getattr.mask = cpu_to_le32(mask);
658
671 req->r_locked_dir = dir; 659 req->r_locked_dir = dir;
672 err = ceph_mdsc_do_request(mdsc, NULL, req); 660 err = ceph_mdsc_do_request(mdsc, NULL, req);
673 err = ceph_handle_snapdir(req, dentry, err); 661 err = ceph_handle_snapdir(req, dentry, err);
@@ -1095,6 +1083,7 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
1095static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) 1083static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1096{ 1084{
1097 int valid = 0; 1085 int valid = 0;
1086 struct dentry *parent;
1098 struct inode *dir; 1087 struct inode *dir;
1099 1088
1100 if (flags & LOOKUP_RCU) 1089 if (flags & LOOKUP_RCU)
@@ -1103,7 +1092,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1103 dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry, 1092 dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
1104 dentry, d_inode(dentry), ceph_dentry(dentry)->offset); 1093 dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
1105 1094
1106 dir = ceph_get_dentry_parent_inode(dentry); 1095 parent = dget_parent(dentry);
1096 dir = d_inode(parent);
1107 1097
1108 /* always trust cached snapped dentries, snapdir dentry */ 1098 /* always trust cached snapped dentries, snapdir dentry */
1109 if (ceph_snap(dir) != CEPH_NOSNAP) { 1099 if (ceph_snap(dir) != CEPH_NOSNAP) {
@@ -1121,13 +1111,48 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1121 valid = 1; 1111 valid = 1;
1122 } 1112 }
1123 1113
1114 if (!valid) {
1115 struct ceph_mds_client *mdsc =
1116 ceph_sb_to_client(dir->i_sb)->mdsc;
1117 struct ceph_mds_request *req;
1118 int op, mask, err;
1119
1120 op = ceph_snap(dir) == CEPH_SNAPDIR ?
1121 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
1122 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
1123 if (!IS_ERR(req)) {
1124 req->r_dentry = dget(dentry);
1125 req->r_num_caps = 2;
1126
1127 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
1128 if (ceph_security_xattr_wanted(dir))
1129 mask |= CEPH_CAP_XATTR_SHARED;
1130 req->r_args.getattr.mask = mask;
1131
1132 req->r_locked_dir = dir;
1133 err = ceph_mdsc_do_request(mdsc, NULL, req);
1134 if (err == 0 || err == -ENOENT) {
1135 if (dentry == req->r_dentry) {
1136 valid = !d_unhashed(dentry);
1137 } else {
1138 d_invalidate(req->r_dentry);
1139 err = -EAGAIN;
1140 }
1141 }
1142 ceph_mdsc_put_request(req);
1143 dout("d_revalidate %p lookup result=%d\n",
1144 dentry, err);
1145 }
1146 }
1147
1124 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); 1148 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
1125 if (valid) { 1149 if (valid) {
1126 ceph_dentry_lru_touch(dentry); 1150 ceph_dentry_lru_touch(dentry);
1127 } else { 1151 } else {
1128 ceph_dir_clear_complete(dir); 1152 ceph_dir_clear_complete(dir);
1129 } 1153 }
1130 iput(dir); 1154
1155 dput(parent);
1131 return valid; 1156 return valid;
1132} 1157}
1133 1158
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 3b3172357326..6e72c98162d5 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -71,12 +71,18 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
71 inode = ceph_find_inode(sb, vino); 71 inode = ceph_find_inode(sb, vino);
72 if (!inode) { 72 if (!inode) {
73 struct ceph_mds_request *req; 73 struct ceph_mds_request *req;
74 int mask;
74 75
75 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO, 76 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO,
76 USE_ANY_MDS); 77 USE_ANY_MDS);
77 if (IS_ERR(req)) 78 if (IS_ERR(req))
78 return ERR_CAST(req); 79 return ERR_CAST(req);
79 80
81 mask = CEPH_STAT_CAP_INODE;
82 if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
83 mask |= CEPH_CAP_XATTR_SHARED;
84 req->r_args.getattr.mask = cpu_to_le32(mask);
85
80 req->r_ino1 = vino; 86 req->r_ino1 = vino;
81 req->r_num_caps = 1; 87 req->r_num_caps = 1;
82 err = ceph_mdsc_do_request(mdsc, NULL, req); 88 err = ceph_mdsc_do_request(mdsc, NULL, req);
@@ -128,6 +134,7 @@ static struct dentry *__get_parent(struct super_block *sb,
128 struct ceph_mds_request *req; 134 struct ceph_mds_request *req;
129 struct inode *inode; 135 struct inode *inode;
130 struct dentry *dentry; 136 struct dentry *dentry;
137 int mask;
131 int err; 138 int err;
132 139
133 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPPARENT, 140 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPPARENT,
@@ -144,6 +151,12 @@ static struct dentry *__get_parent(struct super_block *sb,
144 .snap = CEPH_NOSNAP, 151 .snap = CEPH_NOSNAP,
145 }; 152 };
146 } 153 }
154
155 mask = CEPH_STAT_CAP_INODE;
156 if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
157 mask |= CEPH_CAP_XATTR_SHARED;
158 req->r_args.getattr.mask = cpu_to_le32(mask);
159
147 req->r_num_caps = 1; 160 req->r_num_caps = 1;
148 err = ceph_mdsc_do_request(mdsc, NULL, req); 161 err = ceph_mdsc_do_request(mdsc, NULL, req);
149 inode = req->r_target_inode; 162 inode = req->r_target_inode;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index eb9028e8cfc5..ef38f01c1795 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -157,7 +157,7 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
157 case S_IFDIR: 157 case S_IFDIR:
158 dout("init_file %p %p 0%o (regular)\n", inode, file, 158 dout("init_file %p %p 0%o (regular)\n", inode, file,
159 inode->i_mode); 159 inode->i_mode);
160 cf = kmem_cache_alloc(ceph_file_cachep, GFP_KERNEL | __GFP_ZERO); 160 cf = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
161 if (cf == NULL) { 161 if (cf == NULL) {
162 ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ 162 ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
163 return -ENOMEM; 163 return -ENOMEM;
@@ -300,6 +300,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
300 struct ceph_mds_request *req; 300 struct ceph_mds_request *req;
301 struct dentry *dn; 301 struct dentry *dn;
302 struct ceph_acls_info acls = {}; 302 struct ceph_acls_info acls = {};
303 int mask;
303 int err; 304 int err;
304 305
305 dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n", 306 dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
@@ -335,6 +336,12 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
335 acls.pagelist = NULL; 336 acls.pagelist = NULL;
336 } 337 }
337 } 338 }
339
340 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
341 if (ceph_security_xattr_wanted(dir))
342 mask |= CEPH_CAP_XATTR_SHARED;
343 req->r_args.open.mask = cpu_to_le32(mask);
344
338 req->r_locked_dir = dir; /* caller holds dir->i_mutex */ 345 req->r_locked_dir = dir; /* caller holds dir->i_mutex */
339 err = ceph_mdsc_do_request(mdsc, 346 err = ceph_mdsc_do_request(mdsc,
340 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, 347 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
@@ -725,7 +732,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
725 ret = ceph_osdc_start_request(req->r_osdc, req, false); 732 ret = ceph_osdc_start_request(req->r_osdc, req, false);
726out: 733out:
727 if (ret < 0) { 734 if (ret < 0) {
728 BUG_ON(ret == -EOLDSNAPC);
729 req->r_result = ret; 735 req->r_result = ret;
730 ceph_aio_complete_req(req, NULL); 736 ceph_aio_complete_req(req, NULL);
731 } 737 }
@@ -783,7 +789,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
783 int num_pages = 0; 789 int num_pages = 0;
784 int flags; 790 int flags;
785 int ret; 791 int ret;
786 struct timespec mtime = CURRENT_TIME; 792 struct timespec mtime = current_fs_time(inode->i_sb);
787 size_t count = iov_iter_count(iter); 793 size_t count = iov_iter_count(iter);
788 loff_t pos = iocb->ki_pos; 794 loff_t pos = iocb->ki_pos;
789 bool write = iov_iter_rw(iter) == WRITE; 795 bool write = iov_iter_rw(iter) == WRITE;
@@ -949,7 +955,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
949 ret = ceph_osdc_start_request(req->r_osdc, 955 ret = ceph_osdc_start_request(req->r_osdc,
950 req, false); 956 req, false);
951 if (ret < 0) { 957 if (ret < 0) {
952 BUG_ON(ret == -EOLDSNAPC);
953 req->r_result = ret; 958 req->r_result = ret;
954 ceph_aio_complete_req(req, NULL); 959 ceph_aio_complete_req(req, NULL);
955 } 960 }
@@ -988,7 +993,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
988 int flags; 993 int flags;
989 int check_caps = 0; 994 int check_caps = 0;
990 int ret; 995 int ret;
991 struct timespec mtime = CURRENT_TIME; 996 struct timespec mtime = current_fs_time(inode->i_sb);
992 size_t count = iov_iter_count(from); 997 size_t count = iov_iter_count(from);
993 998
994 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 999 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e48fd8b23257..ed58b168904a 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -549,6 +549,10 @@ int ceph_fill_file_size(struct inode *inode, int issued,
549 if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 || 549 if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
550 (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) { 550 (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
551 dout("size %lld -> %llu\n", inode->i_size, size); 551 dout("size %lld -> %llu\n", inode->i_size, size);
552 if (size > 0 && S_ISDIR(inode->i_mode)) {
553 pr_err("fill_file_size non-zero size for directory\n");
554 size = 0;
555 }
552 i_size_write(inode, size); 556 i_size_write(inode, size);
553 inode->i_blocks = (size + (1<<9) - 1) >> 9; 557 inode->i_blocks = (size + (1<<9) - 1) >> 9;
554 ci->i_reported_size = size; 558 ci->i_reported_size = size;
@@ -1261,6 +1265,7 @@ retry_lookup:
1261 dout(" %p links to %p %llx.%llx, not %llx.%llx\n", 1265 dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
1262 dn, d_inode(dn), ceph_vinop(d_inode(dn)), 1266 dn, d_inode(dn), ceph_vinop(d_inode(dn)),
1263 ceph_vinop(in)); 1267 ceph_vinop(in));
1268 d_invalidate(dn);
1264 have_lease = false; 1269 have_lease = false;
1265 } 1270 }
1266 1271
@@ -1349,15 +1354,20 @@ static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
1349 1354
1350 if (!ctl->page || pgoff != page_index(ctl->page)) { 1355 if (!ctl->page || pgoff != page_index(ctl->page)) {
1351 ceph_readdir_cache_release(ctl); 1356 ceph_readdir_cache_release(ctl);
1352 ctl->page = grab_cache_page(&dir->i_data, pgoff); 1357 if (idx == 0)
1358 ctl->page = grab_cache_page(&dir->i_data, pgoff);
1359 else
1360 ctl->page = find_lock_page(&dir->i_data, pgoff);
1353 if (!ctl->page) { 1361 if (!ctl->page) {
1354 ctl->index = -1; 1362 ctl->index = -1;
1355 return -ENOMEM; 1363 return idx == 0 ? -ENOMEM : 0;
1356 } 1364 }
1357 /* reading/filling the cache are serialized by 1365 /* reading/filling the cache are serialized by
1358 * i_mutex, no need to use page lock */ 1366 * i_mutex, no need to use page lock */
1359 unlock_page(ctl->page); 1367 unlock_page(ctl->page);
1360 ctl->dentries = kmap(ctl->page); 1368 ctl->dentries = kmap(ctl->page);
1369 if (idx == 0)
1370 memset(ctl->dentries, 0, PAGE_CACHE_SIZE);
1361 } 1371 }
1362 1372
1363 if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) && 1373 if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
@@ -1380,7 +1390,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1380 struct qstr dname; 1390 struct qstr dname;
1381 struct dentry *dn; 1391 struct dentry *dn;
1382 struct inode *in; 1392 struct inode *in;
1383 int err = 0, ret, i; 1393 int err = 0, skipped = 0, ret, i;
1384 struct inode *snapdir = NULL; 1394 struct inode *snapdir = NULL;
1385 struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; 1395 struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
1386 struct ceph_dentry_info *di; 1396 struct ceph_dentry_info *di;
@@ -1492,7 +1502,17 @@ retry_lookup:
1492 } 1502 }
1493 1503
1494 if (d_really_is_negative(dn)) { 1504 if (d_really_is_negative(dn)) {
1495 struct dentry *realdn = splice_dentry(dn, in); 1505 struct dentry *realdn;
1506
1507 if (ceph_security_xattr_deadlock(in)) {
1508 dout(" skip splicing dn %p to inode %p"
1509 " (security xattr deadlock)\n", dn, in);
1510 iput(in);
1511 skipped++;
1512 goto next_item;
1513 }
1514
1515 realdn = splice_dentry(dn, in);
1496 if (IS_ERR(realdn)) { 1516 if (IS_ERR(realdn)) {
1497 err = PTR_ERR(realdn); 1517 err = PTR_ERR(realdn);
1498 d_drop(dn); 1518 d_drop(dn);
@@ -1509,7 +1529,7 @@ retry_lookup:
1509 req->r_session, 1529 req->r_session,
1510 req->r_request_started); 1530 req->r_request_started);
1511 1531
1512 if (err == 0 && cache_ctl.index >= 0) { 1532 if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
1513 ret = fill_readdir_cache(d_inode(parent), dn, 1533 ret = fill_readdir_cache(d_inode(parent), dn,
1514 &cache_ctl, req); 1534 &cache_ctl, req);
1515 if (ret < 0) 1535 if (ret < 0)
@@ -1520,7 +1540,7 @@ next_item:
1520 dput(dn); 1540 dput(dn);
1521 } 1541 }
1522out: 1542out:
1523 if (err == 0) { 1543 if (err == 0 && skipped == 0) {
1524 req->r_did_prepopulate = true; 1544 req->r_did_prepopulate = true;
1525 req->r_readdir_cache_idx = cache_ctl.index; 1545 req->r_readdir_cache_idx = cache_ctl.index;
1526 } 1546 }
@@ -1950,7 +1970,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1950 if (dirtied) { 1970 if (dirtied) {
1951 inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied, 1971 inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied,
1952 &prealloc_cf); 1972 &prealloc_cf);
1953 inode->i_ctime = CURRENT_TIME; 1973 inode->i_ctime = current_fs_time(inode->i_sb);
1954 } 1974 }
1955 1975
1956 release &= issued; 1976 release &= issued;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 911d64d865f1..44852c3ae531 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1729,7 +1729,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1729 init_completion(&req->r_safe_completion); 1729 init_completion(&req->r_safe_completion);
1730 INIT_LIST_HEAD(&req->r_unsafe_item); 1730 INIT_LIST_HEAD(&req->r_unsafe_item);
1731 1731
1732 req->r_stamp = CURRENT_TIME; 1732 req->r_stamp = current_fs_time(mdsc->fsc->sb);
1733 1733
1734 req->r_op = op; 1734 req->r_op = op;
1735 req->r_direct_mode = mode; 1735 req->r_direct_mode = mode;
@@ -2540,6 +2540,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2540 2540
2541 /* insert trace into our cache */ 2541 /* insert trace into our cache */
2542 mutex_lock(&req->r_fill_mutex); 2542 mutex_lock(&req->r_fill_mutex);
2543 current->journal_info = req;
2543 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); 2544 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2544 if (err == 0) { 2545 if (err == 0) {
2545 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 2546 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
@@ -2547,6 +2548,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2547 ceph_readdir_prepopulate(req, req->r_session); 2548 ceph_readdir_prepopulate(req, req->r_session);
2548 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2549 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2549 } 2550 }
2551 current->journal_info = NULL;
2550 mutex_unlock(&req->r_fill_mutex); 2552 mutex_unlock(&req->r_fill_mutex);
2551 2553
2552 up_read(&mdsc->snap_rwsem); 2554 up_read(&mdsc->snap_rwsem);
@@ -3764,7 +3766,6 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3764 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 3766 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3765 3767
3766 /* do we need it? */ 3768 /* do we need it? */
3767 ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3768 mutex_lock(&mdsc->mutex); 3769 mutex_lock(&mdsc->mutex);
3769 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 3770 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3770 dout("handle_map epoch %u <= our %u\n", 3771 dout("handle_map epoch %u <= our %u\n",
@@ -3791,6 +3792,8 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3791 mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size; 3792 mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3792 3793
3793 __wake_requests(mdsc, &mdsc->waiting_for_map); 3794 __wake_requests(mdsc, &mdsc->waiting_for_map);
3795 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
3796 mdsc->mdsmap->m_epoch);
3794 3797
3795 mutex_unlock(&mdsc->mutex); 3798 mutex_unlock(&mdsc->mutex);
3796 schedule_delayed(mdsc); 3799 schedule_delayed(mdsc);
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 4aa7122a8d38..9caaa7ffc93f 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -296,8 +296,6 @@ static int cmpu64_rev(const void *a, const void *b)
296} 296}
297 297
298 298
299struct ceph_snap_context *ceph_empty_snapc;
300
301/* 299/*
302 * build the snap context for a given realm. 300 * build the snap context for a given realm.
303 */ 301 */
@@ -987,17 +985,3 @@ out:
987 up_write(&mdsc->snap_rwsem); 985 up_write(&mdsc->snap_rwsem);
988 return; 986 return;
989} 987}
990
991int __init ceph_snap_init(void)
992{
993 ceph_empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
994 if (!ceph_empty_snapc)
995 return -ENOMEM;
996 ceph_empty_snapc->seq = 1;
997 return 0;
998}
999
1000void ceph_snap_exit(void)
1001{
1002 ceph_put_snap_context(ceph_empty_snapc);
1003}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index ca4d5e8457f1..c973043deb0e 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -439,8 +439,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
439 439
440 if (fsopt->flags & CEPH_MOUNT_OPT_DIRSTAT) 440 if (fsopt->flags & CEPH_MOUNT_OPT_DIRSTAT)
441 seq_puts(m, ",dirstat"); 441 seq_puts(m, ",dirstat");
442 if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES) == 0) 442 if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES))
443 seq_puts(m, ",norbytes"); 443 seq_puts(m, ",rbytes");
444 if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR) 444 if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
445 seq_puts(m, ",noasyncreaddir"); 445 seq_puts(m, ",noasyncreaddir");
446 if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0) 446 if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
@@ -530,7 +530,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
530 goto fail; 530 goto fail;
531 } 531 }
532 fsc->client->extra_mon_dispatch = extra_mon_dispatch; 532 fsc->client->extra_mon_dispatch = extra_mon_dispatch;
533 fsc->client->monc.want_mdsmap = 1; 533 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
534 534
535 fsc->mount_options = fsopt; 535 fsc->mount_options = fsopt;
536 536
@@ -793,22 +793,20 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
793 struct dentry *root; 793 struct dentry *root;
794 int first = 0; /* first vfsmount for this super_block */ 794 int first = 0; /* first vfsmount for this super_block */
795 795
796 dout("mount start\n"); 796 dout("mount start %p\n", fsc);
797 mutex_lock(&fsc->client->mount_mutex); 797 mutex_lock(&fsc->client->mount_mutex);
798 798
799 err = __ceph_open_session(fsc->client, started); 799 if (!fsc->sb->s_root) {
800 if (err < 0) 800 err = __ceph_open_session(fsc->client, started);
801 goto out; 801 if (err < 0)
802 goto out;
802 803
803 dout("mount opening root\n"); 804 dout("mount opening root\n");
804 root = open_root_dentry(fsc, "", started); 805 root = open_root_dentry(fsc, "", started);
805 if (IS_ERR(root)) { 806 if (IS_ERR(root)) {
806 err = PTR_ERR(root); 807 err = PTR_ERR(root);
807 goto out; 808 goto out;
808 } 809 }
809 if (fsc->sb->s_root) {
810 dput(root);
811 } else {
812 fsc->sb->s_root = root; 810 fsc->sb->s_root = root;
813 first = 1; 811 first = 1;
814 812
@@ -818,6 +816,7 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
818 } 816 }
819 817
820 if (path[0] == 0) { 818 if (path[0] == 0) {
819 root = fsc->sb->s_root;
821 dget(root); 820 dget(root);
822 } else { 821 } else {
823 dout("mount opening base mountpoint\n"); 822 dout("mount opening base mountpoint\n");
@@ -833,16 +832,14 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
833 mutex_unlock(&fsc->client->mount_mutex); 832 mutex_unlock(&fsc->client->mount_mutex);
834 return root; 833 return root;
835 834
836out:
837 mutex_unlock(&fsc->client->mount_mutex);
838 return ERR_PTR(err);
839
840fail: 835fail:
841 if (first) { 836 if (first) {
842 dput(fsc->sb->s_root); 837 dput(fsc->sb->s_root);
843 fsc->sb->s_root = NULL; 838 fsc->sb->s_root = NULL;
844 } 839 }
845 goto out; 840out:
841 mutex_unlock(&fsc->client->mount_mutex);
842 return ERR_PTR(err);
846} 843}
847 844
848static int ceph_set_super(struct super_block *s, void *data) 845static int ceph_set_super(struct super_block *s, void *data)
@@ -1042,19 +1039,14 @@ static int __init init_ceph(void)
1042 1039
1043 ceph_flock_init(); 1040 ceph_flock_init();
1044 ceph_xattr_init(); 1041 ceph_xattr_init();
1045 ret = ceph_snap_init();
1046 if (ret)
1047 goto out_xattr;
1048 ret = register_filesystem(&ceph_fs_type); 1042 ret = register_filesystem(&ceph_fs_type);
1049 if (ret) 1043 if (ret)
1050 goto out_snap; 1044 goto out_xattr;
1051 1045
1052 pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL); 1046 pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
1053 1047
1054 return 0; 1048 return 0;
1055 1049
1056out_snap:
1057 ceph_snap_exit();
1058out_xattr: 1050out_xattr:
1059 ceph_xattr_exit(); 1051 ceph_xattr_exit();
1060 destroy_caches(); 1052 destroy_caches();
@@ -1066,7 +1058,6 @@ static void __exit exit_ceph(void)
1066{ 1058{
1067 dout("exit_ceph\n"); 1059 dout("exit_ceph\n");
1068 unregister_filesystem(&ceph_fs_type); 1060 unregister_filesystem(&ceph_fs_type);
1069 ceph_snap_exit();
1070 ceph_xattr_exit(); 1061 ceph_xattr_exit();
1071 destroy_caches(); 1062 destroy_caches();
1072} 1063}
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 9c458eb52245..e705c4d612d7 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -37,8 +37,7 @@
37#define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */ 37#define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */
38#define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */ 38#define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */
39 39
40#define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES | \ 40#define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE
41 CEPH_MOUNT_OPT_DCACHE)
42 41
43#define ceph_set_mount_opt(fsc, opt) \ 42#define ceph_set_mount_opt(fsc, opt) \
44 (fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt; 43 (fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt;
@@ -469,7 +468,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
469#define CEPH_I_POOL_PERM (1 << 4) /* pool rd/wr bits are valid */ 468#define CEPH_I_POOL_PERM (1 << 4) /* pool rd/wr bits are valid */
470#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */ 469#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */
471#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */ 470#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */
472 471#define CEPH_I_SEC_INITED (1 << 7) /* security initialized */
473 472
474static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, 473static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
475 long long release_count, 474 long long release_count,
@@ -721,7 +720,6 @@ static inline int default_congestion_kb(void)
721 720
722 721
723/* snap.c */ 722/* snap.c */
724extern struct ceph_snap_context *ceph_empty_snapc;
725struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, 723struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
726 u64 ino); 724 u64 ino);
727extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc, 725extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
@@ -738,8 +736,6 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
738extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, 736extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
739 struct ceph_cap_snap *capsnap); 737 struct ceph_cap_snap *capsnap);
740extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); 738extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
741extern int ceph_snap_init(void);
742extern void ceph_snap_exit(void);
743 739
744/* 740/*
745 * a cap_snap is "pending" if it is still awaiting an in-progress 741 * a cap_snap is "pending" if it is still awaiting an in-progress
@@ -808,6 +804,20 @@ extern void __init ceph_xattr_init(void);
808extern void ceph_xattr_exit(void); 804extern void ceph_xattr_exit(void);
809extern const struct xattr_handler *ceph_xattr_handlers[]; 805extern const struct xattr_handler *ceph_xattr_handlers[];
810 806
807#ifdef CONFIG_SECURITY
808extern bool ceph_security_xattr_deadlock(struct inode *in);
809extern bool ceph_security_xattr_wanted(struct inode *in);
810#else
811static inline bool ceph_security_xattr_deadlock(struct inode *in)
812{
813 return false;
814}
815static inline bool ceph_security_xattr_wanted(struct inode *in)
816{
817 return false;
818}
819#endif
820
811/* acl.c */ 821/* acl.c */
812struct ceph_acls_info { 822struct ceph_acls_info {
813 void *default_acl; 823 void *default_acl;
@@ -947,7 +957,6 @@ extern void ceph_dentry_lru_touch(struct dentry *dn);
947extern void ceph_dentry_lru_del(struct dentry *dn); 957extern void ceph_dentry_lru_del(struct dentry *dn);
948extern void ceph_invalidate_dentry_lease(struct dentry *dentry); 958extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
949extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn); 959extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
950extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry);
951extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl); 960extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl);
952 961
953/* 962/*
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 819163d8313b..9410abdef3ce 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -714,31 +714,62 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
714 } 714 }
715} 715}
716 716
717static inline int __get_request_mask(struct inode *in) {
718 struct ceph_mds_request *req = current->journal_info;
719 int mask = 0;
720 if (req && req->r_target_inode == in) {
721 if (req->r_op == CEPH_MDS_OP_LOOKUP ||
722 req->r_op == CEPH_MDS_OP_LOOKUPINO ||
723 req->r_op == CEPH_MDS_OP_LOOKUPPARENT ||
724 req->r_op == CEPH_MDS_OP_GETATTR) {
725 mask = le32_to_cpu(req->r_args.getattr.mask);
726 } else if (req->r_op == CEPH_MDS_OP_OPEN ||
727 req->r_op == CEPH_MDS_OP_CREATE) {
728 mask = le32_to_cpu(req->r_args.open.mask);
729 }
730 }
731 return mask;
732}
733
717ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, 734ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
718 size_t size) 735 size_t size)
719{ 736{
720 struct ceph_inode_info *ci = ceph_inode(inode); 737 struct ceph_inode_info *ci = ceph_inode(inode);
721 int err;
722 struct ceph_inode_xattr *xattr; 738 struct ceph_inode_xattr *xattr;
723 struct ceph_vxattr *vxattr = NULL; 739 struct ceph_vxattr *vxattr = NULL;
740 int req_mask;
741 int err;
724 742
725 if (!ceph_is_valid_xattr(name)) 743 if (!ceph_is_valid_xattr(name))
726 return -ENODATA; 744 return -ENODATA;
727 745
728 /* let's see if a virtual xattr was requested */ 746 /* let's see if a virtual xattr was requested */
729 vxattr = ceph_match_vxattr(inode, name); 747 vxattr = ceph_match_vxattr(inode, name);
730 if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) { 748 if (vxattr) {
731 err = vxattr->getxattr_cb(ci, value, size); 749 err = -ENODATA;
750 if (!(vxattr->exists_cb && !vxattr->exists_cb(ci)))
751 err = vxattr->getxattr_cb(ci, value, size);
732 return err; 752 return err;
733 } 753 }
734 754
755 req_mask = __get_request_mask(inode);
756
735 spin_lock(&ci->i_ceph_lock); 757 spin_lock(&ci->i_ceph_lock);
736 dout("getxattr %p ver=%lld index_ver=%lld\n", inode, 758 dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
737 ci->i_xattrs.version, ci->i_xattrs.index_version); 759 ci->i_xattrs.version, ci->i_xattrs.index_version);
738 760
739 if (ci->i_xattrs.version == 0 || 761 if (ci->i_xattrs.version == 0 ||
740 !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) { 762 !((req_mask & CEPH_CAP_XATTR_SHARED) ||
763 __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1))) {
741 spin_unlock(&ci->i_ceph_lock); 764 spin_unlock(&ci->i_ceph_lock);
765
766 /* security module gets xattr while filling trace */
767 if (current->journal_info != NULL) {
768 pr_warn_ratelimited("sync getxattr %p "
769 "during filling trace\n", inode);
770 return -EBUSY;
771 }
772
742 /* get xattrs from mds (if we don't already have them) */ 773 /* get xattrs from mds (if we don't already have them) */
743 err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true); 774 err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
744 if (err) 775 if (err)
@@ -765,6 +796,9 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
765 796
766 memcpy(value, xattr->val, xattr->val_len); 797 memcpy(value, xattr->val, xattr->val_len);
767 798
799 if (current->journal_info != NULL &&
800 !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN))
801 ci->i_ceph_flags |= CEPH_I_SEC_INITED;
768out: 802out:
769 spin_unlock(&ci->i_ceph_lock); 803 spin_unlock(&ci->i_ceph_lock);
770 return err; 804 return err;
@@ -999,7 +1033,7 @@ retry:
999 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL, 1033 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
1000 &prealloc_cf); 1034 &prealloc_cf);
1001 ci->i_xattrs.dirty = true; 1035 ci->i_xattrs.dirty = true;
1002 inode->i_ctime = CURRENT_TIME; 1036 inode->i_ctime = current_fs_time(inode->i_sb);
1003 } 1037 }
1004 1038
1005 spin_unlock(&ci->i_ceph_lock); 1039 spin_unlock(&ci->i_ceph_lock);
@@ -1015,7 +1049,15 @@ do_sync:
1015do_sync_unlocked: 1049do_sync_unlocked:
1016 if (lock_snap_rwsem) 1050 if (lock_snap_rwsem)
1017 up_read(&mdsc->snap_rwsem); 1051 up_read(&mdsc->snap_rwsem);
1018 err = ceph_sync_setxattr(dentry, name, value, size, flags); 1052
1053 /* security module set xattr while filling trace */
1054 if (current->journal_info != NULL) {
1055 pr_warn_ratelimited("sync setxattr %p "
1056 "during filling trace\n", inode);
1057 err = -EBUSY;
1058 } else {
1059 err = ceph_sync_setxattr(dentry, name, value, size, flags);
1060 }
1019out: 1061out:
1020 ceph_free_cap_flush(prealloc_cf); 1062 ceph_free_cap_flush(prealloc_cf);
1021 kfree(newname); 1063 kfree(newname);
@@ -1136,7 +1178,7 @@ retry:
1136 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL, 1178 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
1137 &prealloc_cf); 1179 &prealloc_cf);
1138 ci->i_xattrs.dirty = true; 1180 ci->i_xattrs.dirty = true;
1139 inode->i_ctime = CURRENT_TIME; 1181 inode->i_ctime = current_fs_time(inode->i_sb);
1140 spin_unlock(&ci->i_ceph_lock); 1182 spin_unlock(&ci->i_ceph_lock);
1141 if (lock_snap_rwsem) 1183 if (lock_snap_rwsem)
1142 up_read(&mdsc->snap_rwsem); 1184 up_read(&mdsc->snap_rwsem);
@@ -1164,3 +1206,25 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
1164 1206
1165 return __ceph_removexattr(dentry, name); 1207 return __ceph_removexattr(dentry, name);
1166} 1208}
1209
1210#ifdef CONFIG_SECURITY
1211bool ceph_security_xattr_wanted(struct inode *in)
1212{
1213 return in->i_security != NULL;
1214}
1215
1216bool ceph_security_xattr_deadlock(struct inode *in)
1217{
1218 struct ceph_inode_info *ci;
1219 bool ret;
1220 if (in->i_security == NULL)
1221 return false;
1222 ci = ceph_inode(in);
1223 spin_lock(&ci->i_ceph_lock);
1224 ret = !(ci->i_ceph_flags & CEPH_I_SEC_INITED) &&
1225 !(ci->i_xattrs.version > 0 &&
1226 __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0));
1227 spin_unlock(&ci->i_ceph_lock);
1228 return ret;
1229}
1230#endif
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index 15151f3c4120..ae2f66833762 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -105,6 +105,7 @@ static inline u64 ceph_sanitize_features(u64 features)
105 */ 105 */
106#define CEPH_FEATURES_SUPPORTED_DEFAULT \ 106#define CEPH_FEATURES_SUPPORTED_DEFAULT \
107 (CEPH_FEATURE_NOSRCADDR | \ 107 (CEPH_FEATURE_NOSRCADDR | \
108 CEPH_FEATURE_SUBSCRIBE2 | \
108 CEPH_FEATURE_RECONNECT_SEQ | \ 109 CEPH_FEATURE_RECONNECT_SEQ | \
109 CEPH_FEATURE_PGID64 | \ 110 CEPH_FEATURE_PGID64 | \
110 CEPH_FEATURE_PGPOOL3 | \ 111 CEPH_FEATURE_PGPOOL3 | \
@@ -127,6 +128,7 @@ static inline u64 ceph_sanitize_features(u64 features)
127 128
128#define CEPH_FEATURES_REQUIRED_DEFAULT \ 129#define CEPH_FEATURES_REQUIRED_DEFAULT \
129 (CEPH_FEATURE_NOSRCADDR | \ 130 (CEPH_FEATURE_NOSRCADDR | \
131 CEPH_FEATURE_SUBSCRIBE2 | \
130 CEPH_FEATURE_RECONNECT_SEQ | \ 132 CEPH_FEATURE_RECONNECT_SEQ | \
131 CEPH_FEATURE_PGID64 | \ 133 CEPH_FEATURE_PGID64 | \
132 CEPH_FEATURE_PGPOOL3 | \ 134 CEPH_FEATURE_PGPOOL3 | \
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index d7d072a25c27..37f28bf55ce4 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -198,8 +198,8 @@ struct ceph_client_mount {
198#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */ 198#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */
199 199
200struct ceph_mon_subscribe_item { 200struct ceph_mon_subscribe_item {
201 __le64 have_version; __le64 have; 201 __le64 start;
202 __u8 onetime; 202 __u8 flags;
203} __attribute__ ((packed)); 203} __attribute__ ((packed));
204 204
205struct ceph_mon_subscribe_ack { 205struct ceph_mon_subscribe_ack {
@@ -376,7 +376,8 @@ union ceph_mds_request_args {
376 __le32 stripe_count; /* ... */ 376 __le32 stripe_count; /* ... */
377 __le32 object_size; 377 __le32 object_size;
378 __le32 file_replication; 378 __le32 file_replication;
379 __le32 unused; /* used to be preferred osd */ 379 __le32 mask; /* CEPH_CAP_* */
380 __le32 old_size;
380 } __attribute__ ((packed)) open; 381 } __attribute__ ((packed)) open;
381 struct { 382 struct {
382 __le32 flags; 383 __le32 flags;
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 3e3799cdc6e6..e7975e4681e1 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -47,7 +47,6 @@ struct ceph_options {
47 unsigned long mount_timeout; /* jiffies */ 47 unsigned long mount_timeout; /* jiffies */
48 unsigned long osd_idle_ttl; /* jiffies */ 48 unsigned long osd_idle_ttl; /* jiffies */
49 unsigned long osd_keepalive_timeout; /* jiffies */ 49 unsigned long osd_keepalive_timeout; /* jiffies */
50 unsigned long monc_ping_timeout; /* jiffies */
51 50
52 /* 51 /*
53 * any type that can't be simply compared or doesn't need need 52 * any type that can't be simply compared or doesn't need need
@@ -68,7 +67,12 @@ struct ceph_options {
68#define CEPH_MOUNT_TIMEOUT_DEFAULT msecs_to_jiffies(60 * 1000) 67#define CEPH_MOUNT_TIMEOUT_DEFAULT msecs_to_jiffies(60 * 1000)
69#define CEPH_OSD_KEEPALIVE_DEFAULT msecs_to_jiffies(5 * 1000) 68#define CEPH_OSD_KEEPALIVE_DEFAULT msecs_to_jiffies(5 * 1000)
70#define CEPH_OSD_IDLE_TTL_DEFAULT msecs_to_jiffies(60 * 1000) 69#define CEPH_OSD_IDLE_TTL_DEFAULT msecs_to_jiffies(60 * 1000)
71#define CEPH_MONC_PING_TIMEOUT_DEFAULT msecs_to_jiffies(30 * 1000) 70
71#define CEPH_MONC_HUNT_INTERVAL msecs_to_jiffies(3 * 1000)
72#define CEPH_MONC_PING_INTERVAL msecs_to_jiffies(10 * 1000)
73#define CEPH_MONC_PING_TIMEOUT msecs_to_jiffies(30 * 1000)
74#define CEPH_MONC_HUNT_BACKOFF 2
75#define CEPH_MONC_HUNT_MAX_MULT 10
72 76
73#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024) 77#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)
74#define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024) 78#define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024)
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 81810dc21f06..e230e7ed60d3 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -68,18 +68,24 @@ struct ceph_mon_client {
68 68
69 bool hunting; 69 bool hunting;
70 int cur_mon; /* last monitor i contacted */ 70 int cur_mon; /* last monitor i contacted */
71 unsigned long sub_sent, sub_renew_after; 71 unsigned long sub_renew_after;
72 unsigned long sub_renew_sent;
72 struct ceph_connection con; 73 struct ceph_connection con;
73 74
75 bool had_a_connection;
76 int hunt_mult; /* [1..CEPH_MONC_HUNT_MAX_MULT] */
77
74 /* pending generic requests */ 78 /* pending generic requests */
75 struct rb_root generic_request_tree; 79 struct rb_root generic_request_tree;
76 int num_generic_requests; 80 int num_generic_requests;
77 u64 last_tid; 81 u64 last_tid;
78 82
79 /* mds/osd map */ 83 /* subs, indexed with CEPH_SUB_* */
80 int want_mdsmap; 84 struct {
81 int want_next_osdmap; /* 1 = want, 2 = want+asked */ 85 struct ceph_mon_subscribe_item item;
82 u32 have_osdmap, have_mdsmap; 86 bool want;
87 u32 have; /* epoch */
88 } subs[3];
83 89
84#ifdef CONFIG_DEBUG_FS 90#ifdef CONFIG_DEBUG_FS
85 struct dentry *debugfs_file; 91 struct dentry *debugfs_file;
@@ -93,14 +99,23 @@ extern int ceph_monmap_contains(struct ceph_monmap *m,
93extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl); 99extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl);
94extern void ceph_monc_stop(struct ceph_mon_client *monc); 100extern void ceph_monc_stop(struct ceph_mon_client *monc);
95 101
102enum {
103 CEPH_SUB_MDSMAP = 0,
104 CEPH_SUB_MONMAP,
105 CEPH_SUB_OSDMAP,
106};
107
108extern const char *ceph_sub_str[];
109
96/* 110/*
97 * The model here is to indicate that we need a new map of at least 111 * The model here is to indicate that we need a new map of at least
98 * epoch @want, and also call in when we receive a map. We will 112 * epoch @epoch, and also call in when we receive a map. We will
99 * periodically rerequest the map from the monitor cluster until we 113 * periodically rerequest the map from the monitor cluster until we
100 * get what we want. 114 * get what we want.
101 */ 115 */
102extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have); 116bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
103extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have); 117 bool continuous);
118void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch);
104 119
105extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc); 120extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
106extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 121extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 7506b485bb6d..4343df806710 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -43,7 +43,8 @@ struct ceph_osd {
43}; 43};
44 44
45 45
46#define CEPH_OSD_MAX_OP 3 46#define CEPH_OSD_SLAB_OPS 2
47#define CEPH_OSD_MAX_OPS 16
47 48
48enum ceph_osd_data_type { 49enum ceph_osd_data_type {
49 CEPH_OSD_DATA_TYPE_NONE = 0, 50 CEPH_OSD_DATA_TYPE_NONE = 0,
@@ -77,7 +78,10 @@ struct ceph_osd_data {
77struct ceph_osd_req_op { 78struct ceph_osd_req_op {
78 u16 op; /* CEPH_OSD_OP_* */ 79 u16 op; /* CEPH_OSD_OP_* */
79 u32 flags; /* CEPH_OSD_OP_FLAG_* */ 80 u32 flags; /* CEPH_OSD_OP_FLAG_* */
80 u32 payload_len; 81 u32 indata_len; /* request */
82 u32 outdata_len; /* reply */
83 s32 rval;
84
81 union { 85 union {
82 struct ceph_osd_data raw_data_in; 86 struct ceph_osd_data raw_data_in;
83 struct { 87 struct {
@@ -136,7 +140,6 @@ struct ceph_osd_request {
136 140
137 /* request osd ops array */ 141 /* request osd ops array */
138 unsigned int r_num_ops; 142 unsigned int r_num_ops;
139 struct ceph_osd_req_op r_ops[CEPH_OSD_MAX_OP];
140 143
141 /* these are updated on each send */ 144 /* these are updated on each send */
142 __le32 *r_request_osdmap_epoch; 145 __le32 *r_request_osdmap_epoch;
@@ -148,8 +151,6 @@ struct ceph_osd_request {
148 struct ceph_eversion *r_request_reassert_version; 151 struct ceph_eversion *r_request_reassert_version;
149 152
150 int r_result; 153 int r_result;
151 int r_reply_op_len[CEPH_OSD_MAX_OP];
152 s32 r_reply_op_result[CEPH_OSD_MAX_OP];
153 int r_got_reply; 154 int r_got_reply;
154 int r_linger; 155 int r_linger;
155 156
@@ -174,6 +175,8 @@ struct ceph_osd_request {
174 unsigned long r_stamp; /* send OR check time */ 175 unsigned long r_stamp; /* send OR check time */
175 176
176 struct ceph_snap_context *r_snapc; /* snap context for writes */ 177 struct ceph_snap_context *r_snapc; /* snap context for writes */
178
179 struct ceph_osd_req_op r_ops[];
177}; 180};
178 181
179struct ceph_request_redirect { 182struct ceph_request_redirect {
@@ -263,6 +266,8 @@ extern void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
263 u64 truncate_size, u32 truncate_seq); 266 u64 truncate_size, u32 truncate_seq);
264extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 267extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
265 unsigned int which, u64 length); 268 unsigned int which, u64 length);
269extern void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
270 unsigned int which, u64 offset_inc);
266 271
267extern struct ceph_osd_data *osd_req_op_extent_osd_data( 272extern struct ceph_osd_data *osd_req_op_extent_osd_data(
268 struct ceph_osd_request *osd_req, 273 struct ceph_osd_request *osd_req,
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index bcbec33c6a14..dcc18c6f7cf9 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -361,7 +361,6 @@ ceph_parse_options(char *options, const char *dev_name,
361 opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; 361 opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
362 opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; 362 opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
363 opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; 363 opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
364 opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT;
365 364
366 /* get mon ip(s) */ 365 /* get mon ip(s) */
367 /* ip1[:port1][,ip2[:port2]...] */ 366 /* ip1[:port1][,ip2[:port2]...] */
@@ -686,6 +685,9 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started)
686 return client->auth_err; 685 return client->auth_err;
687 } 686 }
688 687
688 pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid);
689 ceph_debugfs_client_init(client);
690
689 return 0; 691 return 0;
690} 692}
691EXPORT_SYMBOL(__ceph_open_session); 693EXPORT_SYMBOL(__ceph_open_session);
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 593dc2eabcc8..b902fbc7863e 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -112,15 +112,20 @@ static int monc_show(struct seq_file *s, void *p)
112 struct ceph_mon_generic_request *req; 112 struct ceph_mon_generic_request *req;
113 struct ceph_mon_client *monc = &client->monc; 113 struct ceph_mon_client *monc = &client->monc;
114 struct rb_node *rp; 114 struct rb_node *rp;
115 int i;
115 116
116 mutex_lock(&monc->mutex); 117 mutex_lock(&monc->mutex);
117 118
118 if (monc->have_mdsmap) 119 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
119 seq_printf(s, "have mdsmap %u\n", (unsigned int)monc->have_mdsmap); 120 seq_printf(s, "have %s %u", ceph_sub_str[i],
120 if (monc->have_osdmap) 121 monc->subs[i].have);
121 seq_printf(s, "have osdmap %u\n", (unsigned int)monc->have_osdmap); 122 if (monc->subs[i].want)
122 if (monc->want_next_osdmap) 123 seq_printf(s, " want %llu%s",
123 seq_printf(s, "want next osdmap\n"); 124 le64_to_cpu(monc->subs[i].item.start),
125 (monc->subs[i].item.flags &
126 CEPH_SUBSCRIBE_ONETIME ? "" : "+"));
127 seq_putc(s, '\n');
128 }
124 129
125 for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) { 130 for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) {
126 __u16 op; 131 __u16 op;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9382619a405b..1831f6353622 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -235,18 +235,12 @@ static struct workqueue_struct *ceph_msgr_wq;
235static int ceph_msgr_slab_init(void) 235static int ceph_msgr_slab_init(void)
236{ 236{
237 BUG_ON(ceph_msg_cache); 237 BUG_ON(ceph_msg_cache);
238 ceph_msg_cache = kmem_cache_create("ceph_msg", 238 ceph_msg_cache = KMEM_CACHE(ceph_msg, 0);
239 sizeof (struct ceph_msg),
240 __alignof__(struct ceph_msg), 0, NULL);
241
242 if (!ceph_msg_cache) 239 if (!ceph_msg_cache)
243 return -ENOMEM; 240 return -ENOMEM;
244 241
245 BUG_ON(ceph_msg_data_cache); 242 BUG_ON(ceph_msg_data_cache);
246 ceph_msg_data_cache = kmem_cache_create("ceph_msg_data", 243 ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0);
247 sizeof (struct ceph_msg_data),
248 __alignof__(struct ceph_msg_data),
249 0, NULL);
250 if (ceph_msg_data_cache) 244 if (ceph_msg_data_cache)
251 return 0; 245 return 0;
252 246
@@ -1221,25 +1215,19 @@ static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
1221static void prepare_write_message_footer(struct ceph_connection *con) 1215static void prepare_write_message_footer(struct ceph_connection *con)
1222{ 1216{
1223 struct ceph_msg *m = con->out_msg; 1217 struct ceph_msg *m = con->out_msg;
1224 int v = con->out_kvec_left;
1225 1218
1226 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; 1219 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
1227 1220
1228 dout("prepare_write_message_footer %p\n", con); 1221 dout("prepare_write_message_footer %p\n", con);
1229 con->out_kvec[v].iov_base = &m->footer; 1222 con_out_kvec_add(con, sizeof_footer(con), &m->footer);
1230 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { 1223 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
1231 if (con->ops->sign_message) 1224 if (con->ops->sign_message)
1232 con->ops->sign_message(m); 1225 con->ops->sign_message(m);
1233 else 1226 else
1234 m->footer.sig = 0; 1227 m->footer.sig = 0;
1235 con->out_kvec[v].iov_len = sizeof(m->footer);
1236 con->out_kvec_bytes += sizeof(m->footer);
1237 } else { 1228 } else {
1238 m->old_footer.flags = m->footer.flags; 1229 m->old_footer.flags = m->footer.flags;
1239 con->out_kvec[v].iov_len = sizeof(m->old_footer);
1240 con->out_kvec_bytes += sizeof(m->old_footer);
1241 } 1230 }
1242 con->out_kvec_left++;
1243 con->out_more = m->more_to_follow; 1231 con->out_more = m->more_to_follow;
1244 con->out_msg_done = true; 1232 con->out_msg_done = true;
1245} 1233}
@@ -2409,11 +2397,7 @@ static int read_partial_message(struct ceph_connection *con)
2409 } 2397 }
2410 2398
2411 /* footer */ 2399 /* footer */
2412 if (need_sign) 2400 size = sizeof_footer(con);
2413 size = sizeof(m->footer);
2414 else
2415 size = sizeof(m->old_footer);
2416
2417 end += size; 2401 end += size;
2418 ret = read_partial(con, end, size, &m->footer); 2402 ret = read_partial(con, end, size, &m->footer);
2419 if (ret <= 0) 2403 if (ret <= 0)
@@ -3089,10 +3073,7 @@ void ceph_msg_revoke(struct ceph_msg *msg)
3089 con->out_skip += con_out_kvec_skip(con); 3073 con->out_skip += con_out_kvec_skip(con);
3090 } else { 3074 } else {
3091 BUG_ON(!msg->data_length); 3075 BUG_ON(!msg->data_length);
3092 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) 3076 con->out_skip += sizeof_footer(con);
3093 con->out_skip += sizeof(msg->footer);
3094 else
3095 con->out_skip += sizeof(msg->old_footer);
3096 } 3077 }
3097 /* data, middle, front */ 3078 /* data, middle, front */
3098 if (msg->data_length) 3079 if (msg->data_length)
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index de85dddc3dc0..cf638c009cfa 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -122,51 +122,91 @@ static void __close_session(struct ceph_mon_client *monc)
122 ceph_msg_revoke(monc->m_subscribe); 122 ceph_msg_revoke(monc->m_subscribe);
123 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 123 ceph_msg_revoke_incoming(monc->m_subscribe_ack);
124 ceph_con_close(&monc->con); 124 ceph_con_close(&monc->con);
125 monc->cur_mon = -1; 125
126 monc->pending_auth = 0; 126 monc->pending_auth = 0;
127 ceph_auth_reset(monc->auth); 127 ceph_auth_reset(monc->auth);
128} 128}
129 129
130/* 130/*
131 * Open a session with a (new) monitor. 131 * Pick a new monitor at random and set cur_mon. If we are repicking
132 * (i.e. cur_mon is already set), be sure to pick a different one.
132 */ 133 */
133static int __open_session(struct ceph_mon_client *monc) 134static void pick_new_mon(struct ceph_mon_client *monc)
134{ 135{
135 char r; 136 int old_mon = monc->cur_mon;
136 int ret;
137 137
138 if (monc->cur_mon < 0) { 138 BUG_ON(monc->monmap->num_mon < 1);
139 get_random_bytes(&r, 1); 139
140 monc->cur_mon = r % monc->monmap->num_mon; 140 if (monc->monmap->num_mon == 1) {
141 dout("open_session num=%d r=%d -> mon%d\n", 141 monc->cur_mon = 0;
142 monc->monmap->num_mon, r, monc->cur_mon);
143 monc->sub_sent = 0;
144 monc->sub_renew_after = jiffies; /* i.e., expired */
145 monc->want_next_osdmap = !!monc->want_next_osdmap;
146
147 dout("open_session mon%d opening\n", monc->cur_mon);
148 ceph_con_open(&monc->con,
149 CEPH_ENTITY_TYPE_MON, monc->cur_mon,
150 &monc->monmap->mon_inst[monc->cur_mon].addr);
151
152 /* send an initial keepalive to ensure our timestamp is
153 * valid by the time we are in an OPENED state */
154 ceph_con_keepalive(&monc->con);
155
156 /* initiatiate authentication handshake */
157 ret = ceph_auth_build_hello(monc->auth,
158 monc->m_auth->front.iov_base,
159 monc->m_auth->front_alloc_len);
160 __send_prepared_auth_request(monc, ret);
161 } else { 142 } else {
162 dout("open_session mon%d already open\n", monc->cur_mon); 143 int max = monc->monmap->num_mon;
144 int o = -1;
145 int n;
146
147 if (monc->cur_mon >= 0) {
148 if (monc->cur_mon < monc->monmap->num_mon)
149 o = monc->cur_mon;
150 if (o >= 0)
151 max--;
152 }
153
154 n = prandom_u32() % max;
155 if (o >= 0 && n >= o)
156 n++;
157
158 monc->cur_mon = n;
163 } 159 }
164 return 0; 160
161 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
162 monc->cur_mon, monc->monmap->num_mon);
163}
164
165/*
166 * Open a session with a new monitor.
167 */
168static void __open_session(struct ceph_mon_client *monc)
169{
170 int ret;
171
172 pick_new_mon(monc);
173
174 monc->hunting = true;
175 if (monc->had_a_connection) {
176 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
177 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
178 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
179 }
180
181 monc->sub_renew_after = jiffies; /* i.e., expired */
182 monc->sub_renew_sent = 0;
183
184 dout("%s opening mon%d\n", __func__, monc->cur_mon);
185 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
186 &monc->monmap->mon_inst[monc->cur_mon].addr);
187
188 /*
189 * send an initial keepalive to ensure our timestamp is valid
190 * by the time we are in an OPENED state
191 */
192 ceph_con_keepalive(&monc->con);
193
194 /* initiate authentication handshake */
195 ret = ceph_auth_build_hello(monc->auth,
196 monc->m_auth->front.iov_base,
197 monc->m_auth->front_alloc_len);
198 BUG_ON(ret <= 0);
199 __send_prepared_auth_request(monc, ret);
165} 200}
166 201
167static bool __sub_expired(struct ceph_mon_client *monc) 202static void reopen_session(struct ceph_mon_client *monc)
168{ 203{
169 return time_after_eq(jiffies, monc->sub_renew_after); 204 if (!monc->hunting)
205 pr_info("mon%d %s session lost, hunting for new mon\n",
206 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
207
208 __close_session(monc);
209 __open_session(monc);
170} 210}
171 211
172/* 212/*
@@ -174,74 +214,70 @@ static bool __sub_expired(struct ceph_mon_client *monc)
174 */ 214 */
175static void __schedule_delayed(struct ceph_mon_client *monc) 215static void __schedule_delayed(struct ceph_mon_client *monc)
176{ 216{
177 struct ceph_options *opt = monc->client->options;
178 unsigned long delay; 217 unsigned long delay;
179 218
180 if (monc->cur_mon < 0 || __sub_expired(monc)) { 219 if (monc->hunting)
181 delay = 10 * HZ; 220 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
182 } else { 221 else
183 delay = 20 * HZ; 222 delay = CEPH_MONC_PING_INTERVAL;
184 if (opt->monc_ping_timeout > 0) 223
185 delay = min(delay, opt->monc_ping_timeout / 3);
186 }
187 dout("__schedule_delayed after %lu\n", delay); 224 dout("__schedule_delayed after %lu\n", delay);
188 schedule_delayed_work(&monc->delayed_work, 225 mod_delayed_work(system_wq, &monc->delayed_work,
189 round_jiffies_relative(delay)); 226 round_jiffies_relative(delay));
190} 227}
191 228
229const char *ceph_sub_str[] = {
230 [CEPH_SUB_MDSMAP] = "mdsmap",
231 [CEPH_SUB_MONMAP] = "monmap",
232 [CEPH_SUB_OSDMAP] = "osdmap",
233};
234
192/* 235/*
193 * Send subscribe request for mdsmap and/or osdmap. 236 * Send subscribe request for one or more maps, according to
237 * monc->subs.
194 */ 238 */
195static void __send_subscribe(struct ceph_mon_client *monc) 239static void __send_subscribe(struct ceph_mon_client *monc)
196{ 240{
197 dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", 241 struct ceph_msg *msg = monc->m_subscribe;
198 (unsigned int)monc->sub_sent, __sub_expired(monc), 242 void *p = msg->front.iov_base;
199 monc->want_next_osdmap); 243 void *const end = p + msg->front_alloc_len;
200 if ((__sub_expired(monc) && !monc->sub_sent) || 244 int num = 0;
201 monc->want_next_osdmap == 1) { 245 int i;
202 struct ceph_msg *msg = monc->m_subscribe; 246
203 struct ceph_mon_subscribe_item *i; 247 dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
204 void *p, *end; 248
205 int num; 249 BUG_ON(monc->cur_mon < 0);
206 250
207 p = msg->front.iov_base; 251 if (!monc->sub_renew_sent)
208 end = p + msg->front_alloc_len; 252 monc->sub_renew_sent = jiffies | 1; /* never 0 */
209 253
210 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; 254 msg->hdr.version = cpu_to_le16(2);
211 ceph_encode_32(&p, num); 255
212 256 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
213 if (monc->want_next_osdmap) { 257 if (monc->subs[i].want)
214 dout("__send_subscribe to 'osdmap' %u\n", 258 num++;
215 (unsigned int)monc->have_osdmap);
216 ceph_encode_string(&p, end, "osdmap", 6);
217 i = p;
218 i->have = cpu_to_le64(monc->have_osdmap);
219 i->onetime = 1;
220 p += sizeof(*i);
221 monc->want_next_osdmap = 2; /* requested */
222 }
223 if (monc->want_mdsmap) {
224 dout("__send_subscribe to 'mdsmap' %u+\n",
225 (unsigned int)monc->have_mdsmap);
226 ceph_encode_string(&p, end, "mdsmap", 6);
227 i = p;
228 i->have = cpu_to_le64(monc->have_mdsmap);
229 i->onetime = 0;
230 p += sizeof(*i);
231 }
232 ceph_encode_string(&p, end, "monmap", 6);
233 i = p;
234 i->have = 0;
235 i->onetime = 0;
236 p += sizeof(*i);
237
238 msg->front.iov_len = p - msg->front.iov_base;
239 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
240 ceph_msg_revoke(msg);
241 ceph_con_send(&monc->con, ceph_msg_get(msg));
242
243 monc->sub_sent = jiffies | 1; /* never 0 */
244 } 259 }
260 BUG_ON(num < 1); /* monmap sub is always there */
261 ceph_encode_32(&p, num);
262 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
263 const char *s = ceph_sub_str[i];
264
265 if (!monc->subs[i].want)
266 continue;
267
268 dout("%s %s start %llu flags 0x%x\n", __func__, s,
269 le64_to_cpu(monc->subs[i].item.start),
270 monc->subs[i].item.flags);
271 ceph_encode_string(&p, end, s, strlen(s));
272 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
273 p += sizeof(monc->subs[i].item);
274 }
275
276 BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19));
277 msg->front.iov_len = p - msg->front.iov_base;
278 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
279 ceph_msg_revoke(msg);
280 ceph_con_send(&monc->con, ceph_msg_get(msg));
245} 281}
246 282
247static void handle_subscribe_ack(struct ceph_mon_client *monc, 283static void handle_subscribe_ack(struct ceph_mon_client *monc,
@@ -255,15 +291,16 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
255 seconds = le32_to_cpu(h->duration); 291 seconds = le32_to_cpu(h->duration);
256 292
257 mutex_lock(&monc->mutex); 293 mutex_lock(&monc->mutex);
258 if (monc->hunting) { 294 if (monc->sub_renew_sent) {
259 pr_info("mon%d %s session established\n", 295 monc->sub_renew_after = monc->sub_renew_sent +
260 monc->cur_mon, 296 (seconds >> 1) * HZ - 1;
261 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 297 dout("%s sent %lu duration %d renew after %lu\n", __func__,
262 monc->hunting = false; 298 monc->sub_renew_sent, seconds, monc->sub_renew_after);
299 monc->sub_renew_sent = 0;
300 } else {
301 dout("%s sent %lu renew after %lu, ignoring\n", __func__,
302 monc->sub_renew_sent, monc->sub_renew_after);
263 } 303 }
264 dout("handle_subscribe_ack after %d seconds\n", seconds);
265 monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
266 monc->sub_sent = 0;
267 mutex_unlock(&monc->mutex); 304 mutex_unlock(&monc->mutex);
268 return; 305 return;
269bad: 306bad:
@@ -272,36 +309,82 @@ bad:
272} 309}
273 310
274/* 311/*
275 * Keep track of which maps we have 312 * Register interest in a map
313 *
314 * @sub: one of CEPH_SUB_*
315 * @epoch: X for "every map since X", or 0 for "just the latest"
276 */ 316 */
277int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) 317static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
318 u32 epoch, bool continuous)
319{
320 __le64 start = cpu_to_le64(epoch);
321 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
322
323 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
324 epoch, continuous);
325
326 if (monc->subs[sub].want &&
327 monc->subs[sub].item.start == start &&
328 monc->subs[sub].item.flags == flags)
329 return false;
330
331 monc->subs[sub].item.start = start;
332 monc->subs[sub].item.flags = flags;
333 monc->subs[sub].want = true;
334
335 return true;
336}
337
338bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
339 bool continuous)
278{ 340{
341 bool need_request;
342
279 mutex_lock(&monc->mutex); 343 mutex_lock(&monc->mutex);
280 monc->have_mdsmap = got; 344 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
281 mutex_unlock(&monc->mutex); 345 mutex_unlock(&monc->mutex);
282 return 0; 346
347 return need_request;
283} 348}
284EXPORT_SYMBOL(ceph_monc_got_mdsmap); 349EXPORT_SYMBOL(ceph_monc_want_map);
285 350
286int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) 351/*
352 * Keep track of which maps we have
353 *
354 * @sub: one of CEPH_SUB_*
355 */
356static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
357 u32 epoch)
358{
359 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
360
361 if (monc->subs[sub].want) {
362 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
363 monc->subs[sub].want = false;
364 else
365 monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
366 }
367
368 monc->subs[sub].have = epoch;
369}
370
371void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
287{ 372{
288 mutex_lock(&monc->mutex); 373 mutex_lock(&monc->mutex);
289 monc->have_osdmap = got; 374 __ceph_monc_got_map(monc, sub, epoch);
290 monc->want_next_osdmap = 0;
291 mutex_unlock(&monc->mutex); 375 mutex_unlock(&monc->mutex);
292 return 0;
293} 376}
377EXPORT_SYMBOL(ceph_monc_got_map);
294 378
295/* 379/*
296 * Register interest in the next osdmap 380 * Register interest in the next osdmap
297 */ 381 */
298void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) 382void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
299{ 383{
300 dout("request_next_osdmap have %u\n", monc->have_osdmap); 384 dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have);
301 mutex_lock(&monc->mutex); 385 mutex_lock(&monc->mutex);
302 if (!monc->want_next_osdmap) 386 if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP,
303 monc->want_next_osdmap = 1; 387 monc->subs[CEPH_SUB_OSDMAP].have + 1, false))
304 if (monc->want_next_osdmap < 2)
305 __send_subscribe(monc); 388 __send_subscribe(monc);
306 mutex_unlock(&monc->mutex); 389 mutex_unlock(&monc->mutex);
307} 390}
@@ -320,15 +403,15 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
320 long ret; 403 long ret;
321 404
322 mutex_lock(&monc->mutex); 405 mutex_lock(&monc->mutex);
323 while (monc->have_osdmap < epoch) { 406 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
324 mutex_unlock(&monc->mutex); 407 mutex_unlock(&monc->mutex);
325 408
326 if (timeout && time_after_eq(jiffies, started + timeout)) 409 if (timeout && time_after_eq(jiffies, started + timeout))
327 return -ETIMEDOUT; 410 return -ETIMEDOUT;
328 411
329 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 412 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
330 monc->have_osdmap >= epoch, 413 monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
331 ceph_timeout_jiffies(timeout)); 414 ceph_timeout_jiffies(timeout));
332 if (ret < 0) 415 if (ret < 0)
333 return ret; 416 return ret;
334 417
@@ -341,11 +424,14 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
341EXPORT_SYMBOL(ceph_monc_wait_osdmap); 424EXPORT_SYMBOL(ceph_monc_wait_osdmap);
342 425
343/* 426/*
344 * 427 * Open a session with a random monitor. Request monmap and osdmap,
428 * which are waited upon in __ceph_open_session().
345 */ 429 */
346int ceph_monc_open_session(struct ceph_mon_client *monc) 430int ceph_monc_open_session(struct ceph_mon_client *monc)
347{ 431{
348 mutex_lock(&monc->mutex); 432 mutex_lock(&monc->mutex);
433 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
434 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
349 __open_session(monc); 435 __open_session(monc);
350 __schedule_delayed(monc); 436 __schedule_delayed(monc);
351 mutex_unlock(&monc->mutex); 437 mutex_unlock(&monc->mutex);
@@ -353,29 +439,15 @@ int ceph_monc_open_session(struct ceph_mon_client *monc)
353} 439}
354EXPORT_SYMBOL(ceph_monc_open_session); 440EXPORT_SYMBOL(ceph_monc_open_session);
355 441
356/*
357 * We require the fsid and global_id in order to initialize our
358 * debugfs dir.
359 */
360static bool have_debugfs_info(struct ceph_mon_client *monc)
361{
362 dout("have_debugfs_info fsid %d globalid %lld\n",
363 (int)monc->client->have_fsid, monc->auth->global_id);
364 return monc->client->have_fsid && monc->auth->global_id > 0;
365}
366
367static void ceph_monc_handle_map(struct ceph_mon_client *monc, 442static void ceph_monc_handle_map(struct ceph_mon_client *monc,
368 struct ceph_msg *msg) 443 struct ceph_msg *msg)
369{ 444{
370 struct ceph_client *client = monc->client; 445 struct ceph_client *client = monc->client;
371 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 446 struct ceph_monmap *monmap = NULL, *old = monc->monmap;
372 void *p, *end; 447 void *p, *end;
373 int had_debugfs_info, init_debugfs = 0;
374 448
375 mutex_lock(&monc->mutex); 449 mutex_lock(&monc->mutex);
376 450
377 had_debugfs_info = have_debugfs_info(monc);
378
379 dout("handle_monmap\n"); 451 dout("handle_monmap\n");
380 p = msg->front.iov_base; 452 p = msg->front.iov_base;
381 end = p + msg->front.iov_len; 453 end = p + msg->front.iov_len;
@@ -395,29 +467,11 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
395 client->monc.monmap = monmap; 467 client->monc.monmap = monmap;
396 kfree(old); 468 kfree(old);
397 469
398 if (!client->have_fsid) { 470 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
399 client->have_fsid = true; 471 client->have_fsid = true;
400 if (!had_debugfs_info && have_debugfs_info(monc)) {
401 pr_info("client%lld fsid %pU\n",
402 ceph_client_id(monc->client),
403 &monc->client->fsid);
404 init_debugfs = 1;
405 }
406 mutex_unlock(&monc->mutex);
407
408 if (init_debugfs) {
409 /*
410 * do debugfs initialization without mutex to avoid
411 * creating a locking dependency
412 */
413 ceph_debugfs_client_init(monc->client);
414 }
415 472
416 goto out_unlocked;
417 }
418out: 473out:
419 mutex_unlock(&monc->mutex); 474 mutex_unlock(&monc->mutex);
420out_unlocked:
421 wake_up_all(&client->auth_wq); 475 wake_up_all(&client->auth_wq);
422} 476}
423 477
@@ -745,18 +799,15 @@ static void delayed_work(struct work_struct *work)
745 dout("monc delayed_work\n"); 799 dout("monc delayed_work\n");
746 mutex_lock(&monc->mutex); 800 mutex_lock(&monc->mutex);
747 if (monc->hunting) { 801 if (monc->hunting) {
748 __close_session(monc); 802 dout("%s continuing hunt\n", __func__);
749 __open_session(monc); /* continue hunting */ 803 reopen_session(monc);
750 } else { 804 } else {
751 struct ceph_options *opt = monc->client->options;
752 int is_auth = ceph_auth_is_authenticated(monc->auth); 805 int is_auth = ceph_auth_is_authenticated(monc->auth);
753 if (ceph_con_keepalive_expired(&monc->con, 806 if (ceph_con_keepalive_expired(&monc->con,
754 opt->monc_ping_timeout)) { 807 CEPH_MONC_PING_TIMEOUT)) {
755 dout("monc keepalive timeout\n"); 808 dout("monc keepalive timeout\n");
756 is_auth = 0; 809 is_auth = 0;
757 __close_session(monc); 810 reopen_session(monc);
758 monc->hunting = true;
759 __open_session(monc);
760 } 811 }
761 812
762 if (!monc->hunting) { 813 if (!monc->hunting) {
@@ -764,8 +815,14 @@ static void delayed_work(struct work_struct *work)
764 __validate_auth(monc); 815 __validate_auth(monc);
765 } 816 }
766 817
767 if (is_auth) 818 if (is_auth) {
768 __send_subscribe(monc); 819 unsigned long now = jiffies;
820
821 dout("%s renew subs? now %lu renew after %lu\n",
822 __func__, now, monc->sub_renew_after);
823 if (time_after_eq(now, monc->sub_renew_after))
824 __send_subscribe(monc);
825 }
769 } 826 }
770 __schedule_delayed(monc); 827 __schedule_delayed(monc);
771 mutex_unlock(&monc->mutex); 828 mutex_unlock(&monc->mutex);
@@ -852,18 +909,14 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
852 &monc->client->msgr); 909 &monc->client->msgr);
853 910
854 monc->cur_mon = -1; 911 monc->cur_mon = -1;
855 monc->hunting = true; 912 monc->had_a_connection = false;
856 monc->sub_renew_after = jiffies; 913 monc->hunt_mult = 1;
857 monc->sub_sent = 0;
858 914
859 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 915 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
860 monc->generic_request_tree = RB_ROOT; 916 monc->generic_request_tree = RB_ROOT;
861 monc->num_generic_requests = 0; 917 monc->num_generic_requests = 0;
862 monc->last_tid = 0; 918 monc->last_tid = 0;
863 919
864 monc->have_mdsmap = 0;
865 monc->have_osdmap = 0;
866 monc->want_next_osdmap = 1;
867 return 0; 920 return 0;
868 921
869out_auth_reply: 922out_auth_reply:
@@ -888,7 +941,7 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
888 941
889 mutex_lock(&monc->mutex); 942 mutex_lock(&monc->mutex);
890 __close_session(monc); 943 __close_session(monc);
891 944 monc->cur_mon = -1;
892 mutex_unlock(&monc->mutex); 945 mutex_unlock(&monc->mutex);
893 946
894 /* 947 /*
@@ -910,26 +963,40 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
910} 963}
911EXPORT_SYMBOL(ceph_monc_stop); 964EXPORT_SYMBOL(ceph_monc_stop);
912 965
966static void finish_hunting(struct ceph_mon_client *monc)
967{
968 if (monc->hunting) {
969 dout("%s found mon%d\n", __func__, monc->cur_mon);
970 monc->hunting = false;
971 monc->had_a_connection = true;
972 monc->hunt_mult /= 2; /* reduce by 50% */
973 if (monc->hunt_mult < 1)
974 monc->hunt_mult = 1;
975 }
976}
977
913static void handle_auth_reply(struct ceph_mon_client *monc, 978static void handle_auth_reply(struct ceph_mon_client *monc,
914 struct ceph_msg *msg) 979 struct ceph_msg *msg)
915{ 980{
916 int ret; 981 int ret;
917 int was_auth = 0; 982 int was_auth = 0;
918 int had_debugfs_info, init_debugfs = 0;
919 983
920 mutex_lock(&monc->mutex); 984 mutex_lock(&monc->mutex);
921 had_debugfs_info = have_debugfs_info(monc);
922 was_auth = ceph_auth_is_authenticated(monc->auth); 985 was_auth = ceph_auth_is_authenticated(monc->auth);
923 monc->pending_auth = 0; 986 monc->pending_auth = 0;
924 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 987 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
925 msg->front.iov_len, 988 msg->front.iov_len,
926 monc->m_auth->front.iov_base, 989 monc->m_auth->front.iov_base,
927 monc->m_auth->front_alloc_len); 990 monc->m_auth->front_alloc_len);
991 if (ret > 0) {
992 __send_prepared_auth_request(monc, ret);
993 goto out;
994 }
995
996 finish_hunting(monc);
997
928 if (ret < 0) { 998 if (ret < 0) {
929 monc->client->auth_err = ret; 999 monc->client->auth_err = ret;
930 wake_up_all(&monc->client->auth_wq);
931 } else if (ret > 0) {
932 __send_prepared_auth_request(monc, ret);
933 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { 1000 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
934 dout("authenticated, starting session\n"); 1001 dout("authenticated, starting session\n");
935 1002
@@ -939,23 +1006,15 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
939 1006
940 __send_subscribe(monc); 1007 __send_subscribe(monc);
941 __resend_generic_request(monc); 1008 __resend_generic_request(monc);
942 }
943 1009
944 if (!had_debugfs_info && have_debugfs_info(monc)) { 1010 pr_info("mon%d %s session established\n", monc->cur_mon,
945 pr_info("client%lld fsid %pU\n", 1011 ceph_pr_addr(&monc->con.peer_addr.in_addr));
946 ceph_client_id(monc->client),
947 &monc->client->fsid);
948 init_debugfs = 1;
949 } 1012 }
950 mutex_unlock(&monc->mutex);
951 1013
952 if (init_debugfs) { 1014out:
953 /* 1015 mutex_unlock(&monc->mutex);
954 * do debugfs initialization without mutex to avoid 1016 if (monc->client->auth_err < 0)
955 * creating a locking dependency 1017 wake_up_all(&monc->client->auth_wq);
956 */
957 ceph_debugfs_client_init(monc->client);
958 }
959} 1018}
960 1019
961static int __validate_auth(struct ceph_mon_client *monc) 1020static int __validate_auth(struct ceph_mon_client *monc)
@@ -1096,29 +1155,17 @@ static void mon_fault(struct ceph_connection *con)
1096{ 1155{
1097 struct ceph_mon_client *monc = con->private; 1156 struct ceph_mon_client *monc = con->private;
1098 1157
1099 if (!monc)
1100 return;
1101
1102 dout("mon_fault\n");
1103 mutex_lock(&monc->mutex); 1158 mutex_lock(&monc->mutex);
1104 if (!con->private) 1159 dout("%s mon%d\n", __func__, monc->cur_mon);
1105 goto out; 1160 if (monc->cur_mon >= 0) {
1106 1161 if (!monc->hunting) {
1107 if (!monc->hunting) 1162 dout("%s hunting for new mon\n", __func__);
1108 pr_info("mon%d %s session lost, " 1163 reopen_session(monc);
1109 "hunting for new mon\n", monc->cur_mon, 1164 __schedule_delayed(monc);
1110 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 1165 } else {
1111 1166 dout("%s already hunting\n", __func__);
1112 __close_session(monc); 1167 }
1113 if (!monc->hunting) {
1114 /* start hunting */
1115 monc->hunting = true;
1116 __open_session(monc);
1117 } else {
1118 /* already hunting, let's wait a bit */
1119 __schedule_delayed(monc);
1120 } 1168 }
1121out:
1122 mutex_unlock(&monc->mutex); 1169 mutex_unlock(&monc->mutex);
1123} 1170}
1124 1171
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5bc053778fed..32355d9d0103 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,9 +338,10 @@ static void ceph_osdc_release_request(struct kref *kref)
338 ceph_put_snap_context(req->r_snapc); 338 ceph_put_snap_context(req->r_snapc);
339 if (req->r_mempool) 339 if (req->r_mempool)
340 mempool_free(req, req->r_osdc->req_mempool); 340 mempool_free(req, req->r_osdc->req_mempool);
341 else 341 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
342 kmem_cache_free(ceph_osd_request_cache, req); 342 kmem_cache_free(ceph_osd_request_cache, req);
343 343 else
344 kfree(req);
344} 345}
345 346
346void ceph_osdc_get_request(struct ceph_osd_request *req) 347void ceph_osdc_get_request(struct ceph_osd_request *req)
@@ -369,28 +370,22 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
369 struct ceph_msg *msg; 370 struct ceph_msg *msg;
370 size_t msg_size; 371 size_t msg_size;
371 372
372 BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
373 BUG_ON(num_ops > CEPH_OSD_MAX_OP);
374
375 msg_size = 4 + 4 + 8 + 8 + 4+8;
376 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
377 msg_size += 1 + 8 + 4 + 4; /* pg_t */
378 msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
379 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
380 msg_size += 8; /* snapid */
381 msg_size += 8; /* snap_seq */
382 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
383 msg_size += 4;
384
385 if (use_mempool) { 373 if (use_mempool) {
374 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
386 req = mempool_alloc(osdc->req_mempool, gfp_flags); 375 req = mempool_alloc(osdc->req_mempool, gfp_flags);
387 memset(req, 0, sizeof(*req)); 376 } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
377 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
388 } else { 378 } else {
389 req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags); 379 BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
380 req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
381 gfp_flags);
390 } 382 }
391 if (req == NULL) 383 if (unlikely(!req))
392 return NULL; 384 return NULL;
393 385
386 /* req only, each op is zeroed in _osd_req_op_init() */
387 memset(req, 0, sizeof(*req));
388
394 req->r_osdc = osdc; 389 req->r_osdc = osdc;
395 req->r_mempool = use_mempool; 390 req->r_mempool = use_mempool;
396 req->r_num_ops = num_ops; 391 req->r_num_ops = num_ops;
@@ -408,18 +403,36 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
408 req->r_base_oloc.pool = -1; 403 req->r_base_oloc.pool = -1;
409 req->r_target_oloc.pool = -1; 404 req->r_target_oloc.pool = -1;
410 405
406 msg_size = OSD_OPREPLY_FRONT_LEN;
407 if (num_ops > CEPH_OSD_SLAB_OPS) {
408 /* ceph_osd_op and rval */
409 msg_size += (num_ops - CEPH_OSD_SLAB_OPS) *
410 (sizeof(struct ceph_osd_op) + 4);
411 }
412
411 /* create reply message */ 413 /* create reply message */
412 if (use_mempool) 414 if (use_mempool)
413 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 415 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
414 else 416 else
415 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 417 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size,
416 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 418 gfp_flags, true);
417 if (!msg) { 419 if (!msg) {
418 ceph_osdc_put_request(req); 420 ceph_osdc_put_request(req);
419 return NULL; 421 return NULL;
420 } 422 }
421 req->r_reply = msg; 423 req->r_reply = msg;
422 424
425 msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
426 msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
427 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
428 msg_size += 1 + 8 + 4 + 4; /* pgid */
429 msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
430 msg_size += 2 + num_ops * sizeof(struct ceph_osd_op);
431 msg_size += 8; /* snapid */
432 msg_size += 8; /* snap_seq */
433 msg_size += 4 + 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
434 msg_size += 4; /* retry_attempt */
435
423 /* create request message; allow space for oid */ 436 /* create request message; allow space for oid */
424 if (use_mempool) 437 if (use_mempool)
425 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 438 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
@@ -498,7 +511,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
498 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) 511 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
499 payload_len += length; 512 payload_len += length;
500 513
501 op->payload_len = payload_len; 514 op->indata_len = payload_len;
502} 515}
503EXPORT_SYMBOL(osd_req_op_extent_init); 516EXPORT_SYMBOL(osd_req_op_extent_init);
504 517
@@ -517,10 +530,32 @@ void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
517 BUG_ON(length > previous); 530 BUG_ON(length > previous);
518 531
519 op->extent.length = length; 532 op->extent.length = length;
520 op->payload_len -= previous - length; 533 op->indata_len -= previous - length;
521} 534}
522EXPORT_SYMBOL(osd_req_op_extent_update); 535EXPORT_SYMBOL(osd_req_op_extent_update);
523 536
537void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
538 unsigned int which, u64 offset_inc)
539{
540 struct ceph_osd_req_op *op, *prev_op;
541
542 BUG_ON(which + 1 >= osd_req->r_num_ops);
543
544 prev_op = &osd_req->r_ops[which];
545 op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
546 /* dup previous one */
547 op->indata_len = prev_op->indata_len;
548 op->outdata_len = prev_op->outdata_len;
549 op->extent = prev_op->extent;
550 /* adjust offset */
551 op->extent.offset += offset_inc;
552 op->extent.length -= offset_inc;
553
554 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
555 op->indata_len -= offset_inc;
556}
557EXPORT_SYMBOL(osd_req_op_extent_dup_last);
558
524void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 559void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
525 u16 opcode, const char *class, const char *method) 560 u16 opcode, const char *class, const char *method)
526{ 561{
@@ -554,7 +589,7 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
554 589
555 op->cls.argc = 0; /* currently unused */ 590 op->cls.argc = 0; /* currently unused */
556 591
557 op->payload_len = payload_len; 592 op->indata_len = payload_len;
558} 593}
559EXPORT_SYMBOL(osd_req_op_cls_init); 594EXPORT_SYMBOL(osd_req_op_cls_init);
560 595
@@ -587,7 +622,7 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
587 op->xattr.cmp_mode = cmp_mode; 622 op->xattr.cmp_mode = cmp_mode;
588 623
589 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); 624 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
590 op->payload_len = payload_len; 625 op->indata_len = payload_len;
591 return 0; 626 return 0;
592} 627}
593EXPORT_SYMBOL(osd_req_op_xattr_init); 628EXPORT_SYMBOL(osd_req_op_xattr_init);
@@ -707,7 +742,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
707 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE); 742 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
708 dst->cls.indata_len = cpu_to_le32(data_length); 743 dst->cls.indata_len = cpu_to_le32(data_length);
709 ceph_osdc_msg_data_add(req->r_request, osd_data); 744 ceph_osdc_msg_data_add(req->r_request, osd_data);
710 src->payload_len += data_length; 745 src->indata_len += data_length;
711 request_data_len += data_length; 746 request_data_len += data_length;
712 } 747 }
713 osd_data = &src->cls.response_data; 748 osd_data = &src->cls.response_data;
@@ -750,7 +785,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
750 785
751 dst->op = cpu_to_le16(src->op); 786 dst->op = cpu_to_le16(src->op);
752 dst->flags = cpu_to_le32(src->flags); 787 dst->flags = cpu_to_le32(src->flags);
753 dst->payload_len = cpu_to_le32(src->payload_len); 788 dst->payload_len = cpu_to_le32(src->indata_len);
754 789
755 return request_data_len; 790 return request_data_len;
756} 791}
@@ -1810,7 +1845,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1810 1845
1811 ceph_decode_need(&p, end, 4, bad_put); 1846 ceph_decode_need(&p, end, 4, bad_put);
1812 numops = ceph_decode_32(&p); 1847 numops = ceph_decode_32(&p);
1813 if (numops > CEPH_OSD_MAX_OP) 1848 if (numops > CEPH_OSD_MAX_OPS)
1814 goto bad_put; 1849 goto bad_put;
1815 if (numops != req->r_num_ops) 1850 if (numops != req->r_num_ops)
1816 goto bad_put; 1851 goto bad_put;
@@ -1821,7 +1856,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1821 int len; 1856 int len;
1822 1857
1823 len = le32_to_cpu(op->payload_len); 1858 len = le32_to_cpu(op->payload_len);
1824 req->r_reply_op_len[i] = len; 1859 req->r_ops[i].outdata_len = len;
1825 dout(" op %d has %d bytes\n", i, len); 1860 dout(" op %d has %d bytes\n", i, len);
1826 payload_len += len; 1861 payload_len += len;
1827 p += sizeof(*op); 1862 p += sizeof(*op);
@@ -1836,7 +1871,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1836 ceph_decode_need(&p, end, 4 + numops * 4, bad_put); 1871 ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
1837 retry_attempt = ceph_decode_32(&p); 1872 retry_attempt = ceph_decode_32(&p);
1838 for (i = 0; i < numops; i++) 1873 for (i = 0; i < numops; i++)
1839 req->r_reply_op_result[i] = ceph_decode_32(&p); 1874 req->r_ops[i].rval = ceph_decode_32(&p);
1840 1875
1841 if (le16_to_cpu(msg->hdr.version) >= 6) { 1876 if (le16_to_cpu(msg->hdr.version) >= 6) {
1842 p += 8 + 4; /* skip replay_version */ 1877 p += 8 + 4; /* skip replay_version */
@@ -2187,7 +2222,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
2187 goto bad; 2222 goto bad;
2188done: 2223done:
2189 downgrade_write(&osdc->map_sem); 2224 downgrade_write(&osdc->map_sem);
2190 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 2225 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2226 osdc->osdmap->epoch);
2191 2227
2192 /* 2228 /*
2193 * subscribe to subsequent osdmap updates if full to ensure 2229 * subscribe to subsequent osdmap updates if full to ensure
@@ -2646,8 +2682,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
2646 round_jiffies_relative(osdc->client->options->osd_idle_ttl)); 2682 round_jiffies_relative(osdc->client->options->osd_idle_ttl));
2647 2683
2648 err = -ENOMEM; 2684 err = -ENOMEM;
2649 osdc->req_mempool = mempool_create_kmalloc_pool(10, 2685 osdc->req_mempool = mempool_create_slab_pool(10,
2650 sizeof(struct ceph_osd_request)); 2686 ceph_osd_request_cache);
2651 if (!osdc->req_mempool) 2687 if (!osdc->req_mempool)
2652 goto out; 2688 goto out;
2653 2689
@@ -2782,11 +2818,12 @@ EXPORT_SYMBOL(ceph_osdc_writepages);
2782 2818
2783int ceph_osdc_setup(void) 2819int ceph_osdc_setup(void)
2784{ 2820{
2821 size_t size = sizeof(struct ceph_osd_request) +
2822 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
2823
2785 BUG_ON(ceph_osd_request_cache); 2824 BUG_ON(ceph_osd_request_cache);
2786 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", 2825 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
2787 sizeof (struct ceph_osd_request), 2826 0, 0, NULL);
2788 __alignof__(struct ceph_osd_request),
2789 0, NULL);
2790 2827
2791 return ceph_osd_request_cache ? 0 : -ENOMEM; 2828 return ceph_osd_request_cache ? 0 : -ENOMEM;
2792} 2829}