aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorYan, Zheng <zheng.z.yan@intel.com>2013-03-13 07:44:32 -0400
committerSage Weil <sage@inktank.com>2013-05-02 00:17:07 -0400
commit2f276c511137d97e56b19e29865e1e6569315ccb (patch)
treef29a8088a6f312ec40f03b96d96b34e69154627a /fs
parent8a166d05369f6a0369bb194a795e6e3928ac6e34 (diff)
ceph: use i_release_count to indicate dir's completeness
Current ceph code tracks directory's completeness in two places. ceph_readdir() checks i_release_count to decide if it can set the I_COMPLETE flag in i_ceph_flags. All other places check the I_COMPLETE flag. This indirection introduces locking complexity. This patch adds a new variable i_complete_count to ceph_inode_info. Set i_release_count's value to it when marking a directory complete. By comparing the two variables, we know if a directory is complete Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/caps.c4
-rw-r--r--fs/ceph/dir.c25
-rw-r--r--fs/ceph/inode.c13
-rw-r--r--fs/ceph/mds_client.c10
-rw-r--r--fs/ceph/super.h42
5 files changed, 45 insertions, 49 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index bc575a4a813e..f9563108d189 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -490,7 +490,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
490 ci->i_rdcache_gen++; 490 ci->i_rdcache_gen++;
491 491
492 /* 492 /*
493 * if we are newly issued FILE_SHARED, clear I_COMPLETE; we 493 * if we are newly issued FILE_SHARED, mark dir not complete; we
494 * don't know what happened to this directory while we didn't 494 * don't know what happened to this directory while we didn't
495 * have the cap. 495 * have the cap.
496 */ 496 */
@@ -499,7 +499,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
499 ci->i_shared_gen++; 499 ci->i_shared_gen++;
500 if (S_ISDIR(ci->vfs_inode.i_mode)) { 500 if (S_ISDIR(ci->vfs_inode.i_mode)) {
501 dout(" marking %p NOT complete\n", &ci->vfs_inode); 501 dout(" marking %p NOT complete\n", &ci->vfs_inode);
502 ci->i_ceph_flags &= ~CEPH_I_COMPLETE; 502 __ceph_dir_clear_complete(ci);
503 } 503 }
504 } 504 }
505} 505}
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 0c369ac62c07..f02d82b7933e 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -107,7 +107,7 @@ static unsigned fpos_off(loff_t p)
107 * falling back to a "normal" sync readdir if any dentries in the dir 107 * falling back to a "normal" sync readdir if any dentries in the dir
108 * are dropped. 108 * are dropped.
109 * 109 *
110 * I_COMPLETE tells indicates we have all dentries in the dir. It is 110 * Complete dir indicates that we have all dentries in the dir. It is
111 * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by 111 * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
112 * the MDS if/when the directory is modified). 112 * the MDS if/when the directory is modified).
113 */ 113 */
@@ -198,8 +198,8 @@ more:
198 filp->f_pos++; 198 filp->f_pos++;
199 199
200 /* make sure a dentry wasn't dropped while we didn't have parent lock */ 200 /* make sure a dentry wasn't dropped while we didn't have parent lock */
201 if (!ceph_i_test(dir, CEPH_I_COMPLETE)) { 201 if (!ceph_dir_is_complete(dir)) {
202 dout(" lost I_COMPLETE on %p; falling back to mds\n", dir); 202 dout(" lost dir complete on %p; falling back to mds\n", dir);
203 err = -EAGAIN; 203 err = -EAGAIN;
204 goto out; 204 goto out;
205 } 205 }
@@ -258,7 +258,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
258 if (filp->f_pos == 0) { 258 if (filp->f_pos == 0) {
259 /* note dir version at start of readdir so we can tell 259 /* note dir version at start of readdir so we can tell
260 * if any dentries get dropped */ 260 * if any dentries get dropped */
261 fi->dir_release_count = ci->i_release_count; 261 fi->dir_release_count = atomic_read(&ci->i_release_count);
262 262
263 dout("readdir off 0 -> '.'\n"); 263 dout("readdir off 0 -> '.'\n");
264 if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0), 264 if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0),
@@ -284,7 +284,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
284 if ((filp->f_pos == 2 || fi->dentry) && 284 if ((filp->f_pos == 2 || fi->dentry) &&
285 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && 285 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
286 ceph_snap(inode) != CEPH_SNAPDIR && 286 ceph_snap(inode) != CEPH_SNAPDIR &&
287 (ci->i_ceph_flags & CEPH_I_COMPLETE) && 287 __ceph_dir_is_complete(ci) &&
288 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { 288 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
289 spin_unlock(&ci->i_ceph_lock); 289 spin_unlock(&ci->i_ceph_lock);
290 err = __dcache_readdir(filp, dirent, filldir); 290 err = __dcache_readdir(filp, dirent, filldir);
@@ -350,7 +350,8 @@ more:
350 350
351 if (!req->r_did_prepopulate) { 351 if (!req->r_did_prepopulate) {
352 dout("readdir !did_prepopulate"); 352 dout("readdir !did_prepopulate");
353 fi->dir_release_count--; /* preclude I_COMPLETE */ 353 /* preclude from marking dir complete */
354 fi->dir_release_count--;
354 } 355 }
355 356
356 /* note next offset and last dentry name */ 357 /* note next offset and last dentry name */
@@ -428,9 +429,9 @@ more:
428 * the complete dir contents in our cache. 429 * the complete dir contents in our cache.
429 */ 430 */
430 spin_lock(&ci->i_ceph_lock); 431 spin_lock(&ci->i_ceph_lock);
431 if (ci->i_release_count == fi->dir_release_count) { 432 if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
432 dout(" marking %p complete\n", inode); 433 dout(" marking %p complete\n", inode);
433 ci->i_ceph_flags |= CEPH_I_COMPLETE; 434 __ceph_dir_set_complete(ci, fi->dir_release_count);
434 ci->i_max_offset = filp->f_pos; 435 ci->i_max_offset = filp->f_pos;
435 } 436 }
436 spin_unlock(&ci->i_ceph_lock); 437 spin_unlock(&ci->i_ceph_lock);
@@ -605,7 +606,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
605 fsc->mount_options->snapdir_name, 606 fsc->mount_options->snapdir_name,
606 dentry->d_name.len) && 607 dentry->d_name.len) &&
607 !is_root_ceph_dentry(dir, dentry) && 608 !is_root_ceph_dentry(dir, dentry) &&
608 (ci->i_ceph_flags & CEPH_I_COMPLETE) && 609 __ceph_dir_is_complete(ci) &&
609 (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) { 610 (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
610 spin_unlock(&ci->i_ceph_lock); 611 spin_unlock(&ci->i_ceph_lock);
611 dout(" dir %p complete, -ENOENT\n", dir); 612 dout(" dir %p complete, -ENOENT\n", dir);
@@ -909,7 +910,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
909 */ 910 */
910 911
911 /* d_move screws up d_subdirs order */ 912 /* d_move screws up d_subdirs order */
912 ceph_i_clear(new_dir, CEPH_I_COMPLETE); 913 ceph_dir_clear_complete(new_dir);
913 914
914 d_move(old_dentry, new_dentry); 915 d_move(old_dentry, new_dentry);
915 916
@@ -1079,7 +1080,7 @@ static void ceph_d_prune(struct dentry *dentry)
1079 if (IS_ROOT(dentry)) 1080 if (IS_ROOT(dentry))
1080 return; 1081 return;
1081 1082
1082 /* if we are not hashed, we don't affect I_COMPLETE */ 1083 /* if we are not hashed, we don't affect dir's completeness */
1083 if (d_unhashed(dentry)) 1084 if (d_unhashed(dentry))
1084 return; 1085 return;
1085 1086
@@ -1087,7 +1088,7 @@ static void ceph_d_prune(struct dentry *dentry)
1087 * we hold d_lock, so d_parent is stable, and d_fsdata is never 1088 * we hold d_lock, so d_parent is stable, and d_fsdata is never
1088 * cleared until d_release 1089 * cleared until d_release
1089 */ 1090 */
1090 ceph_i_clear(dentry->d_parent->d_inode, CEPH_I_COMPLETE); 1091 ceph_dir_clear_complete(dentry->d_parent->d_inode);
1091} 1092}
1092 1093
1093/* 1094/*
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index eeac43dd04eb..1b173edc8083 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -302,7 +302,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
302 ci->i_version = 0; 302 ci->i_version = 0;
303 ci->i_time_warp_seq = 0; 303 ci->i_time_warp_seq = 0;
304 ci->i_ceph_flags = 0; 304 ci->i_ceph_flags = 0;
305 ci->i_release_count = 0; 305 atomic_set(&ci->i_release_count, 1);
306 atomic_set(&ci->i_complete_count, 0);
306 ci->i_symlink = NULL; 307 ci->i_symlink = NULL;
307 308
308 memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); 309 memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
@@ -721,9 +722,9 @@ static int fill_inode(struct inode *inode,
721 ceph_snap(inode) == CEPH_NOSNAP && 722 ceph_snap(inode) == CEPH_NOSNAP &&
722 (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) && 723 (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
723 (issued & CEPH_CAP_FILE_EXCL) == 0 && 724 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
724 (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) { 725 !__ceph_dir_is_complete(ci)) {
725 dout(" marking %p complete (empty)\n", inode); 726 dout(" marking %p complete (empty)\n", inode);
726 ci->i_ceph_flags |= CEPH_I_COMPLETE; 727 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
727 ci->i_max_offset = 2; 728 ci->i_max_offset = 2;
728 } 729 }
729no_change: 730no_change:
@@ -857,7 +858,7 @@ static void ceph_set_dentry_offset(struct dentry *dn)
857 di = ceph_dentry(dn); 858 di = ceph_dentry(dn);
858 859
859 spin_lock(&ci->i_ceph_lock); 860 spin_lock(&ci->i_ceph_lock);
860 if ((ceph_inode(inode)->i_ceph_flags & CEPH_I_COMPLETE) == 0) { 861 if (!__ceph_dir_is_complete(ci)) {
861 spin_unlock(&ci->i_ceph_lock); 862 spin_unlock(&ci->i_ceph_lock);
862 return; 863 return;
863 } 864 }
@@ -1061,8 +1062,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1061 /* 1062 /*
1062 * d_move() puts the renamed dentry at the end of 1063 * d_move() puts the renamed dentry at the end of
1063 * d_subdirs. We need to assign it an appropriate 1064 * d_subdirs. We need to assign it an appropriate
1064 * directory offset so we can behave when holding 1065 * directory offset so we can behave when dir is
1065 * I_COMPLETE. 1066 * complete.
1066 */ 1067 */
1067 ceph_set_dentry_offset(req->r_old_dentry); 1068 ceph_set_dentry_offset(req->r_old_dentry);
1068 dout("dn %p gets new offset %lld\n", req->r_old_dentry, 1069 dout("dn %p gets new offset %lld\n", req->r_old_dentry,
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 600d770d70f7..0db6f5206d11 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2034,20 +2034,16 @@ out:
2034} 2034}
2035 2035
2036/* 2036/*
2037 * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS 2037 * Invalidate dir's completeness, dentry lease state on an aborted MDS
2038 * namespace request. 2038 * namespace request.
2039 */ 2039 */
2040void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2040void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2041{ 2041{
2042 struct inode *inode = req->r_locked_dir; 2042 struct inode *inode = req->r_locked_dir;
2043 struct ceph_inode_info *ci = ceph_inode(inode);
2044 2043
2045 dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode); 2044 dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
2046 spin_lock(&ci->i_ceph_lock);
2047 ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
2048 ci->i_release_count++;
2049 spin_unlock(&ci->i_ceph_lock);
2050 2045
2046 ceph_dir_clear_complete(inode);
2051 if (req->r_dentry) 2047 if (req->r_dentry)
2052 ceph_invalidate_dentry_lease(req->r_dentry); 2048 ceph_invalidate_dentry_lease(req->r_dentry);
2053 if (req->r_old_dentry) 2049 if (req->r_old_dentry)
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index a04eda714df4..8696be2ff679 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -244,7 +244,8 @@ struct ceph_inode_info {
244 u32 i_time_warp_seq; 244 u32 i_time_warp_seq;
245 245
246 unsigned i_ceph_flags; 246 unsigned i_ceph_flags;
247 unsigned long i_release_count; 247 atomic_t i_release_count;
248 atomic_t i_complete_count;
248 249
249 struct ceph_dir_layout i_dir_layout; 250 struct ceph_dir_layout i_dir_layout;
250 struct ceph_file_layout i_layout; 251 struct ceph_file_layout i_layout;
@@ -254,7 +255,7 @@ struct ceph_inode_info {
254 struct timespec i_rctime; 255 struct timespec i_rctime;
255 u64 i_rbytes, i_rfiles, i_rsubdirs; 256 u64 i_rbytes, i_rfiles, i_rsubdirs;
256 u64 i_files, i_subdirs; 257 u64 i_files, i_subdirs;
257 u64 i_max_offset; /* largest readdir offset, set with I_COMPLETE */ 258 u64 i_max_offset; /* largest readdir offset, set with complete dir */
258 259
259 struct rb_root i_fragtree; 260 struct rb_root i_fragtree;
260 struct mutex i_fragtree_mutex; 261 struct mutex i_fragtree_mutex;
@@ -419,38 +420,35 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
419/* 420/*
420 * Ceph inode. 421 * Ceph inode.
421 */ 422 */
422#define CEPH_I_COMPLETE 1 /* we have complete directory cached */
423#define CEPH_I_NODELAY 4 /* do not delay cap release */ 423#define CEPH_I_NODELAY 4 /* do not delay cap release */
424#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ 424#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
425#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ 425#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
426 426
427static inline void ceph_i_clear(struct inode *inode, unsigned mask) 427static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
428 int release_count)
428{ 429{
429 struct ceph_inode_info *ci = ceph_inode(inode); 430 atomic_set(&ci->i_complete_count, release_count);
430
431 spin_lock(&ci->i_ceph_lock);
432 ci->i_ceph_flags &= ~mask;
433 spin_unlock(&ci->i_ceph_lock);
434} 431}
435 432
436static inline void ceph_i_set(struct inode *inode, unsigned mask) 433static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
437{ 434{
438 struct ceph_inode_info *ci = ceph_inode(inode); 435 atomic_inc(&ci->i_release_count);
436}
439 437
440 spin_lock(&ci->i_ceph_lock); 438static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
441 ci->i_ceph_flags |= mask; 439{
442 spin_unlock(&ci->i_ceph_lock); 440 return atomic_read(&ci->i_complete_count) ==
441 atomic_read(&ci->i_release_count);
443} 442}
444 443
445static inline bool ceph_i_test(struct inode *inode, unsigned mask) 444static inline void ceph_dir_clear_complete(struct inode *inode)
446{ 445{
447 struct ceph_inode_info *ci = ceph_inode(inode); 446 __ceph_dir_clear_complete(ceph_inode(inode));
448 bool r; 447}
449 448
450 spin_lock(&ci->i_ceph_lock); 449static inline bool ceph_dir_is_complete(struct inode *inode)
451 r = (ci->i_ceph_flags & mask) == mask; 450{
452 spin_unlock(&ci->i_ceph_lock); 451 return __ceph_dir_is_complete(ceph_inode(inode));
453 return r;
454} 452}
455 453
456 454
@@ -565,7 +563,7 @@ struct ceph_file_info {
565 u64 next_offset; /* offset of next chunk (last_name's + 1) */ 563 u64 next_offset; /* offset of next chunk (last_name's + 1) */
566 char *last_name; /* last entry in previous chunk */ 564 char *last_name; /* last entry in previous chunk */
567 struct dentry *dentry; /* next dentry (for dcache readdir) */ 565 struct dentry *dentry; /* next dentry (for dcache readdir) */
568 unsigned long dir_release_count; 566 int dir_release_count;
569 567
570 /* used for -o dirstat read() on directory thing */ 568 /* used for -o dirstat read() on directory thing */
571 char *dir_info; 569 char *dir_info;