aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/dir.c
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-10-06 14:31:08 -0400
committerSage Weil <sage@newdream.net>2009-10-06 14:31:08 -0400
commit2817b000b02c5f0c05af67c01fb2684e1381d6ef (patch)
tree32efd3c3cddde6824d4f1e44deb5e7694899dbe1 /fs/ceph/dir.c
parent355da1eb7a1f91c276b991764e951bbcd8047599 (diff)
ceph: directory operations
Directory operations, including lookup, are defined here. We take advantage of lookup intents when possible. For the most part, we just need to build the proper requests for the metadata server(s) and pass things off to the mds_client. The results of most operations are normally incorporated into the client's cache when the reply is parsed by ceph_fill_trace(). However, if the MDS replies without a trace (e.g., when retrying an update after an MDS failure recovery), some operation-specific cleanup may be needed. We can validate cached dentries in two ways. A per-dentry lease may be issued by the MDS, or a per-directory cap may be issued that acts as a lease on the entire directory. In the latter case, a 'gen' value is used to determine which dentries belong to the currently leased directory contents. We normally prepopulate the dcache and icache with readdir results. This makes subsequent lookups and getattrs avoid any server interaction. It also lets us satisfy readdir operation by peeking at the dcache IFF we hold the per-directory cap/lease, previously performed a readdir, and haven't dropped any of the resulting dentries. Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/dir.c')
-rw-r--r--fs/ceph/dir.c1212
1 files changed, 1212 insertions, 0 deletions
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
new file mode 100644
index 000000000000..7bb8db524e58
--- /dev/null
+++ b/fs/ceph/dir.c
@@ -0,0 +1,1212 @@
1#include "ceph_debug.h"
2
3#include <linux/spinlock.h>
4#include <linux/fs_struct.h>
5#include <linux/namei.h>
6#include <linux/sched.h>
7
8#include "super.h"
9
10/*
11 * Directory operations: readdir, lookup, create, link, unlink,
12 * rename, etc.
13 */
14
15/*
16 * Ceph MDS operations are specified in terms of a base ino and
17 * relative path. Thus, the client can specify an operation on a
18 * specific inode (e.g., a getattr due to fstat(2)), or as a path
19 * relative to, say, the root directory.
20 *
21 * Normally, we limit ourselves to strict inode ops (no path component)
22 * or dentry operations (a single path component relative to an ino). The
23 * exception to this is open_root_dentry(), which will open the mount
24 * point by name.
25 */
26
27const struct inode_operations ceph_dir_iops;
28const struct file_operations ceph_dir_fops;
29struct dentry_operations ceph_dentry_ops;
30
31/*
32 * Initialize ceph dentry state.
33 */
34int ceph_init_dentry(struct dentry *dentry)
35{
36 struct ceph_dentry_info *di;
37
38 if (dentry->d_fsdata)
39 return 0;
40
41 if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
42 dentry->d_op = &ceph_dentry_ops;
43 else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR)
44 dentry->d_op = &ceph_snapdir_dentry_ops;
45 else
46 dentry->d_op = &ceph_snap_dentry_ops;
47
48 di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS);
49 if (!di)
50 return -ENOMEM; /* oh well */
51
52 spin_lock(&dentry->d_lock);
53 if (dentry->d_fsdata) /* lost a race */
54 goto out_unlock;
55 di->dentry = dentry;
56 di->lease_session = NULL;
57 dentry->d_fsdata = di;
58 dentry->d_time = jiffies;
59 ceph_dentry_lru_add(dentry);
60out_unlock:
61 spin_unlock(&dentry->d_lock);
62 return 0;
63}
64
65
66
67/*
68 * for readdir, we encode the directory frag and offset within that
69 * frag into f_pos.
70 */
71static unsigned fpos_frag(loff_t p)
72{
73 return p >> 32;
74}
75static unsigned fpos_off(loff_t p)
76{
77 return p & 0xffffffff;
78}
79
80/*
81 * When possible, we try to satisfy a readdir by peeking at the
82 * dcache. We make this work by carefully ordering dentries on
83 * d_u.d_child when we initially get results back from the MDS, and
84 * falling back to a "normal" sync readdir if any dentries in the dir
85 * are dropped.
86 *
87 * I_COMPLETE tells indicates we have all dentries in the dir. It is
88 * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
89 * the MDS if/when the directory is modified).
90 */
91static int __dcache_readdir(struct file *filp,
92 void *dirent, filldir_t filldir)
93{
94 struct inode *inode = filp->f_dentry->d_inode;
95 struct ceph_file_info *fi = filp->private_data;
96 struct dentry *parent = filp->f_dentry;
97 struct inode *dir = parent->d_inode;
98 struct list_head *p;
99 struct dentry *dentry, *last;
100 struct ceph_dentry_info *di;
101 int err = 0;
102
103 /* claim ref on last dentry we returned */
104 last = fi->dentry;
105 fi->dentry = NULL;
106
107 dout("__dcache_readdir %p at %llu (last %p)\n", dir, filp->f_pos,
108 last);
109
110 spin_lock(&dcache_lock);
111
112 /* start at beginning? */
113 if (filp->f_pos == 2 || (last &&
114 filp->f_pos < ceph_dentry(last)->offset)) {
115 if (list_empty(&parent->d_subdirs))
116 goto out_unlock;
117 p = parent->d_subdirs.prev;
118 dout(" initial p %p/%p\n", p->prev, p->next);
119 } else {
120 p = last->d_u.d_child.prev;
121 }
122
123more:
124 dentry = list_entry(p, struct dentry, d_u.d_child);
125 di = ceph_dentry(dentry);
126 while (1) {
127 dout(" p %p/%p d_subdirs %p/%p\n", p->prev, p->next,
128 parent->d_subdirs.prev, parent->d_subdirs.next);
129 if (p == &parent->d_subdirs) {
130 fi->at_end = 1;
131 goto out_unlock;
132 }
133 if (!d_unhashed(dentry) && dentry->d_inode &&
134 filp->f_pos <= di->offset)
135 break;
136 dout(" skipping %p %.*s at %llu (%llu)%s%s\n", dentry,
137 dentry->d_name.len, dentry->d_name.name, di->offset,
138 filp->f_pos, d_unhashed(dentry) ? " unhashed" : "",
139 !dentry->d_inode ? " null" : "");
140 p = p->prev;
141 dentry = list_entry(p, struct dentry, d_u.d_child);
142 di = ceph_dentry(dentry);
143 }
144
145 atomic_inc(&dentry->d_count);
146 spin_unlock(&dcache_lock);
147 spin_unlock(&inode->i_lock);
148
149 dout(" %llu (%llu) dentry %p %.*s %p\n", di->offset, filp->f_pos,
150 dentry, dentry->d_name.len, dentry->d_name.name, dentry->d_inode);
151 filp->f_pos = di->offset;
152 err = filldir(dirent, dentry->d_name.name,
153 dentry->d_name.len, di->offset,
154 dentry->d_inode->i_ino,
155 dentry->d_inode->i_mode >> 12);
156
157 if (last) {
158 if (err < 0) {
159 /* remember our position */
160 fi->dentry = last;
161 fi->next_offset = di->offset;
162 } else {
163 dput(last);
164 }
165 last = NULL;
166 }
167
168 spin_lock(&inode->i_lock);
169 spin_lock(&dcache_lock);
170
171 if (err < 0)
172 goto out_unlock;
173
174 last = dentry;
175
176 p = p->prev;
177 filp->f_pos++;
178
179 /* make sure a dentry wasn't dropped while we didn't have dcache_lock */
180 if ((ceph_inode(dir)->i_ceph_flags & CEPH_I_COMPLETE))
181 goto more;
182 dout(" lost I_COMPLETE on %p; falling back to mds\n", dir);
183 err = -EAGAIN;
184
185out_unlock:
186 spin_unlock(&dcache_lock);
187
188 if (last) {
189 spin_unlock(&inode->i_lock);
190 dput(last);
191 spin_lock(&inode->i_lock);
192 }
193
194 return err;
195}
196
197/*
198 * make note of the last dentry we read, so we can
199 * continue at the same lexicographical point,
200 * regardless of what dir changes take place on the
201 * server.
202 */
203static int note_last_dentry(struct ceph_file_info *fi, const char *name,
204 int len)
205{
206 kfree(fi->last_name);
207 fi->last_name = kmalloc(len+1, GFP_NOFS);
208 if (!fi->last_name)
209 return -ENOMEM;
210 memcpy(fi->last_name, name, len);
211 fi->last_name[len] = 0;
212 dout("note_last_dentry '%s'\n", fi->last_name);
213 return 0;
214}
215
216static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
217{
218 struct ceph_file_info *fi = filp->private_data;
219 struct inode *inode = filp->f_dentry->d_inode;
220 struct ceph_inode_info *ci = ceph_inode(inode);
221 struct ceph_client *client = ceph_inode_to_client(inode);
222 struct ceph_mds_client *mdsc = &client->mdsc;
223 unsigned frag = fpos_frag(filp->f_pos);
224 int off = fpos_off(filp->f_pos);
225 int err;
226 u32 ftype;
227 struct ceph_mds_reply_info_parsed *rinfo;
228 const int max_entries = client->mount_args.max_readdir;
229
230 dout("readdir %p filp %p frag %u off %u\n", inode, filp, frag, off);
231 if (fi->at_end)
232 return 0;
233
234 /* always start with . and .. */
235 if (filp->f_pos == 0) {
236 /* note dir version at start of readdir so we can tell
237 * if any dentries get dropped */
238 fi->dir_release_count = ci->i_release_count;
239
240 dout("readdir off 0 -> '.'\n");
241 if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0),
242 inode->i_ino, inode->i_mode >> 12) < 0)
243 return 0;
244 filp->f_pos = 1;
245 off = 1;
246 }
247 if (filp->f_pos == 1) {
248 dout("readdir off 1 -> '..'\n");
249 if (filldir(dirent, "..", 2, ceph_make_fpos(0, 1),
250 filp->f_dentry->d_parent->d_inode->i_ino,
251 inode->i_mode >> 12) < 0)
252 return 0;
253 filp->f_pos = 2;
254 off = 2;
255 }
256
257 /* can we use the dcache? */
258 spin_lock(&inode->i_lock);
259 if ((filp->f_pos == 2 || fi->dentry) &&
260 !ceph_test_opt(client, NOASYNCREADDIR) &&
261 (ci->i_ceph_flags & CEPH_I_COMPLETE) &&
262 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
263 err = __dcache_readdir(filp, dirent, filldir);
264 if (err != -EAGAIN) {
265 spin_unlock(&inode->i_lock);
266 return err;
267 }
268 }
269 spin_unlock(&inode->i_lock);
270 if (fi->dentry) {
271 err = note_last_dentry(fi, fi->dentry->d_name.name,
272 fi->dentry->d_name.len);
273 if (err)
274 return err;
275 dput(fi->dentry);
276 fi->dentry = NULL;
277 }
278
279 /* proceed with a normal readdir */
280
281more:
282 /* do we have the correct frag content buffered? */
283 if (fi->frag != frag || fi->last_readdir == NULL) {
284 struct ceph_mds_request *req;
285 int op = ceph_snap(inode) == CEPH_SNAPDIR ?
286 CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
287
288 /* discard old result, if any */
289 if (fi->last_readdir)
290 ceph_mdsc_put_request(fi->last_readdir);
291
292 /* requery frag tree, as the frag topology may have changed */
293 frag = ceph_choose_frag(ceph_inode(inode), frag, NULL, NULL);
294
295 dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
296 ceph_vinop(inode), frag, fi->last_name);
297 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
298 if (IS_ERR(req))
299 return PTR_ERR(req);
300 req->r_inode = igrab(inode);
301 req->r_dentry = dget(filp->f_dentry);
302 /* hints to request -> mds selection code */
303 req->r_direct_mode = USE_AUTH_MDS;
304 req->r_direct_hash = ceph_frag_value(frag);
305 req->r_direct_is_hash = true;
306 req->r_path2 = kstrdup(fi->last_name, GFP_NOFS);
307 req->r_readdir_offset = fi->next_offset;
308 req->r_args.readdir.frag = cpu_to_le32(frag);
309 req->r_args.readdir.max_entries = cpu_to_le32(max_entries);
310 req->r_num_caps = max_entries;
311 err = ceph_mdsc_do_request(mdsc, NULL, req);
312 if (err < 0) {
313 ceph_mdsc_put_request(req);
314 return err;
315 }
316 dout("readdir got and parsed readdir result=%d"
317 " on frag %x, end=%d, complete=%d\n", err, frag,
318 (int)req->r_reply_info.dir_end,
319 (int)req->r_reply_info.dir_complete);
320
321 if (!req->r_did_prepopulate) {
322 dout("readdir !did_prepopulate");
323 fi->dir_release_count--; /* preclude I_COMPLETE */
324 }
325
326 /* note next offset and last dentry name */
327 fi->offset = fi->next_offset;
328 fi->last_readdir = req;
329
330 if (req->r_reply_info.dir_end) {
331 kfree(fi->last_name);
332 fi->last_name = NULL;
333 fi->next_offset = 0;
334 } else {
335 rinfo = &req->r_reply_info;
336 err = note_last_dentry(fi,
337 rinfo->dir_dname[rinfo->dir_nr-1],
338 rinfo->dir_dname_len[rinfo->dir_nr-1]);
339 if (err)
340 return err;
341 fi->next_offset += rinfo->dir_nr;
342 }
343 }
344
345 rinfo = &fi->last_readdir->r_reply_info;
346 dout("readdir frag %x num %d off %d chunkoff %d\n", frag,
347 rinfo->dir_nr, off, fi->offset);
348 while (off - fi->offset >= 0 && off - fi->offset < rinfo->dir_nr) {
349 u64 pos = ceph_make_fpos(frag, off);
350 struct ceph_mds_reply_inode *in =
351 rinfo->dir_in[off - fi->offset].in;
352 dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n",
353 off, off - fi->offset, rinfo->dir_nr, pos,
354 rinfo->dir_dname_len[off - fi->offset],
355 rinfo->dir_dname[off - fi->offset], in);
356 BUG_ON(!in);
357 ftype = le32_to_cpu(in->mode) >> 12;
358 if (filldir(dirent,
359 rinfo->dir_dname[off - fi->offset],
360 rinfo->dir_dname_len[off - fi->offset],
361 pos,
362 le64_to_cpu(in->ino),
363 ftype) < 0) {
364 dout("filldir stopping us...\n");
365 return 0;
366 }
367 off++;
368 filp->f_pos = pos + 1;
369 }
370
371 if (fi->last_name) {
372 ceph_mdsc_put_request(fi->last_readdir);
373 fi->last_readdir = NULL;
374 goto more;
375 }
376
377 /* more frags? */
378 if (!ceph_frag_is_rightmost(frag)) {
379 frag = ceph_frag_next(frag);
380 off = 0;
381 filp->f_pos = ceph_make_fpos(frag, off);
382 dout("readdir next frag is %x\n", frag);
383 goto more;
384 }
385 fi->at_end = 1;
386
387 /*
388 * if dir_release_count still matches the dir, no dentries
389 * were released during the whole readdir, and we should have
390 * the complete dir contents in our cache.
391 */
392 spin_lock(&inode->i_lock);
393 if (ci->i_release_count == fi->dir_release_count) {
394 dout(" marking %p complete\n", inode);
395 ci->i_ceph_flags |= CEPH_I_COMPLETE;
396 ci->i_max_offset = filp->f_pos;
397 }
398 spin_unlock(&inode->i_lock);
399
400 dout("readdir %p filp %p done.\n", inode, filp);
401 return 0;
402}
403
404static void reset_readdir(struct ceph_file_info *fi)
405{
406 if (fi->last_readdir) {
407 ceph_mdsc_put_request(fi->last_readdir);
408 fi->last_readdir = NULL;
409 }
410 kfree(fi->last_name);
411 fi->next_offset = 2; /* compensate for . and .. */
412 if (fi->dentry) {
413 dput(fi->dentry);
414 fi->dentry = NULL;
415 }
416 fi->at_end = 0;
417}
418
419static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin)
420{
421 struct ceph_file_info *fi = file->private_data;
422 struct inode *inode = file->f_mapping->host;
423 loff_t old_offset = offset;
424 loff_t retval;
425
426 mutex_lock(&inode->i_mutex);
427 switch (origin) {
428 case SEEK_END:
429 offset += inode->i_size + 2; /* FIXME */
430 break;
431 case SEEK_CUR:
432 offset += file->f_pos;
433 }
434 retval = -EINVAL;
435 if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) {
436 if (offset != file->f_pos) {
437 file->f_pos = offset;
438 file->f_version = 0;
439 fi->at_end = 0;
440 }
441 retval = offset;
442
443 /*
444 * discard buffered readdir content on seekdir(0), or
445 * seek to new frag, or seek prior to current chunk.
446 */
447 if (offset == 0 ||
448 fpos_frag(offset) != fpos_frag(old_offset) ||
449 fpos_off(offset) < fi->offset) {
450 dout("dir_llseek dropping %p content\n", file);
451 reset_readdir(fi);
452 }
453
454 /* bump dir_release_count if we did a forward seek */
455 if (offset > old_offset)
456 fi->dir_release_count--;
457 }
458 mutex_unlock(&inode->i_mutex);
459 return retval;
460}
461
462/*
463 * Process result of a lookup/open request.
464 *
465 * Mainly, make sure we return the final req->r_dentry (if it already
466 * existed) in place of the original VFS-provided dentry when they
467 * differ.
468 *
469 * Gracefully handle the case where the MDS replies with -ENOENT and
470 * no trace (which it may do, at its discretion, e.g., if it doesn't
471 * care to issue a lease on the negative dentry).
472 */
473struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
474 struct dentry *dentry, int err)
475{
476 struct ceph_client *client = ceph_client(dentry->d_sb);
477 struct inode *parent = dentry->d_parent->d_inode;
478
479 /* .snap dir? */
480 if (err == -ENOENT &&
481 ceph_vino(parent).ino != CEPH_INO_ROOT && /* no .snap in root dir */
482 strcmp(dentry->d_name.name, client->mount_args.snapdir_name) == 0) {
483 struct inode *inode = ceph_get_snapdir(parent);
484 dout("ENOENT on snapdir %p '%.*s', linking to snapdir %p\n",
485 dentry, dentry->d_name.len, dentry->d_name.name, inode);
486 d_add(dentry, inode);
487 err = 0;
488 }
489
490 if (err == -ENOENT) {
491 /* no trace? */
492 err = 0;
493 if (!req->r_reply_info.head->is_dentry) {
494 dout("ENOENT and no trace, dentry %p inode %p\n",
495 dentry, dentry->d_inode);
496 if (dentry->d_inode) {
497 d_drop(dentry);
498 err = -ENOENT;
499 } else {
500 d_add(dentry, NULL);
501 }
502 }
503 }
504 if (err)
505 dentry = ERR_PTR(err);
506 else if (dentry != req->r_dentry)
507 dentry = dget(req->r_dentry); /* we got spliced */
508 else
509 dentry = NULL;
510 return dentry;
511}
512
513/*
514 * Look up a single dir entry. If there is a lookup intent, inform
515 * the MDS so that it gets our 'caps wanted' value in a single op.
516 */
517static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
518 struct nameidata *nd)
519{
520 struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
521 struct ceph_mds_client *mdsc = &client->mdsc;
522 struct ceph_mds_request *req;
523 int op;
524 int err;
525
526 dout("lookup %p dentry %p '%.*s'\n",
527 dir, dentry, dentry->d_name.len, dentry->d_name.name);
528
529 if (dentry->d_name.len > NAME_MAX)
530 return ERR_PTR(-ENAMETOOLONG);
531
532 err = ceph_init_dentry(dentry);
533 if (err < 0)
534 return ERR_PTR(err);
535
536 /* open (but not create!) intent? */
537 if (nd &&
538 (nd->flags & LOOKUP_OPEN) &&
539 (nd->flags & LOOKUP_CONTINUE) == 0 && /* only open last component */
540 !(nd->intent.open.flags & O_CREAT)) {
541 int mode = nd->intent.open.create_mode & ~current->fs->umask;
542 return ceph_lookup_open(dir, dentry, nd, mode, 1);
543 }
544
545 /* can we conclude ENOENT locally? */
546 if (dentry->d_inode == NULL) {
547 struct ceph_inode_info *ci = ceph_inode(dir);
548 struct ceph_dentry_info *di = ceph_dentry(dentry);
549
550 spin_lock(&dir->i_lock);
551 dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags);
552 if (strncmp(dentry->d_name.name,
553 client->mount_args.snapdir_name,
554 dentry->d_name.len) &&
555 (ci->i_ceph_flags & CEPH_I_COMPLETE) &&
556 (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
557 di->offset = ci->i_max_offset++;
558 spin_unlock(&dir->i_lock);
559 dout(" dir %p complete, -ENOENT\n", dir);
560 d_add(dentry, NULL);
561 di->lease_shared_gen = ci->i_shared_gen;
562 return NULL;
563 }
564 spin_unlock(&dir->i_lock);
565 }
566
567 op = ceph_snap(dir) == CEPH_SNAPDIR ?
568 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
569 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
570 if (IS_ERR(req))
571 return ERR_PTR(PTR_ERR(req));
572 req->r_dentry = dget(dentry);
573 req->r_num_caps = 2;
574 /* we only need inode linkage */
575 req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
576 req->r_locked_dir = dir;
577 err = ceph_mdsc_do_request(mdsc, NULL, req);
578 dentry = ceph_finish_lookup(req, dentry, err);
579 ceph_mdsc_put_request(req); /* will dput(dentry) */
580 dout("lookup result=%p\n", dentry);
581 return dentry;
582}
583
584/*
585 * If we do a create but get no trace back from the MDS, follow up with
586 * a lookup (the VFS expects us to link up the provided dentry).
587 */
588int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
589{
590 struct dentry *result = ceph_lookup(dir, dentry, NULL);
591
592 if (result && !IS_ERR(result)) {
593 /*
594 * We created the item, then did a lookup, and found
595 * it was already linked to another inode we already
596 * had in our cache (and thus got spliced). Link our
597 * dentry to that inode, but don't hash it, just in
598 * case the VFS wants to dereference it.
599 */
600 BUG_ON(!result->d_inode);
601 d_instantiate(dentry, result->d_inode);
602 return 0;
603 }
604 return PTR_ERR(result);
605}
606
607static int ceph_mknod(struct inode *dir, struct dentry *dentry,
608 int mode, dev_t rdev)
609{
610 struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
611 struct ceph_mds_client *mdsc = &client->mdsc;
612 struct ceph_mds_request *req;
613 int err;
614
615 if (ceph_snap(dir) != CEPH_NOSNAP)
616 return -EROFS;
617
618 dout("mknod in dir %p dentry %p mode 0%o rdev %d\n",
619 dir, dentry, mode, rdev);
620 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
621 if (IS_ERR(req)) {
622 d_drop(dentry);
623 return PTR_ERR(req);
624 }
625 req->r_dentry = dget(dentry);
626 req->r_num_caps = 2;
627 req->r_locked_dir = dir;
628 req->r_args.mknod.mode = cpu_to_le32(mode);
629 req->r_args.mknod.rdev = cpu_to_le32(rdev);
630 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
631 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
632 err = ceph_mdsc_do_request(mdsc, dir, req);
633 if (!err && !req->r_reply_info.head->is_dentry)
634 err = ceph_handle_notrace_create(dir, dentry);
635 ceph_mdsc_put_request(req);
636 if (err)
637 d_drop(dentry);
638 return err;
639}
640
641static int ceph_create(struct inode *dir, struct dentry *dentry, int mode,
642 struct nameidata *nd)
643{
644 dout("create in dir %p dentry %p name '%.*s'\n",
645 dir, dentry, dentry->d_name.len, dentry->d_name.name);
646
647 if (ceph_snap(dir) != CEPH_NOSNAP)
648 return -EROFS;
649
650 if (nd) {
651 BUG_ON((nd->flags & LOOKUP_OPEN) == 0);
652 dentry = ceph_lookup_open(dir, dentry, nd, mode, 0);
653 /* hrm, what should i do here if we get aliased? */
654 if (IS_ERR(dentry))
655 return PTR_ERR(dentry);
656 return 0;
657 }
658
659 /* fall back to mknod */
660 return ceph_mknod(dir, dentry, (mode & ~S_IFMT) | S_IFREG, 0);
661}
662
663static int ceph_symlink(struct inode *dir, struct dentry *dentry,
664 const char *dest)
665{
666 struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
667 struct ceph_mds_client *mdsc = &client->mdsc;
668 struct ceph_mds_request *req;
669 int err;
670
671 if (ceph_snap(dir) != CEPH_NOSNAP)
672 return -EROFS;
673
674 dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
675 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
676 if (IS_ERR(req)) {
677 d_drop(dentry);
678 return PTR_ERR(req);
679 }
680 req->r_dentry = dget(dentry);
681 req->r_num_caps = 2;
682 req->r_path2 = kstrdup(dest, GFP_NOFS);
683 req->r_locked_dir = dir;
684 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
685 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
686 err = ceph_mdsc_do_request(mdsc, dir, req);
687 if (!err && !req->r_reply_info.head->is_dentry)
688 err = ceph_handle_notrace_create(dir, dentry);
689 ceph_mdsc_put_request(req);
690 if (err)
691 d_drop(dentry);
692 return err;
693}
694
695static int ceph_mkdir(struct inode *dir, struct dentry *dentry, int mode)
696{
697 struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
698 struct ceph_mds_client *mdsc = &client->mdsc;
699 struct ceph_mds_request *req;
700 int err = -EROFS;
701 int op;
702
703 if (ceph_snap(dir) == CEPH_SNAPDIR) {
704 /* mkdir .snap/foo is a MKSNAP */
705 op = CEPH_MDS_OP_MKSNAP;
706 dout("mksnap dir %p snap '%.*s' dn %p\n", dir,
707 dentry->d_name.len, dentry->d_name.name, dentry);
708 } else if (ceph_snap(dir) == CEPH_NOSNAP) {
709 dout("mkdir dir %p dn %p mode 0%o\n", dir, dentry, mode);
710 op = CEPH_MDS_OP_MKDIR;
711 } else {
712 goto out;
713 }
714 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
715 if (IS_ERR(req)) {
716 err = PTR_ERR(req);
717 goto out;
718 }
719
720 req->r_dentry = dget(dentry);
721 req->r_num_caps = 2;
722 req->r_locked_dir = dir;
723 req->r_args.mkdir.mode = cpu_to_le32(mode);
724 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
725 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
726 err = ceph_mdsc_do_request(mdsc, dir, req);
727 if (!err && !req->r_reply_info.head->is_dentry)
728 err = ceph_handle_notrace_create(dir, dentry);
729 ceph_mdsc_put_request(req);
730out:
731 if (err < 0)
732 d_drop(dentry);
733 return err;
734}
735
736static int ceph_link(struct dentry *old_dentry, struct inode *dir,
737 struct dentry *dentry)
738{
739 struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
740 struct ceph_mds_client *mdsc = &client->mdsc;
741 struct ceph_mds_request *req;
742 int err;
743
744 if (ceph_snap(dir) != CEPH_NOSNAP)
745 return -EROFS;
746
747 dout("link in dir %p old_dentry %p dentry %p\n", dir,
748 old_dentry, dentry);
749 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
750 if (IS_ERR(req)) {
751 d_drop(dentry);
752 return PTR_ERR(req);
753 }
754 req->r_dentry = dget(dentry);
755 req->r_num_caps = 2;
756 req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */
757 req->r_locked_dir = dir;
758 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
759 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
760 err = ceph_mdsc_do_request(mdsc, dir, req);
761 if (err)
762 d_drop(dentry);
763 else if (!req->r_reply_info.head->is_dentry)
764 d_instantiate(dentry, igrab(old_dentry->d_inode));
765 ceph_mdsc_put_request(req);
766 return err;
767}
768
769/*
770 * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps. If it
771 * looks like the link count will hit 0, drop any other caps (other
772 * than PIN) we don't specifically want (due to the file still being
773 * open).
774 */
775static int drop_caps_for_unlink(struct inode *inode)
776{
777 struct ceph_inode_info *ci = ceph_inode(inode);
778 int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
779
780 spin_lock(&inode->i_lock);
781 if (inode->i_nlink == 1) {
782 drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
783 ci->i_ceph_flags |= CEPH_I_NODELAY;
784 }
785 spin_unlock(&inode->i_lock);
786 return drop;
787}
788
789/*
790 * rmdir and unlink are differ only by the metadata op code
791 */
792static int ceph_unlink(struct inode *dir, struct dentry *dentry)
793{
794 struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
795 struct ceph_mds_client *mdsc = &client->mdsc;
796 struct inode *inode = dentry->d_inode;
797 struct ceph_mds_request *req;
798 int err = -EROFS;
799 int op;
800
801 if (ceph_snap(dir) == CEPH_SNAPDIR) {
802 /* rmdir .snap/foo is RMSNAP */
803 dout("rmsnap dir %p '%.*s' dn %p\n", dir, dentry->d_name.len,
804 dentry->d_name.name, dentry);
805 op = CEPH_MDS_OP_RMSNAP;
806 } else if (ceph_snap(dir) == CEPH_NOSNAP) {
807 dout("unlink/rmdir dir %p dn %p inode %p\n",
808 dir, dentry, inode);
809 op = ((dentry->d_inode->i_mode & S_IFMT) == S_IFDIR) ?
810 CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
811 } else
812 goto out;
813 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
814 if (IS_ERR(req)) {
815 err = PTR_ERR(req);
816 goto out;
817 }
818 req->r_dentry = dget(dentry);
819 req->r_num_caps = 2;
820 req->r_locked_dir = dir;
821 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
822 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
823 req->r_inode_drop = drop_caps_for_unlink(inode);
824 err = ceph_mdsc_do_request(mdsc, dir, req);
825 if (!err && !req->r_reply_info.head->is_dentry)
826 d_delete(dentry);
827 ceph_mdsc_put_request(req);
828out:
829 return err;
830}
831
832static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
833 struct inode *new_dir, struct dentry *new_dentry)
834{
835 struct ceph_client *client = ceph_sb_to_client(old_dir->i_sb);
836 struct ceph_mds_client *mdsc = &client->mdsc;
837 struct ceph_mds_request *req;
838 int err;
839
840 if (ceph_snap(old_dir) != ceph_snap(new_dir))
841 return -EXDEV;
842 if (ceph_snap(old_dir) != CEPH_NOSNAP ||
843 ceph_snap(new_dir) != CEPH_NOSNAP)
844 return -EROFS;
845 dout("rename dir %p dentry %p to dir %p dentry %p\n",
846 old_dir, old_dentry, new_dir, new_dentry);
847 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RENAME, USE_AUTH_MDS);
848 if (IS_ERR(req))
849 return PTR_ERR(req);
850 req->r_dentry = dget(new_dentry);
851 req->r_num_caps = 2;
852 req->r_old_dentry = dget(old_dentry);
853 req->r_locked_dir = new_dir;
854 req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
855 req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
856 req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
857 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
858 /* release LINK_RDCACHE on source inode (mds will lock it) */
859 req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
860 if (new_dentry->d_inode)
861 req->r_inode_drop = drop_caps_for_unlink(new_dentry->d_inode);
862 err = ceph_mdsc_do_request(mdsc, old_dir, req);
863 if (!err && !req->r_reply_info.head->is_dentry) {
864 /*
865 * Normally d_move() is done by fill_trace (called by
866 * do_request, above). If there is no trace, we need
867 * to do it here.
868 */
869 d_move(old_dentry, new_dentry);
870 }
871 ceph_mdsc_put_request(req);
872 return err;
873}
874
875
876/*
877 * Check if dentry lease is valid. If not, delete the lease. Try to
878 * renew if the least is more than half up.
879 */
880static int dentry_lease_is_valid(struct dentry *dentry)
881{
882 struct ceph_dentry_info *di;
883 struct ceph_mds_session *s;
884 int valid = 0;
885 u32 gen;
886 unsigned long ttl;
887 struct ceph_mds_session *session = NULL;
888 struct inode *dir = NULL;
889 u32 seq = 0;
890
891 spin_lock(&dentry->d_lock);
892 di = ceph_dentry(dentry);
893 if (di && di->lease_session) {
894 s = di->lease_session;
895 spin_lock(&s->s_cap_lock);
896 gen = s->s_cap_gen;
897 ttl = s->s_cap_ttl;
898 spin_unlock(&s->s_cap_lock);
899
900 if (di->lease_gen == gen &&
901 time_before(jiffies, dentry->d_time) &&
902 time_before(jiffies, ttl)) {
903 valid = 1;
904 if (di->lease_renew_after &&
905 time_after(jiffies, di->lease_renew_after)) {
906 /* we should renew */
907 dir = dentry->d_parent->d_inode;
908 session = ceph_get_mds_session(s);
909 seq = di->lease_seq;
910 di->lease_renew_after = 0;
911 di->lease_renew_from = jiffies;
912 }
913 } else {
914 __ceph_mdsc_drop_dentry_lease(dentry);
915 }
916 }
917 spin_unlock(&dentry->d_lock);
918
919 if (session) {
920 ceph_mdsc_lease_send_msg(session, dir, dentry,
921 CEPH_MDS_LEASE_RENEW, seq);
922 ceph_put_mds_session(session);
923 }
924 dout("dentry_lease_is_valid - dentry %p = %d\n", dentry, valid);
925 return valid;
926}
927
928/*
929 * Check if directory-wide content lease/cap is valid.
930 */
931static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
932{
933 struct ceph_inode_info *ci = ceph_inode(dir);
934 struct ceph_dentry_info *di = ceph_dentry(dentry);
935 int valid = 0;
936
937 spin_lock(&dir->i_lock);
938 if (ci->i_shared_gen == di->lease_shared_gen)
939 valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
940 spin_unlock(&dir->i_lock);
941 dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n",
942 dir, (unsigned)ci->i_shared_gen, dentry,
943 (unsigned)di->lease_shared_gen, valid);
944 return valid;
945}
946
947/*
948 * Check if cached dentry can be trusted.
949 */
950static int ceph_d_revalidate(struct dentry *dentry, struct nameidata *nd)
951{
952 struct inode *dir = dentry->d_parent->d_inode;
953
954 dout("d_revalidate %p '%.*s' inode %p\n", dentry,
955 dentry->d_name.len, dentry->d_name.name, dentry->d_inode);
956
957 /* always trust cached snapped dentries, snapdir dentry */
958 if (ceph_snap(dir) != CEPH_NOSNAP) {
959 dout("d_revalidate %p '%.*s' inode %p is SNAPPED\n", dentry,
960 dentry->d_name.len, dentry->d_name.name, dentry->d_inode);
961 goto out_touch;
962 }
963 if (dentry->d_inode && ceph_snap(dentry->d_inode) == CEPH_SNAPDIR)
964 goto out_touch;
965
966 if (dentry_lease_is_valid(dentry) ||
967 dir_lease_is_valid(dir, dentry))
968 goto out_touch;
969
970 dout("d_revalidate %p invalid\n", dentry);
971 d_drop(dentry);
972 return 0;
973out_touch:
974 ceph_dentry_lru_touch(dentry);
975 return 1;
976}
977
978/*
979 * When a dentry is released, clear the dir I_COMPLETE if it was part
980 * of the current dir gen.
981 */
982static void ceph_dentry_release(struct dentry *dentry)
983{
984 struct ceph_dentry_info *di = ceph_dentry(dentry);
985 struct inode *parent_inode = dentry->d_parent->d_inode;
986
987 if (parent_inode) {
988 struct ceph_inode_info *ci = ceph_inode(parent_inode);
989
990 spin_lock(&parent_inode->i_lock);
991 if (ci->i_shared_gen == di->lease_shared_gen) {
992 dout(" clearing %p complete (d_release)\n",
993 parent_inode);
994 ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
995 ci->i_release_count++;
996 }
997 spin_unlock(&parent_inode->i_lock);
998 }
999 if (di) {
1000 ceph_dentry_lru_del(dentry);
1001 if (di->lease_session)
1002 ceph_put_mds_session(di->lease_session);
1003 kmem_cache_free(ceph_dentry_cachep, di);
1004 dentry->d_fsdata = NULL;
1005 }
1006}
1007
1008static int ceph_snapdir_d_revalidate(struct dentry *dentry,
1009 struct nameidata *nd)
1010{
1011 /*
1012 * Eventually, we'll want to revalidate snapped metadata
1013 * too... probably...
1014 */
1015 return 1;
1016}
1017
1018
1019
1020/*
1021 * read() on a dir. This weird interface hack only works if mounted
1022 * with '-o dirstat'.
1023 */
1024static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
1025 loff_t *ppos)
1026{
1027 struct ceph_file_info *cf = file->private_data;
1028 struct inode *inode = file->f_dentry->d_inode;
1029 struct ceph_inode_info *ci = ceph_inode(inode);
1030 int left;
1031
1032 if (!ceph_test_opt(ceph_client(inode->i_sb), DIRSTAT))
1033 return -EISDIR;
1034
1035 if (!cf->dir_info) {
1036 cf->dir_info = kmalloc(1024, GFP_NOFS);
1037 if (!cf->dir_info)
1038 return -ENOMEM;
1039 cf->dir_info_len =
1040 sprintf(cf->dir_info,
1041 "entries: %20lld\n"
1042 " files: %20lld\n"
1043 " subdirs: %20lld\n"
1044 "rentries: %20lld\n"
1045 " rfiles: %20lld\n"
1046 " rsubdirs: %20lld\n"
1047 "rbytes: %20lld\n"
1048 "rctime: %10ld.%09ld\n",
1049 ci->i_files + ci->i_subdirs,
1050 ci->i_files,
1051 ci->i_subdirs,
1052 ci->i_rfiles + ci->i_rsubdirs,
1053 ci->i_rfiles,
1054 ci->i_rsubdirs,
1055 ci->i_rbytes,
1056 (long)ci->i_rctime.tv_sec,
1057 (long)ci->i_rctime.tv_nsec);
1058 }
1059
1060 if (*ppos >= cf->dir_info_len)
1061 return 0;
1062 size = min_t(unsigned, size, cf->dir_info_len-*ppos);
1063 left = copy_to_user(buf, cf->dir_info + *ppos, size);
1064 if (left == size)
1065 return -EFAULT;
1066 *ppos += (size - left);
1067 return size - left;
1068}
1069
1070/*
1071 * an fsync() on a dir will wait for any uncommitted directory
1072 * operations to commit.
1073 */
1074static int ceph_dir_fsync(struct file *file, struct dentry *dentry,
1075 int datasync)
1076{
1077 struct inode *inode = dentry->d_inode;
1078 struct ceph_inode_info *ci = ceph_inode(inode);
1079 struct list_head *head = &ci->i_unsafe_dirops;
1080 struct ceph_mds_request *req;
1081 u64 last_tid;
1082 int ret = 0;
1083
1084 dout("dir_fsync %p\n", inode);
1085 spin_lock(&ci->i_unsafe_lock);
1086 if (list_empty(head))
1087 goto out;
1088
1089 req = list_entry(head->prev,
1090 struct ceph_mds_request, r_unsafe_dir_item);
1091 last_tid = req->r_tid;
1092
1093 do {
1094 ceph_mdsc_get_request(req);
1095 spin_unlock(&ci->i_unsafe_lock);
1096 dout("dir_fsync %p wait on tid %llu (until %llu)\n",
1097 inode, req->r_tid, last_tid);
1098 if (req->r_timeout) {
1099 ret = wait_for_completion_timeout(
1100 &req->r_safe_completion, req->r_timeout);
1101 if (ret > 0)
1102 ret = 0;
1103 else if (ret == 0)
1104 ret = -EIO; /* timed out */
1105 } else {
1106 wait_for_completion(&req->r_safe_completion);
1107 }
1108 spin_lock(&ci->i_unsafe_lock);
1109 ceph_mdsc_put_request(req);
1110
1111 if (ret || list_empty(head))
1112 break;
1113 req = list_entry(head->next,
1114 struct ceph_mds_request, r_unsafe_dir_item);
1115 } while (req->r_tid < last_tid);
1116out:
1117 spin_unlock(&ci->i_unsafe_lock);
1118 return ret;
1119}
1120
1121/*
1122 * We maintain a private dentry LRU.
1123 *
1124 * FIXME: this needs to be changed to a per-mds lru to be useful.
1125 */
1126void ceph_dentry_lru_add(struct dentry *dn)
1127{
1128 struct ceph_dentry_info *di = ceph_dentry(dn);
1129 struct ceph_mds_client *mdsc;
1130 dout("dentry_lru_add %p %p\t%.*s\n",
1131 di, dn, dn->d_name.len, dn->d_name.name);
1132
1133 if (di) {
1134 mdsc = &ceph_client(dn->d_sb)->mdsc;
1135 spin_lock(&mdsc->dentry_lru_lock);
1136 list_add_tail(&di->lru, &mdsc->dentry_lru);
1137 mdsc->num_dentry++;
1138 spin_unlock(&mdsc->dentry_lru_lock);
1139 }
1140}
1141
1142void ceph_dentry_lru_touch(struct dentry *dn)
1143{
1144 struct ceph_dentry_info *di = ceph_dentry(dn);
1145 struct ceph_mds_client *mdsc;
1146 dout("dentry_lru_touch %p %p\t%.*s\n",
1147 di, dn, dn->d_name.len, dn->d_name.name);
1148
1149 if (di) {
1150 mdsc = &ceph_client(dn->d_sb)->mdsc;
1151 spin_lock(&mdsc->dentry_lru_lock);
1152 list_move_tail(&di->lru, &mdsc->dentry_lru);
1153 spin_unlock(&mdsc->dentry_lru_lock);
1154 }
1155}
1156
1157void ceph_dentry_lru_del(struct dentry *dn)
1158{
1159 struct ceph_dentry_info *di = ceph_dentry(dn);
1160 struct ceph_mds_client *mdsc;
1161
1162 dout("dentry_lru_del %p %p\t%.*s\n",
1163 di, dn, dn->d_name.len, dn->d_name.name);
1164 if (di) {
1165 mdsc = &ceph_client(dn->d_sb)->mdsc;
1166 spin_lock(&mdsc->dentry_lru_lock);
1167 list_del_init(&di->lru);
1168 mdsc->num_dentry--;
1169 spin_unlock(&mdsc->dentry_lru_lock);
1170 }
1171}
1172
1173const struct file_operations ceph_dir_fops = {
1174 .read = ceph_read_dir,
1175 .readdir = ceph_readdir,
1176 .llseek = ceph_dir_llseek,
1177 .open = ceph_open,
1178 .release = ceph_release,
1179 .unlocked_ioctl = ceph_ioctl,
1180 .fsync = ceph_dir_fsync,
1181};
1182
1183const struct inode_operations ceph_dir_iops = {
1184 .lookup = ceph_lookup,
1185 .permission = ceph_permission,
1186 .getattr = ceph_getattr,
1187 .setattr = ceph_setattr,
1188 .setxattr = ceph_setxattr,
1189 .getxattr = ceph_getxattr,
1190 .listxattr = ceph_listxattr,
1191 .removexattr = ceph_removexattr,
1192 .mknod = ceph_mknod,
1193 .symlink = ceph_symlink,
1194 .mkdir = ceph_mkdir,
1195 .link = ceph_link,
1196 .unlink = ceph_unlink,
1197 .rmdir = ceph_unlink,
1198 .rename = ceph_rename,
1199 .create = ceph_create,
1200};
1201
1202struct dentry_operations ceph_dentry_ops = {
1203 .d_revalidate = ceph_d_revalidate,
1204 .d_release = ceph_dentry_release,
1205};
1206
1207struct dentry_operations ceph_snapdir_dentry_ops = {
1208 .d_revalidate = ceph_snapdir_d_revalidate,
1209};
1210
1211struct dentry_operations ceph_snap_dentry_ops = {
1212};