aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorYan, Zheng <zyan@redhat.com>2014-10-21 21:09:56 -0400
committerIlya Dryomov <idryomov@redhat.com>2014-12-17 12:09:50 -0500
commit70db4f3629b3476cf506be869ef9d15688d2d44a (patch)
tree1a15d801442d91ec3ea98a84eedc03d61ee3127f /fs
parent4965fc38c460b274b2a1789e1165a25fb0409d7e (diff)
ceph: introduce a new inode flag indicating if cached dentries are ordered
After creating/deleting/renaming file, offsets of sibling dentries may change. So we can not use cached dentries to satisfy readdir. But we can still use the cached dentries to conclude -ENOENT for lookup. This patch introduces a new inode flag indicating if child dentries are ordered. The flag is set at the same time marking a directory complete. After creating/deleting/renaming file, we clear the flag on directory inode. This prevents ceph_readdir() from using cached dentries to satisfy readdir syscall. Signed-off-by: Yan, Zheng <zyan@redhat.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/dir.c23
-rw-r--r--fs/ceph/inode.c13
-rw-r--r--fs/ceph/super.h38
3 files changed, 55 insertions, 19 deletions
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index e6d63f8f98c0..652619950fa9 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -183,7 +183,7 @@ more:
183 spin_unlock(&parent->d_lock); 183 spin_unlock(&parent->d_lock);
184 184
185 /* make sure a dentry wasn't dropped while we didn't have parent lock */ 185 /* make sure a dentry wasn't dropped while we didn't have parent lock */
186 if (!ceph_dir_is_complete(dir)) { 186 if (!ceph_dir_is_complete_ordered(dir)) {
187 dout(" lost dir complete on %p; falling back to mds\n", dir); 187 dout(" lost dir complete on %p; falling back to mds\n", dir);
188 dput(dentry); 188 dput(dentry);
189 err = -EAGAIN; 189 err = -EAGAIN;
@@ -261,10 +261,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
261 261
262 /* always start with . and .. */ 262 /* always start with . and .. */
263 if (ctx->pos == 0) { 263 if (ctx->pos == 0) {
264 /* note dir version at start of readdir so we can tell
265 * if any dentries get dropped */
266 fi->dir_release_count = atomic_read(&ci->i_release_count);
267
268 dout("readdir off 0 -> '.'\n"); 264 dout("readdir off 0 -> '.'\n");
269 if (!dir_emit(ctx, ".", 1, 265 if (!dir_emit(ctx, ".", 1,
270 ceph_translate_ino(inode->i_sb, inode->i_ino), 266 ceph_translate_ino(inode->i_sb, inode->i_ino),
@@ -289,7 +285,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
289 if ((ctx->pos == 2 || fi->dentry) && 285 if ((ctx->pos == 2 || fi->dentry) &&
290 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && 286 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
291 ceph_snap(inode) != CEPH_SNAPDIR && 287 ceph_snap(inode) != CEPH_SNAPDIR &&
292 __ceph_dir_is_complete(ci) && 288 __ceph_dir_is_complete_ordered(ci) &&
293 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { 289 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
294 u32 shared_gen = ci->i_shared_gen; 290 u32 shared_gen = ci->i_shared_gen;
295 spin_unlock(&ci->i_ceph_lock); 291 spin_unlock(&ci->i_ceph_lock);
@@ -312,6 +308,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
312 308
313 /* proceed with a normal readdir */ 309 /* proceed with a normal readdir */
314 310
311 if (ctx->pos == 2) {
312 /* note dir version at start of readdir so we can tell
313 * if any dentries get dropped */
314 fi->dir_release_count = atomic_read(&ci->i_release_count);
315 fi->dir_ordered_count = ci->i_ordered_count;
316 }
317
315more: 318more:
316 /* do we have the correct frag content buffered? */ 319 /* do we have the correct frag content buffered? */
317 if (fi->frag != frag || fi->last_readdir == NULL) { 320 if (fi->frag != frag || fi->last_readdir == NULL) {
@@ -446,8 +449,12 @@ more:
446 */ 449 */
447 spin_lock(&ci->i_ceph_lock); 450 spin_lock(&ci->i_ceph_lock);
448 if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { 451 if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
449 dout(" marking %p complete\n", inode); 452 if (ci->i_ordered_count == fi->dir_ordered_count)
450 __ceph_dir_set_complete(ci, fi->dir_release_count); 453 dout(" marking %p complete and ordered\n", inode);
454 else
455 dout(" marking %p complete\n", inode);
456 __ceph_dir_set_complete(ci, fi->dir_release_count,
457 fi->dir_ordered_count);
451 } 458 }
452 spin_unlock(&ci->i_ceph_lock); 459 spin_unlock(&ci->i_ceph_lock);
453 460
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 7b6139004401..72607c17e6fd 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -389,6 +389,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
389 ci->i_version = 0; 389 ci->i_version = 0;
390 ci->i_time_warp_seq = 0; 390 ci->i_time_warp_seq = 0;
391 ci->i_ceph_flags = 0; 391 ci->i_ceph_flags = 0;
392 ci->i_ordered_count = 0;
392 atomic_set(&ci->i_release_count, 1); 393 atomic_set(&ci->i_release_count, 1);
393 atomic_set(&ci->i_complete_count, 0); 394 atomic_set(&ci->i_complete_count, 0);
394 ci->i_symlink = NULL; 395 ci->i_symlink = NULL;
@@ -845,7 +846,8 @@ static int fill_inode(struct inode *inode,
845 (issued & CEPH_CAP_FILE_EXCL) == 0 && 846 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
846 !__ceph_dir_is_complete(ci)) { 847 !__ceph_dir_is_complete(ci)) {
847 dout(" marking %p complete (empty)\n", inode); 848 dout(" marking %p complete (empty)\n", inode);
848 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count)); 849 __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
850 ci->i_ordered_count);
849 } 851 }
850 852
851 /* were we issued a capability? */ 853 /* were we issued a capability? */
@@ -1206,8 +1208,8 @@ retry_lookup:
1206 ceph_invalidate_dentry_lease(dn); 1208 ceph_invalidate_dentry_lease(dn);
1207 1209
1208 /* d_move screws up sibling dentries' offsets */ 1210 /* d_move screws up sibling dentries' offsets */
1209 ceph_dir_clear_complete(dir); 1211 ceph_dir_clear_ordered(dir);
1210 ceph_dir_clear_complete(olddir); 1212 ceph_dir_clear_ordered(olddir);
1211 1213
1212 dout("dn %p gets new offset %lld\n", req->r_old_dentry, 1214 dout("dn %p gets new offset %lld\n", req->r_old_dentry,
1213 ceph_dentry(req->r_old_dentry)->offset); 1215 ceph_dentry(req->r_old_dentry)->offset);
@@ -1219,6 +1221,7 @@ retry_lookup:
1219 if (!rinfo->head->is_target) { 1221 if (!rinfo->head->is_target) {
1220 dout("fill_trace null dentry\n"); 1222 dout("fill_trace null dentry\n");
1221 if (dn->d_inode) { 1223 if (dn->d_inode) {
1224 ceph_dir_clear_ordered(dir);
1222 dout("d_delete %p\n", dn); 1225 dout("d_delete %p\n", dn);
1223 d_delete(dn); 1226 d_delete(dn);
1224 } else { 1227 } else {
@@ -1235,7 +1238,7 @@ retry_lookup:
1235 1238
1236 /* attach proper inode */ 1239 /* attach proper inode */
1237 if (!dn->d_inode) { 1240 if (!dn->d_inode) {
1238 ceph_dir_clear_complete(dir); 1241 ceph_dir_clear_ordered(dir);
1239 ihold(in); 1242 ihold(in);
1240 dn = splice_dentry(dn, in, &have_lease); 1243 dn = splice_dentry(dn, in, &have_lease);
1241 if (IS_ERR(dn)) { 1244 if (IS_ERR(dn)) {
@@ -1265,7 +1268,7 @@ retry_lookup:
1265 BUG_ON(!dir); 1268 BUG_ON(!dir);
1266 BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR); 1269 BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
1267 dout(" linking snapped dir %p to dn %p\n", in, dn); 1270 dout(" linking snapped dir %p to dn %p\n", in, dn);
1268 ceph_dir_clear_complete(dir); 1271 ceph_dir_clear_ordered(dir);
1269 ihold(in); 1272 ihold(in);
1270 dn = splice_dentry(dn, in, NULL); 1273 dn = splice_dentry(dn, in, NULL);
1271 if (IS_ERR(dn)) { 1274 if (IS_ERR(dn)) {
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index b82f507979b8..aca22879b41f 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -256,6 +256,7 @@ struct ceph_inode_info {
256 u32 i_time_warp_seq; 256 u32 i_time_warp_seq;
257 257
258 unsigned i_ceph_flags; 258 unsigned i_ceph_flags;
259 int i_ordered_count;
259 atomic_t i_release_count; 260 atomic_t i_release_count;
260 atomic_t i_complete_count; 261 atomic_t i_complete_count;
261 262
@@ -434,14 +435,19 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
434/* 435/*
435 * Ceph inode. 436 * Ceph inode.
436 */ 437 */
437#define CEPH_I_NODELAY 4 /* do not delay cap release */ 438#define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */
438#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ 439#define CEPH_I_NODELAY 4 /* do not delay cap release */
439#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ 440#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
441#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
440 442
441static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, 443static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
442 int release_count) 444 int release_count, int ordered_count)
443{ 445{
444 atomic_set(&ci->i_complete_count, release_count); 446 atomic_set(&ci->i_complete_count, release_count);
447 if (ci->i_ordered_count == ordered_count)
448 ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
449 else
450 ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
445} 451}
446 452
447static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci) 453static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
@@ -455,16 +461,35 @@ static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
455 atomic_read(&ci->i_release_count); 461 atomic_read(&ci->i_release_count);
456} 462}
457 463
464static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
465{
466 return __ceph_dir_is_complete(ci) &&
467 (ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
468}
469
458static inline void ceph_dir_clear_complete(struct inode *inode) 470static inline void ceph_dir_clear_complete(struct inode *inode)
459{ 471{
460 __ceph_dir_clear_complete(ceph_inode(inode)); 472 __ceph_dir_clear_complete(ceph_inode(inode));
461} 473}
462 474
463static inline bool ceph_dir_is_complete(struct inode *inode) 475static inline void ceph_dir_clear_ordered(struct inode *inode)
464{ 476{
465 return __ceph_dir_is_complete(ceph_inode(inode)); 477 struct ceph_inode_info *ci = ceph_inode(inode);
478 spin_lock(&ci->i_ceph_lock);
479 ci->i_ordered_count++;
480 ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
481 spin_unlock(&ci->i_ceph_lock);
466} 482}
467 483
484static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
485{
486 struct ceph_inode_info *ci = ceph_inode(inode);
487 bool ret;
488 spin_lock(&ci->i_ceph_lock);
489 ret = __ceph_dir_is_complete_ordered(ci);
490 spin_unlock(&ci->i_ceph_lock);
491 return ret;
492}
468 493
469/* find a specific frag @f */ 494/* find a specific frag @f */
470extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, 495extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci,
@@ -580,6 +605,7 @@ struct ceph_file_info {
580 char *last_name; /* last entry in previous chunk */ 605 char *last_name; /* last entry in previous chunk */
581 struct dentry *dentry; /* next dentry (for dcache readdir) */ 606 struct dentry *dentry; /* next dentry (for dcache readdir) */
582 int dir_release_count; 607 int dir_release_count;
608 int dir_ordered_count;
583 609
584 /* used for -o dirstat read() on directory thing */ 610 /* used for -o dirstat read() on directory thing */
585 char *dir_info; 611 char *dir_info;