aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
authorYan, Zheng <zyan@redhat.com>2015-06-16 08:48:56 -0400
committerIlya Dryomov <idryomov@gmail.com>2015-06-25 04:49:32 -0400
commitfdd4e15838e59c394a1ec4963b57c22c12608685 (patch)
treeb5486f7b0f12abf9ed670d187f4841dfdb2aa13e /fs/ceph
parentb459be739f97e2062b2ba77cfe8ea198dbd58904 (diff)
ceph: rework dcache readdir
Previously our dcache readdir code relies on that child dentries in directory dentry's d_subdir list are sorted by dentry's offset in descending order. When adding dentries to the dcache, if a dentry already exists, our readdir code moves it to head of directory dentry's d_subdir list. This design relies on dcache internals. Al Viro suggests using ncpfs's approach: keeping array of pointers to dentries in page cache of directory inode. the validity of those pointers are presented by directory inode's complete and ordered flags. When a dentry gets pruned, we clear directory inode's complete flag in the d_prune() callback. Before moving a dentry to other directory, we clear the ordered flag for both old and new directory. Signed-off-by: Yan, Zheng <zyan@redhat.com>
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/caps.c14
-rw-r--r--fs/ceph/dir.c313
-rw-r--r--fs/ceph/file.c2
-rw-r--r--fs/ceph/inode.c118
-rw-r--r--fs/ceph/mds_client.h3
-rw-r--r--fs/ceph/super.h60
6 files changed, 295 insertions, 215 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index dd7b20adf1d4..dc10c9dd36c1 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -833,7 +833,9 @@ int __ceph_caps_used(struct ceph_inode_info *ci)
833 used |= CEPH_CAP_PIN; 833 used |= CEPH_CAP_PIN;
834 if (ci->i_rd_ref) 834 if (ci->i_rd_ref)
835 used |= CEPH_CAP_FILE_RD; 835 used |= CEPH_CAP_FILE_RD;
836 if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages) 836 if (ci->i_rdcache_ref ||
837 (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */
838 ci->vfs_inode.i_data.nrpages))
837 used |= CEPH_CAP_FILE_CACHE; 839 used |= CEPH_CAP_FILE_CACHE;
838 if (ci->i_wr_ref) 840 if (ci->i_wr_ref)
839 used |= CEPH_CAP_FILE_WR; 841 used |= CEPH_CAP_FILE_WR;
@@ -1651,9 +1653,10 @@ retry_locked:
1651 * If we fail, it's because pages are locked.... try again later. 1653 * If we fail, it's because pages are locked.... try again later.
1652 */ 1654 */
1653 if ((!is_delayed || mdsc->stopping) && 1655 if ((!is_delayed || mdsc->stopping) &&
1654 ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ 1656 !S_ISDIR(inode->i_mode) && /* ignore readdir cache */
1655 inode->i_data.nrpages && /* have cached pages */ 1657 ci->i_wrbuffer_ref == 0 && /* no dirty pages... */
1656 (file_wanted == 0 || /* no open files */ 1658 inode->i_data.nrpages && /* have cached pages */
1659 (file_wanted == 0 || /* no open files */
1657 (revoking & (CEPH_CAP_FILE_CACHE| 1660 (revoking & (CEPH_CAP_FILE_CACHE|
1658 CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */ 1661 CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */
1659 !tried_invalidate) { 1662 !tried_invalidate) {
@@ -2805,7 +2808,8 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2805 * try to invalidate (once). (If there are dirty buffers, we 2808 * try to invalidate (once). (If there are dirty buffers, we
2806 * will invalidate _after_ writeback.) 2809 * will invalidate _after_ writeback.)
2807 */ 2810 */
2808 if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && 2811 if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */
2812 ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
2809 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && 2813 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
2810 !ci->i_wrbuffer_ref) { 2814 !ci->i_wrbuffer_ref) {
2811 if (try_nonblocking_invalidate(inode)) { 2815 if (try_nonblocking_invalidate(inode)) {
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index b99f2ff8189d..9314b4ea2375 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -107,6 +107,27 @@ static int fpos_cmp(loff_t l, loff_t r)
107} 107}
108 108
109/* 109/*
110 * make note of the last dentry we read, so we can
111 * continue at the same lexicographical point,
112 * regardless of what dir changes take place on the
113 * server.
114 */
115static int note_last_dentry(struct ceph_file_info *fi, const char *name,
116 int len, unsigned next_offset)
117{
118 char *buf = kmalloc(len+1, GFP_KERNEL);
119 if (!buf)
120 return -ENOMEM;
121 kfree(fi->last_name);
122 fi->last_name = buf;
123 memcpy(fi->last_name, name, len);
124 fi->last_name[len] = 0;
125 fi->next_offset = next_offset;
126 dout("note_last_dentry '%s'\n", fi->last_name);
127 return 0;
128}
129
130/*
110 * When possible, we try to satisfy a readdir by peeking at the 131 * When possible, we try to satisfy a readdir by peeking at the
111 * dcache. We make this work by carefully ordering dentries on 132 * dcache. We make this work by carefully ordering dentries on
112 * d_child when we initially get results back from the MDS, and 133 * d_child when we initially get results back from the MDS, and
@@ -123,123 +144,113 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
123 struct ceph_file_info *fi = file->private_data; 144 struct ceph_file_info *fi = file->private_data;
124 struct dentry *parent = file->f_path.dentry; 145 struct dentry *parent = file->f_path.dentry;
125 struct inode *dir = d_inode(parent); 146 struct inode *dir = d_inode(parent);
126 struct list_head *p; 147 struct dentry *dentry, *last = NULL;
127 struct dentry *dentry, *last;
128 struct ceph_dentry_info *di; 148 struct ceph_dentry_info *di;
149 unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry *);
129 int err = 0; 150 int err = 0;
151 loff_t ptr_pos = 0;
152 struct ceph_readdir_cache_control cache_ctl = {};
130 153
131 /* claim ref on last dentry we returned */ 154 dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
132 last = fi->dentry;
133 fi->dentry = NULL;
134
135 dout("__dcache_readdir %p v%u at %llu (last %p)\n",
136 dir, shared_gen, ctx->pos, last);
137
138 spin_lock(&parent->d_lock);
139 155
140 /* start at beginning? */ 156 /* we can calculate cache index for the first dirfrag */
141 if (ctx->pos == 2 || last == NULL || 157 if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) {
142 fpos_cmp(ctx->pos, ceph_dentry(last)->offset) < 0) { 158 cache_ctl.index = fpos_off(ctx->pos) - 2;
143 if (list_empty(&parent->d_subdirs)) 159 BUG_ON(cache_ctl.index < 0);
144 goto out_unlock; 160 ptr_pos = cache_ctl.index * sizeof(struct dentry *);
145 p = parent->d_subdirs.prev;
146 dout(" initial p %p/%p\n", p->prev, p->next);
147 } else {
148 p = last->d_child.prev;
149 } 161 }
150 162
151more: 163 while (true) {
152 dentry = list_entry(p, struct dentry, d_child); 164 pgoff_t pgoff;
153 di = ceph_dentry(dentry); 165 bool emit_dentry;
154 while (1) { 166
155 dout(" p %p/%p %s d_subdirs %p/%p\n", p->prev, p->next, 167 if (ptr_pos >= i_size_read(dir)) {
156 d_unhashed(dentry) ? "!hashed" : "hashed",
157 parent->d_subdirs.prev, parent->d_subdirs.next);
158 if (p == &parent->d_subdirs) {
159 fi->flags |= CEPH_F_ATEND; 168 fi->flags |= CEPH_F_ATEND;
160 goto out_unlock; 169 err = 0;
170 break;
161 } 171 }
162 spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); 172
173 err = -EAGAIN;
174 pgoff = ptr_pos >> PAGE_CACHE_SHIFT;
175 if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) {
176 ceph_readdir_cache_release(&cache_ctl);
177 cache_ctl.page = find_lock_page(&dir->i_data, pgoff);
178 if (!cache_ctl.page) {
179 dout(" page %lu not found\n", pgoff);
180 break;
181 }
182 /* reading/filling the cache are serialized by
183 * i_mutex, no need to use page lock */
184 unlock_page(cache_ctl.page);
185 cache_ctl.dentries = kmap(cache_ctl.page);
186 }
187
188 rcu_read_lock();
189 spin_lock(&parent->d_lock);
190 /* check i_size again here, because empty directory can be
191 * marked as complete while not holding the i_mutex. */
192 if (ceph_dir_is_complete_ordered(dir) &&
193 ptr_pos < i_size_read(dir))
194 dentry = cache_ctl.dentries[cache_ctl.index % nsize];
195 else
196 dentry = NULL;
197 spin_unlock(&parent->d_lock);
198 if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
199 dentry = NULL;
200 rcu_read_unlock();
201 if (!dentry)
202 break;
203
204 emit_dentry = false;
205 di = ceph_dentry(dentry);
206 spin_lock(&dentry->d_lock);
163 if (di->lease_shared_gen == shared_gen && 207 if (di->lease_shared_gen == shared_gen &&
164 !d_unhashed(dentry) && d_really_is_positive(dentry) && 208 d_really_is_positive(dentry) &&
165 ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR && 209 ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR &&
166 ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH && 210 ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH &&
167 fpos_cmp(ctx->pos, di->offset) <= 0) 211 fpos_cmp(ctx->pos, di->offset) <= 0) {
168 break; 212 emit_dentry = true;
169 dout(" skipping %p %pd at %llu (%llu)%s%s\n", dentry, 213 }
170 dentry, di->offset,
171 ctx->pos, d_unhashed(dentry) ? " unhashed" : "",
172 !d_inode(dentry) ? " null" : "");
173 spin_unlock(&dentry->d_lock); 214 spin_unlock(&dentry->d_lock);
174 p = p->prev;
175 dentry = list_entry(p, struct dentry, d_child);
176 di = ceph_dentry(dentry);
177 }
178
179 dget_dlock(dentry);
180 spin_unlock(&dentry->d_lock);
181 spin_unlock(&parent->d_lock);
182 215
183 /* make sure a dentry wasn't dropped while we didn't have parent lock */ 216 if (emit_dentry) {
184 if (!ceph_dir_is_complete_ordered(dir)) { 217 dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
185 dout(" lost dir complete on %p; falling back to mds\n", dir); 218 dentry, dentry, d_inode(dentry));
186 dput(dentry); 219 ctx->pos = di->offset;
187 err = -EAGAIN; 220 if (!dir_emit(ctx, dentry->d_name.name,
188 goto out; 221 dentry->d_name.len,
189 } 222 ceph_translate_ino(dentry->d_sb,
223 d_inode(dentry)->i_ino),
224 d_inode(dentry)->i_mode >> 12)) {
225 dput(dentry);
226 err = 0;
227 break;
228 }
229 ctx->pos++;
190 230
191 dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos, 231 if (last)
192 dentry, dentry, d_inode(dentry)); 232 dput(last);
193 if (!dir_emit(ctx, dentry->d_name.name, 233 last = dentry;
194 dentry->d_name.len, 234 } else {
195 ceph_translate_ino(dentry->d_sb, d_inode(dentry)->i_ino), 235 dput(dentry);
196 d_inode(dentry)->i_mode >> 12)) {
197 if (last) {
198 /* remember our position */
199 fi->dentry = last;
200 fi->next_offset = fpos_off(di->offset);
201 } 236 }
202 dput(dentry);
203 return 0;
204 }
205
206 ctx->pos = di->offset + 1;
207
208 if (last)
209 dput(last);
210 last = dentry;
211
212 spin_lock(&parent->d_lock);
213 p = p->prev; /* advance to next dentry */
214 goto more;
215 237
216out_unlock: 238 cache_ctl.index++;
217 spin_unlock(&parent->d_lock); 239 ptr_pos += sizeof(struct dentry *);
218out: 240 }
219 if (last) 241 ceph_readdir_cache_release(&cache_ctl);
242 if (last) {
243 int ret;
244 di = ceph_dentry(last);
245 ret = note_last_dentry(fi, last->d_name.name, last->d_name.len,
246 fpos_off(di->offset) + 1);
247 if (ret < 0)
248 err = ret;
220 dput(last); 249 dput(last);
250 }
221 return err; 251 return err;
222} 252}
223 253
224/*
225 * make note of the last dentry we read, so we can
226 * continue at the same lexicographical point,
227 * regardless of what dir changes take place on the
228 * server.
229 */
230static int note_last_dentry(struct ceph_file_info *fi, const char *name,
231 int len)
232{
233 kfree(fi->last_name);
234 fi->last_name = kmalloc(len+1, GFP_KERNEL);
235 if (!fi->last_name)
236 return -ENOMEM;
237 memcpy(fi->last_name, name, len);
238 fi->last_name[len] = 0;
239 dout("note_last_dentry '%s'\n", fi->last_name);
240 return 0;
241}
242
243static int ceph_readdir(struct file *file, struct dir_context *ctx) 254static int ceph_readdir(struct file *file, struct dir_context *ctx)
244{ 255{
245 struct ceph_file_info *fi = file->private_data; 256 struct ceph_file_info *fi = file->private_data;
@@ -280,8 +291,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
280 291
281 /* can we use the dcache? */ 292 /* can we use the dcache? */
282 spin_lock(&ci->i_ceph_lock); 293 spin_lock(&ci->i_ceph_lock);
283 if ((ctx->pos == 2 || fi->dentry) && 294 if (ceph_test_mount_opt(fsc, DCACHE) &&
284 ceph_test_mount_opt(fsc, DCACHE) &&
285 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && 295 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
286 ceph_snap(inode) != CEPH_SNAPDIR && 296 ceph_snap(inode) != CEPH_SNAPDIR &&
287 __ceph_dir_is_complete_ordered(ci) && 297 __ceph_dir_is_complete_ordered(ci) &&
@@ -296,24 +306,8 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
296 } else { 306 } else {
297 spin_unlock(&ci->i_ceph_lock); 307 spin_unlock(&ci->i_ceph_lock);
298 } 308 }
299 if (fi->dentry) {
300 err = note_last_dentry(fi, fi->dentry->d_name.name,
301 fi->dentry->d_name.len);
302 if (err)
303 return err;
304 dput(fi->dentry);
305 fi->dentry = NULL;
306 }
307 309
308 /* proceed with a normal readdir */ 310 /* proceed with a normal readdir */
309
310 if (ctx->pos == 2) {
311 /* note dir version at start of readdir so we can tell
312 * if any dentries get dropped */
313 fi->dir_release_count = atomic_read(&ci->i_release_count);
314 fi->dir_ordered_count = ci->i_ordered_count;
315 }
316
317more: 311more:
318 /* do we have the correct frag content buffered? */ 312 /* do we have the correct frag content buffered? */
319 if (fi->frag != frag || fi->last_readdir == NULL) { 313 if (fi->frag != frag || fi->last_readdir == NULL) {
@@ -348,6 +342,9 @@ more:
348 return -ENOMEM; 342 return -ENOMEM;
349 } 343 }
350 } 344 }
345 req->r_dir_release_cnt = fi->dir_release_count;
346 req->r_dir_ordered_cnt = fi->dir_ordered_count;
347 req->r_readdir_cache_idx = fi->readdir_cache_idx;
351 req->r_readdir_offset = fi->next_offset; 348 req->r_readdir_offset = fi->next_offset;
352 req->r_args.readdir.frag = cpu_to_le32(frag); 349 req->r_args.readdir.frag = cpu_to_le32(frag);
353 350
@@ -364,26 +361,38 @@ more:
364 (int)req->r_reply_info.dir_end, 361 (int)req->r_reply_info.dir_end,
365 (int)req->r_reply_info.dir_complete); 362 (int)req->r_reply_info.dir_complete);
366 363
367 if (!req->r_did_prepopulate) {
368 dout("readdir !did_prepopulate");
369 /* preclude from marking dir complete */
370 fi->dir_release_count--;
371 }
372 364
373 /* note next offset and last dentry name */ 365 /* note next offset and last dentry name */
374 rinfo = &req->r_reply_info; 366 rinfo = &req->r_reply_info;
375 if (le32_to_cpu(rinfo->dir_dir->frag) != frag) { 367 if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
376 frag = le32_to_cpu(rinfo->dir_dir->frag); 368 frag = le32_to_cpu(rinfo->dir_dir->frag);
377 if (ceph_frag_is_leftmost(frag)) 369 off = req->r_readdir_offset;
378 fi->next_offset = 2; 370 fi->next_offset = off;
379 else
380 fi->next_offset = 0;
381 off = fi->next_offset;
382 } 371 }
372
383 fi->frag = frag; 373 fi->frag = frag;
384 fi->offset = fi->next_offset; 374 fi->offset = fi->next_offset;
385 fi->last_readdir = req; 375 fi->last_readdir = req;
386 376
377 if (req->r_did_prepopulate) {
378 fi->readdir_cache_idx = req->r_readdir_cache_idx;
379 if (fi->readdir_cache_idx < 0) {
380 /* preclude from marking dir ordered */
381 fi->dir_ordered_count = 0;
382 } else if (ceph_frag_is_leftmost(frag) && off == 2) {
383 /* note dir version at start of readdir so
384 * we can tell if any dentries get dropped */
385 fi->dir_release_count = req->r_dir_release_cnt;
386 fi->dir_ordered_count = req->r_dir_ordered_cnt;
387 }
388 } else {
389 dout("readdir !did_prepopulate");
390 /* disable readdir cache */
391 fi->readdir_cache_idx = -1;
392 /* preclude from marking dir complete */
393 fi->dir_release_count = 0;
394 }
395
387 if (req->r_reply_info.dir_end) { 396 if (req->r_reply_info.dir_end) {
388 kfree(fi->last_name); 397 kfree(fi->last_name);
389 fi->last_name = NULL; 398 fi->last_name = NULL;
@@ -394,10 +403,10 @@ more:
394 } else { 403 } else {
395 err = note_last_dentry(fi, 404 err = note_last_dentry(fi,
396 rinfo->dir_dname[rinfo->dir_nr-1], 405 rinfo->dir_dname[rinfo->dir_nr-1],
397 rinfo->dir_dname_len[rinfo->dir_nr-1]); 406 rinfo->dir_dname_len[rinfo->dir_nr-1],
407 fi->next_offset + rinfo->dir_nr);
398 if (err) 408 if (err)
399 return err; 409 return err;
400 fi->next_offset += rinfo->dir_nr;
401 } 410 }
402 } 411 }
403 412
@@ -453,16 +462,22 @@ more:
453 * were released during the whole readdir, and we should have 462 * were released during the whole readdir, and we should have
454 * the complete dir contents in our cache. 463 * the complete dir contents in our cache.
455 */ 464 */
456 spin_lock(&ci->i_ceph_lock); 465 if (atomic64_read(&ci->i_release_count) == fi->dir_release_count) {
457 if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { 466 spin_lock(&ci->i_ceph_lock);
458 if (ci->i_ordered_count == fi->dir_ordered_count) 467 if (fi->dir_ordered_count == atomic64_read(&ci->i_ordered_count)) {
459 dout(" marking %p complete and ordered\n", inode); 468 dout(" marking %p complete and ordered\n", inode);
460 else 469 /* use i_size to track number of entries in
470 * readdir cache */
471 BUG_ON(fi->readdir_cache_idx < 0);
472 i_size_write(inode, fi->readdir_cache_idx *
473 sizeof(struct dentry*));
474 } else {
461 dout(" marking %p complete\n", inode); 475 dout(" marking %p complete\n", inode);
476 }
462 __ceph_dir_set_complete(ci, fi->dir_release_count, 477 __ceph_dir_set_complete(ci, fi->dir_release_count,
463 fi->dir_ordered_count); 478 fi->dir_ordered_count);
479 spin_unlock(&ci->i_ceph_lock);
464 } 480 }
465 spin_unlock(&ci->i_ceph_lock);
466 481
467 dout("readdir %p file %p done.\n", inode, file); 482 dout("readdir %p file %p done.\n", inode, file);
468 return 0; 483 return 0;
@@ -476,14 +491,12 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
476 } 491 }
477 kfree(fi->last_name); 492 kfree(fi->last_name);
478 fi->last_name = NULL; 493 fi->last_name = NULL;
494 fi->dir_release_count = 0;
495 fi->readdir_cache_idx = -1;
479 if (ceph_frag_is_leftmost(frag)) 496 if (ceph_frag_is_leftmost(frag))
480 fi->next_offset = 2; /* compensate for . and .. */ 497 fi->next_offset = 2; /* compensate for . and .. */
481 else 498 else
482 fi->next_offset = 0; 499 fi->next_offset = 0;
483 if (fi->dentry) {
484 dput(fi->dentry);
485 fi->dentry = NULL;
486 }
487 fi->flags &= ~CEPH_F_ATEND; 500 fi->flags &= ~CEPH_F_ATEND;
488} 501}
489 502
@@ -497,13 +510,12 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
497 mutex_lock(&inode->i_mutex); 510 mutex_lock(&inode->i_mutex);
498 retval = -EINVAL; 511 retval = -EINVAL;
499 switch (whence) { 512 switch (whence) {
500 case SEEK_END:
501 offset += inode->i_size + 2; /* FIXME */
502 break;
503 case SEEK_CUR: 513 case SEEK_CUR:
504 offset += file->f_pos; 514 offset += file->f_pos;
505 case SEEK_SET: 515 case SEEK_SET:
506 break; 516 break;
517 case SEEK_END:
518 retval = -EOPNOTSUPP;
507 default: 519 default:
508 goto out; 520 goto out;
509 } 521 }
@@ -516,20 +528,18 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
516 } 528 }
517 retval = offset; 529 retval = offset;
518 530
519 /*
520 * discard buffered readdir content on seekdir(0), or
521 * seek to new frag, or seek prior to current chunk.
522 */
523 if (offset == 0 || 531 if (offset == 0 ||
524 fpos_frag(offset) != fi->frag || 532 fpos_frag(offset) != fi->frag ||
525 fpos_off(offset) < fi->offset) { 533 fpos_off(offset) < fi->offset) {
534 /* discard buffered readdir content on seekdir(0), or
535 * seek to new frag, or seek prior to current chunk */
526 dout("dir_llseek dropping %p content\n", file); 536 dout("dir_llseek dropping %p content\n", file);
527 reset_readdir(fi, fpos_frag(offset)); 537 reset_readdir(fi, fpos_frag(offset));
538 } else if (fpos_cmp(offset, old_offset) > 0) {
539 /* reset dir_release_count if we did a forward seek */
540 fi->dir_release_count = 0;
541 fi->readdir_cache_idx = -1;
528 } 542 }
529
530 /* bump dir_release_count if we did a forward seek */
531 if (fpos_cmp(offset, old_offset) > 0)
532 fi->dir_release_count--;
533 } 543 }
534out: 544out:
535 mutex_unlock(&inode->i_mutex); 545 mutex_unlock(&inode->i_mutex);
@@ -985,16 +995,15 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
985 * to do it here. 995 * to do it here.
986 */ 996 */
987 997
998 /* d_move screws up sibling dentries' offsets */
999 ceph_dir_clear_complete(old_dir);
1000 ceph_dir_clear_complete(new_dir);
1001
988 d_move(old_dentry, new_dentry); 1002 d_move(old_dentry, new_dentry);
989 1003
990 /* ensure target dentry is invalidated, despite 1004 /* ensure target dentry is invalidated, despite
991 rehashing bug in vfs_rename_dir */ 1005 rehashing bug in vfs_rename_dir */
992 ceph_invalidate_dentry_lease(new_dentry); 1006 ceph_invalidate_dentry_lease(new_dentry);
993
994 /* d_move screws up sibling dentries' offsets */
995 ceph_dir_clear_complete(old_dir);
996 ceph_dir_clear_complete(new_dir);
997
998 } 1007 }
999 ceph_mdsc_put_request(req); 1008 ceph_mdsc_put_request(req);
1000 return err; 1009 return err;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 424b5b540207..faf92095e105 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -96,6 +96,7 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
96 } 96 }
97 cf->fmode = fmode; 97 cf->fmode = fmode;
98 cf->next_offset = 2; 98 cf->next_offset = 2;
99 cf->readdir_cache_idx = -1;
99 file->private_data = cf; 100 file->private_data = cf;
100 BUG_ON(inode->i_fop->release != ceph_release); 101 BUG_ON(inode->i_fop->release != ceph_release);
101 break; 102 break;
@@ -324,7 +325,6 @@ int ceph_release(struct inode *inode, struct file *file)
324 ceph_mdsc_put_request(cf->last_readdir); 325 ceph_mdsc_put_request(cf->last_readdir);
325 kfree(cf->last_name); 326 kfree(cf->last_name);
326 kfree(cf->dir_info); 327 kfree(cf->dir_info);
327 dput(cf->dentry);
328 kmem_cache_free(ceph_file_cachep, cf); 328 kmem_cache_free(ceph_file_cachep, cf);
329 329
330 /* wake up anyone waiting for caps on this inode */ 330 /* wake up anyone waiting for caps on this inode */
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e86d1a4efc46..2a6d93befbae 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -390,9 +390,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
390 ci->i_inline_version = 0; 390 ci->i_inline_version = 0;
391 ci->i_time_warp_seq = 0; 391 ci->i_time_warp_seq = 0;
392 ci->i_ceph_flags = 0; 392 ci->i_ceph_flags = 0;
393 ci->i_ordered_count = 0; 393 atomic64_set(&ci->i_ordered_count, 1);
394 atomic_set(&ci->i_release_count, 1); 394 atomic64_set(&ci->i_release_count, 1);
395 atomic_set(&ci->i_complete_count, 0); 395 atomic64_set(&ci->i_complete_seq[0], 0);
396 atomic64_set(&ci->i_complete_seq[1], 0);
396 ci->i_symlink = NULL; 397 ci->i_symlink = NULL;
397 398
398 memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); 399 memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
@@ -860,9 +861,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
860 (issued & CEPH_CAP_FILE_EXCL) == 0 && 861 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
861 !__ceph_dir_is_complete(ci)) { 862 !__ceph_dir_is_complete(ci)) {
862 dout(" marking %p complete (empty)\n", inode); 863 dout(" marking %p complete (empty)\n", inode);
864 i_size_write(inode, 0);
863 __ceph_dir_set_complete(ci, 865 __ceph_dir_set_complete(ci,
864 atomic_read(&ci->i_release_count), 866 atomic64_read(&ci->i_release_count),
865 ci->i_ordered_count); 867 atomic64_read(&ci->i_ordered_count));
866 } 868 }
867 869
868 wake = true; 870 wake = true;
@@ -1214,6 +1216,10 @@ retry_lookup:
1214 dout("fill_trace doing d_move %p -> %p\n", 1216 dout("fill_trace doing d_move %p -> %p\n",
1215 req->r_old_dentry, dn); 1217 req->r_old_dentry, dn);
1216 1218
1219 /* d_move screws up sibling dentries' offsets */
1220 ceph_dir_clear_ordered(dir);
1221 ceph_dir_clear_ordered(olddir);
1222
1217 d_move(req->r_old_dentry, dn); 1223 d_move(req->r_old_dentry, dn);
1218 dout(" src %p '%pd' dst %p '%pd'\n", 1224 dout(" src %p '%pd' dst %p '%pd'\n",
1219 req->r_old_dentry, 1225 req->r_old_dentry,
@@ -1224,10 +1230,6 @@ retry_lookup:
1224 rehashing bug in vfs_rename_dir */ 1230 rehashing bug in vfs_rename_dir */
1225 ceph_invalidate_dentry_lease(dn); 1231 ceph_invalidate_dentry_lease(dn);
1226 1232
1227 /* d_move screws up sibling dentries' offsets */
1228 ceph_dir_clear_ordered(dir);
1229 ceph_dir_clear_ordered(olddir);
1230
1231 dout("dn %p gets new offset %lld\n", req->r_old_dentry, 1233 dout("dn %p gets new offset %lld\n", req->r_old_dentry,
1232 ceph_dentry(req->r_old_dentry)->offset); 1234 ceph_dentry(req->r_old_dentry)->offset);
1233 1235
@@ -1335,6 +1337,49 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
1335 return err; 1337 return err;
1336} 1338}
1337 1339
1340void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl)
1341{
1342 if (ctl->page) {
1343 kunmap(ctl->page);
1344 page_cache_release(ctl->page);
1345 ctl->page = NULL;
1346 }
1347}
1348
1349static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
1350 struct ceph_readdir_cache_control *ctl,
1351 struct ceph_mds_request *req)
1352{
1353 struct ceph_inode_info *ci = ceph_inode(dir);
1354 unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry*);
1355 unsigned idx = ctl->index % nsize;
1356 pgoff_t pgoff = ctl->index / nsize;
1357
1358 if (!ctl->page || pgoff != page_index(ctl->page)) {
1359 ceph_readdir_cache_release(ctl);
1360 ctl->page = grab_cache_page(&dir->i_data, pgoff);
1361 if (!ctl->page) {
1362 ctl->index = -1;
1363 return -ENOMEM;
1364 }
1365 /* reading/filling the cache are serialized by
1366 * i_mutex, no need to use page lock */
1367 unlock_page(ctl->page);
1368 ctl->dentries = kmap(ctl->page);
1369 }
1370
1371 if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
1372 req->r_dir_ordered_cnt == atomic64_read(&ci->i_ordered_count)) {
1373 dout("readdir cache dn %p idx %d\n", dn, ctl->index);
1374 ctl->dentries[idx] = dn;
1375 ctl->index++;
1376 } else {
1377 dout("disable readdir cache\n");
1378 ctl->index = -1;
1379 }
1380 return 0;
1381}
1382
1338int ceph_readdir_prepopulate(struct ceph_mds_request *req, 1383int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1339 struct ceph_mds_session *session) 1384 struct ceph_mds_session *session)
1340{ 1385{
@@ -1347,8 +1392,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1347 struct inode *snapdir = NULL; 1392 struct inode *snapdir = NULL;
1348 struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; 1393 struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
1349 struct ceph_dentry_info *di; 1394 struct ceph_dentry_info *di;
1350 u64 r_readdir_offset = req->r_readdir_offset;
1351 u32 frag = le32_to_cpu(rhead->args.readdir.frag); 1395 u32 frag = le32_to_cpu(rhead->args.readdir.frag);
1396 struct ceph_readdir_cache_control cache_ctl = {};
1397
1398 if (req->r_aborted)
1399 return readdir_prepopulate_inodes_only(req, session);
1352 1400
1353 if (rinfo->dir_dir && 1401 if (rinfo->dir_dir &&
1354 le32_to_cpu(rinfo->dir_dir->frag) != frag) { 1402 le32_to_cpu(rinfo->dir_dir->frag) != frag) {
@@ -1356,14 +1404,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1356 frag, le32_to_cpu(rinfo->dir_dir->frag)); 1404 frag, le32_to_cpu(rinfo->dir_dir->frag));
1357 frag = le32_to_cpu(rinfo->dir_dir->frag); 1405 frag = le32_to_cpu(rinfo->dir_dir->frag);
1358 if (ceph_frag_is_leftmost(frag)) 1406 if (ceph_frag_is_leftmost(frag))
1359 r_readdir_offset = 2; 1407 req->r_readdir_offset = 2;
1360 else 1408 else
1361 r_readdir_offset = 0; 1409 req->r_readdir_offset = 0;
1362 } 1410 }
1363 1411
1364 if (req->r_aborted)
1365 return readdir_prepopulate_inodes_only(req, session);
1366
1367 if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) { 1412 if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
1368 snapdir = ceph_get_snapdir(d_inode(parent)); 1413 snapdir = ceph_get_snapdir(d_inode(parent));
1369 parent = d_find_alias(snapdir); 1414 parent = d_find_alias(snapdir);
@@ -1376,6 +1421,17 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1376 ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir); 1421 ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir);
1377 } 1422 }
1378 1423
1424 if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {
1425 /* note dir version at start of readdir so we can tell
1426 * if any dentries get dropped */
1427 struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
1428 req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
1429 req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
1430 req->r_readdir_cache_idx = 0;
1431 }
1432
1433 cache_ctl.index = req->r_readdir_cache_idx;
1434
1379 /* FIXME: release caps/leases if error occurs */ 1435 /* FIXME: release caps/leases if error occurs */
1380 for (i = 0; i < rinfo->dir_nr; i++) { 1436 for (i = 0; i < rinfo->dir_nr; i++) {
1381 struct ceph_vino vino; 1437 struct ceph_vino vino;
@@ -1415,13 +1471,6 @@ retry_lookup:
1415 d_delete(dn); 1471 d_delete(dn);
1416 dput(dn); 1472 dput(dn);
1417 goto retry_lookup; 1473 goto retry_lookup;
1418 } else {
1419 /* reorder parent's d_subdirs */
1420 spin_lock(&parent->d_lock);
1421 spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED);
1422 list_move(&dn->d_child, &parent->d_subdirs);
1423 spin_unlock(&dn->d_lock);
1424 spin_unlock(&parent->d_lock);
1425 } 1474 }
1426 1475
1427 /* inode */ 1476 /* inode */
@@ -1438,13 +1487,15 @@ retry_lookup:
1438 } 1487 }
1439 } 1488 }
1440 1489
1441 if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, 1490 ret = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
1442 req->r_request_started, -1, 1491 req->r_request_started, -1,
1443 &req->r_caps_reservation) < 0) { 1492 &req->r_caps_reservation);
1493 if (ret < 0) {
1444 pr_err("fill_inode badness on %p\n", in); 1494 pr_err("fill_inode badness on %p\n", in);
1445 if (d_really_is_negative(dn)) 1495 if (d_really_is_negative(dn))
1446 iput(in); 1496 iput(in);
1447 d_drop(dn); 1497 d_drop(dn);
1498 err = ret;
1448 goto next_item; 1499 goto next_item;
1449 } 1500 }
1450 1501
@@ -1460,19 +1511,28 @@ retry_lookup:
1460 } 1511 }
1461 1512
1462 di = dn->d_fsdata; 1513 di = dn->d_fsdata;
1463 di->offset = ceph_make_fpos(frag, i + r_readdir_offset); 1514 di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
1464 1515
1465 update_dentry_lease(dn, rinfo->dir_dlease[i], 1516 update_dentry_lease(dn, rinfo->dir_dlease[i],
1466 req->r_session, 1517 req->r_session,
1467 req->r_request_started); 1518 req->r_request_started);
1519
1520 if (err == 0 && cache_ctl.index >= 0) {
1521 ret = fill_readdir_cache(d_inode(parent), dn,
1522 &cache_ctl, req);
1523 if (ret < 0)
1524 err = ret;
1525 }
1468next_item: 1526next_item:
1469 if (dn) 1527 if (dn)
1470 dput(dn); 1528 dput(dn);
1471 } 1529 }
1472 if (err == 0)
1473 req->r_did_prepopulate = true;
1474
1475out: 1530out:
1531 if (err == 0) {
1532 req->r_did_prepopulate = true;
1533 req->r_readdir_cache_idx = cache_ctl.index;
1534 }
1535 ceph_readdir_cache_release(&cache_ctl);
1476 if (snapdir) { 1536 if (snapdir) {
1477 iput(snapdir); 1537 iput(snapdir);
1478 dput(parent); 1538 dput(parent);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 470be4eb25f3..762757e6cebf 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -253,6 +253,9 @@ struct ceph_mds_request {
253 bool r_got_unsafe, r_got_safe, r_got_result; 253 bool r_got_unsafe, r_got_safe, r_got_result;
254 254
255 bool r_did_prepopulate; 255 bool r_did_prepopulate;
256 long long r_dir_release_cnt;
257 long long r_dir_ordered_cnt;
258 int r_readdir_cache_idx;
256 u32 r_readdir_offset; 259 u32 r_readdir_offset;
257 260
258 struct ceph_cap_reservation r_caps_reservation; 261 struct ceph_cap_reservation r_caps_reservation;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 4415e977d72b..860cc016e70d 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -282,9 +282,9 @@ struct ceph_inode_info {
282 u32 i_time_warp_seq; 282 u32 i_time_warp_seq;
283 283
284 unsigned i_ceph_flags; 284 unsigned i_ceph_flags;
285 int i_ordered_count; 285 atomic64_t i_release_count;
286 atomic_t i_release_count; 286 atomic64_t i_ordered_count;
287 atomic_t i_complete_count; 287 atomic64_t i_complete_seq[2];
288 288
289 struct ceph_dir_layout i_dir_layout; 289 struct ceph_dir_layout i_dir_layout;
290 struct ceph_file_layout i_layout; 290 struct ceph_file_layout i_layout;
@@ -471,30 +471,36 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
471 471
472 472
473static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, 473static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
474 int release_count, int ordered_count) 474 long long release_count,
475 long long ordered_count)
475{ 476{
476 atomic_set(&ci->i_complete_count, release_count); 477 smp_mb__before_atomic();
477 if (ci->i_ordered_count == ordered_count) 478 atomic64_set(&ci->i_complete_seq[0], release_count);
478 ci->i_ceph_flags |= CEPH_I_DIR_ORDERED; 479 atomic64_set(&ci->i_complete_seq[1], ordered_count);
479 else
480 ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
481} 480}
482 481
483static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci) 482static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
484{ 483{
485 atomic_inc(&ci->i_release_count); 484 atomic64_inc(&ci->i_release_count);
485}
486
487static inline void __ceph_dir_clear_ordered(struct ceph_inode_info *ci)
488{
489 atomic64_inc(&ci->i_ordered_count);
486} 490}
487 491
488static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci) 492static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
489{ 493{
490 return atomic_read(&ci->i_complete_count) == 494 return atomic64_read(&ci->i_complete_seq[0]) ==
491 atomic_read(&ci->i_release_count); 495 atomic64_read(&ci->i_release_count);
492} 496}
493 497
494static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci) 498static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
495{ 499{
496 return __ceph_dir_is_complete(ci) && 500 return atomic64_read(&ci->i_complete_seq[0]) ==
497 (ci->i_ceph_flags & CEPH_I_DIR_ORDERED); 501 atomic64_read(&ci->i_release_count) &&
502 atomic64_read(&ci->i_complete_seq[1]) ==
503 atomic64_read(&ci->i_ordered_count);
498} 504}
499 505
500static inline void ceph_dir_clear_complete(struct inode *inode) 506static inline void ceph_dir_clear_complete(struct inode *inode)
@@ -504,20 +510,13 @@ static inline void ceph_dir_clear_complete(struct inode *inode)
504 510
505static inline void ceph_dir_clear_ordered(struct inode *inode) 511static inline void ceph_dir_clear_ordered(struct inode *inode)
506{ 512{
507 struct ceph_inode_info *ci = ceph_inode(inode); 513 __ceph_dir_clear_ordered(ceph_inode(inode));
508 spin_lock(&ci->i_ceph_lock);
509 ci->i_ordered_count++;
510 ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
511 spin_unlock(&ci->i_ceph_lock);
512} 514}
513 515
514static inline bool ceph_dir_is_complete_ordered(struct inode *inode) 516static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
515{ 517{
516 struct ceph_inode_info *ci = ceph_inode(inode); 518 bool ret = __ceph_dir_is_complete_ordered(ceph_inode(inode));
517 bool ret; 519 smp_rmb();
518 spin_lock(&ci->i_ceph_lock);
519 ret = __ceph_dir_is_complete_ordered(ci);
520 spin_unlock(&ci->i_ceph_lock);
521 return ret; 520 return ret;
522} 521}
523 522
@@ -636,16 +635,20 @@ struct ceph_file_info {
636 unsigned offset; /* offset of last chunk, adjusted for . and .. */ 635 unsigned offset; /* offset of last chunk, adjusted for . and .. */
637 unsigned next_offset; /* offset of next chunk (last_name's + 1) */ 636 unsigned next_offset; /* offset of next chunk (last_name's + 1) */
638 char *last_name; /* last entry in previous chunk */ 637 char *last_name; /* last entry in previous chunk */
639 struct dentry *dentry; /* next dentry (for dcache readdir) */ 638 long long dir_release_count;
640 int dir_release_count; 639 long long dir_ordered_count;
641 int dir_ordered_count; 640 int readdir_cache_idx;
642 641
643 /* used for -o dirstat read() on directory thing */ 642 /* used for -o dirstat read() on directory thing */
644 char *dir_info; 643 char *dir_info;
645 int dir_info_len; 644 int dir_info_len;
646}; 645};
647 646
648 647struct ceph_readdir_cache_control {
648 struct page *page;
649 struct dentry **dentries;
650 int index;
651};
649 652
650/* 653/*
651 * A "snap realm" describes a subset of the file hierarchy sharing 654 * A "snap realm" describes a subset of the file hierarchy sharing
@@ -944,6 +947,7 @@ extern void ceph_dentry_lru_del(struct dentry *dn);
944extern void ceph_invalidate_dentry_lease(struct dentry *dentry); 947extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
945extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn); 948extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
946extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry); 949extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry);
950extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl);
947 951
948/* 952/*
949 * our d_ops vary depending on whether the inode is live, 953 * our d_ops vary depending on whether the inode is live,