aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-06-13 02:06:23 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2014-06-13 02:06:23 -0400
commit6d87c225f5d82d29243dc124f1ffcbb0e14ec358 (patch)
tree7d72e2e6a77ec0911e86911d2ddae62c1b4161cf
parent338c09a94b14c449dd53227e9bea44816668c6a5 (diff)
parent22001f619f29ddf66582d834223dcff4c0b74595 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "This has a mix of bug fixes and cleanups. Alex's patch fixes a rare race in RBD. Ilya's patches fix an ENOENT check when a second rbd image is mapped and a couple memory leaks. Zheng fixes several issues with fragmented directories and multiple MDSs. Josh fixes a spin/sleep issue, and Josh and Guangliang's patches fix setting and unsetting RBD images read-only. Naturally there are several other cleanups mixed in for good measure" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (23 commits) rbd: only set disk to read-only once rbd: move calls that may sleep out of spin lock range rbd: add ioctl for rbd ceph: use truncate_pagecache() instead of truncate_inode_pages() ceph: include time stamp in every MDS request rbd: fix ida/idr memory leak rbd: use reference counts for image requests rbd: fix osd_request memory leak in __rbd_dev_header_watch_sync() rbd: make sure we have latest osdmap on 'rbd map' libceph: add ceph_monc_wait_osdmap() libceph: mon_get_version request infrastructure libceph: recognize poolop requests in debugfs ceph: refactor readpage_nounlock() to make the logic clearer mds: check cap ID when handling cap export message ceph: remember subtree root dirfrag's auth MDS ceph: introduce ceph_fill_fragtree() ceph: handle cap import atomically ceph: pre-allocate ceph_cap struct for ceph_add_cap() ceph: update inode fields according to issued caps rbd: replace IS_ERR and PTR_ERR with PTR_ERR_OR_ZERO ...
-rw-r--r--drivers/block/rbd.c242
-rw-r--r--fs/ceph/acl.c6
-rw-r--r--fs/ceph/addr.c17
-rw-r--r--fs/ceph/caps.c246
-rw-r--r--fs/ceph/export.c2
-rw-r--r--fs/ceph/inode.c247
-rw-r--r--fs/ceph/mds_client.c9
-rw-r--r--fs/ceph/mds_client.h1
-rw-r--r--fs/ceph/super.h13
-rw-r--r--include/linux/ceph/ceph_fs.h2
-rw-r--r--include/linux/ceph/mon_client.h11
-rw-r--r--net/ceph/ceph_common.c2
-rw-r--r--net/ceph/debugfs.c8
-rw-r--r--net/ceph/mon_client.c150
14 files changed, 670 insertions, 286 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 4c95b503b09e..bbeb404b3a07 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -541,7 +541,6 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
541 return -ENOENT; 541 return -ENOENT;
542 542
543 (void) get_device(&rbd_dev->dev); 543 (void) get_device(&rbd_dev->dev);
544 set_device_ro(bdev, rbd_dev->mapping.read_only);
545 544
546 return 0; 545 return 0;
547} 546}
@@ -559,10 +558,76 @@ static void rbd_release(struct gendisk *disk, fmode_t mode)
559 put_device(&rbd_dev->dev); 558 put_device(&rbd_dev->dev);
560} 559}
561 560
561static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
562{
563 int ret = 0;
564 int val;
565 bool ro;
566 bool ro_changed = false;
567
568 /* get_user() may sleep, so call it before taking rbd_dev->lock */
569 if (get_user(val, (int __user *)(arg)))
570 return -EFAULT;
571
572 ro = val ? true : false;
573 /* Snapshot doesn't allow to write*/
574 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
575 return -EROFS;
576
577 spin_lock_irq(&rbd_dev->lock);
578 /* prevent others open this device */
579 if (rbd_dev->open_count > 1) {
580 ret = -EBUSY;
581 goto out;
582 }
583
584 if (rbd_dev->mapping.read_only != ro) {
585 rbd_dev->mapping.read_only = ro;
586 ro_changed = true;
587 }
588
589out:
590 spin_unlock_irq(&rbd_dev->lock);
591 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
592 if (ret == 0 && ro_changed)
593 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
594
595 return ret;
596}
597
598static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
599 unsigned int cmd, unsigned long arg)
600{
601 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602 int ret = 0;
603
604 switch (cmd) {
605 case BLKROSET:
606 ret = rbd_ioctl_set_ro(rbd_dev, arg);
607 break;
608 default:
609 ret = -ENOTTY;
610 }
611
612 return ret;
613}
614
615#ifdef CONFIG_COMPAT
616static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
617 unsigned int cmd, unsigned long arg)
618{
619 return rbd_ioctl(bdev, mode, cmd, arg);
620}
621#endif /* CONFIG_COMPAT */
622
562static const struct block_device_operations rbd_bd_ops = { 623static const struct block_device_operations rbd_bd_ops = {
563 .owner = THIS_MODULE, 624 .owner = THIS_MODULE,
564 .open = rbd_open, 625 .open = rbd_open,
565 .release = rbd_release, 626 .release = rbd_release,
627 .ioctl = rbd_ioctl,
628#ifdef CONFIG_COMPAT
629 .compat_ioctl = rbd_compat_ioctl,
630#endif
566}; 631};
567 632
568/* 633/*
@@ -1382,6 +1447,13 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1382 kref_put(&obj_request->kref, rbd_obj_request_destroy); 1447 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1383} 1448}
1384 1449
1450static void rbd_img_request_get(struct rbd_img_request *img_request)
1451{
1452 dout("%s: img %p (was %d)\n", __func__, img_request,
1453 atomic_read(&img_request->kref.refcount));
1454 kref_get(&img_request->kref);
1455}
1456
1385static bool img_request_child_test(struct rbd_img_request *img_request); 1457static bool img_request_child_test(struct rbd_img_request *img_request);
1386static void rbd_parent_request_destroy(struct kref *kref); 1458static void rbd_parent_request_destroy(struct kref *kref);
1387static void rbd_img_request_destroy(struct kref *kref); 1459static void rbd_img_request_destroy(struct kref *kref);
@@ -2142,6 +2214,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2142 img_request->next_completion = which; 2214 img_request->next_completion = which;
2143out: 2215out:
2144 spin_unlock_irq(&img_request->completion_lock); 2216 spin_unlock_irq(&img_request->completion_lock);
2217 rbd_img_request_put(img_request);
2145 2218
2146 if (!more) 2219 if (!more)
2147 rbd_img_request_complete(img_request); 2220 rbd_img_request_complete(img_request);
@@ -2242,6 +2315,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
2242 goto out_unwind; 2315 goto out_unwind;
2243 obj_request->osd_req = osd_req; 2316 obj_request->osd_req = osd_req;
2244 obj_request->callback = rbd_img_obj_callback; 2317 obj_request->callback = rbd_img_obj_callback;
2318 rbd_img_request_get(img_request);
2245 2319
2246 if (write_request) { 2320 if (write_request) {
2247 osd_req_op_alloc_hint_init(osd_req, which, 2321 osd_req_op_alloc_hint_init(osd_req, which,
@@ -2872,56 +2946,55 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2872} 2946}
2873 2947
2874/* 2948/*
2875 * Request sync osd watch/unwatch. The value of "start" determines 2949 * Initiate a watch request, synchronously.
2876 * whether a watch request is being initiated or torn down.
2877 */ 2950 */
2878static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) 2951static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
2879{ 2952{
2880 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2953 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2881 struct rbd_obj_request *obj_request; 2954 struct rbd_obj_request *obj_request;
2882 int ret; 2955 int ret;
2883 2956
2884 rbd_assert(start ^ !!rbd_dev->watch_event); 2957 rbd_assert(!rbd_dev->watch_event);
2885 rbd_assert(start ^ !!rbd_dev->watch_request); 2958 rbd_assert(!rbd_dev->watch_request);
2886 2959
2887 if (start) { 2960 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2888 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, 2961 &rbd_dev->watch_event);
2889 &rbd_dev->watch_event); 2962 if (ret < 0)
2890 if (ret < 0) 2963 return ret;
2891 return ret; 2964
2892 rbd_assert(rbd_dev->watch_event != NULL); 2965 rbd_assert(rbd_dev->watch_event);
2893 }
2894 2966
2895 ret = -ENOMEM;
2896 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, 2967 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2897 OBJ_REQUEST_NODATA); 2968 OBJ_REQUEST_NODATA);
2898 if (!obj_request) 2969 if (!obj_request) {
2970 ret = -ENOMEM;
2899 goto out_cancel; 2971 goto out_cancel;
2972 }
2900 2973
2901 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, 2974 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
2902 obj_request); 2975 obj_request);
2903 if (!obj_request->osd_req) 2976 if (!obj_request->osd_req) {
2904 goto out_cancel; 2977 ret = -ENOMEM;
2978 goto out_put;
2979 }
2905 2980
2906 if (start) 2981 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2907 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2908 else
2909 ceph_osdc_unregister_linger_request(osdc,
2910 rbd_dev->watch_request->osd_req);
2911 2982
2912 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, 2983 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2913 rbd_dev->watch_event->cookie, 0, start ? 1 : 0); 2984 rbd_dev->watch_event->cookie, 0, 1);
2914 rbd_osd_req_format_write(obj_request); 2985 rbd_osd_req_format_write(obj_request);
2915 2986
2916 ret = rbd_obj_request_submit(osdc, obj_request); 2987 ret = rbd_obj_request_submit(osdc, obj_request);
2917 if (ret) 2988 if (ret)
2918 goto out_cancel; 2989 goto out_linger;
2990
2919 ret = rbd_obj_request_wait(obj_request); 2991 ret = rbd_obj_request_wait(obj_request);
2920 if (ret) 2992 if (ret)
2921 goto out_cancel; 2993 goto out_linger;
2994
2922 ret = obj_request->result; 2995 ret = obj_request->result;
2923 if (ret) 2996 if (ret)
2924 goto out_cancel; 2997 goto out_linger;
2925 2998
2926 /* 2999 /*
2927 * A watch request is set to linger, so the underlying osd 3000 * A watch request is set to linger, so the underlying osd
@@ -2931,36 +3004,84 @@ static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2931 * it. We'll drop that reference (below) after we've 3004 * it. We'll drop that reference (below) after we've
2932 * unregistered it. 3005 * unregistered it.
2933 */ 3006 */
2934 if (start) { 3007 rbd_dev->watch_request = obj_request;
2935 rbd_dev->watch_request = obj_request;
2936 3008
2937 return 0; 3009 return 0;
3010
3011out_linger:
3012 ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req);
3013out_put:
3014 rbd_obj_request_put(obj_request);
3015out_cancel:
3016 ceph_osdc_cancel_event(rbd_dev->watch_event);
3017 rbd_dev->watch_event = NULL;
3018
3019 return ret;
3020}
3021
3022/*
3023 * Tear down a watch request, synchronously.
3024 */
3025static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
3026{
3027 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3028 struct rbd_obj_request *obj_request;
3029 int ret;
3030
3031 rbd_assert(rbd_dev->watch_event);
3032 rbd_assert(rbd_dev->watch_request);
3033
3034 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
3035 OBJ_REQUEST_NODATA);
3036 if (!obj_request) {
3037 ret = -ENOMEM;
3038 goto out_cancel;
3039 }
3040
3041 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
3042 obj_request);
3043 if (!obj_request->osd_req) {
3044 ret = -ENOMEM;
3045 goto out_put;
2938 } 3046 }
2939 3047
3048 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
3049 rbd_dev->watch_event->cookie, 0, 0);
3050 rbd_osd_req_format_write(obj_request);
3051
3052 ret = rbd_obj_request_submit(osdc, obj_request);
3053 if (ret)
3054 goto out_put;
3055
3056 ret = rbd_obj_request_wait(obj_request);
3057 if (ret)
3058 goto out_put;
3059
3060 ret = obj_request->result;
3061 if (ret)
3062 goto out_put;
3063
2940 /* We have successfully torn down the watch request */ 3064 /* We have successfully torn down the watch request */
2941 3065
3066 ceph_osdc_unregister_linger_request(osdc,
3067 rbd_dev->watch_request->osd_req);
2942 rbd_obj_request_put(rbd_dev->watch_request); 3068 rbd_obj_request_put(rbd_dev->watch_request);
2943 rbd_dev->watch_request = NULL; 3069 rbd_dev->watch_request = NULL;
3070
3071out_put:
3072 rbd_obj_request_put(obj_request);
2944out_cancel: 3073out_cancel:
2945 /* Cancel the event if we're tearing down, or on error */
2946 ceph_osdc_cancel_event(rbd_dev->watch_event); 3074 ceph_osdc_cancel_event(rbd_dev->watch_event);
2947 rbd_dev->watch_event = NULL; 3075 rbd_dev->watch_event = NULL;
2948 if (obj_request)
2949 rbd_obj_request_put(obj_request);
2950 3076
2951 return ret; 3077 return ret;
2952} 3078}
2953 3079
2954static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
2955{
2956 return __rbd_dev_header_watch_sync(rbd_dev, true);
2957}
2958
2959static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) 3080static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
2960{ 3081{
2961 int ret; 3082 int ret;
2962 3083
2963 ret = __rbd_dev_header_watch_sync(rbd_dev, false); 3084 ret = __rbd_dev_header_unwatch_sync(rbd_dev);
2964 if (ret) { 3085 if (ret) {
2965 rbd_warn(rbd_dev, "unable to tear down watch request: %d\n", 3086 rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
2966 ret); 3087 ret);
@@ -3058,7 +3179,6 @@ static void rbd_request_fn(struct request_queue *q)
3058 __releases(q->queue_lock) __acquires(q->queue_lock) 3179 __releases(q->queue_lock) __acquires(q->queue_lock)
3059{ 3180{
3060 struct rbd_device *rbd_dev = q->queuedata; 3181 struct rbd_device *rbd_dev = q->queuedata;
3061 bool read_only = rbd_dev->mapping.read_only;
3062 struct request *rq; 3182 struct request *rq;
3063 int result; 3183 int result;
3064 3184
@@ -3094,7 +3214,7 @@ static void rbd_request_fn(struct request_queue *q)
3094 3214
3095 if (write_request) { 3215 if (write_request) {
3096 result = -EROFS; 3216 result = -EROFS;
3097 if (read_only) 3217 if (rbd_dev->mapping.read_only)
3098 goto end_request; 3218 goto end_request;
3099 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); 3219 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3100 } 3220 }
@@ -4683,6 +4803,38 @@ out_err:
4683} 4803}
4684 4804
4685/* 4805/*
4806 * Return pool id (>= 0) or a negative error code.
4807 */
4808static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
4809{
4810 u64 newest_epoch;
4811 unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
4812 int tries = 0;
4813 int ret;
4814
4815again:
4816 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
4817 if (ret == -ENOENT && tries++ < 1) {
4818 ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
4819 &newest_epoch);
4820 if (ret < 0)
4821 return ret;
4822
4823 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
4824 ceph_monc_request_next_osdmap(&rbdc->client->monc);
4825 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
4826 newest_epoch, timeout);
4827 goto again;
4828 } else {
4829 /* the osdmap we have is new enough */
4830 return -ENOENT;
4831 }
4832 }
4833
4834 return ret;
4835}
4836
4837/*
4686 * An rbd format 2 image has a unique identifier, distinct from the 4838 * An rbd format 2 image has a unique identifier, distinct from the
4687 * name given to it by the user. Internally, that identifier is 4839 * name given to it by the user. Internally, that identifier is
4688 * what's used to specify the names of objects related to the image. 4840 * what's used to specify the names of objects related to the image.
@@ -4752,7 +4904,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4752 4904
4753 image_id = ceph_extract_encoded_string(&p, p + ret, 4905 image_id = ceph_extract_encoded_string(&p, p + ret,
4754 NULL, GFP_NOIO); 4906 NULL, GFP_NOIO);
4755 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0; 4907 ret = PTR_ERR_OR_ZERO(image_id);
4756 if (!ret) 4908 if (!ret)
4757 rbd_dev->image_format = 2; 4909 rbd_dev->image_format = 2;
4758 } else { 4910 } else {
@@ -4907,6 +5059,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4907 if (ret) 5059 if (ret)
4908 goto err_out_disk; 5060 goto err_out_disk;
4909 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 5061 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5062 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
4910 5063
4911 ret = rbd_bus_add_dev(rbd_dev); 5064 ret = rbd_bus_add_dev(rbd_dev);
4912 if (ret) 5065 if (ret)
@@ -5053,7 +5206,6 @@ static ssize_t do_rbd_add(struct bus_type *bus,
5053 struct rbd_options *rbd_opts = NULL; 5206 struct rbd_options *rbd_opts = NULL;
5054 struct rbd_spec *spec = NULL; 5207 struct rbd_spec *spec = NULL;
5055 struct rbd_client *rbdc; 5208 struct rbd_client *rbdc;
5056 struct ceph_osd_client *osdc;
5057 bool read_only; 5209 bool read_only;
5058 int rc = -ENOMEM; 5210 int rc = -ENOMEM;
5059 5211
@@ -5075,8 +5227,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
5075 } 5227 }
5076 5228
5077 /* pick the pool */ 5229 /* pick the pool */
5078 osdc = &rbdc->client->osdc; 5230 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
5079 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
5080 if (rc < 0) 5231 if (rc < 0)
5081 goto err_out_client; 5232 goto err_out_client;
5082 spec->pool_id = (u64)rc; 5233 spec->pool_id = (u64)rc;
@@ -5387,6 +5538,7 @@ err_out_slab:
5387 5538
5388static void __exit rbd_exit(void) 5539static void __exit rbd_exit(void)
5389{ 5540{
5541 ida_destroy(&rbd_dev_id_ida);
5390 rbd_sysfs_cleanup(); 5542 rbd_sysfs_cleanup();
5391 if (single_major) 5543 if (single_major)
5392 unregister_blkdev(rbd_major, RBD_DRV_NAME); 5544 unregister_blkdev(rbd_major, RBD_DRV_NAME);
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 21887d63dad5..469f2e8657e8 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -104,12 +104,6 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
104 umode_t new_mode = inode->i_mode, old_mode = inode->i_mode; 104 umode_t new_mode = inode->i_mode, old_mode = inode->i_mode;
105 struct dentry *dentry; 105 struct dentry *dentry;
106 106
107 if (acl) {
108 ret = posix_acl_valid(acl);
109 if (ret < 0)
110 goto out;
111 }
112
113 switch (type) { 107 switch (type) {
114 case ACL_TYPE_ACCESS: 108 case ACL_TYPE_ACCESS:
115 name = POSIX_ACL_XATTR_ACCESS; 109 name = POSIX_ACL_XATTR_ACCESS;
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 4f3f69079f36..90b3954d48ed 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -211,18 +211,15 @@ static int readpage_nounlock(struct file *filp, struct page *page)
211 SetPageError(page); 211 SetPageError(page);
212 ceph_fscache_readpage_cancel(inode, page); 212 ceph_fscache_readpage_cancel(inode, page);
213 goto out; 213 goto out;
214 } else {
215 if (err < PAGE_CACHE_SIZE) {
216 /* zero fill remainder of page */
217 zero_user_segment(page, err, PAGE_CACHE_SIZE);
218 } else {
219 flush_dcache_page(page);
220 }
221 } 214 }
222 SetPageUptodate(page); 215 if (err < PAGE_CACHE_SIZE)
216 /* zero fill remainder of page */
217 zero_user_segment(page, err, PAGE_CACHE_SIZE);
218 else
219 flush_dcache_page(page);
223 220
224 if (err >= 0) 221 SetPageUptodate(page);
225 ceph_readpage_to_fscache(inode, page); 222 ceph_readpage_to_fscache(inode, page);
226 223
227out: 224out:
228 return err < 0 ? err : 0; 225 return err < 0 ? err : 0;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index c561b628ebce..1fde164b74b5 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -221,8 +221,8 @@ int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
221 return 0; 221 return 0;
222} 222}
223 223
224static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc, 224struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
225 struct ceph_cap_reservation *ctx) 225 struct ceph_cap_reservation *ctx)
226{ 226{
227 struct ceph_cap *cap = NULL; 227 struct ceph_cap *cap = NULL;
228 228
@@ -508,15 +508,14 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
508 * it is < 0. (This is so we can atomically add the cap and add an 508 * it is < 0. (This is so we can atomically add the cap and add an
509 * open file reference to it.) 509 * open file reference to it.)
510 */ 510 */
511int ceph_add_cap(struct inode *inode, 511void ceph_add_cap(struct inode *inode,
512 struct ceph_mds_session *session, u64 cap_id, 512 struct ceph_mds_session *session, u64 cap_id,
513 int fmode, unsigned issued, unsigned wanted, 513 int fmode, unsigned issued, unsigned wanted,
514 unsigned seq, unsigned mseq, u64 realmino, int flags, 514 unsigned seq, unsigned mseq, u64 realmino, int flags,
515 struct ceph_cap_reservation *caps_reservation) 515 struct ceph_cap **new_cap)
516{ 516{
517 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 517 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
518 struct ceph_inode_info *ci = ceph_inode(inode); 518 struct ceph_inode_info *ci = ceph_inode(inode);
519 struct ceph_cap *new_cap = NULL;
520 struct ceph_cap *cap; 519 struct ceph_cap *cap;
521 int mds = session->s_mds; 520 int mds = session->s_mds;
522 int actual_wanted; 521 int actual_wanted;
@@ -531,20 +530,10 @@ int ceph_add_cap(struct inode *inode,
531 if (fmode >= 0) 530 if (fmode >= 0)
532 wanted |= ceph_caps_for_mode(fmode); 531 wanted |= ceph_caps_for_mode(fmode);
533 532
534retry:
535 spin_lock(&ci->i_ceph_lock);
536 cap = __get_cap_for_mds(ci, mds); 533 cap = __get_cap_for_mds(ci, mds);
537 if (!cap) { 534 if (!cap) {
538 if (new_cap) { 535 cap = *new_cap;
539 cap = new_cap; 536 *new_cap = NULL;
540 new_cap = NULL;
541 } else {
542 spin_unlock(&ci->i_ceph_lock);
543 new_cap = get_cap(mdsc, caps_reservation);
544 if (new_cap == NULL)
545 return -ENOMEM;
546 goto retry;
547 }
548 537
549 cap->issued = 0; 538 cap->issued = 0;
550 cap->implemented = 0; 539 cap->implemented = 0;
@@ -562,9 +551,6 @@ retry:
562 session->s_nr_caps++; 551 session->s_nr_caps++;
563 spin_unlock(&session->s_cap_lock); 552 spin_unlock(&session->s_cap_lock);
564 } else { 553 } else {
565 if (new_cap)
566 ceph_put_cap(mdsc, new_cap);
567
568 /* 554 /*
569 * auth mds of the inode changed. we received the cap export 555 * auth mds of the inode changed. we received the cap export
570 * message, but still haven't received the cap import message. 556 * message, but still haven't received the cap import message.
@@ -626,7 +612,6 @@ retry:
626 ci->i_auth_cap = cap; 612 ci->i_auth_cap = cap;
627 cap->mds_wanted = wanted; 613 cap->mds_wanted = wanted;
628 } 614 }
629 ci->i_cap_exporting_issued = 0;
630 } else { 615 } else {
631 WARN_ON(ci->i_auth_cap == cap); 616 WARN_ON(ci->i_auth_cap == cap);
632 } 617 }
@@ -648,9 +633,6 @@ retry:
648 633
649 if (fmode >= 0) 634 if (fmode >= 0)
650 __ceph_get_fmode(ci, fmode); 635 __ceph_get_fmode(ci, fmode);
651 spin_unlock(&ci->i_ceph_lock);
652 wake_up_all(&ci->i_cap_wq);
653 return 0;
654} 636}
655 637
656/* 638/*
@@ -685,7 +667,7 @@ static int __cap_is_valid(struct ceph_cap *cap)
685 */ 667 */
686int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) 668int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
687{ 669{
688 int have = ci->i_snap_caps | ci->i_cap_exporting_issued; 670 int have = ci->i_snap_caps;
689 struct ceph_cap *cap; 671 struct ceph_cap *cap;
690 struct rb_node *p; 672 struct rb_node *p;
691 673
@@ -900,7 +882,7 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
900 */ 882 */
901static int __ceph_is_any_caps(struct ceph_inode_info *ci) 883static int __ceph_is_any_caps(struct ceph_inode_info *ci)
902{ 884{
903 return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued; 885 return !RB_EMPTY_ROOT(&ci->i_caps);
904} 886}
905 887
906int ceph_is_any_caps(struct inode *inode) 888int ceph_is_any_caps(struct inode *inode)
@@ -2397,32 +2379,30 @@ static void invalidate_aliases(struct inode *inode)
2397 * actually be a revocation if it specifies a smaller cap set.) 2379 * actually be a revocation if it specifies a smaller cap set.)
2398 * 2380 *
2399 * caller holds s_mutex and i_ceph_lock, we drop both. 2381 * caller holds s_mutex and i_ceph_lock, we drop both.
2400 *
2401 * return value:
2402 * 0 - ok
2403 * 1 - check_caps on auth cap only (writeback)
2404 * 2 - check_caps (ack revoke)
2405 */ 2382 */
2406static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, 2383static void handle_cap_grant(struct ceph_mds_client *mdsc,
2384 struct inode *inode, struct ceph_mds_caps *grant,
2385 void *snaptrace, int snaptrace_len,
2386 struct ceph_buffer *xattr_buf,
2407 struct ceph_mds_session *session, 2387 struct ceph_mds_session *session,
2408 struct ceph_cap *cap, 2388 struct ceph_cap *cap, int issued)
2409 struct ceph_buffer *xattr_buf) 2389 __releases(ci->i_ceph_lock)
2410 __releases(ci->i_ceph_lock)
2411{ 2390{
2412 struct ceph_inode_info *ci = ceph_inode(inode); 2391 struct ceph_inode_info *ci = ceph_inode(inode);
2413 int mds = session->s_mds; 2392 int mds = session->s_mds;
2414 int seq = le32_to_cpu(grant->seq); 2393 int seq = le32_to_cpu(grant->seq);
2415 int newcaps = le32_to_cpu(grant->caps); 2394 int newcaps = le32_to_cpu(grant->caps);
2416 int issued, implemented, used, wanted, dirty; 2395 int used, wanted, dirty;
2417 u64 size = le64_to_cpu(grant->size); 2396 u64 size = le64_to_cpu(grant->size);
2418 u64 max_size = le64_to_cpu(grant->max_size); 2397 u64 max_size = le64_to_cpu(grant->max_size);
2419 struct timespec mtime, atime, ctime; 2398 struct timespec mtime, atime, ctime;
2420 int check_caps = 0; 2399 int check_caps = 0;
2421 int wake = 0; 2400 bool wake = 0;
2422 int writeback = 0; 2401 bool writeback = 0;
2423 int queue_invalidate = 0; 2402 bool queue_trunc = 0;
2424 int deleted_inode = 0; 2403 bool queue_invalidate = 0;
2425 int queue_revalidate = 0; 2404 bool queue_revalidate = 0;
2405 bool deleted_inode = 0;
2426 2406
2427 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", 2407 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
2428 inode, cap, mds, seq, ceph_cap_string(newcaps)); 2408 inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2466,16 +2446,13 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2466 } 2446 }
2467 2447
2468 /* side effects now are allowed */ 2448 /* side effects now are allowed */
2469
2470 issued = __ceph_caps_issued(ci, &implemented);
2471 issued |= implemented | __ceph_caps_dirty(ci);
2472
2473 cap->cap_gen = session->s_cap_gen; 2449 cap->cap_gen = session->s_cap_gen;
2474 cap->seq = seq; 2450 cap->seq = seq;
2475 2451
2476 __check_cap_issue(ci, cap, newcaps); 2452 __check_cap_issue(ci, cap, newcaps);
2477 2453
2478 if ((issued & CEPH_CAP_AUTH_EXCL) == 0) { 2454 if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
2455 (issued & CEPH_CAP_AUTH_EXCL) == 0) {
2479 inode->i_mode = le32_to_cpu(grant->mode); 2456 inode->i_mode = le32_to_cpu(grant->mode);
2480 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid)); 2457 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
2481 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid)); 2458 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
@@ -2484,7 +2461,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2484 from_kgid(&init_user_ns, inode->i_gid)); 2461 from_kgid(&init_user_ns, inode->i_gid));
2485 } 2462 }
2486 2463
2487 if ((issued & CEPH_CAP_LINK_EXCL) == 0) { 2464 if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
2465 (issued & CEPH_CAP_LINK_EXCL) == 0) {
2488 set_nlink(inode, le32_to_cpu(grant->nlink)); 2466 set_nlink(inode, le32_to_cpu(grant->nlink));
2489 if (inode->i_nlink == 0 && 2467 if (inode->i_nlink == 0 &&
2490 (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) 2468 (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
@@ -2511,30 +2489,35 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2511 if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1) 2489 if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1)
2512 queue_revalidate = 1; 2490 queue_revalidate = 1;
2513 2491
2514 /* size/ctime/mtime/atime? */ 2492 if (newcaps & CEPH_CAP_ANY_RD) {
2515 ceph_fill_file_size(inode, issued, 2493 /* ctime/mtime/atime? */
2516 le32_to_cpu(grant->truncate_seq), 2494 ceph_decode_timespec(&mtime, &grant->mtime);
2517 le64_to_cpu(grant->truncate_size), size); 2495 ceph_decode_timespec(&atime, &grant->atime);
2518 ceph_decode_timespec(&mtime, &grant->mtime); 2496 ceph_decode_timespec(&ctime, &grant->ctime);
2519 ceph_decode_timespec(&atime, &grant->atime); 2497 ceph_fill_file_time(inode, issued,
2520 ceph_decode_timespec(&ctime, &grant->ctime); 2498 le32_to_cpu(grant->time_warp_seq),
2521 ceph_fill_file_time(inode, issued, 2499 &ctime, &mtime, &atime);
2522 le32_to_cpu(grant->time_warp_seq), &ctime, &mtime, 2500 }
2523 &atime); 2501
2524 2502 if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
2525 2503 /* file layout may have changed */
2526 /* file layout may have changed */ 2504 ci->i_layout = grant->layout;
2527 ci->i_layout = grant->layout; 2505 /* size/truncate_seq? */
2528 2506 queue_trunc = ceph_fill_file_size(inode, issued,
2529 /* max size increase? */ 2507 le32_to_cpu(grant->truncate_seq),
2530 if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { 2508 le64_to_cpu(grant->truncate_size),
2531 dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); 2509 size);
2532 ci->i_max_size = max_size; 2510 /* max size increase? */
2533 if (max_size >= ci->i_wanted_max_size) { 2511 if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
2534 ci->i_wanted_max_size = 0; /* reset */ 2512 dout("max_size %lld -> %llu\n",
2535 ci->i_requested_max_size = 0; 2513 ci->i_max_size, max_size);
2514 ci->i_max_size = max_size;
2515 if (max_size >= ci->i_wanted_max_size) {
2516 ci->i_wanted_max_size = 0; /* reset */
2517 ci->i_requested_max_size = 0;
2518 }
2519 wake = 1;
2536 } 2520 }
2537 wake = 1;
2538 } 2521 }
2539 2522
2540 /* check cap bits */ 2523 /* check cap bits */
@@ -2595,6 +2578,23 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2595 2578
2596 spin_unlock(&ci->i_ceph_lock); 2579 spin_unlock(&ci->i_ceph_lock);
2597 2580
2581 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
2582 down_write(&mdsc->snap_rwsem);
2583 ceph_update_snap_trace(mdsc, snaptrace,
2584 snaptrace + snaptrace_len, false);
2585 downgrade_write(&mdsc->snap_rwsem);
2586 kick_flushing_inode_caps(mdsc, session, inode);
2587 up_read(&mdsc->snap_rwsem);
2588 if (newcaps & ~issued)
2589 wake = 1;
2590 }
2591
2592 if (queue_trunc) {
2593 ceph_queue_vmtruncate(inode);
2594 ceph_queue_revalidate(inode);
2595 } else if (queue_revalidate)
2596 ceph_queue_revalidate(inode);
2597
2598 if (writeback) 2598 if (writeback)
2599 /* 2599 /*
2600 * queue inode for writeback: we can't actually call 2600 * queue inode for writeback: we can't actually call
@@ -2606,8 +2606,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2606 ceph_queue_invalidate(inode); 2606 ceph_queue_invalidate(inode);
2607 if (deleted_inode) 2607 if (deleted_inode)
2608 invalidate_aliases(inode); 2608 invalidate_aliases(inode);
2609 if (queue_revalidate)
2610 ceph_queue_revalidate(inode);
2611 if (wake) 2609 if (wake)
2612 wake_up_all(&ci->i_cap_wq); 2610 wake_up_all(&ci->i_cap_wq);
2613 2611
@@ -2784,7 +2782,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2784{ 2782{
2785 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 2783 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
2786 struct ceph_mds_session *tsession = NULL; 2784 struct ceph_mds_session *tsession = NULL;
2787 struct ceph_cap *cap, *tcap; 2785 struct ceph_cap *cap, *tcap, *new_cap = NULL;
2788 struct ceph_inode_info *ci = ceph_inode(inode); 2786 struct ceph_inode_info *ci = ceph_inode(inode);
2789 u64 t_cap_id; 2787 u64 t_cap_id;
2790 unsigned mseq = le32_to_cpu(ex->migrate_seq); 2788 unsigned mseq = le32_to_cpu(ex->migrate_seq);
@@ -2807,7 +2805,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2807retry: 2805retry:
2808 spin_lock(&ci->i_ceph_lock); 2806 spin_lock(&ci->i_ceph_lock);
2809 cap = __get_cap_for_mds(ci, mds); 2807 cap = __get_cap_for_mds(ci, mds);
2810 if (!cap) 2808 if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id))
2811 goto out_unlock; 2809 goto out_unlock;
2812 2810
2813 if (target < 0) { 2811 if (target < 0) {
@@ -2846,15 +2844,14 @@ retry:
2846 } 2844 }
2847 __ceph_remove_cap(cap, false); 2845 __ceph_remove_cap(cap, false);
2848 goto out_unlock; 2846 goto out_unlock;
2849 } 2847 } else if (tsession) {
2850
2851 if (tsession) {
2852 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
2853 spin_unlock(&ci->i_ceph_lock);
2854 /* add placeholder for the export tagert */ 2848 /* add placeholder for the export tagert */
2849 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
2855 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, 2850 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
2856 t_seq - 1, t_mseq, (u64)-1, flag, NULL); 2851 t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
2857 goto retry; 2852
2853 __ceph_remove_cap(cap, false);
2854 goto out_unlock;
2858 } 2855 }
2859 2856
2860 spin_unlock(&ci->i_ceph_lock); 2857 spin_unlock(&ci->i_ceph_lock);
@@ -2873,6 +2870,7 @@ retry:
2873 SINGLE_DEPTH_NESTING); 2870 SINGLE_DEPTH_NESTING);
2874 } 2871 }
2875 ceph_add_cap_releases(mdsc, tsession); 2872 ceph_add_cap_releases(mdsc, tsession);
2873 new_cap = ceph_get_cap(mdsc, NULL);
2876 } else { 2874 } else {
2877 WARN_ON(1); 2875 WARN_ON(1);
2878 tsession = NULL; 2876 tsession = NULL;
@@ -2887,24 +2885,27 @@ out_unlock:
2887 mutex_unlock(&tsession->s_mutex); 2885 mutex_unlock(&tsession->s_mutex);
2888 ceph_put_mds_session(tsession); 2886 ceph_put_mds_session(tsession);
2889 } 2887 }
2888 if (new_cap)
2889 ceph_put_cap(mdsc, new_cap);
2890} 2890}
2891 2891
2892/* 2892/*
2893 * Handle cap IMPORT. If there are temp bits from an older EXPORT, 2893 * Handle cap IMPORT.
2894 * clean them up.
2895 * 2894 *
2896 * caller holds s_mutex. 2895 * caller holds s_mutex. acquires i_ceph_lock
2897 */ 2896 */
2898static void handle_cap_import(struct ceph_mds_client *mdsc, 2897static void handle_cap_import(struct ceph_mds_client *mdsc,
2899 struct inode *inode, struct ceph_mds_caps *im, 2898 struct inode *inode, struct ceph_mds_caps *im,
2900 struct ceph_mds_cap_peer *ph, 2899 struct ceph_mds_cap_peer *ph,
2901 struct ceph_mds_session *session, 2900 struct ceph_mds_session *session,
2902 void *snaptrace, int snaptrace_len) 2901 struct ceph_cap **target_cap, int *old_issued)
2902 __acquires(ci->i_ceph_lock)
2903{ 2903{
2904 struct ceph_inode_info *ci = ceph_inode(inode); 2904 struct ceph_inode_info *ci = ceph_inode(inode);
2905 struct ceph_cap *cap; 2905 struct ceph_cap *cap, *ocap, *new_cap = NULL;
2906 int mds = session->s_mds; 2906 int mds = session->s_mds;
2907 unsigned issued = le32_to_cpu(im->caps); 2907 int issued;
2908 unsigned caps = le32_to_cpu(im->caps);
2908 unsigned wanted = le32_to_cpu(im->wanted); 2909 unsigned wanted = le32_to_cpu(im->wanted);
2909 unsigned seq = le32_to_cpu(im->seq); 2910 unsigned seq = le32_to_cpu(im->seq);
2910 unsigned mseq = le32_to_cpu(im->migrate_seq); 2911 unsigned mseq = le32_to_cpu(im->migrate_seq);
@@ -2924,40 +2925,52 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
2924 dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", 2925 dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
2925 inode, ci, mds, mseq, peer); 2926 inode, ci, mds, mseq, peer);
2926 2927
2928retry:
2927 spin_lock(&ci->i_ceph_lock); 2929 spin_lock(&ci->i_ceph_lock);
2928 cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; 2930 cap = __get_cap_for_mds(ci, mds);
2929 if (cap && cap->cap_id == p_cap_id) { 2931 if (!cap) {
2932 if (!new_cap) {
2933 spin_unlock(&ci->i_ceph_lock);
2934 new_cap = ceph_get_cap(mdsc, NULL);
2935 goto retry;
2936 }
2937 cap = new_cap;
2938 } else {
2939 if (new_cap) {
2940 ceph_put_cap(mdsc, new_cap);
2941 new_cap = NULL;
2942 }
2943 }
2944
2945 __ceph_caps_issued(ci, &issued);
2946 issued |= __ceph_caps_dirty(ci);
2947
2948 ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq,
2949 realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
2950
2951 ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
2952 if (ocap && ocap->cap_id == p_cap_id) {
2930 dout(" remove export cap %p mds%d flags %d\n", 2953 dout(" remove export cap %p mds%d flags %d\n",
2931 cap, peer, ph->flags); 2954 ocap, peer, ph->flags);
2932 if ((ph->flags & CEPH_CAP_FLAG_AUTH) && 2955 if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
2933 (cap->seq != le32_to_cpu(ph->seq) || 2956 (ocap->seq != le32_to_cpu(ph->seq) ||
2934 cap->mseq != le32_to_cpu(ph->mseq))) { 2957 ocap->mseq != le32_to_cpu(ph->mseq))) {
2935 pr_err("handle_cap_import: mismatched seq/mseq: " 2958 pr_err("handle_cap_import: mismatched seq/mseq: "
2936 "ino (%llx.%llx) mds%d seq %d mseq %d " 2959 "ino (%llx.%llx) mds%d seq %d mseq %d "
2937 "importer mds%d has peer seq %d mseq %d\n", 2960 "importer mds%d has peer seq %d mseq %d\n",
2938 ceph_vinop(inode), peer, cap->seq, 2961 ceph_vinop(inode), peer, ocap->seq,
2939 cap->mseq, mds, le32_to_cpu(ph->seq), 2962 ocap->mseq, mds, le32_to_cpu(ph->seq),
2940 le32_to_cpu(ph->mseq)); 2963 le32_to_cpu(ph->mseq));
2941 } 2964 }
2942 ci->i_cap_exporting_issued = cap->issued; 2965 __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
2943 __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
2944 } 2966 }
2945 2967
2946 /* make sure we re-request max_size, if necessary */ 2968 /* make sure we re-request max_size, if necessary */
2947 ci->i_wanted_max_size = 0; 2969 ci->i_wanted_max_size = 0;
2948 ci->i_requested_max_size = 0; 2970 ci->i_requested_max_size = 0;
2949 spin_unlock(&ci->i_ceph_lock);
2950
2951 down_write(&mdsc->snap_rwsem);
2952 ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
2953 false);
2954 downgrade_write(&mdsc->snap_rwsem);
2955 ceph_add_cap(inode, session, cap_id, -1,
2956 issued, wanted, seq, mseq, realmino, CEPH_CAP_FLAG_AUTH,
2957 NULL /* no caps context */);
2958 kick_flushing_inode_caps(mdsc, session, inode);
2959 up_read(&mdsc->snap_rwsem);
2960 2971
2972 *old_issued = issued;
2973 *target_cap = cap;
2961} 2974}
2962 2975
2963/* 2976/*
@@ -2977,7 +2990,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2977 struct ceph_mds_caps *h; 2990 struct ceph_mds_caps *h;
2978 struct ceph_mds_cap_peer *peer = NULL; 2991 struct ceph_mds_cap_peer *peer = NULL;
2979 int mds = session->s_mds; 2992 int mds = session->s_mds;
2980 int op; 2993 int op, issued;
2981 u32 seq, mseq; 2994 u32 seq, mseq;
2982 struct ceph_vino vino; 2995 struct ceph_vino vino;
2983 u64 cap_id; 2996 u64 cap_id;
@@ -3069,7 +3082,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3069 3082
3070 case CEPH_CAP_OP_IMPORT: 3083 case CEPH_CAP_OP_IMPORT:
3071 handle_cap_import(mdsc, inode, h, peer, session, 3084 handle_cap_import(mdsc, inode, h, peer, session,
3072 snaptrace, snaptrace_len); 3085 &cap, &issued);
3086 handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len,
3087 msg->middle, session, cap, issued);
3088 goto done_unlocked;
3073 } 3089 }
3074 3090
3075 /* the rest require a cap */ 3091 /* the rest require a cap */
@@ -3086,8 +3102,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3086 switch (op) { 3102 switch (op) {
3087 case CEPH_CAP_OP_REVOKE: 3103 case CEPH_CAP_OP_REVOKE:
3088 case CEPH_CAP_OP_GRANT: 3104 case CEPH_CAP_OP_GRANT:
3089 case CEPH_CAP_OP_IMPORT: 3105 __ceph_caps_issued(ci, &issued);
3090 handle_cap_grant(inode, h, session, cap, msg->middle); 3106 issued |= __ceph_caps_dirty(ci);
3107 handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle,
3108 session, cap, issued);
3091 goto done_unlocked; 3109 goto done_unlocked;
3092 3110
3093 case CEPH_CAP_OP_FLUSH_ACK: 3111 case CEPH_CAP_OP_FLUSH_ACK:
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 00d6af6a32ec..8d7d782f4382 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -169,7 +169,7 @@ static struct dentry *__get_parent(struct super_block *sb,
169 return dentry; 169 return dentry;
170} 170}
171 171
172struct dentry *ceph_get_parent(struct dentry *child) 172static struct dentry *ceph_get_parent(struct dentry *child)
173{ 173{
174 /* don't re-export snaps */ 174 /* don't re-export snaps */
175 if (ceph_snap(child->d_inode) != CEPH_NOSNAP) 175 if (ceph_snap(child->d_inode) != CEPH_NOSNAP)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e4fff9ff1c27..04c89c266cec 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -10,6 +10,7 @@
10#include <linux/writeback.h> 10#include <linux/writeback.h>
11#include <linux/vmalloc.h> 11#include <linux/vmalloc.h>
12#include <linux/posix_acl.h> 12#include <linux/posix_acl.h>
13#include <linux/random.h>
13 14
14#include "super.h" 15#include "super.h"
15#include "mds_client.h" 16#include "mds_client.h"
@@ -179,9 +180,8 @@ struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, u32 f)
179 * specified, copy the frag delegation info to the caller if 180 * specified, copy the frag delegation info to the caller if
180 * it is present. 181 * it is present.
181 */ 182 */
182u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v, 183static u32 __ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
183 struct ceph_inode_frag *pfrag, 184 struct ceph_inode_frag *pfrag, int *found)
184 int *found)
185{ 185{
186 u32 t = ceph_frag_make(0, 0); 186 u32 t = ceph_frag_make(0, 0);
187 struct ceph_inode_frag *frag; 187 struct ceph_inode_frag *frag;
@@ -191,7 +191,6 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
191 if (found) 191 if (found)
192 *found = 0; 192 *found = 0;
193 193
194 mutex_lock(&ci->i_fragtree_mutex);
195 while (1) { 194 while (1) {
196 WARN_ON(!ceph_frag_contains_value(t, v)); 195 WARN_ON(!ceph_frag_contains_value(t, v));
197 frag = __ceph_find_frag(ci, t); 196 frag = __ceph_find_frag(ci, t);
@@ -220,10 +219,19 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
220 } 219 }
221 dout("choose_frag(%x) = %x\n", v, t); 220 dout("choose_frag(%x) = %x\n", v, t);
222 221
223 mutex_unlock(&ci->i_fragtree_mutex);
224 return t; 222 return t;
225} 223}
226 224
225u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
226 struct ceph_inode_frag *pfrag, int *found)
227{
228 u32 ret;
229 mutex_lock(&ci->i_fragtree_mutex);
230 ret = __ceph_choose_frag(ci, v, pfrag, found);
231 mutex_unlock(&ci->i_fragtree_mutex);
232 return ret;
233}
234
227/* 235/*
228 * Process dirfrag (delegation) info from the mds. Include leaf 236 * Process dirfrag (delegation) info from the mds. Include leaf
229 * fragment in tree ONLY if ndist > 0. Otherwise, only 237 * fragment in tree ONLY if ndist > 0. Otherwise, only
@@ -237,11 +245,17 @@ static int ceph_fill_dirfrag(struct inode *inode,
237 u32 id = le32_to_cpu(dirinfo->frag); 245 u32 id = le32_to_cpu(dirinfo->frag);
238 int mds = le32_to_cpu(dirinfo->auth); 246 int mds = le32_to_cpu(dirinfo->auth);
239 int ndist = le32_to_cpu(dirinfo->ndist); 247 int ndist = le32_to_cpu(dirinfo->ndist);
248 int diri_auth = -1;
240 int i; 249 int i;
241 int err = 0; 250 int err = 0;
242 251
252 spin_lock(&ci->i_ceph_lock);
253 if (ci->i_auth_cap)
254 diri_auth = ci->i_auth_cap->mds;
255 spin_unlock(&ci->i_ceph_lock);
256
243 mutex_lock(&ci->i_fragtree_mutex); 257 mutex_lock(&ci->i_fragtree_mutex);
244 if (ndist == 0) { 258 if (ndist == 0 && mds == diri_auth) {
245 /* no delegation info needed. */ 259 /* no delegation info needed. */
246 frag = __ceph_find_frag(ci, id); 260 frag = __ceph_find_frag(ci, id);
247 if (!frag) 261 if (!frag)
@@ -286,6 +300,75 @@ out:
286 return err; 300 return err;
287} 301}
288 302
303static int ceph_fill_fragtree(struct inode *inode,
304 struct ceph_frag_tree_head *fragtree,
305 struct ceph_mds_reply_dirfrag *dirinfo)
306{
307 struct ceph_inode_info *ci = ceph_inode(inode);
308 struct ceph_inode_frag *frag;
309 struct rb_node *rb_node;
310 int i;
311 u32 id, nsplits;
312 bool update = false;
313
314 mutex_lock(&ci->i_fragtree_mutex);
315 nsplits = le32_to_cpu(fragtree->nsplits);
316 if (nsplits) {
317 i = prandom_u32() % nsplits;
318 id = le32_to_cpu(fragtree->splits[i].frag);
319 if (!__ceph_find_frag(ci, id))
320 update = true;
321 } else if (!RB_EMPTY_ROOT(&ci->i_fragtree)) {
322 rb_node = rb_first(&ci->i_fragtree);
323 frag = rb_entry(rb_node, struct ceph_inode_frag, node);
324 if (frag->frag != ceph_frag_make(0, 0) || rb_next(rb_node))
325 update = true;
326 }
327 if (!update && dirinfo) {
328 id = le32_to_cpu(dirinfo->frag);
329 if (id != __ceph_choose_frag(ci, id, NULL, NULL))
330 update = true;
331 }
332 if (!update)
333 goto out_unlock;
334
335 dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
336 rb_node = rb_first(&ci->i_fragtree);
337 for (i = 0; i < nsplits; i++) {
338 id = le32_to_cpu(fragtree->splits[i].frag);
339 frag = NULL;
340 while (rb_node) {
341 frag = rb_entry(rb_node, struct ceph_inode_frag, node);
342 if (ceph_frag_compare(frag->frag, id) >= 0) {
343 if (frag->frag != id)
344 frag = NULL;
345 else
346 rb_node = rb_next(rb_node);
347 break;
348 }
349 rb_node = rb_next(rb_node);
350 rb_erase(&frag->node, &ci->i_fragtree);
351 kfree(frag);
352 frag = NULL;
353 }
354 if (!frag) {
355 frag = __get_or_create_frag(ci, id);
356 if (IS_ERR(frag))
357 continue;
358 }
359 frag->split_by = le32_to_cpu(fragtree->splits[i].by);
360 dout(" frag %x split by %d\n", frag->frag, frag->split_by);
361 }
362 while (rb_node) {
363 frag = rb_entry(rb_node, struct ceph_inode_frag, node);
364 rb_node = rb_next(rb_node);
365 rb_erase(&frag->node, &ci->i_fragtree);
366 kfree(frag);
367 }
368out_unlock:
369 mutex_unlock(&ci->i_fragtree_mutex);
370 return 0;
371}
289 372
290/* 373/*
291 * initialize a newly allocated inode. 374 * initialize a newly allocated inode.
@@ -341,7 +424,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
341 INIT_LIST_HEAD(&ci->i_cap_snaps); 424 INIT_LIST_HEAD(&ci->i_cap_snaps);
342 ci->i_head_snapc = NULL; 425 ci->i_head_snapc = NULL;
343 ci->i_snap_caps = 0; 426 ci->i_snap_caps = 0;
344 ci->i_cap_exporting_issued = 0;
345 427
346 for (i = 0; i < CEPH_FILE_MODE_NUM; i++) 428 for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
347 ci->i_nr_by_mode[i] = 0; 429 ci->i_nr_by_mode[i] = 0;
@@ -407,7 +489,7 @@ void ceph_destroy_inode(struct inode *inode)
407 489
408 /* 490 /*
409 * we may still have a snap_realm reference if there are stray 491 * we may still have a snap_realm reference if there are stray
410 * caps in i_cap_exporting_issued or i_snap_caps. 492 * caps in i_snap_caps.
411 */ 493 */
412 if (ci->i_snap_realm) { 494 if (ci->i_snap_realm) {
413 struct ceph_mds_client *mdsc = 495 struct ceph_mds_client *mdsc =
@@ -582,22 +664,26 @@ static int fill_inode(struct inode *inode,
582 unsigned long ttl_from, int cap_fmode, 664 unsigned long ttl_from, int cap_fmode,
583 struct ceph_cap_reservation *caps_reservation) 665 struct ceph_cap_reservation *caps_reservation)
584{ 666{
667 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
585 struct ceph_mds_reply_inode *info = iinfo->in; 668 struct ceph_mds_reply_inode *info = iinfo->in;
586 struct ceph_inode_info *ci = ceph_inode(inode); 669 struct ceph_inode_info *ci = ceph_inode(inode);
587 int i; 670 int issued = 0, implemented, new_issued;
588 int issued = 0, implemented;
589 struct timespec mtime, atime, ctime; 671 struct timespec mtime, atime, ctime;
590 u32 nsplits;
591 struct ceph_inode_frag *frag;
592 struct rb_node *rb_node;
593 struct ceph_buffer *xattr_blob = NULL; 672 struct ceph_buffer *xattr_blob = NULL;
673 struct ceph_cap *new_cap = NULL;
594 int err = 0; 674 int err = 0;
595 int queue_trunc = 0; 675 bool wake = false;
676 bool queue_trunc = false;
677 bool new_version = false;
596 678
597 dout("fill_inode %p ino %llx.%llx v %llu had %llu\n", 679 dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
598 inode, ceph_vinop(inode), le64_to_cpu(info->version), 680 inode, ceph_vinop(inode), le64_to_cpu(info->version),
599 ci->i_version); 681 ci->i_version);
600 682
683 /* prealloc new cap struct */
684 if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP)
685 new_cap = ceph_get_cap(mdsc, caps_reservation);
686
601 /* 687 /*
602 * prealloc xattr data, if it looks like we'll need it. only 688 * prealloc xattr data, if it looks like we'll need it. only
603 * if len > 4 (meaning there are actually xattrs; the first 4 689 * if len > 4 (meaning there are actually xattrs; the first 4
@@ -623,19 +709,23 @@ static int fill_inode(struct inode *inode,
623 * 3 2 skip 709 * 3 2 skip
624 * 3 3 update 710 * 3 3 update
625 */ 711 */
626 if (le64_to_cpu(info->version) > 0 && 712 if (ci->i_version == 0 ||
627 (ci->i_version & ~1) >= le64_to_cpu(info->version)) 713 ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
628 goto no_change; 714 le64_to_cpu(info->version) > (ci->i_version & ~1)))
629 715 new_version = true;
716
630 issued = __ceph_caps_issued(ci, &implemented); 717 issued = __ceph_caps_issued(ci, &implemented);
631 issued |= implemented | __ceph_caps_dirty(ci); 718 issued |= implemented | __ceph_caps_dirty(ci);
719 new_issued = ~issued & le32_to_cpu(info->cap.caps);
632 720
633 /* update inode */ 721 /* update inode */
634 ci->i_version = le64_to_cpu(info->version); 722 ci->i_version = le64_to_cpu(info->version);
635 inode->i_version++; 723 inode->i_version++;
636 inode->i_rdev = le32_to_cpu(info->rdev); 724 inode->i_rdev = le32_to_cpu(info->rdev);
725 inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
637 726
638 if ((issued & CEPH_CAP_AUTH_EXCL) == 0) { 727 if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
728 (issued & CEPH_CAP_AUTH_EXCL) == 0) {
639 inode->i_mode = le32_to_cpu(info->mode); 729 inode->i_mode = le32_to_cpu(info->mode);
640 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid)); 730 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid));
641 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid)); 731 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid));
@@ -644,23 +734,35 @@ static int fill_inode(struct inode *inode,
644 from_kgid(&init_user_ns, inode->i_gid)); 734 from_kgid(&init_user_ns, inode->i_gid));
645 } 735 }
646 736
647 if ((issued & CEPH_CAP_LINK_EXCL) == 0) 737 if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
738 (issued & CEPH_CAP_LINK_EXCL) == 0)
648 set_nlink(inode, le32_to_cpu(info->nlink)); 739 set_nlink(inode, le32_to_cpu(info->nlink));
649 740
650 /* be careful with mtime, atime, size */ 741 if (new_version || (new_issued & CEPH_CAP_ANY_RD)) {
651 ceph_decode_timespec(&atime, &info->atime); 742 /* be careful with mtime, atime, size */
652 ceph_decode_timespec(&mtime, &info->mtime); 743 ceph_decode_timespec(&atime, &info->atime);
653 ceph_decode_timespec(&ctime, &info->ctime); 744 ceph_decode_timespec(&mtime, &info->mtime);
654 queue_trunc = ceph_fill_file_size(inode, issued, 745 ceph_decode_timespec(&ctime, &info->ctime);
655 le32_to_cpu(info->truncate_seq), 746 ceph_fill_file_time(inode, issued,
656 le64_to_cpu(info->truncate_size), 747 le32_to_cpu(info->time_warp_seq),
657 le64_to_cpu(info->size)); 748 &ctime, &mtime, &atime);
658 ceph_fill_file_time(inode, issued, 749 }
659 le32_to_cpu(info->time_warp_seq), 750
660 &ctime, &mtime, &atime); 751 if (new_version ||
661 752 (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
662 ci->i_layout = info->layout; 753 ci->i_layout = info->layout;
663 inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1; 754 queue_trunc = ceph_fill_file_size(inode, issued,
755 le32_to_cpu(info->truncate_seq),
756 le64_to_cpu(info->truncate_size),
757 le64_to_cpu(info->size));
758 /* only update max_size on auth cap */
759 if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
760 ci->i_max_size != le64_to_cpu(info->max_size)) {
761 dout("max_size %lld -> %llu\n", ci->i_max_size,
762 le64_to_cpu(info->max_size));
763 ci->i_max_size = le64_to_cpu(info->max_size);
764 }
765 }
664 766
665 /* xattrs */ 767 /* xattrs */
666 /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */ 768 /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
@@ -745,58 +847,6 @@ static int fill_inode(struct inode *inode,
745 dout(" marking %p complete (empty)\n", inode); 847 dout(" marking %p complete (empty)\n", inode);
746 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count)); 848 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
747 } 849 }
748no_change:
749 /* only update max_size on auth cap */
750 if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
751 ci->i_max_size != le64_to_cpu(info->max_size)) {
752 dout("max_size %lld -> %llu\n", ci->i_max_size,
753 le64_to_cpu(info->max_size));
754 ci->i_max_size = le64_to_cpu(info->max_size);
755 }
756
757 spin_unlock(&ci->i_ceph_lock);
758
759 /* queue truncate if we saw i_size decrease */
760 if (queue_trunc)
761 ceph_queue_vmtruncate(inode);
762
763 /* populate frag tree */
764 /* FIXME: move me up, if/when version reflects fragtree changes */
765 nsplits = le32_to_cpu(info->fragtree.nsplits);
766 mutex_lock(&ci->i_fragtree_mutex);
767 rb_node = rb_first(&ci->i_fragtree);
768 for (i = 0; i < nsplits; i++) {
769 u32 id = le32_to_cpu(info->fragtree.splits[i].frag);
770 frag = NULL;
771 while (rb_node) {
772 frag = rb_entry(rb_node, struct ceph_inode_frag, node);
773 if (ceph_frag_compare(frag->frag, id) >= 0) {
774 if (frag->frag != id)
775 frag = NULL;
776 else
777 rb_node = rb_next(rb_node);
778 break;
779 }
780 rb_node = rb_next(rb_node);
781 rb_erase(&frag->node, &ci->i_fragtree);
782 kfree(frag);
783 frag = NULL;
784 }
785 if (!frag) {
786 frag = __get_or_create_frag(ci, id);
787 if (IS_ERR(frag))
788 continue;
789 }
790 frag->split_by = le32_to_cpu(info->fragtree.splits[i].by);
791 dout(" frag %x split by %d\n", frag->frag, frag->split_by);
792 }
793 while (rb_node) {
794 frag = rb_entry(rb_node, struct ceph_inode_frag, node);
795 rb_node = rb_next(rb_node);
796 rb_erase(&frag->node, &ci->i_fragtree);
797 kfree(frag);
798 }
799 mutex_unlock(&ci->i_fragtree_mutex);
800 850
801 /* were we issued a capability? */ 851 /* were we issued a capability? */
802 if (info->cap.caps) { 852 if (info->cap.caps) {
@@ -809,30 +859,41 @@ no_change:
809 le32_to_cpu(info->cap.seq), 859 le32_to_cpu(info->cap.seq),
810 le32_to_cpu(info->cap.mseq), 860 le32_to_cpu(info->cap.mseq),
811 le64_to_cpu(info->cap.realm), 861 le64_to_cpu(info->cap.realm),
812 info->cap.flags, 862 info->cap.flags, &new_cap);
813 caps_reservation); 863 wake = true;
814 } else { 864 } else {
815 spin_lock(&ci->i_ceph_lock);
816 dout(" %p got snap_caps %s\n", inode, 865 dout(" %p got snap_caps %s\n", inode,
817 ceph_cap_string(le32_to_cpu(info->cap.caps))); 866 ceph_cap_string(le32_to_cpu(info->cap.caps)));
818 ci->i_snap_caps |= le32_to_cpu(info->cap.caps); 867 ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
819 if (cap_fmode >= 0) 868 if (cap_fmode >= 0)
820 __ceph_get_fmode(ci, cap_fmode); 869 __ceph_get_fmode(ci, cap_fmode);
821 spin_unlock(&ci->i_ceph_lock);
822 } 870 }
823 } else if (cap_fmode >= 0) { 871 } else if (cap_fmode >= 0) {
824 pr_warn("mds issued no caps on %llx.%llx\n", 872 pr_warn("mds issued no caps on %llx.%llx\n",
825 ceph_vinop(inode)); 873 ceph_vinop(inode));
826 __ceph_get_fmode(ci, cap_fmode); 874 __ceph_get_fmode(ci, cap_fmode);
827 } 875 }
876 spin_unlock(&ci->i_ceph_lock);
877
878 if (wake)
879 wake_up_all(&ci->i_cap_wq);
880
881 /* queue truncate if we saw i_size decrease */
882 if (queue_trunc)
883 ceph_queue_vmtruncate(inode);
884
885 /* populate frag tree */
886 if (S_ISDIR(inode->i_mode))
887 ceph_fill_fragtree(inode, &info->fragtree, dirinfo);
828 888
829 /* update delegation info? */ 889 /* update delegation info? */
830 if (dirinfo) 890 if (dirinfo)
831 ceph_fill_dirfrag(inode, dirinfo); 891 ceph_fill_dirfrag(inode, dirinfo);
832 892
833 err = 0; 893 err = 0;
834
835out: 894out:
895 if (new_cap)
896 ceph_put_cap(mdsc, new_cap);
836 if (xattr_blob) 897 if (xattr_blob)
837 ceph_buffer_put(xattr_blob); 898 ceph_buffer_put(xattr_blob);
838 return err; 899 return err;
@@ -1485,7 +1546,7 @@ static void ceph_invalidate_work(struct work_struct *work)
1485 orig_gen = ci->i_rdcache_gen; 1546 orig_gen = ci->i_rdcache_gen;
1486 spin_unlock(&ci->i_ceph_lock); 1547 spin_unlock(&ci->i_ceph_lock);
1487 1548
1488 truncate_inode_pages(inode->i_mapping, 0); 1549 truncate_pagecache(inode, 0);
1489 1550
1490 spin_lock(&ci->i_ceph_lock); 1551 spin_lock(&ci->i_ceph_lock);
1491 if (orig_gen == ci->i_rdcache_gen && 1552 if (orig_gen == ci->i_rdcache_gen &&
@@ -1588,7 +1649,7 @@ retry:
1588 ci->i_truncate_pending, to); 1649 ci->i_truncate_pending, to);
1589 spin_unlock(&ci->i_ceph_lock); 1650 spin_unlock(&ci->i_ceph_lock);
1590 1651
1591 truncate_inode_pages(inode->i_mapping, to); 1652 truncate_pagecache(inode, to);
1592 1653
1593 spin_lock(&ci->i_ceph_lock); 1654 spin_lock(&ci->i_ceph_lock);
1594 if (to == ci->i_truncate_size) { 1655 if (to == ci->i_truncate_size) {
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 9a33b98cb000..92a2548278fc 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1558,6 +1558,8 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1558 init_completion(&req->r_safe_completion); 1558 init_completion(&req->r_safe_completion);
1559 INIT_LIST_HEAD(&req->r_unsafe_item); 1559 INIT_LIST_HEAD(&req->r_unsafe_item);
1560 1560
1561 req->r_stamp = CURRENT_TIME;
1562
1561 req->r_op = op; 1563 req->r_op = op;
1562 req->r_direct_mode = mode; 1564 req->r_direct_mode = mode;
1563 return req; 1565 return req;
@@ -1783,7 +1785,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1783 } 1785 }
1784 1786
1785 len = sizeof(*head) + 1787 len = sizeof(*head) +
1786 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)); 1788 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
1789 sizeof(struct timespec);
1787 1790
1788 /* calculate (max) length for cap releases */ 1791 /* calculate (max) length for cap releases */
1789 len += sizeof(struct ceph_mds_request_release) * 1792 len += sizeof(struct ceph_mds_request_release) *
@@ -1800,6 +1803,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1800 goto out_free2; 1803 goto out_free2;
1801 } 1804 }
1802 1805
1806 msg->hdr.version = 2;
1803 msg->hdr.tid = cpu_to_le64(req->r_tid); 1807 msg->hdr.tid = cpu_to_le64(req->r_tid);
1804 1808
1805 head = msg->front.iov_base; 1809 head = msg->front.iov_base;
@@ -1836,6 +1840,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1836 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 1840 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1837 head->num_releases = cpu_to_le16(releases); 1841 head->num_releases = cpu_to_le16(releases);
1838 1842
1843 /* time stamp */
1844 ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
1845
1839 BUG_ON(p > end); 1846 BUG_ON(p > end);
1840 msg->front.iov_len = p - msg->front.iov_base; 1847 msg->front.iov_len = p - msg->front.iov_base;
1841 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1848 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index e90cfccf93bd..e00737cf523c 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -194,6 +194,7 @@ struct ceph_mds_request {
194 int r_fmode; /* file mode, if expecting cap */ 194 int r_fmode; /* file mode, if expecting cap */
195 kuid_t r_uid; 195 kuid_t r_uid;
196 kgid_t r_gid; 196 kgid_t r_gid;
197 struct timespec r_stamp;
197 198
198 /* for choosing which mds to send this request to */ 199 /* for choosing which mds to send this request to */
199 int r_direct_mode; 200 int r_direct_mode;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index ead05cc1f447..12b20744e386 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -292,7 +292,6 @@ struct ceph_inode_info {
292 struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or 292 struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or
293 dirty|flushing caps */ 293 dirty|flushing caps */
294 unsigned i_snap_caps; /* cap bits for snapped files */ 294 unsigned i_snap_caps; /* cap bits for snapped files */
295 unsigned i_cap_exporting_issued;
296 295
297 int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ 296 int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */
298 297
@@ -775,11 +774,13 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode)
775extern const char *ceph_cap_string(int c); 774extern const char *ceph_cap_string(int c);
776extern void ceph_handle_caps(struct ceph_mds_session *session, 775extern void ceph_handle_caps(struct ceph_mds_session *session,
777 struct ceph_msg *msg); 776 struct ceph_msg *msg);
778extern int ceph_add_cap(struct inode *inode, 777extern struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
779 struct ceph_mds_session *session, u64 cap_id, 778 struct ceph_cap_reservation *ctx);
780 int fmode, unsigned issued, unsigned wanted, 779extern void ceph_add_cap(struct inode *inode,
781 unsigned cap, unsigned seq, u64 realmino, int flags, 780 struct ceph_mds_session *session, u64 cap_id,
782 struct ceph_cap_reservation *caps_reservation); 781 int fmode, unsigned issued, unsigned wanted,
782 unsigned cap, unsigned seq, u64 realmino, int flags,
783 struct ceph_cap **new_cap);
783extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); 784extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
784extern void ceph_put_cap(struct ceph_mds_client *mdsc, 785extern void ceph_put_cap(struct ceph_mds_client *mdsc,
785 struct ceph_cap *cap); 786 struct ceph_cap *cap);
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 5f6db18d72e8..3c97d5e9b951 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -625,6 +625,8 @@ int ceph_flags_to_mode(int flags);
625 CEPH_CAP_LINK_EXCL | \ 625 CEPH_CAP_LINK_EXCL | \
626 CEPH_CAP_XATTR_EXCL | \ 626 CEPH_CAP_XATTR_EXCL | \
627 CEPH_CAP_FILE_EXCL) 627 CEPH_CAP_FILE_EXCL)
628#define CEPH_CAP_ANY_FILE_RD (CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE | \
629 CEPH_CAP_FILE_SHARED)
628#define CEPH_CAP_ANY_FILE_WR (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER | \ 630#define CEPH_CAP_ANY_FILE_WR (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER | \
629 CEPH_CAP_FILE_EXCL) 631 CEPH_CAP_FILE_EXCL)
630#define CEPH_CAP_ANY_WR (CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_FILE_WR) 632#define CEPH_CAP_ANY_WR (CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_FILE_WR)
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index a486f390dfbe..deb47e45ac7c 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -40,9 +40,9 @@ struct ceph_mon_request {
40}; 40};
41 41
42/* 42/*
43 * ceph_mon_generic_request is being used for the statfs and poolop requests 43 * ceph_mon_generic_request is being used for the statfs, poolop and
44 * which are bening done a bit differently because we need to get data back 44 * mon_get_version requests which are being done a bit differently
45 * to the caller 45 * because we need to get data back to the caller
46 */ 46 */
47struct ceph_mon_generic_request { 47struct ceph_mon_generic_request {
48 struct kref kref; 48 struct kref kref;
@@ -104,10 +104,15 @@ extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have);
104extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have); 104extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
105 105
106extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc); 106extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
107extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
108 unsigned long timeout);
107 109
108extern int ceph_monc_do_statfs(struct ceph_mon_client *monc, 110extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
109 struct ceph_statfs *buf); 111 struct ceph_statfs *buf);
110 112
113extern int ceph_monc_do_get_version(struct ceph_mon_client *monc,
114 const char *what, u64 *newest);
115
111extern int ceph_monc_open_session(struct ceph_mon_client *monc); 116extern int ceph_monc_open_session(struct ceph_mon_client *monc);
112 117
113extern int ceph_monc_validate_auth(struct ceph_mon_client *monc); 118extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 67d7721d237e..1675021d8c12 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -72,6 +72,8 @@ const char *ceph_msg_type_name(int type)
72 case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack"; 72 case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack";
73 case CEPH_MSG_STATFS: return "statfs"; 73 case CEPH_MSG_STATFS: return "statfs";
74 case CEPH_MSG_STATFS_REPLY: return "statfs_reply"; 74 case CEPH_MSG_STATFS_REPLY: return "statfs_reply";
75 case CEPH_MSG_MON_GET_VERSION: return "mon_get_version";
76 case CEPH_MSG_MON_GET_VERSION_REPLY: return "mon_get_version_reply";
75 case CEPH_MSG_MDS_MAP: return "mds_map"; 77 case CEPH_MSG_MDS_MAP: return "mds_map";
76 case CEPH_MSG_CLIENT_SESSION: return "client_session"; 78 case CEPH_MSG_CLIENT_SESSION: return "client_session";
77 case CEPH_MSG_CLIENT_RECONNECT: return "client_reconnect"; 79 case CEPH_MSG_CLIENT_RECONNECT: return "client_reconnect";
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 10421a4b76f8..d1a62c69a9f4 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -126,9 +126,13 @@ static int monc_show(struct seq_file *s, void *p)
126 req = rb_entry(rp, struct ceph_mon_generic_request, node); 126 req = rb_entry(rp, struct ceph_mon_generic_request, node);
127 op = le16_to_cpu(req->request->hdr.type); 127 op = le16_to_cpu(req->request->hdr.type);
128 if (op == CEPH_MSG_STATFS) 128 if (op == CEPH_MSG_STATFS)
129 seq_printf(s, "%lld statfs\n", req->tid); 129 seq_printf(s, "%llu statfs\n", req->tid);
130 else if (op == CEPH_MSG_POOLOP)
131 seq_printf(s, "%llu poolop\n", req->tid);
132 else if (op == CEPH_MSG_MON_GET_VERSION)
133 seq_printf(s, "%llu mon_get_version", req->tid);
130 else 134 else
131 seq_printf(s, "%lld unknown\n", req->tid); 135 seq_printf(s, "%llu unknown\n", req->tid);
132 } 136 }
133 137
134 mutex_unlock(&monc->mutex); 138 mutex_unlock(&monc->mutex);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 2ac9ef35110b..067d3af2eaf6 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -296,6 +296,33 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
296 __send_subscribe(monc); 296 __send_subscribe(monc);
297 mutex_unlock(&monc->mutex); 297 mutex_unlock(&monc->mutex);
298} 298}
299EXPORT_SYMBOL(ceph_monc_request_next_osdmap);
300
301int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
302 unsigned long timeout)
303{
304 unsigned long started = jiffies;
305 int ret;
306
307 mutex_lock(&monc->mutex);
308 while (monc->have_osdmap < epoch) {
309 mutex_unlock(&monc->mutex);
310
311 if (timeout != 0 && time_after_eq(jiffies, started + timeout))
312 return -ETIMEDOUT;
313
314 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
315 monc->have_osdmap >= epoch, timeout);
316 if (ret < 0)
317 return ret;
318
319 mutex_lock(&monc->mutex);
320 }
321
322 mutex_unlock(&monc->mutex);
323 return 0;
324}
325EXPORT_SYMBOL(ceph_monc_wait_osdmap);
299 326
300/* 327/*
301 * 328 *
@@ -477,14 +504,13 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
477 return m; 504 return m;
478} 505}
479 506
480static int do_generic_request(struct ceph_mon_client *monc, 507static int __do_generic_request(struct ceph_mon_client *monc, u64 tid,
481 struct ceph_mon_generic_request *req) 508 struct ceph_mon_generic_request *req)
482{ 509{
483 int err; 510 int err;
484 511
485 /* register request */ 512 /* register request */
486 mutex_lock(&monc->mutex); 513 req->tid = tid != 0 ? tid : ++monc->last_tid;
487 req->tid = ++monc->last_tid;
488 req->request->hdr.tid = cpu_to_le64(req->tid); 514 req->request->hdr.tid = cpu_to_le64(req->tid);
489 __insert_generic_request(monc, req); 515 __insert_generic_request(monc, req);
490 monc->num_generic_requests++; 516 monc->num_generic_requests++;
@@ -496,13 +522,24 @@ static int do_generic_request(struct ceph_mon_client *monc,
496 mutex_lock(&monc->mutex); 522 mutex_lock(&monc->mutex);
497 rb_erase(&req->node, &monc->generic_request_tree); 523 rb_erase(&req->node, &monc->generic_request_tree);
498 monc->num_generic_requests--; 524 monc->num_generic_requests--;
499 mutex_unlock(&monc->mutex);
500 525
501 if (!err) 526 if (!err)
502 err = req->result; 527 err = req->result;
503 return err; 528 return err;
504} 529}
505 530
531static int do_generic_request(struct ceph_mon_client *monc,
532 struct ceph_mon_generic_request *req)
533{
534 int err;
535
536 mutex_lock(&monc->mutex);
537 err = __do_generic_request(monc, 0, req);
538 mutex_unlock(&monc->mutex);
539
540 return err;
541}
542
506/* 543/*
507 * statfs 544 * statfs
508 */ 545 */
@@ -579,6 +616,96 @@ out:
579} 616}
580EXPORT_SYMBOL(ceph_monc_do_statfs); 617EXPORT_SYMBOL(ceph_monc_do_statfs);
581 618
619static void handle_get_version_reply(struct ceph_mon_client *monc,
620 struct ceph_msg *msg)
621{
622 struct ceph_mon_generic_request *req;
623 u64 tid = le64_to_cpu(msg->hdr.tid);
624 void *p = msg->front.iov_base;
625 void *end = p + msg->front_alloc_len;
626 u64 handle;
627
628 dout("%s %p tid %llu\n", __func__, msg, tid);
629
630 ceph_decode_need(&p, end, 2*sizeof(u64), bad);
631 handle = ceph_decode_64(&p);
632 if (tid != 0 && tid != handle)
633 goto bad;
634
635 mutex_lock(&monc->mutex);
636 req = __lookup_generic_req(monc, handle);
637 if (req) {
638 *(u64 *)req->buf = ceph_decode_64(&p);
639 req->result = 0;
640 get_generic_request(req);
641 }
642 mutex_unlock(&monc->mutex);
643 if (req) {
644 complete_all(&req->completion);
645 put_generic_request(req);
646 }
647
648 return;
649bad:
650 pr_err("corrupt mon_get_version reply\n");
651 ceph_msg_dump(msg);
652}
653
654/*
655 * Send MMonGetVersion and wait for the reply.
656 *
657 * @what: one of "mdsmap", "osdmap" or "monmap"
658 */
659int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
660 u64 *newest)
661{
662 struct ceph_mon_generic_request *req;
663 void *p, *end;
664 u64 tid;
665 int err;
666
667 req = kzalloc(sizeof(*req), GFP_NOFS);
668 if (!req)
669 return -ENOMEM;
670
671 kref_init(&req->kref);
672 req->buf = newest;
673 req->buf_len = sizeof(*newest);
674 init_completion(&req->completion);
675
676 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
677 sizeof(u64) + sizeof(u32) + strlen(what),
678 GFP_NOFS, true);
679 if (!req->request) {
680 err = -ENOMEM;
681 goto out;
682 }
683
684 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024,
685 GFP_NOFS, true);
686 if (!req->reply) {
687 err = -ENOMEM;
688 goto out;
689 }
690
691 p = req->request->front.iov_base;
692 end = p + req->request->front_alloc_len;
693
694 /* fill out request */
695 mutex_lock(&monc->mutex);
696 tid = ++monc->last_tid;
697 ceph_encode_64(&p, tid); /* handle */
698 ceph_encode_string(&p, end, what, strlen(what));
699
700 err = __do_generic_request(monc, tid, req);
701
702 mutex_unlock(&monc->mutex);
703out:
704 kref_put(&req->kref, release_generic_request);
705 return err;
706}
707EXPORT_SYMBOL(ceph_monc_do_get_version);
708
582/* 709/*
583 * pool ops 710 * pool ops
584 */ 711 */
@@ -981,6 +1108,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
981 handle_statfs_reply(monc, msg); 1108 handle_statfs_reply(monc, msg);
982 break; 1109 break;
983 1110
1111 case CEPH_MSG_MON_GET_VERSION_REPLY:
1112 handle_get_version_reply(monc, msg);
1113 break;
1114
984 case CEPH_MSG_POOLOP_REPLY: 1115 case CEPH_MSG_POOLOP_REPLY:
985 handle_poolop_reply(monc, msg); 1116 handle_poolop_reply(monc, msg);
986 break; 1117 break;
@@ -1029,6 +1160,15 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1029 case CEPH_MSG_AUTH_REPLY: 1160 case CEPH_MSG_AUTH_REPLY:
1030 m = ceph_msg_get(monc->m_auth_reply); 1161 m = ceph_msg_get(monc->m_auth_reply);
1031 break; 1162 break;
1163 case CEPH_MSG_MON_GET_VERSION_REPLY:
1164 if (le64_to_cpu(hdr->tid) != 0)
1165 return get_generic_reply(con, hdr, skip);
1166
1167 /*
1168 * Older OSDs don't set reply tid even if the orignal
1169 * request had a non-zero tid. Workaround this weirdness
1170 * by falling through to the allocate case.
1171 */
1032 case CEPH_MSG_MON_MAP: 1172 case CEPH_MSG_MON_MAP:
1033 case CEPH_MSG_MDS_MAP: 1173 case CEPH_MSG_MDS_MAP:
1034 case CEPH_MSG_OSD_MAP: 1174 case CEPH_MSG_OSD_MAP: