summaryrefslogtreecommitdiffstats
path: root/fs/ceph/dir.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-05-26 17:10:32 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2016-05-26 17:10:32 -0400
commita10c38a4f385f5d7c173a263ff6bb2d36021b3bb (patch)
tree3cbaa916940b36a9fdb27c8a231e1488fbc352d6 /fs/ceph/dir.c
parentea8ea737c46cffa5d0ee74309f81e55a7e5e9c2a (diff)
parente536030934aebf049fe6aaebc58dd37aeee21840 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "This changeset has a few main parts: - Ilya has finished a huge refactoring effort to sync up the client-side logic in libceph with the user-space client code, which has evolved significantly over the last couple years, with lots of additional behaviors (e.g., how requests are handled when cluster is full and transitions from full to non-full). This structure of the code is more closely aligned with userspace now such that it will be much easier to maintain going forward when behavior changes take place. There are some locking improvements bundled in as well. - Zheng adds multi-filesystem support (multiple namespaces within the same Ceph cluster) - Zheng has changed the readdir offsets and directory enumeration so that dentry offsets are hash-based and therefore stable across directory fragmentation events on the MDS. - Zheng has a smorgasbord of bug fixes across fs/ceph" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (71 commits) ceph: fix wake_up_session_cb() ceph: don't use truncate_pagecache() to invalidate read cache ceph: SetPageError() for writeback pages if writepages fails ceph: handle interrupted ceph_writepage() ceph: make ceph_update_writeable_page() uninterruptible libceph: make ceph_osdc_wait_request() uninterruptible ceph: handle -EAGAIN returned by ceph_update_writeable_page() ceph: make fault/page_mkwrite return VM_FAULT_OOM for -ENOMEM ceph: block non-fatal signals for fault/page_mkwrite ceph: make logical calculation functions return bool ceph: tolerate bad i_size for symlink inode ceph: improve fragtree change detection ceph: keep leaf frag when updating fragtree ceph: fix dir_auth check in ceph_fill_dirfrag() ceph: don't assume frag tree splits in mds reply are sorted ceph: fix inode reference leak ceph: using hash value to compose dentry offset ceph: don't forbid marking directory complete after forward seek ceph: record 'offset' for each entry of readdir result ceph: define 'end/complete' in readdir reply as bit flags ...
Diffstat (limited to 'fs/ceph/dir.c')
-rw-r--r--fs/ceph/dir.c376
1 files changed, 248 insertions, 128 deletions
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 3ab1192d2029..6e0fedf6713b 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -70,16 +70,42 @@ out_unlock:
70} 70}
71 71
72/* 72/*
73 * for readdir, we encode the directory frag and offset within that 73 * for f_pos for readdir:
74 * frag into f_pos. 74 * - hash order:
75 * (0xff << 52) | ((24 bits hash) << 28) |
76 * (the nth entry has hash collision);
77 * - frag+name order;
78 * ((frag value) << 28) | (the nth entry in frag);
75 */ 79 */
80#define OFFSET_BITS 28
81#define OFFSET_MASK ((1 << OFFSET_BITS) - 1)
82#define HASH_ORDER (0xffull << (OFFSET_BITS + 24))
83loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
84{
85 loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
86 if (hash_order)
87 fpos |= HASH_ORDER;
88 return fpos;
89}
90
91static bool is_hash_order(loff_t p)
92{
93 return (p & HASH_ORDER) == HASH_ORDER;
94}
95
76static unsigned fpos_frag(loff_t p) 96static unsigned fpos_frag(loff_t p)
77{ 97{
78 return p >> 32; 98 return p >> OFFSET_BITS;
79} 99}
100
101static unsigned fpos_hash(loff_t p)
102{
103 return ceph_frag_value(fpos_frag(p));
104}
105
80static unsigned fpos_off(loff_t p) 106static unsigned fpos_off(loff_t p)
81{ 107{
82 return p & 0xffffffff; 108 return p & OFFSET_MASK;
83} 109}
84 110
85static int fpos_cmp(loff_t l, loff_t r) 111static int fpos_cmp(loff_t l, loff_t r)
@@ -111,6 +137,50 @@ static int note_last_dentry(struct ceph_file_info *fi, const char *name,
111 return 0; 137 return 0;
112} 138}
113 139
140
141static struct dentry *
142__dcache_find_get_entry(struct dentry *parent, u64 idx,
143 struct ceph_readdir_cache_control *cache_ctl)
144{
145 struct inode *dir = d_inode(parent);
146 struct dentry *dentry;
147 unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
148 loff_t ptr_pos = idx * sizeof(struct dentry *);
149 pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;
150
151 if (ptr_pos >= i_size_read(dir))
152 return NULL;
153
154 if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) {
155 ceph_readdir_cache_release(cache_ctl);
156 cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff);
157 if (!cache_ctl->page) {
158 dout(" page %lu not found\n", ptr_pgoff);
159 return ERR_PTR(-EAGAIN);
160 }
161 /* reading/filling the cache are serialized by
162 i_mutex, no need to use page lock */
163 unlock_page(cache_ctl->page);
164 cache_ctl->dentries = kmap(cache_ctl->page);
165 }
166
167 cache_ctl->index = idx & idx_mask;
168
169 rcu_read_lock();
170 spin_lock(&parent->d_lock);
171 /* check i_size again here, because empty directory can be
172 * marked as complete while not holding the i_mutex. */
173 if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
174 dentry = cache_ctl->dentries[cache_ctl->index];
175 else
176 dentry = NULL;
177 spin_unlock(&parent->d_lock);
178 if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
179 dentry = NULL;
180 rcu_read_unlock();
181 return dentry ? : ERR_PTR(-EAGAIN);
182}
183
114/* 184/*
115 * When possible, we try to satisfy a readdir by peeking at the 185 * When possible, we try to satisfy a readdir by peeking at the
116 * dcache. We make this work by carefully ordering dentries on 186 * dcache. We make this work by carefully ordering dentries on
@@ -130,75 +200,68 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
130 struct inode *dir = d_inode(parent); 200 struct inode *dir = d_inode(parent);
131 struct dentry *dentry, *last = NULL; 201 struct dentry *dentry, *last = NULL;
132 struct ceph_dentry_info *di; 202 struct ceph_dentry_info *di;
133 unsigned nsize = PAGE_SIZE / sizeof(struct dentry *);
134 int err = 0;
135 loff_t ptr_pos = 0;
136 struct ceph_readdir_cache_control cache_ctl = {}; 203 struct ceph_readdir_cache_control cache_ctl = {};
204 u64 idx = 0;
205 int err = 0;
137 206
138 dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos); 207 dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
208
209 /* search start position */
210 if (ctx->pos > 2) {
211 u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
212 while (count > 0) {
213 u64 step = count >> 1;
214 dentry = __dcache_find_get_entry(parent, idx + step,
215 &cache_ctl);
216 if (!dentry) {
217 /* use linar search */
218 idx = 0;
219 break;
220 }
221 if (IS_ERR(dentry)) {
222 err = PTR_ERR(dentry);
223 goto out;
224 }
225 di = ceph_dentry(dentry);
226 spin_lock(&dentry->d_lock);
227 if (fpos_cmp(di->offset, ctx->pos) < 0) {
228 idx += step + 1;
229 count -= step + 1;
230 } else {
231 count = step;
232 }
233 spin_unlock(&dentry->d_lock);
234 dput(dentry);
235 }
139 236
140 /* we can calculate cache index for the first dirfrag */ 237 dout("__dcache_readdir %p cache idx %llu\n", dir, idx);
141 if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) {
142 cache_ctl.index = fpos_off(ctx->pos) - 2;
143 BUG_ON(cache_ctl.index < 0);
144 ptr_pos = cache_ctl.index * sizeof(struct dentry *);
145 } 238 }
146 239
147 while (true) {
148 pgoff_t pgoff;
149 bool emit_dentry;
150 240
151 if (ptr_pos >= i_size_read(dir)) { 241 for (;;) {
242 bool emit_dentry = false;
243 dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
244 if (!dentry) {
152 fi->flags |= CEPH_F_ATEND; 245 fi->flags |= CEPH_F_ATEND;
153 err = 0; 246 err = 0;
154 break; 247 break;
155 } 248 }
156 249 if (IS_ERR(dentry)) {
157 err = -EAGAIN; 250 err = PTR_ERR(dentry);
158 pgoff = ptr_pos >> PAGE_SHIFT; 251 goto out;
159 if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) {
160 ceph_readdir_cache_release(&cache_ctl);
161 cache_ctl.page = find_lock_page(&dir->i_data, pgoff);
162 if (!cache_ctl.page) {
163 dout(" page %lu not found\n", pgoff);
164 break;
165 }
166 /* reading/filling the cache are serialized by
167 * i_mutex, no need to use page lock */
168 unlock_page(cache_ctl.page);
169 cache_ctl.dentries = kmap(cache_ctl.page);
170 } 252 }
171 253
172 rcu_read_lock();
173 spin_lock(&parent->d_lock);
174 /* check i_size again here, because empty directory can be
175 * marked as complete while not holding the i_mutex. */
176 if (ceph_dir_is_complete_ordered(dir) &&
177 ptr_pos < i_size_read(dir))
178 dentry = cache_ctl.dentries[cache_ctl.index % nsize];
179 else
180 dentry = NULL;
181 spin_unlock(&parent->d_lock);
182 if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
183 dentry = NULL;
184 rcu_read_unlock();
185 if (!dentry)
186 break;
187
188 emit_dentry = false;
189 di = ceph_dentry(dentry); 254 di = ceph_dentry(dentry);
190 spin_lock(&dentry->d_lock); 255 spin_lock(&dentry->d_lock);
191 if (di->lease_shared_gen == shared_gen && 256 if (di->lease_shared_gen == shared_gen &&
192 d_really_is_positive(dentry) && 257 d_really_is_positive(dentry) &&
193 ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR &&
194 ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH &&
195 fpos_cmp(ctx->pos, di->offset) <= 0) { 258 fpos_cmp(ctx->pos, di->offset) <= 0) {
196 emit_dentry = true; 259 emit_dentry = true;
197 } 260 }
198 spin_unlock(&dentry->d_lock); 261 spin_unlock(&dentry->d_lock);
199 262
200 if (emit_dentry) { 263 if (emit_dentry) {
201 dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos, 264 dout(" %llx dentry %p %pd %p\n", di->offset,
202 dentry, dentry, d_inode(dentry)); 265 dentry, dentry, d_inode(dentry));
203 ctx->pos = di->offset; 266 ctx->pos = di->offset;
204 if (!dir_emit(ctx, dentry->d_name.name, 267 if (!dir_emit(ctx, dentry->d_name.name,
@@ -218,10 +281,8 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
218 } else { 281 } else {
219 dput(dentry); 282 dput(dentry);
220 } 283 }
221
222 cache_ctl.index++;
223 ptr_pos += sizeof(struct dentry *);
224 } 284 }
285out:
225 ceph_readdir_cache_release(&cache_ctl); 286 ceph_readdir_cache_release(&cache_ctl);
226 if (last) { 287 if (last) {
227 int ret; 288 int ret;
@@ -235,6 +296,16 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
235 return err; 296 return err;
236} 297}
237 298
299static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos)
300{
301 if (!fi->last_readdir)
302 return true;
303 if (is_hash_order(pos))
304 return !ceph_frag_contains_value(fi->frag, fpos_hash(pos));
305 else
306 return fi->frag != fpos_frag(pos);
307}
308
238static int ceph_readdir(struct file *file, struct dir_context *ctx) 309static int ceph_readdir(struct file *file, struct dir_context *ctx)
239{ 310{
240 struct ceph_file_info *fi = file->private_data; 311 struct ceph_file_info *fi = file->private_data;
@@ -242,13 +313,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
242 struct ceph_inode_info *ci = ceph_inode(inode); 313 struct ceph_inode_info *ci = ceph_inode(inode);
243 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 314 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
244 struct ceph_mds_client *mdsc = fsc->mdsc; 315 struct ceph_mds_client *mdsc = fsc->mdsc;
245 unsigned frag = fpos_frag(ctx->pos); 316 int i;
246 int off = fpos_off(ctx->pos);
247 int err; 317 int err;
248 u32 ftype; 318 u32 ftype;
249 struct ceph_mds_reply_info_parsed *rinfo; 319 struct ceph_mds_reply_info_parsed *rinfo;
250 320
251 dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off); 321 dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);
252 if (fi->flags & CEPH_F_ATEND) 322 if (fi->flags & CEPH_F_ATEND)
253 return 0; 323 return 0;
254 324
@@ -260,7 +330,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
260 inode->i_mode >> 12)) 330 inode->i_mode >> 12))
261 return 0; 331 return 0;
262 ctx->pos = 1; 332 ctx->pos = 1;
263 off = 1;
264 } 333 }
265 if (ctx->pos == 1) { 334 if (ctx->pos == 1) {
266 ino_t ino = parent_ino(file->f_path.dentry); 335 ino_t ino = parent_ino(file->f_path.dentry);
@@ -270,7 +339,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
270 inode->i_mode >> 12)) 339 inode->i_mode >> 12))
271 return 0; 340 return 0;
272 ctx->pos = 2; 341 ctx->pos = 2;
273 off = 2;
274 } 342 }
275 343
276 /* can we use the dcache? */ 344 /* can we use the dcache? */
@@ -285,8 +353,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
285 err = __dcache_readdir(file, ctx, shared_gen); 353 err = __dcache_readdir(file, ctx, shared_gen);
286 if (err != -EAGAIN) 354 if (err != -EAGAIN)
287 return err; 355 return err;
288 frag = fpos_frag(ctx->pos);
289 off = fpos_off(ctx->pos);
290 } else { 356 } else {
291 spin_unlock(&ci->i_ceph_lock); 357 spin_unlock(&ci->i_ceph_lock);
292 } 358 }
@@ -294,8 +360,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
294 /* proceed with a normal readdir */ 360 /* proceed with a normal readdir */
295more: 361more:
296 /* do we have the correct frag content buffered? */ 362 /* do we have the correct frag content buffered? */
297 if (fi->frag != frag || fi->last_readdir == NULL) { 363 if (need_send_readdir(fi, ctx->pos)) {
298 struct ceph_mds_request *req; 364 struct ceph_mds_request *req;
365 unsigned frag;
299 int op = ceph_snap(inode) == CEPH_SNAPDIR ? 366 int op = ceph_snap(inode) == CEPH_SNAPDIR ?
300 CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR; 367 CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
301 368
@@ -305,6 +372,13 @@ more:
305 fi->last_readdir = NULL; 372 fi->last_readdir = NULL;
306 } 373 }
307 374
375 if (is_hash_order(ctx->pos)) {
376 frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
377 NULL, NULL);
378 } else {
379 frag = fpos_frag(ctx->pos);
380 }
381
308 dout("readdir fetching %llx.%llx frag %x offset '%s'\n", 382 dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
309 ceph_vinop(inode), frag, fi->last_name); 383 ceph_vinop(inode), frag, fi->last_name);
310 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 384 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
@@ -331,6 +405,8 @@ more:
331 req->r_readdir_cache_idx = fi->readdir_cache_idx; 405 req->r_readdir_cache_idx = fi->readdir_cache_idx;
332 req->r_readdir_offset = fi->next_offset; 406 req->r_readdir_offset = fi->next_offset;
333 req->r_args.readdir.frag = cpu_to_le32(frag); 407 req->r_args.readdir.frag = cpu_to_le32(frag);
408 req->r_args.readdir.flags =
409 cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS);
334 410
335 req->r_inode = inode; 411 req->r_inode = inode;
336 ihold(inode); 412 ihold(inode);
@@ -340,22 +416,26 @@ more:
340 ceph_mdsc_put_request(req); 416 ceph_mdsc_put_request(req);
341 return err; 417 return err;
342 } 418 }
343 dout("readdir got and parsed readdir result=%d" 419 dout("readdir got and parsed readdir result=%d on "
344 " on frag %x, end=%d, complete=%d\n", err, frag, 420 "frag %x, end=%d, complete=%d, hash_order=%d\n",
421 err, frag,
345 (int)req->r_reply_info.dir_end, 422 (int)req->r_reply_info.dir_end,
346 (int)req->r_reply_info.dir_complete); 423 (int)req->r_reply_info.dir_complete,
347 424 (int)req->r_reply_info.hash_order);
348 425
349 /* note next offset and last dentry name */
350 rinfo = &req->r_reply_info; 426 rinfo = &req->r_reply_info;
351 if (le32_to_cpu(rinfo->dir_dir->frag) != frag) { 427 if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
352 frag = le32_to_cpu(rinfo->dir_dir->frag); 428 frag = le32_to_cpu(rinfo->dir_dir->frag);
353 off = req->r_readdir_offset; 429 if (!rinfo->hash_order) {
354 fi->next_offset = off; 430 fi->next_offset = req->r_readdir_offset;
431 /* adjust ctx->pos to beginning of frag */
432 ctx->pos = ceph_make_fpos(frag,
433 fi->next_offset,
434 false);
435 }
355 } 436 }
356 437
357 fi->frag = frag; 438 fi->frag = frag;
358 fi->offset = fi->next_offset;
359 fi->last_readdir = req; 439 fi->last_readdir = req;
360 440
361 if (req->r_did_prepopulate) { 441 if (req->r_did_prepopulate) {
@@ -363,7 +443,8 @@ more:
363 if (fi->readdir_cache_idx < 0) { 443 if (fi->readdir_cache_idx < 0) {
364 /* preclude from marking dir ordered */ 444 /* preclude from marking dir ordered */
365 fi->dir_ordered_count = 0; 445 fi->dir_ordered_count = 0;
366 } else if (ceph_frag_is_leftmost(frag) && off == 2) { 446 } else if (ceph_frag_is_leftmost(frag) &&
447 fi->next_offset == 2) {
367 /* note dir version at start of readdir so 448 /* note dir version at start of readdir so
368 * we can tell if any dentries get dropped */ 449 * we can tell if any dentries get dropped */
369 fi->dir_release_count = req->r_dir_release_cnt; 450 fi->dir_release_count = req->r_dir_release_cnt;
@@ -377,65 +458,87 @@ more:
377 fi->dir_release_count = 0; 458 fi->dir_release_count = 0;
378 } 459 }
379 460
380 if (req->r_reply_info.dir_end) { 461 /* note next offset and last dentry name */
381 kfree(fi->last_name); 462 if (rinfo->dir_nr > 0) {
382 fi->last_name = NULL; 463 struct ceph_mds_reply_dir_entry *rde =
383 if (ceph_frag_is_rightmost(frag)) 464 rinfo->dir_entries + (rinfo->dir_nr-1);
384 fi->next_offset = 2; 465 unsigned next_offset = req->r_reply_info.dir_end ?
385 else 466 2 : (fpos_off(rde->offset) + 1);
386 fi->next_offset = 0; 467 err = note_last_dentry(fi, rde->name, rde->name_len,
387 } else { 468 next_offset);
388 err = note_last_dentry(fi,
389 rinfo->dir_dname[rinfo->dir_nr-1],
390 rinfo->dir_dname_len[rinfo->dir_nr-1],
391 fi->next_offset + rinfo->dir_nr);
392 if (err) 469 if (err)
393 return err; 470 return err;
471 } else if (req->r_reply_info.dir_end) {
472 fi->next_offset = 2;
473 /* keep last name */
394 } 474 }
395 } 475 }
396 476
397 rinfo = &fi->last_readdir->r_reply_info; 477 rinfo = &fi->last_readdir->r_reply_info;
398 dout("readdir frag %x num %d off %d chunkoff %d\n", frag, 478 dout("readdir frag %x num %d pos %llx chunk first %llx\n",
399 rinfo->dir_nr, off, fi->offset); 479 fi->frag, rinfo->dir_nr, ctx->pos,
400 480 rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
401 ctx->pos = ceph_make_fpos(frag, off); 481
402 while (off >= fi->offset && off - fi->offset < rinfo->dir_nr) { 482 i = 0;
403 struct ceph_mds_reply_inode *in = 483 /* search start position */
404 rinfo->dir_in[off - fi->offset].in; 484 if (rinfo->dir_nr > 0) {
485 int step, nr = rinfo->dir_nr;
486 while (nr > 0) {
487 step = nr >> 1;
488 if (rinfo->dir_entries[i + step].offset < ctx->pos) {
489 i += step + 1;
490 nr -= step + 1;
491 } else {
492 nr = step;
493 }
494 }
495 }
496 for (; i < rinfo->dir_nr; i++) {
497 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
405 struct ceph_vino vino; 498 struct ceph_vino vino;
406 ino_t ino; 499 ino_t ino;
407 500
408 dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n", 501 BUG_ON(rde->offset < ctx->pos);
409 off, off - fi->offset, rinfo->dir_nr, ctx->pos, 502
410 rinfo->dir_dname_len[off - fi->offset], 503 ctx->pos = rde->offset;
411 rinfo->dir_dname[off - fi->offset], in); 504 dout("readdir (%d/%d) -> %llx '%.*s' %p\n",
412 BUG_ON(!in); 505 i, rinfo->dir_nr, ctx->pos,
413 ftype = le32_to_cpu(in->mode) >> 12; 506 rde->name_len, rde->name, &rde->inode.in);
414 vino.ino = le64_to_cpu(in->ino); 507
415 vino.snap = le64_to_cpu(in->snapid); 508 BUG_ON(!rde->inode.in);
509 ftype = le32_to_cpu(rde->inode.in->mode) >> 12;
510 vino.ino = le64_to_cpu(rde->inode.in->ino);
511 vino.snap = le64_to_cpu(rde->inode.in->snapid);
416 ino = ceph_vino_to_ino(vino); 512 ino = ceph_vino_to_ino(vino);
417 if (!dir_emit(ctx, 513
418 rinfo->dir_dname[off - fi->offset], 514 if (!dir_emit(ctx, rde->name, rde->name_len,
419 rinfo->dir_dname_len[off - fi->offset], 515 ceph_translate_ino(inode->i_sb, ino), ftype)) {
420 ceph_translate_ino(inode->i_sb, ino), ftype)) {
421 dout("filldir stopping us...\n"); 516 dout("filldir stopping us...\n");
422 return 0; 517 return 0;
423 } 518 }
424 off++;
425 ctx->pos++; 519 ctx->pos++;
426 } 520 }
427 521
428 if (fi->last_name) { 522 if (fi->next_offset > 2) {
429 ceph_mdsc_put_request(fi->last_readdir); 523 ceph_mdsc_put_request(fi->last_readdir);
430 fi->last_readdir = NULL; 524 fi->last_readdir = NULL;
431 goto more; 525 goto more;
432 } 526 }
433 527
434 /* more frags? */ 528 /* more frags? */
435 if (!ceph_frag_is_rightmost(frag)) { 529 if (!ceph_frag_is_rightmost(fi->frag)) {
436 frag = ceph_frag_next(frag); 530 unsigned frag = ceph_frag_next(fi->frag);
437 off = 0; 531 if (is_hash_order(ctx->pos)) {
438 ctx->pos = ceph_make_fpos(frag, off); 532 loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
533 fi->next_offset, true);
534 if (new_pos > ctx->pos)
535 ctx->pos = new_pos;
536 /* keep last_name */
537 } else {
538 ctx->pos = ceph_make_fpos(frag, fi->next_offset, false);
539 kfree(fi->last_name);
540 fi->last_name = NULL;
541 }
439 dout("readdir next frag is %x\n", frag); 542 dout("readdir next frag is %x\n", frag);
440 goto more; 543 goto more;
441 } 544 }
@@ -467,7 +570,7 @@ more:
467 return 0; 570 return 0;
468} 571}
469 572
470static void reset_readdir(struct ceph_file_info *fi, unsigned frag) 573static void reset_readdir(struct ceph_file_info *fi)
471{ 574{
472 if (fi->last_readdir) { 575 if (fi->last_readdir) {
473 ceph_mdsc_put_request(fi->last_readdir); 576 ceph_mdsc_put_request(fi->last_readdir);
@@ -477,18 +580,38 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
477 fi->last_name = NULL; 580 fi->last_name = NULL;
478 fi->dir_release_count = 0; 581 fi->dir_release_count = 0;
479 fi->readdir_cache_idx = -1; 582 fi->readdir_cache_idx = -1;
480 if (ceph_frag_is_leftmost(frag)) 583 fi->next_offset = 2; /* compensate for . and .. */
481 fi->next_offset = 2; /* compensate for . and .. */
482 else
483 fi->next_offset = 0;
484 fi->flags &= ~CEPH_F_ATEND; 584 fi->flags &= ~CEPH_F_ATEND;
485} 585}
486 586
587/*
588 * discard buffered readdir content on seekdir(0), or seek to new frag,
589 * or seek prior to current chunk
590 */
591static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
592{
593 struct ceph_mds_reply_info_parsed *rinfo;
594 loff_t chunk_offset;
595 if (new_pos == 0)
596 return true;
597 if (is_hash_order(new_pos)) {
598 /* no need to reset last_name for a forward seek when
599 * dentries are sotred in hash order */
600 } else if (fi->frag |= fpos_frag(new_pos)) {
601 return true;
602 }
603 rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL;
604 if (!rinfo || !rinfo->dir_nr)
605 return true;
606 chunk_offset = rinfo->dir_entries[0].offset;
607 return new_pos < chunk_offset ||
608 is_hash_order(new_pos) != is_hash_order(chunk_offset);
609}
610
487static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) 611static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
488{ 612{
489 struct ceph_file_info *fi = file->private_data; 613 struct ceph_file_info *fi = file->private_data;
490 struct inode *inode = file->f_mapping->host; 614 struct inode *inode = file->f_mapping->host;
491 loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);
492 loff_t retval; 615 loff_t retval;
493 616
494 inode_lock(inode); 617 inode_lock(inode);
@@ -505,25 +628,22 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
505 } 628 }
506 629
507 if (offset >= 0) { 630 if (offset >= 0) {
631 if (need_reset_readdir(fi, offset)) {
632 dout("dir_llseek dropping %p content\n", file);
633 reset_readdir(fi);
634 } else if (is_hash_order(offset) && offset > file->f_pos) {
635 /* for hash offset, we don't know if a forward seek
636 * is within same frag */
637 fi->dir_release_count = 0;
638 fi->readdir_cache_idx = -1;
639 }
640
508 if (offset != file->f_pos) { 641 if (offset != file->f_pos) {
509 file->f_pos = offset; 642 file->f_pos = offset;
510 file->f_version = 0; 643 file->f_version = 0;
511 fi->flags &= ~CEPH_F_ATEND; 644 fi->flags &= ~CEPH_F_ATEND;
512 } 645 }
513 retval = offset; 646 retval = offset;
514
515 if (offset == 0 ||
516 fpos_frag(offset) != fi->frag ||
517 fpos_off(offset) < fi->offset) {
518 /* discard buffered readdir content on seekdir(0), or
519 * seek to new frag, or seek prior to current chunk */
520 dout("dir_llseek dropping %p content\n", file);
521 reset_readdir(fi, fpos_frag(offset));
522 } else if (fpos_cmp(offset, old_offset) > 0) {
523 /* reset dir_release_count if we did a forward seek */
524 fi->dir_release_count = 0;
525 fi->readdir_cache_idx = -1;
526 }
527 } 647 }
528out: 648out:
529 inode_unlock(inode); 649 inode_unlock(inode);
@@ -591,7 +711,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
591 return dentry; 711 return dentry;
592} 712}
593 713
594static int is_root_ceph_dentry(struct inode *inode, struct dentry *dentry) 714static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
595{ 715{
596 return ceph_ino(inode) == CEPH_INO_ROOT && 716 return ceph_ino(inode) == CEPH_INO_ROOT &&
597 strncmp(dentry->d_name.name, ".ceph", 5) == 0; 717 strncmp(dentry->d_name.name, ".ceph", 5) == 0;