aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/acl.c14
-rw-r--r--fs/ceph/addr.c19
-rw-r--r--fs/ceph/caps.c127
-rw-r--r--fs/ceph/dir.c33
-rw-r--r--fs/ceph/file.c37
-rw-r--r--fs/ceph/inode.c41
-rw-r--r--fs/ceph/mds_client.c127
-rw-r--r--fs/ceph/mds_client.h2
-rw-r--r--fs/ceph/snap.c54
-rw-r--r--fs/ceph/super.c4
-rw-r--r--fs/ceph/super.h5
11 files changed, 296 insertions, 167 deletions
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 5bd853ba44ff..64fa248343f6 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -40,20 +40,6 @@ static inline void ceph_set_cached_acl(struct inode *inode,
40 spin_unlock(&ci->i_ceph_lock); 40 spin_unlock(&ci->i_ceph_lock);
41} 41}
42 42
43static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode,
44 int type)
45{
46 struct ceph_inode_info *ci = ceph_inode(inode);
47 struct posix_acl *acl = ACL_NOT_CACHED;
48
49 spin_lock(&ci->i_ceph_lock);
50 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
51 acl = get_cached_acl(inode, type);
52 spin_unlock(&ci->i_ceph_lock);
53
54 return acl;
55}
56
57struct posix_acl *ceph_get_acl(struct inode *inode, int type) 43struct posix_acl *ceph_get_acl(struct inode *inode, int type)
58{ 44{
59 int size; 45 int size;
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 24be059fd1f8..fd5599d32362 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -196,17 +196,22 @@ static int readpage_nounlock(struct file *filp, struct page *page)
196 u64 len = PAGE_CACHE_SIZE; 196 u64 len = PAGE_CACHE_SIZE;
197 197
198 if (off >= i_size_read(inode)) { 198 if (off >= i_size_read(inode)) {
199 zero_user_segment(page, err, PAGE_CACHE_SIZE); 199 zero_user_segment(page, 0, PAGE_CACHE_SIZE);
200 SetPageUptodate(page); 200 SetPageUptodate(page);
201 return 0; 201 return 0;
202 } 202 }
203 203
204 /* 204 if (ci->i_inline_version != CEPH_INLINE_NONE) {
205 * Uptodate inline data should have been added into page cache 205 /*
206 * while getting Fcr caps. 206 * Uptodate inline data should have been added
207 */ 207 * into page cache while getting Fcr caps.
208 if (ci->i_inline_version != CEPH_INLINE_NONE) 208 */
209 return -EINVAL; 209 if (off == 0)
210 return -EINVAL;
211 zero_user_segment(page, 0, PAGE_CACHE_SIZE);
212 SetPageUptodate(page);
213 return 0;
214 }
210 215
211 err = ceph_readpage_from_fscache(inode, page); 216 err = ceph_readpage_from_fscache(inode, page);
212 if (err == 0) 217 if (err == 0)
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index b93c631c6c87..8172775428a0 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -577,7 +577,6 @@ void ceph_add_cap(struct inode *inode,
577 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, 577 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
578 realmino); 578 realmino);
579 if (realm) { 579 if (realm) {
580 ceph_get_snap_realm(mdsc, realm);
581 spin_lock(&realm->inodes_with_caps_lock); 580 spin_lock(&realm->inodes_with_caps_lock);
582 ci->i_snap_realm = realm; 581 ci->i_snap_realm = realm;
583 list_add(&ci->i_snap_realm_item, 582 list_add(&ci->i_snap_realm_item,
@@ -1451,8 +1450,8 @@ static int __mark_caps_flushing(struct inode *inode,
1451 spin_lock(&mdsc->cap_dirty_lock); 1450 spin_lock(&mdsc->cap_dirty_lock);
1452 list_del_init(&ci->i_dirty_item); 1451 list_del_init(&ci->i_dirty_item);
1453 1452
1454 ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
1455 if (list_empty(&ci->i_flushing_item)) { 1453 if (list_empty(&ci->i_flushing_item)) {
1454 ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
1456 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); 1455 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
1457 mdsc->num_cap_flushing++; 1456 mdsc->num_cap_flushing++;
1458 dout(" inode %p now flushing seq %lld\n", inode, 1457 dout(" inode %p now flushing seq %lld\n", inode,
@@ -2073,17 +2072,16 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
2073 * requested from the MDS. 2072 * requested from the MDS.
2074 */ 2073 */
2075static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, 2074static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2076 loff_t endoff, int *got, struct page **pinned_page, 2075 loff_t endoff, int *got, int *check_max, int *err)
2077 int *check_max, int *err)
2078{ 2076{
2079 struct inode *inode = &ci->vfs_inode; 2077 struct inode *inode = &ci->vfs_inode;
2080 int ret = 0; 2078 int ret = 0;
2081 int have, implemented, _got = 0; 2079 int have, implemented;
2082 int file_wanted; 2080 int file_wanted;
2083 2081
2084 dout("get_cap_refs %p need %s want %s\n", inode, 2082 dout("get_cap_refs %p need %s want %s\n", inode,
2085 ceph_cap_string(need), ceph_cap_string(want)); 2083 ceph_cap_string(need), ceph_cap_string(want));
2086again: 2084
2087 spin_lock(&ci->i_ceph_lock); 2085 spin_lock(&ci->i_ceph_lock);
2088 2086
2089 /* make sure file is actually open */ 2087 /* make sure file is actually open */
@@ -2138,50 +2136,34 @@ again:
2138 inode, ceph_cap_string(have), ceph_cap_string(not), 2136 inode, ceph_cap_string(have), ceph_cap_string(not),
2139 ceph_cap_string(revoking)); 2137 ceph_cap_string(revoking));
2140 if ((revoking & not) == 0) { 2138 if ((revoking & not) == 0) {
2141 _got = need | (have & want); 2139 *got = need | (have & want);
2142 __take_cap_refs(ci, _got); 2140 __take_cap_refs(ci, *got);
2143 ret = 1; 2141 ret = 1;
2144 } 2142 }
2145 } else { 2143 } else {
2144 int session_readonly = false;
2145 if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) {
2146 struct ceph_mds_session *s = ci->i_auth_cap->session;
2147 spin_lock(&s->s_cap_lock);
2148 session_readonly = s->s_readonly;
2149 spin_unlock(&s->s_cap_lock);
2150 }
2151 if (session_readonly) {
2152 dout("get_cap_refs %p needed %s but mds%d readonly\n",
2153 inode, ceph_cap_string(need), ci->i_auth_cap->mds);
2154 *err = -EROFS;
2155 ret = 1;
2156 goto out_unlock;
2157 }
2158
2146 dout("get_cap_refs %p have %s needed %s\n", inode, 2159 dout("get_cap_refs %p have %s needed %s\n", inode,
2147 ceph_cap_string(have), ceph_cap_string(need)); 2160 ceph_cap_string(have), ceph_cap_string(need));
2148 } 2161 }
2149out_unlock: 2162out_unlock:
2150 spin_unlock(&ci->i_ceph_lock); 2163 spin_unlock(&ci->i_ceph_lock);
2151 2164
2152 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2153 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
2154 i_size_read(inode) > 0) {
2155 int ret1;
2156 struct page *page = find_get_page(inode->i_mapping, 0);
2157 if (page) {
2158 if (PageUptodate(page)) {
2159 *pinned_page = page;
2160 goto out;
2161 }
2162 page_cache_release(page);
2163 }
2164 /*
2165 * drop cap refs first because getattr while holding
2166 * caps refs can cause deadlock.
2167 */
2168 ceph_put_cap_refs(ci, _got);
2169 _got = 0;
2170
2171 /* getattr request will bring inline data into page cache */
2172 ret1 = __ceph_do_getattr(inode, NULL,
2173 CEPH_STAT_CAP_INLINE_DATA, true);
2174 if (ret1 >= 0) {
2175 ret = 0;
2176 goto again;
2177 }
2178 *err = ret1;
2179 ret = 1;
2180 }
2181out:
2182 dout("get_cap_refs %p ret %d got %s\n", inode, 2165 dout("get_cap_refs %p ret %d got %s\n", inode,
2183 ret, ceph_cap_string(_got)); 2166 ret, ceph_cap_string(*got));
2184 *got = _got;
2185 return ret; 2167 return ret;
2186} 2168}
2187 2169
@@ -2221,22 +2203,52 @@ static void check_max_size(struct inode *inode, loff_t endoff)
2221int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, 2203int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
2222 loff_t endoff, int *got, struct page **pinned_page) 2204 loff_t endoff, int *got, struct page **pinned_page)
2223{ 2205{
2224 int check_max, ret, err; 2206 int _got, check_max, ret, err = 0;
2225 2207
2226retry: 2208retry:
2227 if (endoff > 0) 2209 if (endoff > 0)
2228 check_max_size(&ci->vfs_inode, endoff); 2210 check_max_size(&ci->vfs_inode, endoff);
2211 _got = 0;
2229 check_max = 0; 2212 check_max = 0;
2230 err = 0;
2231 ret = wait_event_interruptible(ci->i_cap_wq, 2213 ret = wait_event_interruptible(ci->i_cap_wq,
2232 try_get_cap_refs(ci, need, want, endoff, 2214 try_get_cap_refs(ci, need, want, endoff,
2233 got, pinned_page, 2215 &_got, &check_max, &err));
2234 &check_max, &err));
2235 if (err) 2216 if (err)
2236 ret = err; 2217 ret = err;
2218 if (ret < 0)
2219 return ret;
2220
2237 if (check_max) 2221 if (check_max)
2238 goto retry; 2222 goto retry;
2239 return ret; 2223
2224 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2225 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
2226 i_size_read(&ci->vfs_inode) > 0) {
2227 struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0);
2228 if (page) {
2229 if (PageUptodate(page)) {
2230 *pinned_page = page;
2231 goto out;
2232 }
2233 page_cache_release(page);
2234 }
2235 /*
2236 * drop cap refs first because getattr while holding
2237 * caps refs can cause deadlock.
2238 */
2239 ceph_put_cap_refs(ci, _got);
2240 _got = 0;
2241
2242 /* getattr request will bring inline data into page cache */
2243 ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
2244 CEPH_STAT_CAP_INLINE_DATA, true);
2245 if (ret < 0)
2246 return ret;
2247 goto retry;
2248 }
2249out:
2250 *got = _got;
2251 return 0;
2240} 2252}
2241 2253
2242/* 2254/*
@@ -2432,13 +2444,13 @@ static void invalidate_aliases(struct inode *inode)
2432 */ 2444 */
2433static void handle_cap_grant(struct ceph_mds_client *mdsc, 2445static void handle_cap_grant(struct ceph_mds_client *mdsc,
2434 struct inode *inode, struct ceph_mds_caps *grant, 2446 struct inode *inode, struct ceph_mds_caps *grant,
2435 void *snaptrace, int snaptrace_len,
2436 u64 inline_version, 2447 u64 inline_version,
2437 void *inline_data, int inline_len, 2448 void *inline_data, int inline_len,
2438 struct ceph_buffer *xattr_buf, 2449 struct ceph_buffer *xattr_buf,
2439 struct ceph_mds_session *session, 2450 struct ceph_mds_session *session,
2440 struct ceph_cap *cap, int issued) 2451 struct ceph_cap *cap, int issued)
2441 __releases(ci->i_ceph_lock) 2452 __releases(ci->i_ceph_lock)
2453 __releases(mdsc->snap_rwsem)
2442{ 2454{
2443 struct ceph_inode_info *ci = ceph_inode(inode); 2455 struct ceph_inode_info *ci = ceph_inode(inode);
2444 int mds = session->s_mds; 2456 int mds = session->s_mds;
@@ -2639,10 +2651,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2639 spin_unlock(&ci->i_ceph_lock); 2651 spin_unlock(&ci->i_ceph_lock);
2640 2652
2641 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 2653 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
2642 down_write(&mdsc->snap_rwsem);
2643 ceph_update_snap_trace(mdsc, snaptrace,
2644 snaptrace + snaptrace_len, false);
2645 downgrade_write(&mdsc->snap_rwsem);
2646 kick_flushing_inode_caps(mdsc, session, inode); 2654 kick_flushing_inode_caps(mdsc, session, inode);
2647 up_read(&mdsc->snap_rwsem); 2655 up_read(&mdsc->snap_rwsem);
2648 if (newcaps & ~issued) 2656 if (newcaps & ~issued)
@@ -3052,6 +3060,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3052 struct ceph_cap *cap; 3060 struct ceph_cap *cap;
3053 struct ceph_mds_caps *h; 3061 struct ceph_mds_caps *h;
3054 struct ceph_mds_cap_peer *peer = NULL; 3062 struct ceph_mds_cap_peer *peer = NULL;
3063 struct ceph_snap_realm *realm;
3055 int mds = session->s_mds; 3064 int mds = session->s_mds;
3056 int op, issued; 3065 int op, issued;
3057 u32 seq, mseq; 3066 u32 seq, mseq;
@@ -3153,11 +3162,23 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3153 goto done_unlocked; 3162 goto done_unlocked;
3154 3163
3155 case CEPH_CAP_OP_IMPORT: 3164 case CEPH_CAP_OP_IMPORT:
3165 realm = NULL;
3166 if (snaptrace_len) {
3167 down_write(&mdsc->snap_rwsem);
3168 ceph_update_snap_trace(mdsc, snaptrace,
3169 snaptrace + snaptrace_len,
3170 false, &realm);
3171 downgrade_write(&mdsc->snap_rwsem);
3172 } else {
3173 down_read(&mdsc->snap_rwsem);
3174 }
3156 handle_cap_import(mdsc, inode, h, peer, session, 3175 handle_cap_import(mdsc, inode, h, peer, session,
3157 &cap, &issued); 3176 &cap, &issued);
3158 handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, 3177 handle_cap_grant(mdsc, inode, h,
3159 inline_version, inline_data, inline_len, 3178 inline_version, inline_data, inline_len,
3160 msg->middle, session, cap, issued); 3179 msg->middle, session, cap, issued);
3180 if (realm)
3181 ceph_put_snap_realm(mdsc, realm);
3161 goto done_unlocked; 3182 goto done_unlocked;
3162 } 3183 }
3163 3184
@@ -3177,7 +3198,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3177 case CEPH_CAP_OP_GRANT: 3198 case CEPH_CAP_OP_GRANT:
3178 __ceph_caps_issued(ci, &issued); 3199 __ceph_caps_issued(ci, &issued);
3179 issued |= __ceph_caps_dirty(ci); 3200 issued |= __ceph_caps_dirty(ci);
3180 handle_cap_grant(mdsc, inode, h, NULL, 0, 3201 handle_cap_grant(mdsc, inode, h,
3181 inline_version, inline_data, inline_len, 3202 inline_version, inline_data, inline_len,
3182 msg->middle, session, cap, issued); 3203 msg->middle, session, cap, issued);
3183 goto done_unlocked; 3204 goto done_unlocked;
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index c241603764fd..0411dbb15815 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -26,8 +26,6 @@
26 * point by name. 26 * point by name.
27 */ 27 */
28 28
29const struct inode_operations ceph_dir_iops;
30const struct file_operations ceph_dir_fops;
31const struct dentry_operations ceph_dentry_ops; 29const struct dentry_operations ceph_dentry_ops;
32 30
33/* 31/*
@@ -672,13 +670,17 @@ int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
672 /* 670 /*
673 * We created the item, then did a lookup, and found 671 * We created the item, then did a lookup, and found
674 * it was already linked to another inode we already 672 * it was already linked to another inode we already
675 * had in our cache (and thus got spliced). Link our 673 * had in our cache (and thus got spliced). To not
676 * dentry to that inode, but don't hash it, just in 674 * confuse VFS (especially when inode is a directory),
677 * case the VFS wants to dereference it. 675 * we don't link our dentry to that inode, return an
676 * error instead.
677 *
678 * This event should be rare and it happens only when
679 * we talk to old MDS. Recent MDS does not send traceless
680 * reply for request that creates new inode.
678 */ 681 */
679 BUG_ON(!result->d_inode); 682 d_drop(result);
680 d_instantiate(dentry, result->d_inode); 683 return -ESTALE;
681 return 0;
682 } 684 }
683 return PTR_ERR(result); 685 return PTR_ERR(result);
684} 686}
@@ -1335,6 +1337,13 @@ const struct file_operations ceph_dir_fops = {
1335 .fsync = ceph_dir_fsync, 1337 .fsync = ceph_dir_fsync,
1336}; 1338};
1337 1339
1340const struct file_operations ceph_snapdir_fops = {
1341 .iterate = ceph_readdir,
1342 .llseek = ceph_dir_llseek,
1343 .open = ceph_open,
1344 .release = ceph_release,
1345};
1346
1338const struct inode_operations ceph_dir_iops = { 1347const struct inode_operations ceph_dir_iops = {
1339 .lookup = ceph_lookup, 1348 .lookup = ceph_lookup,
1340 .permission = ceph_permission, 1349 .permission = ceph_permission,
@@ -1357,6 +1366,14 @@ const struct inode_operations ceph_dir_iops = {
1357 .atomic_open = ceph_atomic_open, 1366 .atomic_open = ceph_atomic_open,
1358}; 1367};
1359 1368
1369const struct inode_operations ceph_snapdir_iops = {
1370 .lookup = ceph_lookup,
1371 .permission = ceph_permission,
1372 .getattr = ceph_getattr,
1373 .mkdir = ceph_mkdir,
1374 .rmdir = ceph_unlink,
1375};
1376
1360const struct dentry_operations ceph_dentry_ops = { 1377const struct dentry_operations ceph_dentry_ops = {
1361 .d_revalidate = ceph_d_revalidate, 1378 .d_revalidate = ceph_d_revalidate,
1362 .d_release = ceph_d_release, 1379 .d_release = ceph_d_release,
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 905986dd4c3c..a3d774b35149 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -275,10 +275,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
275 err = ceph_mdsc_do_request(mdsc, 275 err = ceph_mdsc_do_request(mdsc,
276 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, 276 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
277 req); 277 req);
278 err = ceph_handle_snapdir(req, dentry, err);
278 if (err) 279 if (err)
279 goto out_req; 280 goto out_req;
280 281
281 err = ceph_handle_snapdir(req, dentry, err);
282 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) 282 if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
283 err = ceph_handle_notrace_create(dir, dentry); 283 err = ceph_handle_notrace_create(dir, dentry);
284 284
@@ -392,13 +392,14 @@ more:
392 if (ret >= 0) { 392 if (ret >= 0) {
393 int didpages; 393 int didpages;
394 if (was_short && (pos + ret < inode->i_size)) { 394 if (was_short && (pos + ret < inode->i_size)) {
395 u64 tmp = min(this_len - ret, 395 int zlen = min(this_len - ret,
396 inode->i_size - pos - ret); 396 inode->i_size - pos - ret);
397 int zoff = (o_direct ? buf_align : io_align) +
398 read + ret;
397 dout(" zero gap %llu to %llu\n", 399 dout(" zero gap %llu to %llu\n",
398 pos + ret, pos + ret + tmp); 400 pos + ret, pos + ret + zlen);
399 ceph_zero_page_vector_range(page_align + read + ret, 401 ceph_zero_page_vector_range(zoff, zlen, pages);
400 tmp, pages); 402 ret += zlen;
401 ret += tmp;
402 } 403 }
403 404
404 didpages = (page_align + ret) >> PAGE_CACHE_SHIFT; 405 didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
@@ -878,28 +879,34 @@ again:
878 879
879 i_size = i_size_read(inode); 880 i_size = i_size_read(inode);
880 if (retry_op == READ_INLINE) { 881 if (retry_op == READ_INLINE) {
881 /* does not support inline data > PAGE_SIZE */ 882 BUG_ON(ret > 0 || read > 0);
882 if (i_size > PAGE_CACHE_SIZE) { 883 if (iocb->ki_pos < i_size &&
883 ret = -EIO; 884 iocb->ki_pos < PAGE_CACHE_SIZE) {
884 } else if (iocb->ki_pos < i_size) {
885 loff_t end = min_t(loff_t, i_size, 885 loff_t end = min_t(loff_t, i_size,
886 iocb->ki_pos + len); 886 iocb->ki_pos + len);
887 end = min_t(loff_t, end, PAGE_CACHE_SIZE);
887 if (statret < end) 888 if (statret < end)
888 zero_user_segment(page, statret, end); 889 zero_user_segment(page, statret, end);
889 ret = copy_page_to_iter(page, 890 ret = copy_page_to_iter(page,
890 iocb->ki_pos & ~PAGE_MASK, 891 iocb->ki_pos & ~PAGE_MASK,
891 end - iocb->ki_pos, to); 892 end - iocb->ki_pos, to);
892 iocb->ki_pos += ret; 893 iocb->ki_pos += ret;
893 } else { 894 read += ret;
894 ret = 0; 895 }
896 if (iocb->ki_pos < i_size && read < len) {
897 size_t zlen = min_t(size_t, len - read,
898 i_size - iocb->ki_pos);
899 ret = iov_iter_zero(zlen, to);
900 iocb->ki_pos += ret;
901 read += ret;
895 } 902 }
896 __free_pages(page, 0); 903 __free_pages(page, 0);
897 return ret; 904 return read;
898 } 905 }
899 906
900 /* hit EOF or hole? */ 907 /* hit EOF or hole? */
901 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size && 908 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
902 ret < len) { 909 ret < len) {
903 dout("sync_read hit hole, ppos %lld < size %lld" 910 dout("sync_read hit hole, ppos %lld < size %lld"
904 ", reading more\n", iocb->ki_pos, 911 ", reading more\n", iocb->ki_pos,
905 inode->i_size); 912 inode->i_size);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 6b5173605154..119c43c80638 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -82,8 +82,8 @@ struct inode *ceph_get_snapdir(struct inode *parent)
82 inode->i_mode = parent->i_mode; 82 inode->i_mode = parent->i_mode;
83 inode->i_uid = parent->i_uid; 83 inode->i_uid = parent->i_uid;
84 inode->i_gid = parent->i_gid; 84 inode->i_gid = parent->i_gid;
85 inode->i_op = &ceph_dir_iops; 85 inode->i_op = &ceph_snapdir_iops;
86 inode->i_fop = &ceph_dir_fops; 86 inode->i_fop = &ceph_snapdir_fops;
87 ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */ 87 ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
88 ci->i_rbytes = 0; 88 ci->i_rbytes = 0;
89 return inode; 89 return inode;
@@ -838,30 +838,31 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
838 ceph_vinop(inode), inode->i_mode); 838 ceph_vinop(inode), inode->i_mode);
839 } 839 }
840 840
841 /* set dir completion flag? */
842 if (S_ISDIR(inode->i_mode) &&
843 ci->i_files == 0 && ci->i_subdirs == 0 &&
844 ceph_snap(inode) == CEPH_NOSNAP &&
845 (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
846 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
847 !__ceph_dir_is_complete(ci)) {
848 dout(" marking %p complete (empty)\n", inode);
849 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
850 ci->i_ordered_count);
851 }
852
853 /* were we issued a capability? */ 841 /* were we issued a capability? */
854 if (info->cap.caps) { 842 if (info->cap.caps) {
855 if (ceph_snap(inode) == CEPH_NOSNAP) { 843 if (ceph_snap(inode) == CEPH_NOSNAP) {
844 unsigned caps = le32_to_cpu(info->cap.caps);
856 ceph_add_cap(inode, session, 845 ceph_add_cap(inode, session,
857 le64_to_cpu(info->cap.cap_id), 846 le64_to_cpu(info->cap.cap_id),
858 cap_fmode, 847 cap_fmode, caps,
859 le32_to_cpu(info->cap.caps),
860 le32_to_cpu(info->cap.wanted), 848 le32_to_cpu(info->cap.wanted),
861 le32_to_cpu(info->cap.seq), 849 le32_to_cpu(info->cap.seq),
862 le32_to_cpu(info->cap.mseq), 850 le32_to_cpu(info->cap.mseq),
863 le64_to_cpu(info->cap.realm), 851 le64_to_cpu(info->cap.realm),
864 info->cap.flags, &new_cap); 852 info->cap.flags, &new_cap);
853
854 /* set dir completion flag? */
855 if (S_ISDIR(inode->i_mode) &&
856 ci->i_files == 0 && ci->i_subdirs == 0 &&
857 (caps & CEPH_CAP_FILE_SHARED) &&
858 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
859 !__ceph_dir_is_complete(ci)) {
860 dout(" marking %p complete (empty)\n", inode);
861 __ceph_dir_set_complete(ci,
862 atomic_read(&ci->i_release_count),
863 ci->i_ordered_count);
864 }
865
865 wake = true; 866 wake = true;
866 } else { 867 } else {
867 dout(" %p got snap_caps %s\n", inode, 868 dout(" %p got snap_caps %s\n", inode,
@@ -1446,12 +1447,14 @@ retry_lookup:
1446 } 1447 }
1447 1448
1448 if (!dn->d_inode) { 1449 if (!dn->d_inode) {
1449 dn = splice_dentry(dn, in, NULL); 1450 struct dentry *realdn = splice_dentry(dn, in, NULL);
1450 if (IS_ERR(dn)) { 1451 if (IS_ERR(realdn)) {
1451 err = PTR_ERR(dn); 1452 err = PTR_ERR(realdn);
1453 d_drop(dn);
1452 dn = NULL; 1454 dn = NULL;
1453 goto next_item; 1455 goto next_item;
1454 } 1456 }
1457 dn = realdn;
1455 } 1458 }
1456 1459
1457 di = dn->d_fsdata; 1460 di = dn->d_fsdata;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 5f62fb7a5d0a..71c073f38e54 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -480,6 +480,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
480 mdsc->max_sessions = newmax; 480 mdsc->max_sessions = newmax;
481 } 481 }
482 mdsc->sessions[mds] = s; 482 mdsc->sessions[mds] = s;
483 atomic_inc(&mdsc->num_sessions);
483 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 484 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
484 485
485 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 486 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
@@ -503,6 +504,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
503 mdsc->sessions[s->s_mds] = NULL; 504 mdsc->sessions[s->s_mds] = NULL;
504 ceph_con_close(&s->s_con); 505 ceph_con_close(&s->s_con);
505 ceph_put_mds_session(s); 506 ceph_put_mds_session(s);
507 atomic_dec(&mdsc->num_sessions);
506} 508}
507 509
508/* 510/*
@@ -842,8 +844,9 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
842 struct ceph_options *opt = mdsc->fsc->client->options; 844 struct ceph_options *opt = mdsc->fsc->client->options;
843 void *p; 845 void *p;
844 846
845 const char* metadata[3][2] = { 847 const char* metadata[][2] = {
846 {"hostname", utsname()->nodename}, 848 {"hostname", utsname()->nodename},
849 {"kernel_version", utsname()->release},
847 {"entity_id", opt->name ? opt->name : ""}, 850 {"entity_id", opt->name ? opt->name : ""},
848 {NULL, NULL} 851 {NULL, NULL}
849 }; 852 };
@@ -1464,19 +1467,33 @@ out_unlocked:
1464 return err; 1467 return err;
1465} 1468}
1466 1469
1470static int check_cap_flush(struct inode *inode, u64 want_flush_seq)
1471{
1472 struct ceph_inode_info *ci = ceph_inode(inode);
1473 int ret;
1474 spin_lock(&ci->i_ceph_lock);
1475 if (ci->i_flushing_caps)
1476 ret = ci->i_cap_flush_seq >= want_flush_seq;
1477 else
1478 ret = 1;
1479 spin_unlock(&ci->i_ceph_lock);
1480 return ret;
1481}
1482
1467/* 1483/*
1468 * flush all dirty inode data to disk. 1484 * flush all dirty inode data to disk.
1469 * 1485 *
1470 * returns true if we've flushed through want_flush_seq 1486 * returns true if we've flushed through want_flush_seq
1471 */ 1487 */
1472static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) 1488static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1473{ 1489{
1474 int mds, ret = 1; 1490 int mds;
1475 1491
1476 dout("check_cap_flush want %lld\n", want_flush_seq); 1492 dout("check_cap_flush want %lld\n", want_flush_seq);
1477 mutex_lock(&mdsc->mutex); 1493 mutex_lock(&mdsc->mutex);
1478 for (mds = 0; ret && mds < mdsc->max_sessions; mds++) { 1494 for (mds = 0; mds < mdsc->max_sessions; mds++) {
1479 struct ceph_mds_session *session = mdsc->sessions[mds]; 1495 struct ceph_mds_session *session = mdsc->sessions[mds];
1496 struct inode *inode = NULL;
1480 1497
1481 if (!session) 1498 if (!session)
1482 continue; 1499 continue;
@@ -1489,29 +1506,29 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1489 list_entry(session->s_cap_flushing.next, 1506 list_entry(session->s_cap_flushing.next,
1490 struct ceph_inode_info, 1507 struct ceph_inode_info,
1491 i_flushing_item); 1508 i_flushing_item);
1492 struct inode *inode = &ci->vfs_inode;
1493 1509
1494 spin_lock(&ci->i_ceph_lock); 1510 if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) {
1495 if (ci->i_cap_flush_seq <= want_flush_seq) {
1496 dout("check_cap_flush still flushing %p " 1511 dout("check_cap_flush still flushing %p "
1497 "seq %lld <= %lld to mds%d\n", inode, 1512 "seq %lld <= %lld to mds%d\n",
1498 ci->i_cap_flush_seq, want_flush_seq, 1513 &ci->vfs_inode, ci->i_cap_flush_seq,
1499 session->s_mds); 1514 want_flush_seq, session->s_mds);
1500 ret = 0; 1515 inode = igrab(&ci->vfs_inode);
1501 } 1516 }
1502 spin_unlock(&ci->i_ceph_lock);
1503 } 1517 }
1504 mutex_unlock(&session->s_mutex); 1518 mutex_unlock(&session->s_mutex);
1505 ceph_put_mds_session(session); 1519 ceph_put_mds_session(session);
1506 1520
1507 if (!ret) 1521 if (inode) {
1508 return ret; 1522 wait_event(mdsc->cap_flushing_wq,
1523 check_cap_flush(inode, want_flush_seq));
1524 iput(inode);
1525 }
1526
1509 mutex_lock(&mdsc->mutex); 1527 mutex_lock(&mdsc->mutex);
1510 } 1528 }
1511 1529
1512 mutex_unlock(&mdsc->mutex); 1530 mutex_unlock(&mdsc->mutex);
1513 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); 1531 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1514 return ret;
1515} 1532}
1516 1533
1517/* 1534/*
@@ -1923,7 +1940,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1923 head->num_releases = cpu_to_le16(releases); 1940 head->num_releases = cpu_to_le16(releases);
1924 1941
1925 /* time stamp */ 1942 /* time stamp */
1926 ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp)); 1943 {
1944 struct ceph_timespec ts;
1945 ceph_encode_timespec(&ts, &req->r_stamp);
1946 ceph_encode_copy(&p, &ts, sizeof(ts));
1947 }
1927 1948
1928 BUG_ON(p > end); 1949 BUG_ON(p > end);
1929 msg->front.iov_len = p - msg->front.iov_base; 1950 msg->front.iov_len = p - msg->front.iov_base;
@@ -2012,7 +2033,11 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
2012 2033
2013 /* time stamp */ 2034 /* time stamp */
2014 p = msg->front.iov_base + req->r_request_release_offset; 2035 p = msg->front.iov_base + req->r_request_release_offset;
2015 ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp)); 2036 {
2037 struct ceph_timespec ts;
2038 ceph_encode_timespec(&ts, &req->r_stamp);
2039 ceph_encode_copy(&p, &ts, sizeof(ts));
2040 }
2016 2041
2017 msg->front.iov_len = p - msg->front.iov_base; 2042 msg->front.iov_len = p - msg->front.iov_base;
2018 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2043 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
@@ -2159,6 +2184,8 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2159 p = rb_next(p); 2184 p = rb_next(p);
2160 if (req->r_got_unsafe) 2185 if (req->r_got_unsafe)
2161 continue; 2186 continue;
2187 if (req->r_attempts > 0)
2188 continue; /* only new requests */
2162 if (req->r_session && 2189 if (req->r_session &&
2163 req->r_session->s_mds == mds) { 2190 req->r_session->s_mds == mds) {
2164 dout(" kicking tid %llu\n", req->r_tid); 2191 dout(" kicking tid %llu\n", req->r_tid);
@@ -2286,6 +2313,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2286 struct ceph_mds_request *req; 2313 struct ceph_mds_request *req;
2287 struct ceph_mds_reply_head *head = msg->front.iov_base; 2314 struct ceph_mds_reply_head *head = msg->front.iov_base;
2288 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2315 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
2316 struct ceph_snap_realm *realm;
2289 u64 tid; 2317 u64 tid;
2290 int err, result; 2318 int err, result;
2291 int mds = session->s_mds; 2319 int mds = session->s_mds;
@@ -2401,11 +2429,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2401 } 2429 }
2402 2430
2403 /* snap trace */ 2431 /* snap trace */
2432 realm = NULL;
2404 if (rinfo->snapblob_len) { 2433 if (rinfo->snapblob_len) {
2405 down_write(&mdsc->snap_rwsem); 2434 down_write(&mdsc->snap_rwsem);
2406 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2435 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2407 rinfo->snapblob + rinfo->snapblob_len, 2436 rinfo->snapblob + rinfo->snapblob_len,
2408 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP); 2437 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2438 &realm);
2409 downgrade_write(&mdsc->snap_rwsem); 2439 downgrade_write(&mdsc->snap_rwsem);
2410 } else { 2440 } else {
2411 down_read(&mdsc->snap_rwsem); 2441 down_read(&mdsc->snap_rwsem);
@@ -2423,6 +2453,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2423 mutex_unlock(&req->r_fill_mutex); 2453 mutex_unlock(&req->r_fill_mutex);
2424 2454
2425 up_read(&mdsc->snap_rwsem); 2455 up_read(&mdsc->snap_rwsem);
2456 if (realm)
2457 ceph_put_snap_realm(mdsc, realm);
2426out_err: 2458out_err:
2427 mutex_lock(&mdsc->mutex); 2459 mutex_lock(&mdsc->mutex);
2428 if (!req->r_aborted) { 2460 if (!req->r_aborted) {
@@ -2487,6 +2519,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
2487 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 2519 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2488 BUG_ON(req->r_err); 2520 BUG_ON(req->r_err);
2489 BUG_ON(req->r_got_result); 2521 BUG_ON(req->r_got_result);
2522 req->r_attempts = 0;
2490 req->r_num_fwd = fwd_seq; 2523 req->r_num_fwd = fwd_seq;
2491 req->r_resend_mds = next_mds; 2524 req->r_resend_mds = next_mds;
2492 put_request_session(req); 2525 put_request_session(req);
@@ -2580,6 +2613,14 @@ static void handle_session(struct ceph_mds_session *session,
2580 send_flushmsg_ack(mdsc, session, seq); 2613 send_flushmsg_ack(mdsc, session, seq);
2581 break; 2614 break;
2582 2615
2616 case CEPH_SESSION_FORCE_RO:
2617 dout("force_session_readonly %p\n", session);
2618 spin_lock(&session->s_cap_lock);
2619 session->s_readonly = true;
2620 spin_unlock(&session->s_cap_lock);
2621 wake_up_session_caps(session, 0);
2622 break;
2623
2583 default: 2624 default:
2584 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2625 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2585 WARN_ON(1); 2626 WARN_ON(1);
@@ -2610,6 +2651,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2610 struct ceph_mds_session *session) 2651 struct ceph_mds_session *session)
2611{ 2652{
2612 struct ceph_mds_request *req, *nreq; 2653 struct ceph_mds_request *req, *nreq;
2654 struct rb_node *p;
2613 int err; 2655 int err;
2614 2656
2615 dout("replay_unsafe_requests mds%d\n", session->s_mds); 2657 dout("replay_unsafe_requests mds%d\n", session->s_mds);
@@ -2622,6 +2664,28 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2622 ceph_con_send(&session->s_con, req->r_request); 2664 ceph_con_send(&session->s_con, req->r_request);
2623 } 2665 }
2624 } 2666 }
2667
2668 /*
2669 * also re-send old requests when MDS enters reconnect stage. So that MDS
2670 * can process completed request in clientreplay stage.
2671 */
2672 p = rb_first(&mdsc->request_tree);
2673 while (p) {
2674 req = rb_entry(p, struct ceph_mds_request, r_node);
2675 p = rb_next(p);
2676 if (req->r_got_unsafe)
2677 continue;
2678 if (req->r_attempts == 0)
2679 continue; /* only old requests */
2680 if (req->r_session &&
2681 req->r_session->s_mds == session->s_mds) {
2682 err = __prepare_send_request(mdsc, req, session->s_mds);
2683 if (!err) {
2684 ceph_msg_get(req->r_request);
2685 ceph_con_send(&session->s_con, req->r_request);
2686 }
2687 }
2688 }
2625 mutex_unlock(&mdsc->mutex); 2689 mutex_unlock(&mdsc->mutex);
2626} 2690}
2627 2691
@@ -2787,6 +2851,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2787 spin_unlock(&session->s_gen_ttl_lock); 2851 spin_unlock(&session->s_gen_ttl_lock);
2788 2852
2789 spin_lock(&session->s_cap_lock); 2853 spin_lock(&session->s_cap_lock);
2854 /* don't know if session is readonly */
2855 session->s_readonly = 0;
2790 /* 2856 /*
2791 * notify __ceph_remove_cap() that we are composing cap reconnect. 2857 * notify __ceph_remove_cap() that we are composing cap reconnect.
2792 * If a cap get released before being added to the cap reconnect, 2858 * If a cap get released before being added to the cap reconnect,
@@ -2933,9 +2999,6 @@ static void check_new_map(struct ceph_mds_client *mdsc,
2933 mutex_unlock(&s->s_mutex); 2999 mutex_unlock(&s->s_mutex);
2934 s->s_state = CEPH_MDS_SESSION_RESTARTING; 3000 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2935 } 3001 }
2936
2937 /* kick any requests waiting on the recovering mds */
2938 kick_requests(mdsc, i);
2939 } else if (oldstate == newstate) { 3002 } else if (oldstate == newstate) {
2940 continue; /* nothing new with this mds */ 3003 continue; /* nothing new with this mds */
2941 } 3004 }
@@ -3295,6 +3358,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
3295 init_waitqueue_head(&mdsc->session_close_wq); 3358 init_waitqueue_head(&mdsc->session_close_wq);
3296 INIT_LIST_HEAD(&mdsc->waiting_for_map); 3359 INIT_LIST_HEAD(&mdsc->waiting_for_map);
3297 mdsc->sessions = NULL; 3360 mdsc->sessions = NULL;
3361 atomic_set(&mdsc->num_sessions, 0);
3298 mdsc->max_sessions = 0; 3362 mdsc->max_sessions = 0;
3299 mdsc->stopping = 0; 3363 mdsc->stopping = 0;
3300 init_rwsem(&mdsc->snap_rwsem); 3364 init_rwsem(&mdsc->snap_rwsem);
@@ -3428,14 +3492,17 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3428 dout("sync\n"); 3492 dout("sync\n");
3429 mutex_lock(&mdsc->mutex); 3493 mutex_lock(&mdsc->mutex);
3430 want_tid = mdsc->last_tid; 3494 want_tid = mdsc->last_tid;
3431 want_flush = mdsc->cap_flush_seq;
3432 mutex_unlock(&mdsc->mutex); 3495 mutex_unlock(&mdsc->mutex);
3433 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3434 3496
3435 ceph_flush_dirty_caps(mdsc); 3497 ceph_flush_dirty_caps(mdsc);
3498 spin_lock(&mdsc->cap_dirty_lock);
3499 want_flush = mdsc->cap_flush_seq;
3500 spin_unlock(&mdsc->cap_dirty_lock);
3501
3502 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3436 3503
3437 wait_unsafe_requests(mdsc, want_tid); 3504 wait_unsafe_requests(mdsc, want_tid);
3438 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush)); 3505 wait_caps_flush(mdsc, want_flush);
3439} 3506}
3440 3507
3441/* 3508/*
@@ -3443,17 +3510,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3443 */ 3510 */
3444static bool done_closing_sessions(struct ceph_mds_client *mdsc) 3511static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3445{ 3512{
3446 int i, n = 0;
3447
3448 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3513 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3449 return true; 3514 return true;
3450 3515 return atomic_read(&mdsc->num_sessions) == 0;
3451 mutex_lock(&mdsc->mutex);
3452 for (i = 0; i < mdsc->max_sessions; i++)
3453 if (mdsc->sessions[i])
3454 n++;
3455 mutex_unlock(&mdsc->mutex);
3456 return n == 0;
3457} 3516}
3458 3517
3459/* 3518/*
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index e2817d00f7d9..1875b5d985c6 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -137,6 +137,7 @@ struct ceph_mds_session {
137 int s_nr_caps, s_trim_caps; 137 int s_nr_caps, s_trim_caps;
138 int s_num_cap_releases; 138 int s_num_cap_releases;
139 int s_cap_reconnect; 139 int s_cap_reconnect;
140 int s_readonly;
140 struct list_head s_cap_releases; /* waiting cap_release messages */ 141 struct list_head s_cap_releases; /* waiting cap_release messages */
141 struct list_head s_cap_releases_done; /* ready to send */ 142 struct list_head s_cap_releases_done; /* ready to send */
142 struct ceph_cap *s_cap_iterator; 143 struct ceph_cap *s_cap_iterator;
@@ -272,6 +273,7 @@ struct ceph_mds_client {
272 struct list_head waiting_for_map; 273 struct list_head waiting_for_map;
273 274
274 struct ceph_mds_session **sessions; /* NULL for mds if no session */ 275 struct ceph_mds_session **sessions; /* NULL for mds if no session */
276 atomic_t num_sessions;
275 int max_sessions; /* len of s_mds_sessions */ 277 int max_sessions; /* len of s_mds_sessions */
276 int stopping; /* true if shutting down */ 278 int stopping; /* true if shutting down */
277 279
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index ce35fbd4ba5d..a97e39f09ba6 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -70,13 +70,11 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
70 * safe. we do need to protect against concurrent empty list 70 * safe. we do need to protect against concurrent empty list
71 * additions, however. 71 * additions, however.
72 */ 72 */
73 if (atomic_read(&realm->nref) == 0) { 73 if (atomic_inc_return(&realm->nref) == 1) {
74 spin_lock(&mdsc->snap_empty_lock); 74 spin_lock(&mdsc->snap_empty_lock);
75 list_del_init(&realm->empty_item); 75 list_del_init(&realm->empty_item);
76 spin_unlock(&mdsc->snap_empty_lock); 76 spin_unlock(&mdsc->snap_empty_lock);
77 } 77 }
78
79 atomic_inc(&realm->nref);
80} 78}
81 79
82static void __insert_snap_realm(struct rb_root *root, 80static void __insert_snap_realm(struct rb_root *root,
@@ -116,7 +114,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
116 if (!realm) 114 if (!realm)
117 return ERR_PTR(-ENOMEM); 115 return ERR_PTR(-ENOMEM);
118 116
119 atomic_set(&realm->nref, 0); /* tree does not take a ref */ 117 atomic_set(&realm->nref, 1); /* for caller */
120 realm->ino = ino; 118 realm->ino = ino;
121 INIT_LIST_HEAD(&realm->children); 119 INIT_LIST_HEAD(&realm->children);
122 INIT_LIST_HEAD(&realm->child_item); 120 INIT_LIST_HEAD(&realm->child_item);
@@ -134,8 +132,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
134 * 132 *
135 * caller must hold snap_rwsem for write. 133 * caller must hold snap_rwsem for write.
136 */ 134 */
137struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, 135static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
138 u64 ino) 136 u64 ino)
139{ 137{
140 struct rb_node *n = mdsc->snap_realms.rb_node; 138 struct rb_node *n = mdsc->snap_realms.rb_node;
141 struct ceph_snap_realm *r; 139 struct ceph_snap_realm *r;
@@ -154,6 +152,16 @@ struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
154 return NULL; 152 return NULL;
155} 153}
156 154
155struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
156 u64 ino)
157{
158 struct ceph_snap_realm *r;
159 r = __lookup_snap_realm(mdsc, ino);
160 if (r)
161 ceph_get_snap_realm(mdsc, r);
162 return r;
163}
164
157static void __put_snap_realm(struct ceph_mds_client *mdsc, 165static void __put_snap_realm(struct ceph_mds_client *mdsc,
158 struct ceph_snap_realm *realm); 166 struct ceph_snap_realm *realm);
159 167
@@ -273,7 +281,6 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
273 } 281 }
274 realm->parent_ino = parentino; 282 realm->parent_ino = parentino;
275 realm->parent = parent; 283 realm->parent = parent;
276 ceph_get_snap_realm(mdsc, parent);
277 list_add(&realm->child_item, &parent->children); 284 list_add(&realm->child_item, &parent->children);
278 return 1; 285 return 1;
279} 286}
@@ -631,12 +638,14 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
631 * Caller must hold snap_rwsem for write. 638 * Caller must hold snap_rwsem for write.
632 */ 639 */
633int ceph_update_snap_trace(struct ceph_mds_client *mdsc, 640int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
634 void *p, void *e, bool deletion) 641 void *p, void *e, bool deletion,
642 struct ceph_snap_realm **realm_ret)
635{ 643{
636 struct ceph_mds_snap_realm *ri; /* encoded */ 644 struct ceph_mds_snap_realm *ri; /* encoded */
637 __le64 *snaps; /* encoded */ 645 __le64 *snaps; /* encoded */
638 __le64 *prior_parent_snaps; /* encoded */ 646 __le64 *prior_parent_snaps; /* encoded */
639 struct ceph_snap_realm *realm; 647 struct ceph_snap_realm *realm = NULL;
648 struct ceph_snap_realm *first_realm = NULL;
640 int invalidate = 0; 649 int invalidate = 0;
641 int err = -ENOMEM; 650 int err = -ENOMEM;
642 LIST_HEAD(dirty_realms); 651 LIST_HEAD(dirty_realms);
@@ -704,13 +713,18 @@ more:
704 dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino, 713 dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
705 realm, invalidate, p, e); 714 realm, invalidate, p, e);
706 715
707 if (p < e)
708 goto more;
709
710 /* invalidate when we reach the _end_ (root) of the trace */ 716 /* invalidate when we reach the _end_ (root) of the trace */
711 if (invalidate) 717 if (invalidate && p >= e)
712 rebuild_snap_realms(realm); 718 rebuild_snap_realms(realm);
713 719
720 if (!first_realm)
721 first_realm = realm;
722 else
723 ceph_put_snap_realm(mdsc, realm);
724
725 if (p < e)
726 goto more;
727
714 /* 728 /*
715 * queue cap snaps _after_ we've built the new snap contexts, 729 * queue cap snaps _after_ we've built the new snap contexts,
716 * so that i_head_snapc can be set appropriately. 730 * so that i_head_snapc can be set appropriately.
@@ -721,12 +735,21 @@ more:
721 queue_realm_cap_snaps(realm); 735 queue_realm_cap_snaps(realm);
722 } 736 }
723 737
738 if (realm_ret)
739 *realm_ret = first_realm;
740 else
741 ceph_put_snap_realm(mdsc, first_realm);
742
724 __cleanup_empty_realms(mdsc); 743 __cleanup_empty_realms(mdsc);
725 return 0; 744 return 0;
726 745
727bad: 746bad:
728 err = -EINVAL; 747 err = -EINVAL;
729fail: 748fail:
749 if (realm && !IS_ERR(realm))
750 ceph_put_snap_realm(mdsc, realm);
751 if (first_realm)
752 ceph_put_snap_realm(mdsc, first_realm);
730 pr_err("update_snap_trace error %d\n", err); 753 pr_err("update_snap_trace error %d\n", err);
731 return err; 754 return err;
732} 755}
@@ -844,7 +867,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
844 if (IS_ERR(realm)) 867 if (IS_ERR(realm))
845 goto out; 868 goto out;
846 } 869 }
847 ceph_get_snap_realm(mdsc, realm);
848 870
849 dout("splitting snap_realm %llx %p\n", realm->ino, realm); 871 dout("splitting snap_realm %llx %p\n", realm->ino, realm);
850 for (i = 0; i < num_split_inos; i++) { 872 for (i = 0; i < num_split_inos; i++) {
@@ -905,7 +927,7 @@ skip_inode:
905 /* we may have taken some of the old realm's children. */ 927 /* we may have taken some of the old realm's children. */
906 for (i = 0; i < num_split_realms; i++) { 928 for (i = 0; i < num_split_realms; i++) {
907 struct ceph_snap_realm *child = 929 struct ceph_snap_realm *child =
908 ceph_lookup_snap_realm(mdsc, 930 __lookup_snap_realm(mdsc,
909 le64_to_cpu(split_realms[i])); 931 le64_to_cpu(split_realms[i]));
910 if (!child) 932 if (!child)
911 continue; 933 continue;
@@ -918,7 +940,7 @@ skip_inode:
918 * snap, we can avoid queueing cap_snaps. 940 * snap, we can avoid queueing cap_snaps.
919 */ 941 */
920 ceph_update_snap_trace(mdsc, p, e, 942 ceph_update_snap_trace(mdsc, p, e,
921 op == CEPH_SNAP_OP_DESTROY); 943 op == CEPH_SNAP_OP_DESTROY, NULL);
922 944
923 if (op == CEPH_SNAP_OP_SPLIT) 945 if (op == CEPH_SNAP_OP_SPLIT)
924 /* we took a reference when we created the realm, above */ 946 /* we took a reference when we created the realm, above */
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 5ae62587a71d..a63997b8bcff 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -414,6 +414,10 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
414 seq_puts(m, ",noshare"); 414 seq_puts(m, ",noshare");
415 if (opt->flags & CEPH_OPT_NOCRC) 415 if (opt->flags & CEPH_OPT_NOCRC)
416 seq_puts(m, ",nocrc"); 416 seq_puts(m, ",nocrc");
417 if (opt->flags & CEPH_OPT_NOMSGAUTH)
418 seq_puts(m, ",nocephx_require_signatures");
419 if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0)
420 seq_puts(m, ",notcp_nodelay");
417 421
418 if (opt->name) 422 if (opt->name)
419 seq_printf(m, ",name=%s", opt->name); 423 seq_printf(m, ",name=%s", opt->name);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index e1aa32d0759d..04c8124ed30e 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -693,7 +693,8 @@ extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
693extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc, 693extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
694 struct ceph_snap_realm *realm); 694 struct ceph_snap_realm *realm);
695extern int ceph_update_snap_trace(struct ceph_mds_client *m, 695extern int ceph_update_snap_trace(struct ceph_mds_client *m,
696 void *p, void *e, bool deletion); 696 void *p, void *e, bool deletion,
697 struct ceph_snap_realm **realm_ret);
697extern void ceph_handle_snap(struct ceph_mds_client *mdsc, 698extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
698 struct ceph_mds_session *session, 699 struct ceph_mds_session *session,
699 struct ceph_msg *msg); 700 struct ceph_msg *msg);
@@ -892,7 +893,9 @@ extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
892int ceph_uninline_data(struct file *filp, struct page *locked_page); 893int ceph_uninline_data(struct file *filp, struct page *locked_page);
893/* dir.c */ 894/* dir.c */
894extern const struct file_operations ceph_dir_fops; 895extern const struct file_operations ceph_dir_fops;
896extern const struct file_operations ceph_snapdir_fops;
895extern const struct inode_operations ceph_dir_iops; 897extern const struct inode_operations ceph_dir_iops;
898extern const struct inode_operations ceph_snapdir_iops;
896extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops, 899extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
897 ceph_snapdir_dentry_ops; 900 ceph_snapdir_dentry_ops;
898 901