aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-10-06 14:31:08 -0400
committerSage Weil <sage@newdream.net>2009-10-06 14:31:08 -0400
commit355da1eb7a1f91c276b991764e951bbcd8047599 (patch)
tree18b30761cbbeaa2b104957f5d50fb4c5296a52c5
parent16725b9d2a2e3d0fd2b0034482e2eb0a2d78050f (diff)
ceph: inode operations
Inode cache and inode operations. We also include routines to incorporate metadata structures returned by the MDS into the client cache, and some helpers to deal with file capabilities and metadata leases. The bulk of that work is done by fill_inode() and fill_trace(). Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--fs/ceph/inode.c1620
-rw-r--r--fs/ceph/xattr.c833
2 files changed, 2453 insertions, 0 deletions
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
new file mode 100644
index 00000000000..6097af79004
--- /dev/null
+++ b/fs/ceph/inode.c
@@ -0,0 +1,1620 @@
1#include "ceph_debug.h"
2
3#include <linux/module.h>
4#include <linux/fs.h>
5#include <linux/smp_lock.h>
6#include <linux/slab.h>
7#include <linux/string.h>
8#include <linux/uaccess.h>
9#include <linux/kernel.h>
10#include <linux/namei.h>
11#include <linux/writeback.h>
12#include <linux/vmalloc.h>
13
14#include "super.h"
15#include "decode.h"
16
17/*
18 * Ceph inode operations
19 *
20 * Implement basic inode helpers (get, alloc) and inode ops (getattr,
21 * setattr, etc.), xattr helpers, and helpers for assimilating
22 * metadata returned by the MDS into our cache.
23 *
24 * Also define helpers for doing asynchronous writeback, invalidation,
25 * and truncation for the benefit of those who can't afford to block
26 * (typically because they are in the message handler path).
27 */
28
29static const struct inode_operations ceph_symlink_iops;
30
31static void ceph_inode_invalidate_pages(struct work_struct *work);
32
33/*
34 * find or create an inode, given the ceph ino number
35 */
36struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
37{
38 struct inode *inode;
39 ino_t t = ceph_vino_to_ino(vino);
40
41 inode = iget5_locked(sb, t, ceph_ino_compare, ceph_set_ino_cb, &vino);
42 if (inode == NULL)
43 return ERR_PTR(-ENOMEM);
44 if (inode->i_state & I_NEW) {
45 dout("get_inode created new inode %p %llx.%llx ino %llx\n",
46 inode, ceph_vinop(inode), (u64)inode->i_ino);
47 unlock_new_inode(inode);
48 }
49
50 dout("get_inode on %lu=%llx.%llx got %p\n", inode->i_ino, vino.ino,
51 vino.snap, inode);
52 return inode;
53}
54
55/*
56 * get/constuct snapdir inode for a given directory
57 */
58struct inode *ceph_get_snapdir(struct inode *parent)
59{
60 struct ceph_vino vino = {
61 .ino = ceph_ino(parent),
62 .snap = CEPH_SNAPDIR,
63 };
64 struct inode *inode = ceph_get_inode(parent->i_sb, vino);
65
66 BUG_ON(!S_ISDIR(parent->i_mode));
67 if (IS_ERR(inode))
68 return ERR_PTR(PTR_ERR(inode));
69 inode->i_mode = parent->i_mode;
70 inode->i_uid = parent->i_uid;
71 inode->i_gid = parent->i_gid;
72 inode->i_op = &ceph_dir_iops;
73 inode->i_fop = &ceph_dir_fops;
74 ceph_inode(inode)->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
75 return inode;
76}
77
78const struct inode_operations ceph_file_iops = {
79 .permission = ceph_permission,
80 .setattr = ceph_setattr,
81 .getattr = ceph_getattr,
82 .setxattr = ceph_setxattr,
83 .getxattr = ceph_getxattr,
84 .listxattr = ceph_listxattr,
85 .removexattr = ceph_removexattr,
86};
87
88
89/*
90 * We use a 'frag tree' to keep track of the MDS's directory fragments
91 * for a given inode (usually there is just a single fragment). We
92 * need to know when a child frag is delegated to a new MDS, or when
93 * it is flagged as replicated, so we can direct our requests
94 * accordingly.
95 */
96
97/*
98 * find/create a frag in the tree
99 */
100static struct ceph_inode_frag *__get_or_create_frag(struct ceph_inode_info *ci,
101 u32 f)
102{
103 struct rb_node **p;
104 struct rb_node *parent = NULL;
105 struct ceph_inode_frag *frag;
106 int c;
107
108 p = &ci->i_fragtree.rb_node;
109 while (*p) {
110 parent = *p;
111 frag = rb_entry(parent, struct ceph_inode_frag, node);
112 c = ceph_frag_compare(f, frag->frag);
113 if (c < 0)
114 p = &(*p)->rb_left;
115 else if (c > 0)
116 p = &(*p)->rb_right;
117 else
118 return frag;
119 }
120
121 frag = kmalloc(sizeof(*frag), GFP_NOFS);
122 if (!frag) {
123 pr_err("__get_or_create_frag ENOMEM on %p %llx.%llx "
124 "frag %x\n", &ci->vfs_inode,
125 ceph_vinop(&ci->vfs_inode), f);
126 return ERR_PTR(-ENOMEM);
127 }
128 frag->frag = f;
129 frag->split_by = 0;
130 frag->mds = -1;
131 frag->ndist = 0;
132
133 rb_link_node(&frag->node, parent, p);
134 rb_insert_color(&frag->node, &ci->i_fragtree);
135
136 dout("get_or_create_frag added %llx.%llx frag %x\n",
137 ceph_vinop(&ci->vfs_inode), f);
138 return frag;
139}
140
141/*
142 * find a specific frag @f
143 */
144struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, u32 f)
145{
146 struct rb_node *n = ci->i_fragtree.rb_node;
147
148 while (n) {
149 struct ceph_inode_frag *frag =
150 rb_entry(n, struct ceph_inode_frag, node);
151 int c = ceph_frag_compare(f, frag->frag);
152 if (c < 0)
153 n = n->rb_left;
154 else if (c > 0)
155 n = n->rb_right;
156 else
157 return frag;
158 }
159 return NULL;
160}
161
162/*
163 * Choose frag containing the given value @v. If @pfrag is
164 * specified, copy the frag delegation info to the caller if
165 * it is present.
166 */
167u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
168 struct ceph_inode_frag *pfrag,
169 int *found)
170{
171 u32 t = ceph_frag_make(0, 0);
172 struct ceph_inode_frag *frag;
173 unsigned nway, i;
174 u32 n;
175
176 if (found)
177 *found = 0;
178
179 mutex_lock(&ci->i_fragtree_mutex);
180 while (1) {
181 WARN_ON(!ceph_frag_contains_value(t, v));
182 frag = __ceph_find_frag(ci, t);
183 if (!frag)
184 break; /* t is a leaf */
185 if (frag->split_by == 0) {
186 if (pfrag)
187 memcpy(pfrag, frag, sizeof(*pfrag));
188 if (found)
189 *found = 1;
190 break;
191 }
192
193 /* choose child */
194 nway = 1 << frag->split_by;
195 dout("choose_frag(%x) %x splits by %d (%d ways)\n", v, t,
196 frag->split_by, nway);
197 for (i = 0; i < nway; i++) {
198 n = ceph_frag_make_child(t, frag->split_by, i);
199 if (ceph_frag_contains_value(n, v)) {
200 t = n;
201 break;
202 }
203 }
204 BUG_ON(i == nway);
205 }
206 dout("choose_frag(%x) = %x\n", v, t);
207
208 mutex_unlock(&ci->i_fragtree_mutex);
209 return t;
210}
211
212/*
213 * Process dirfrag (delegation) info from the mds. Include leaf
214 * fragment in tree ONLY if ndist > 0. Otherwise, only
215 * branches/splits are included in i_fragtree)
216 */
217static int ceph_fill_dirfrag(struct inode *inode,
218 struct ceph_mds_reply_dirfrag *dirinfo)
219{
220 struct ceph_inode_info *ci = ceph_inode(inode);
221 struct ceph_inode_frag *frag;
222 u32 id = le32_to_cpu(dirinfo->frag);
223 int mds = le32_to_cpu(dirinfo->auth);
224 int ndist = le32_to_cpu(dirinfo->ndist);
225 int i;
226 int err = 0;
227
228 mutex_lock(&ci->i_fragtree_mutex);
229 if (ndist == 0) {
230 /* no delegation info needed. */
231 frag = __ceph_find_frag(ci, id);
232 if (!frag)
233 goto out;
234 if (frag->split_by == 0) {
235 /* tree leaf, remove */
236 dout("fill_dirfrag removed %llx.%llx frag %x"
237 " (no ref)\n", ceph_vinop(inode), id);
238 rb_erase(&frag->node, &ci->i_fragtree);
239 kfree(frag);
240 } else {
241 /* tree branch, keep and clear */
242 dout("fill_dirfrag cleared %llx.%llx frag %x"
243 " referral\n", ceph_vinop(inode), id);
244 frag->mds = -1;
245 frag->ndist = 0;
246 }
247 goto out;
248 }
249
250
251 /* find/add this frag to store mds delegation info */
252 frag = __get_or_create_frag(ci, id);
253 if (IS_ERR(frag)) {
254 /* this is not the end of the world; we can continue
255 with bad/inaccurate delegation info */
256 pr_err("fill_dirfrag ENOMEM on mds ref %llx.%llx fg %x\n",
257 ceph_vinop(inode), le32_to_cpu(dirinfo->frag));
258 err = -ENOMEM;
259 goto out;
260 }
261
262 frag->mds = mds;
263 frag->ndist = min_t(u32, ndist, CEPH_MAX_DIRFRAG_REP);
264 for (i = 0; i < frag->ndist; i++)
265 frag->dist[i] = le32_to_cpu(dirinfo->dist[i]);
266 dout("fill_dirfrag %llx.%llx frag %x ndist=%d\n",
267 ceph_vinop(inode), frag->frag, frag->ndist);
268
269out:
270 mutex_unlock(&ci->i_fragtree_mutex);
271 return err;
272}
273
274
275/*
276 * initialize a newly allocated inode.
277 */
278struct inode *ceph_alloc_inode(struct super_block *sb)
279{
280 struct ceph_inode_info *ci;
281 int i;
282
283 ci = kmem_cache_alloc(ceph_inode_cachep, GFP_NOFS);
284 if (!ci)
285 return NULL;
286
287 dout("alloc_inode %p\n", &ci->vfs_inode);
288
289 ci->i_version = 0;
290 ci->i_time_warp_seq = 0;
291 ci->i_ceph_flags = 0;
292 ci->i_release_count = 0;
293 ci->i_symlink = NULL;
294
295 ci->i_fragtree = RB_ROOT;
296 mutex_init(&ci->i_fragtree_mutex);
297
298 ci->i_xattrs.blob = NULL;
299 ci->i_xattrs.prealloc_blob = NULL;
300 ci->i_xattrs.dirty = false;
301 ci->i_xattrs.index = RB_ROOT;
302 ci->i_xattrs.count = 0;
303 ci->i_xattrs.names_size = 0;
304 ci->i_xattrs.vals_size = 0;
305 ci->i_xattrs.version = 0;
306 ci->i_xattrs.index_version = 0;
307
308 ci->i_caps = RB_ROOT;
309 ci->i_auth_cap = NULL;
310 ci->i_dirty_caps = 0;
311 ci->i_flushing_caps = 0;
312 INIT_LIST_HEAD(&ci->i_dirty_item);
313 INIT_LIST_HEAD(&ci->i_flushing_item);
314 ci->i_cap_flush_seq = 0;
315 ci->i_cap_flush_last_tid = 0;
316 memset(&ci->i_cap_flush_tid, 0, sizeof(ci->i_cap_flush_tid));
317 init_waitqueue_head(&ci->i_cap_wq);
318 ci->i_hold_caps_min = 0;
319 ci->i_hold_caps_max = 0;
320 INIT_LIST_HEAD(&ci->i_cap_delay_list);
321 ci->i_cap_exporting_mds = 0;
322 ci->i_cap_exporting_mseq = 0;
323 ci->i_cap_exporting_issued = 0;
324 INIT_LIST_HEAD(&ci->i_cap_snaps);
325 ci->i_head_snapc = NULL;
326 ci->i_snap_caps = 0;
327
328 for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
329 ci->i_nr_by_mode[i] = 0;
330
331 ci->i_truncate_seq = 0;
332 ci->i_truncate_size = 0;
333 ci->i_truncate_pending = 0;
334
335 ci->i_max_size = 0;
336 ci->i_reported_size = 0;
337 ci->i_wanted_max_size = 0;
338 ci->i_requested_max_size = 0;
339
340 ci->i_pin_ref = 0;
341 ci->i_rd_ref = 0;
342 ci->i_rdcache_ref = 0;
343 ci->i_wr_ref = 0;
344 ci->i_wrbuffer_ref = 0;
345 ci->i_wrbuffer_ref_head = 0;
346 ci->i_shared_gen = 0;
347 ci->i_rdcache_gen = 0;
348 ci->i_rdcache_revoking = 0;
349
350 INIT_LIST_HEAD(&ci->i_unsafe_writes);
351 INIT_LIST_HEAD(&ci->i_unsafe_dirops);
352 spin_lock_init(&ci->i_unsafe_lock);
353
354 ci->i_snap_realm = NULL;
355 INIT_LIST_HEAD(&ci->i_snap_realm_item);
356 INIT_LIST_HEAD(&ci->i_snap_flush_item);
357
358 INIT_WORK(&ci->i_wb_work, ceph_inode_writeback);
359 INIT_WORK(&ci->i_pg_inv_work, ceph_inode_invalidate_pages);
360
361 INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
362
363 return &ci->vfs_inode;
364}
365
366void ceph_destroy_inode(struct inode *inode)
367{
368 struct ceph_inode_info *ci = ceph_inode(inode);
369 struct ceph_inode_frag *frag;
370 struct rb_node *n;
371
372 dout("destroy_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode));
373
374 ceph_queue_caps_release(inode);
375
376 kfree(ci->i_symlink);
377 while ((n = rb_first(&ci->i_fragtree)) != NULL) {
378 frag = rb_entry(n, struct ceph_inode_frag, node);
379 rb_erase(n, &ci->i_fragtree);
380 kfree(frag);
381 }
382
383 __ceph_destroy_xattrs(ci);
384 ceph_buffer_put(ci->i_xattrs.blob);
385 ceph_buffer_put(ci->i_xattrs.prealloc_blob);
386
387 kmem_cache_free(ceph_inode_cachep, ci);
388}
389
390
391/*
392 * Helpers to fill in size, ctime, mtime, and atime. We have to be
393 * careful because either the client or MDS may have more up to date
394 * info, depending on which capabilities are held, and whether
395 * time_warp_seq or truncate_seq have increased. (Ordinarily, mtime
396 * and size are monotonically increasing, except when utimes() or
397 * truncate() increments the corresponding _seq values.)
398 */
399int ceph_fill_file_size(struct inode *inode, int issued,
400 u32 truncate_seq, u64 truncate_size, u64 size)
401{
402 struct ceph_inode_info *ci = ceph_inode(inode);
403 int queue_trunc = 0;
404
405 if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
406 (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
407 dout("size %lld -> %llu\n", inode->i_size, size);
408 inode->i_size = size;
409 inode->i_blocks = (size + (1<<9) - 1) >> 9;
410 ci->i_reported_size = size;
411 if (truncate_seq != ci->i_truncate_seq) {
412 dout("truncate_seq %u -> %u\n",
413 ci->i_truncate_seq, truncate_seq);
414 ci->i_truncate_seq = truncate_seq;
415 if (issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
416 CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
417 CEPH_CAP_FILE_EXCL)) {
418 ci->i_truncate_pending++;
419 queue_trunc = 1;
420 }
421 }
422 }
423 if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) >= 0 &&
424 ci->i_truncate_size != truncate_size) {
425 dout("truncate_size %lld -> %llu\n", ci->i_truncate_size,
426 truncate_size);
427 ci->i_truncate_size = truncate_size;
428 }
429 return queue_trunc;
430}
431
432void ceph_fill_file_time(struct inode *inode, int issued,
433 u64 time_warp_seq, struct timespec *ctime,
434 struct timespec *mtime, struct timespec *atime)
435{
436 struct ceph_inode_info *ci = ceph_inode(inode);
437 int warn = 0;
438
439 if (issued & (CEPH_CAP_FILE_EXCL|
440 CEPH_CAP_FILE_WR|
441 CEPH_CAP_FILE_BUFFER)) {
442 if (timespec_compare(ctime, &inode->i_ctime) > 0) {
443 dout("ctime %ld.%09ld -> %ld.%09ld inc w/ cap\n",
444 inode->i_ctime.tv_sec, inode->i_ctime.tv_nsec,
445 ctime->tv_sec, ctime->tv_nsec);
446 inode->i_ctime = *ctime;
447 }
448 if (ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) > 0) {
449 /* the MDS did a utimes() */
450 dout("mtime %ld.%09ld -> %ld.%09ld "
451 "tw %d -> %d\n",
452 inode->i_mtime.tv_sec, inode->i_mtime.tv_nsec,
453 mtime->tv_sec, mtime->tv_nsec,
454 ci->i_time_warp_seq, (int)time_warp_seq);
455
456 inode->i_mtime = *mtime;
457 inode->i_atime = *atime;
458 ci->i_time_warp_seq = time_warp_seq;
459 } else if (time_warp_seq == ci->i_time_warp_seq) {
460 /* nobody did utimes(); take the max */
461 if (timespec_compare(mtime, &inode->i_mtime) > 0) {
462 dout("mtime %ld.%09ld -> %ld.%09ld inc\n",
463 inode->i_mtime.tv_sec,
464 inode->i_mtime.tv_nsec,
465 mtime->tv_sec, mtime->tv_nsec);
466 inode->i_mtime = *mtime;
467 }
468 if (timespec_compare(atime, &inode->i_atime) > 0) {
469 dout("atime %ld.%09ld -> %ld.%09ld inc\n",
470 inode->i_atime.tv_sec,
471 inode->i_atime.tv_nsec,
472 atime->tv_sec, atime->tv_nsec);
473 inode->i_atime = *atime;
474 }
475 } else if (issued & CEPH_CAP_FILE_EXCL) {
476 /* we did a utimes(); ignore mds values */
477 } else {
478 warn = 1;
479 }
480 } else {
481 /* we have no write caps; whatever the MDS says is true */
482 if (ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) >= 0) {
483 inode->i_ctime = *ctime;
484 inode->i_mtime = *mtime;
485 inode->i_atime = *atime;
486 ci->i_time_warp_seq = time_warp_seq;
487 } else {
488 warn = 1;
489 }
490 }
491 if (warn) /* time_warp_seq shouldn't go backwards */
492 dout("%p mds time_warp_seq %llu < %u\n",
493 inode, time_warp_seq, ci->i_time_warp_seq);
494}
495
496/*
497 * Populate an inode based on info from mds. May be called on new or
498 * existing inodes.
499 */
500static int fill_inode(struct inode *inode,
501 struct ceph_mds_reply_info_in *iinfo,
502 struct ceph_mds_reply_dirfrag *dirinfo,
503 struct ceph_mds_session *session,
504 unsigned long ttl_from, int cap_fmode,
505 struct ceph_cap_reservation *caps_reservation)
506{
507 struct ceph_mds_reply_inode *info = iinfo->in;
508 struct ceph_inode_info *ci = ceph_inode(inode);
509 int i;
510 int issued, implemented;
511 struct timespec mtime, atime, ctime;
512 u32 nsplits;
513 struct ceph_buffer *xattr_blob = NULL;
514 int err = 0;
515 int queue_trunc = 0;
516
517 dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
518 inode, ceph_vinop(inode), le64_to_cpu(info->version),
519 ci->i_version);
520
521 /*
522 * prealloc xattr data, if it looks like we'll need it. only
523 * if len > 4 (meaning there are actually xattrs; the first 4
524 * bytes are the xattr count).
525 */
526 if (iinfo->xattr_len > 4) {
527 xattr_blob = ceph_buffer_new_alloc(iinfo->xattr_len, GFP_NOFS);
528 if (!xattr_blob)
529 pr_err("fill_inode ENOMEM xattr blob %d bytes\n",
530 iinfo->xattr_len);
531 }
532
533 spin_lock(&inode->i_lock);
534
535 /*
536 * provided version will be odd if inode value is projected,
537 * even if stable. skip the update if we have a newer info
538 * (e.g., due to inode info racing form multiple MDSs), or if
539 * we are getting projected (unstable) inode info.
540 */
541 if (le64_to_cpu(info->version) > 0 &&
542 (ci->i_version & ~1) > le64_to_cpu(info->version))
543 goto no_change;
544
545 issued = __ceph_caps_issued(ci, &implemented);
546 issued |= implemented | __ceph_caps_dirty(ci);
547
548 /* update inode */
549 ci->i_version = le64_to_cpu(info->version);
550 inode->i_version++;
551 inode->i_rdev = le32_to_cpu(info->rdev);
552
553 if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
554 inode->i_mode = le32_to_cpu(info->mode);
555 inode->i_uid = le32_to_cpu(info->uid);
556 inode->i_gid = le32_to_cpu(info->gid);
557 dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
558 inode->i_uid, inode->i_gid);
559 }
560
561 if ((issued & CEPH_CAP_LINK_EXCL) == 0)
562 inode->i_nlink = le32_to_cpu(info->nlink);
563
564 /* be careful with mtime, atime, size */
565 ceph_decode_timespec(&atime, &info->atime);
566 ceph_decode_timespec(&mtime, &info->mtime);
567 ceph_decode_timespec(&ctime, &info->ctime);
568 queue_trunc = ceph_fill_file_size(inode, issued,
569 le32_to_cpu(info->truncate_seq),
570 le64_to_cpu(info->truncate_size),
571 S_ISDIR(inode->i_mode) ?
572 ci->i_rbytes :
573 le64_to_cpu(info->size));
574 ceph_fill_file_time(inode, issued,
575 le32_to_cpu(info->time_warp_seq),
576 &ctime, &mtime, &atime);
577
578 ci->i_max_size = le64_to_cpu(info->max_size);
579 ci->i_layout = info->layout;
580 inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
581
582 /* xattrs */
583 /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
584 if ((issued & CEPH_CAP_XATTR_EXCL) == 0 &&
585 le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) {
586 if (ci->i_xattrs.blob)
587 ceph_buffer_put(ci->i_xattrs.blob);
588 ci->i_xattrs.blob = xattr_blob;
589 if (xattr_blob)
590 memcpy(ci->i_xattrs.blob->vec.iov_base,
591 iinfo->xattr_data, iinfo->xattr_len);
592 ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
593 }
594
595 inode->i_mapping->a_ops = &ceph_aops;
596 inode->i_mapping->backing_dev_info =
597 &ceph_client(inode->i_sb)->backing_dev_info;
598
599 switch (inode->i_mode & S_IFMT) {
600 case S_IFIFO:
601 case S_IFBLK:
602 case S_IFCHR:
603 case S_IFSOCK:
604 init_special_inode(inode, inode->i_mode, inode->i_rdev);
605 inode->i_op = &ceph_file_iops;
606 break;
607 case S_IFREG:
608 inode->i_op = &ceph_file_iops;
609 inode->i_fop = &ceph_file_fops;
610 break;
611 case S_IFLNK:
612 inode->i_op = &ceph_symlink_iops;
613 if (!ci->i_symlink) {
614 int symlen = iinfo->symlink_len;
615 char *sym;
616
617 BUG_ON(symlen != inode->i_size);
618 spin_unlock(&inode->i_lock);
619
620 err = -ENOMEM;
621 sym = kmalloc(symlen+1, GFP_NOFS);
622 if (!sym)
623 goto out;
624 memcpy(sym, iinfo->symlink, symlen);
625 sym[symlen] = 0;
626
627 spin_lock(&inode->i_lock);
628 if (!ci->i_symlink)
629 ci->i_symlink = sym;
630 else
631 kfree(sym); /* lost a race */
632 }
633 break;
634 case S_IFDIR:
635 inode->i_op = &ceph_dir_iops;
636 inode->i_fop = &ceph_dir_fops;
637
638 ci->i_files = le64_to_cpu(info->files);
639 ci->i_subdirs = le64_to_cpu(info->subdirs);
640 ci->i_rbytes = le64_to_cpu(info->rbytes);
641 ci->i_rfiles = le64_to_cpu(info->rfiles);
642 ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
643 ceph_decode_timespec(&ci->i_rctime, &info->rctime);
644
645 /* set dir completion flag? */
646 if (ci->i_files == 0 && ci->i_subdirs == 0 &&
647 ceph_snap(inode) == CEPH_NOSNAP &&
648 (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED)) {
649 dout(" marking %p complete (empty)\n", inode);
650 ci->i_ceph_flags |= CEPH_I_COMPLETE;
651 ci->i_max_offset = 2;
652 }
653
654 /* it may be better to set st_size in getattr instead? */
655 if (ceph_test_opt(ceph_client(inode->i_sb), RBYTES))
656 inode->i_size = ci->i_rbytes;
657 break;
658 default:
659 pr_err("fill_inode %llx.%llx BAD mode 0%o\n",
660 ceph_vinop(inode), inode->i_mode);
661 }
662
663no_change:
664 spin_unlock(&inode->i_lock);
665
666 /* queue truncate if we saw i_size decrease */
667 if (queue_trunc)
668 if (queue_work(ceph_client(inode->i_sb)->trunc_wq,
669 &ci->i_vmtruncate_work))
670 igrab(inode);
671
672 /* populate frag tree */
673 /* FIXME: move me up, if/when version reflects fragtree changes */
674 nsplits = le32_to_cpu(info->fragtree.nsplits);
675 mutex_lock(&ci->i_fragtree_mutex);
676 for (i = 0; i < nsplits; i++) {
677 u32 id = le32_to_cpu(info->fragtree.splits[i].frag);
678 struct ceph_inode_frag *frag = __get_or_create_frag(ci, id);
679
680 if (IS_ERR(frag))
681 continue;
682 frag->split_by = le32_to_cpu(info->fragtree.splits[i].by);
683 dout(" frag %x split by %d\n", frag->frag, frag->split_by);
684 }
685 mutex_unlock(&ci->i_fragtree_mutex);
686
687 /* were we issued a capability? */
688 if (info->cap.caps) {
689 if (ceph_snap(inode) == CEPH_NOSNAP) {
690 ceph_add_cap(inode, session,
691 le64_to_cpu(info->cap.cap_id),
692 cap_fmode,
693 le32_to_cpu(info->cap.caps),
694 le32_to_cpu(info->cap.wanted),
695 le32_to_cpu(info->cap.seq),
696 le32_to_cpu(info->cap.mseq),
697 le64_to_cpu(info->cap.realm),
698 info->cap.flags,
699 caps_reservation);
700 } else {
701 spin_lock(&inode->i_lock);
702 dout(" %p got snap_caps %s\n", inode,
703 ceph_cap_string(le32_to_cpu(info->cap.caps)));
704 ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
705 if (cap_fmode >= 0)
706 __ceph_get_fmode(ci, cap_fmode);
707 spin_unlock(&inode->i_lock);
708 }
709 }
710
711 /* update delegation info? */
712 if (dirinfo)
713 ceph_fill_dirfrag(inode, dirinfo);
714
715 err = 0;
716
717out:
718 ceph_buffer_put(xattr_blob);
719 return err;
720}
721
722/*
723 * caller should hold session s_mutex.
724 */
725static void update_dentry_lease(struct dentry *dentry,
726 struct ceph_mds_reply_lease *lease,
727 struct ceph_mds_session *session,
728 unsigned long from_time)
729{
730 struct ceph_dentry_info *di = ceph_dentry(dentry);
731 long unsigned duration = le32_to_cpu(lease->duration_ms);
732 long unsigned ttl = from_time + (duration * HZ) / 1000;
733 long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
734 struct inode *dir;
735
736 /* only track leases on regular dentries */
737 if (dentry->d_op != &ceph_dentry_ops)
738 return;
739
740 spin_lock(&dentry->d_lock);
741 dout("update_dentry_lease %p mask %d duration %lu ms ttl %lu\n",
742 dentry, le16_to_cpu(lease->mask), duration, ttl);
743
744 /* make lease_rdcache_gen match directory */
745 dir = dentry->d_parent->d_inode;
746 di->lease_shared_gen = ceph_inode(dir)->i_shared_gen;
747
748 if (lease->mask == 0)
749 goto out_unlock;
750
751 if (di->lease_gen == session->s_cap_gen &&
752 time_before(ttl, dentry->d_time))
753 goto out_unlock; /* we already have a newer lease. */
754
755 if (di->lease_session && di->lease_session != session)
756 goto out_unlock;
757
758 ceph_dentry_lru_touch(dentry);
759
760 if (!di->lease_session)
761 di->lease_session = ceph_get_mds_session(session);
762 di->lease_gen = session->s_cap_gen;
763 di->lease_seq = le32_to_cpu(lease->seq);
764 di->lease_renew_after = half_ttl;
765 di->lease_renew_from = 0;
766 dentry->d_time = ttl;
767out_unlock:
768 spin_unlock(&dentry->d_lock);
769 return;
770}
771
772/*
773 * splice a dentry to an inode.
774 * caller must hold directory i_mutex for this to be safe.
775 *
776 * we will only rehash the resulting dentry if @prehash is
777 * true; @prehash will be set to false (for the benefit of
778 * the caller) if we fail.
779 */
780static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
781 bool *prehash)
782{
783 struct dentry *realdn;
784
785 /* dn must be unhashed */
786 if (!d_unhashed(dn))
787 d_drop(dn);
788 realdn = d_materialise_unique(dn, in);
789 if (IS_ERR(realdn)) {
790 pr_err("splice_dentry error %p inode %p ino %llx.%llx\n",
791 dn, in, ceph_vinop(in));
792 if (prehash)
793 *prehash = false; /* don't rehash on error */
794 dn = realdn; /* note realdn contains the error */
795 goto out;
796 } else if (realdn) {
797 dout("dn %p (%d) spliced with %p (%d) "
798 "inode %p ino %llx.%llx\n",
799 dn, atomic_read(&dn->d_count),
800 realdn, atomic_read(&realdn->d_count),
801 realdn->d_inode, ceph_vinop(realdn->d_inode));
802 dput(dn);
803 dn = realdn;
804 } else {
805 BUG_ON(!ceph_dentry(dn));
806
807 dout("dn %p attached to %p ino %llx.%llx\n",
808 dn, dn->d_inode, ceph_vinop(dn->d_inode));
809 }
810 if ((!prehash || *prehash) && d_unhashed(dn))
811 d_rehash(dn);
812out:
813 return dn;
814}
815
816/*
817 * Incorporate results into the local cache. This is either just
818 * one inode, or a directory, dentry, and possibly linked-to inode (e.g.,
819 * after a lookup).
820 *
821 * A reply may contain
822 * a directory inode along with a dentry.
823 * and/or a target inode
824 *
825 * Called with snap_rwsem (read).
826 */
827int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
828 struct ceph_mds_session *session)
829{
830 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
831 struct inode *in = NULL;
832 struct ceph_mds_reply_inode *ininfo;
833 struct ceph_vino vino;
834 int i = 0;
835 int err = 0;
836
837 dout("fill_trace %p is_dentry %d is_target %d\n", req,
838 rinfo->head->is_dentry, rinfo->head->is_target);
839
840#if 0
841 /*
842 * Debugging hook:
843 *
844 * If we resend completed ops to a recovering mds, we get no
845 * trace. Since that is very rare, pretend this is the case
846 * to ensure the 'no trace' handlers in the callers behave.
847 *
848 * Fill in inodes unconditionally to avoid breaking cap
849 * invariants.
850 */
851 if (rinfo->head->op & CEPH_MDS_OP_WRITE) {
852 pr_info("fill_trace faking empty trace on %lld %s\n",
853 req->r_tid, ceph_mds_op_name(rinfo->head->op));
854 if (rinfo->head->is_dentry) {
855 rinfo->head->is_dentry = 0;
856 err = fill_inode(req->r_locked_dir,
857 &rinfo->diri, rinfo->dirfrag,
858 session, req->r_request_started, -1);
859 }
860 if (rinfo->head->is_target) {
861 rinfo->head->is_target = 0;
862 ininfo = rinfo->targeti.in;
863 vino.ino = le64_to_cpu(ininfo->ino);
864 vino.snap = le64_to_cpu(ininfo->snapid);
865 in = ceph_get_inode(sb, vino);
866 err = fill_inode(in, &rinfo->targeti, NULL,
867 session, req->r_request_started,
868 req->r_fmode);
869 iput(in);
870 }
871 }
872#endif
873
874 if (!rinfo->head->is_target && !rinfo->head->is_dentry) {
875 dout("fill_trace reply is empty!\n");
876 if (rinfo->head->result == 0 && req->r_locked_dir) {
877 struct ceph_inode_info *ci =
878 ceph_inode(req->r_locked_dir);
879 dout(" clearing %p complete (empty trace)\n",
880 req->r_locked_dir);
881 ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
882 ci->i_release_count++;
883 }
884 return 0;
885 }
886
887 if (rinfo->head->is_dentry) {
888 /*
889 * lookup link rename : null -> possibly existing inode
890 * mknod symlink mkdir : null -> new inode
891 * unlink : linked -> null
892 */
893 struct inode *dir = req->r_locked_dir;
894 struct dentry *dn = req->r_dentry;
895 bool have_dir_cap, have_lease;
896
897 BUG_ON(!dn);
898 BUG_ON(!dir);
899 BUG_ON(dn->d_parent->d_inode != dir);
900 BUG_ON(ceph_ino(dir) !=
901 le64_to_cpu(rinfo->diri.in->ino));
902 BUG_ON(ceph_snap(dir) !=
903 le64_to_cpu(rinfo->diri.in->snapid));
904
905 err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag,
906 session, req->r_request_started, -1,
907 &req->r_caps_reservation);
908 if (err < 0)
909 return err;
910
911 /* do we have a lease on the whole dir? */
912 have_dir_cap =
913 (le32_to_cpu(rinfo->diri.in->cap.caps) &
914 CEPH_CAP_FILE_SHARED);
915
916 /* do we have a dn lease? */
917 have_lease = have_dir_cap ||
918 (le16_to_cpu(rinfo->dlease->mask) &
919 CEPH_LOCK_DN);
920
921 if (!have_lease)
922 dout("fill_trace no dentry lease or dir cap\n");
923
924 /* rename? */
925 if (req->r_old_dentry && req->r_op == CEPH_MDS_OP_RENAME) {
926 dout(" src %p '%.*s' dst %p '%.*s'\n",
927 req->r_old_dentry,
928 req->r_old_dentry->d_name.len,
929 req->r_old_dentry->d_name.name,
930 dn, dn->d_name.len, dn->d_name.name);
931 dout("fill_trace doing d_move %p -> %p\n",
932 req->r_old_dentry, dn);
933 d_move(req->r_old_dentry, dn);
934 dout(" src %p '%.*s' dst %p '%.*s'\n",
935 req->r_old_dentry,
936 req->r_old_dentry->d_name.len,
937 req->r_old_dentry->d_name.name,
938 dn, dn->d_name.len, dn->d_name.name);
939 /* take overwritten dentry's readdir offset */
940 ceph_dentry(req->r_old_dentry)->offset =
941 ceph_dentry(dn)->offset;
942 dn = req->r_old_dentry; /* use old_dentry */
943 in = dn->d_inode;
944 }
945
946 /* null dentry? */
947 if (!rinfo->head->is_target) {
948 dout("fill_trace null dentry\n");
949 if (dn->d_inode) {
950 dout("d_delete %p\n", dn);
951 d_delete(dn);
952 } else {
953 dout("d_instantiate %p NULL\n", dn);
954 d_instantiate(dn, NULL);
955 if (have_lease && d_unhashed(dn))
956 d_rehash(dn);
957 update_dentry_lease(dn, rinfo->dlease,
958 session,
959 req->r_request_started);
960 }
961 goto done;
962 }
963
964 /* attach proper inode */
965 ininfo = rinfo->targeti.in;
966 vino.ino = le64_to_cpu(ininfo->ino);
967 vino.snap = le64_to_cpu(ininfo->snapid);
968 if (!dn->d_inode) {
969 in = ceph_get_inode(sb, vino);
970 if (IS_ERR(in)) {
971 pr_err("fill_trace bad get_inode "
972 "%llx.%llx\n", vino.ino, vino.snap);
973 err = PTR_ERR(in);
974 d_delete(dn);
975 goto done;
976 }
977 dn = splice_dentry(dn, in, &have_lease);
978 if (IS_ERR(dn)) {
979 err = PTR_ERR(dn);
980 goto done;
981 }
982 req->r_dentry = dn; /* may have spliced */
983 igrab(in);
984 } else if (ceph_ino(in) == vino.ino &&
985 ceph_snap(in) == vino.snap) {
986 igrab(in);
987 } else {
988 dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
989 dn, in, ceph_ino(in), ceph_snap(in),
990 vino.ino, vino.snap);
991 have_lease = false;
992 in = NULL;
993 }
994
995 if (have_lease)
996 update_dentry_lease(dn, rinfo->dlease, session,
997 req->r_request_started);
998 dout(" final dn %p\n", dn);
999 i++;
1000 } else if (req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
1001 req->r_op == CEPH_MDS_OP_MKSNAP) {
1002 struct dentry *dn = req->r_dentry;
1003
1004 /* fill out a snapdir LOOKUPSNAP dentry */
1005 BUG_ON(!dn);
1006 BUG_ON(!req->r_locked_dir);
1007 BUG_ON(ceph_snap(req->r_locked_dir) != CEPH_SNAPDIR);
1008 ininfo = rinfo->targeti.in;
1009 vino.ino = le64_to_cpu(ininfo->ino);
1010 vino.snap = le64_to_cpu(ininfo->snapid);
1011 in = ceph_get_inode(sb, vino);
1012 if (IS_ERR(in)) {
1013 pr_err("fill_inode get_inode badness %llx.%llx\n",
1014 vino.ino, vino.snap);
1015 err = PTR_ERR(in);
1016 d_delete(dn);
1017 goto done;
1018 }
1019 dout(" linking snapped dir %p to dn %p\n", in, dn);
1020 dn = splice_dentry(dn, in, NULL);
1021 if (IS_ERR(dn)) {
1022 err = PTR_ERR(dn);
1023 goto done;
1024 }
1025 req->r_dentry = dn; /* may have spliced */
1026 igrab(in);
1027 rinfo->head->is_dentry = 1; /* fool notrace handlers */
1028 }
1029
1030 if (rinfo->head->is_target) {
1031 vino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1032 vino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1033
1034 if (in == NULL || ceph_ino(in) != vino.ino ||
1035 ceph_snap(in) != vino.snap) {
1036 in = ceph_get_inode(sb, vino);
1037 if (IS_ERR(in)) {
1038 err = PTR_ERR(in);
1039 goto done;
1040 }
1041 }
1042 req->r_target_inode = in;
1043
1044 err = fill_inode(in,
1045 &rinfo->targeti, NULL,
1046 session, req->r_request_started,
1047 (le32_to_cpu(rinfo->head->result) == 0) ?
1048 req->r_fmode : -1,
1049 &req->r_caps_reservation);
1050 if (err < 0) {
1051 pr_err("fill_inode badness %p %llx.%llx\n",
1052 in, ceph_vinop(in));
1053 goto done;
1054 }
1055 }
1056
1057done:
1058 dout("fill_trace done err=%d\n", err);
1059 return err;
1060}
1061
1062/*
1063 * Prepopulate our cache with readdir results, leases, etc.
1064 */
1065int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1066 struct ceph_mds_session *session)
1067{
1068 struct dentry *parent = req->r_dentry;
1069 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1070 struct qstr dname;
1071 struct dentry *dn;
1072 struct inode *in;
1073 int err = 0, i;
1074 struct inode *snapdir = NULL;
1075 struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
1076 u64 frag = le32_to_cpu(rhead->args.readdir.frag);
1077 struct ceph_dentry_info *di;
1078
1079 if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
1080 snapdir = ceph_get_snapdir(parent->d_inode);
1081 parent = d_find_alias(snapdir);
1082 dout("readdir_prepopulate %d items under SNAPDIR dn %p\n",
1083 rinfo->dir_nr, parent);
1084 } else {
1085 dout("readdir_prepopulate %d items under dn %p\n",
1086 rinfo->dir_nr, parent);
1087 if (rinfo->dir_dir)
1088 ceph_fill_dirfrag(parent->d_inode, rinfo->dir_dir);
1089 }
1090
1091 for (i = 0; i < rinfo->dir_nr; i++) {
1092 struct ceph_vino vino;
1093
1094 dname.name = rinfo->dir_dname[i];
1095 dname.len = rinfo->dir_dname_len[i];
1096 dname.hash = full_name_hash(dname.name, dname.len);
1097
1098 vino.ino = le64_to_cpu(rinfo->dir_in[i].in->ino);
1099 vino.snap = le64_to_cpu(rinfo->dir_in[i].in->snapid);
1100
1101retry_lookup:
1102 dn = d_lookup(parent, &dname);
1103 dout("d_lookup on parent=%p name=%.*s got %p\n",
1104 parent, dname.len, dname.name, dn);
1105
1106 if (!dn) {
1107 dn = d_alloc(parent, &dname);
1108 dout("d_alloc %p '%.*s' = %p\n", parent,
1109 dname.len, dname.name, dn);
1110 if (dn == NULL) {
1111 dout("d_alloc badness\n");
1112 err = -ENOMEM;
1113 goto out;
1114 }
1115 err = ceph_init_dentry(dn);
1116 if (err < 0)
1117 goto out;
1118 } else if (dn->d_inode &&
1119 (ceph_ino(dn->d_inode) != vino.ino ||
1120 ceph_snap(dn->d_inode) != vino.snap)) {
1121 dout(" dn %p points to wrong inode %p\n",
1122 dn, dn->d_inode);
1123 d_delete(dn);
1124 dput(dn);
1125 goto retry_lookup;
1126 } else {
1127 /* reorder parent's d_subdirs */
1128 spin_lock(&dcache_lock);
1129 spin_lock(&dn->d_lock);
1130 list_move(&dn->d_u.d_child, &parent->d_subdirs);
1131 spin_unlock(&dn->d_lock);
1132 spin_unlock(&dcache_lock);
1133 }
1134
1135 di = dn->d_fsdata;
1136 di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
1137
1138 /* inode */
1139 if (dn->d_inode) {
1140 in = dn->d_inode;
1141 } else {
1142 in = ceph_get_inode(parent->d_sb, vino);
1143 if (in == NULL) {
1144 dout("new_inode badness\n");
1145 d_delete(dn);
1146 dput(dn);
1147 err = -ENOMEM;
1148 goto out;
1149 }
1150 dn = splice_dentry(dn, in, NULL);
1151 }
1152
1153 if (fill_inode(in, &rinfo->dir_in[i], NULL, session,
1154 req->r_request_started, -1,
1155 &req->r_caps_reservation) < 0) {
1156 pr_err("fill_inode badness on %p\n", in);
1157 dput(dn);
1158 continue;
1159 }
1160 update_dentry_lease(dn, rinfo->dir_dlease[i],
1161 req->r_session, req->r_request_started);
1162 dput(dn);
1163 }
1164 req->r_did_prepopulate = true;
1165
1166out:
1167 if (snapdir) {
1168 iput(snapdir);
1169 dput(parent);
1170 }
1171 dout("readdir_prepopulate done\n");
1172 return err;
1173}
1174
1175int ceph_inode_set_size(struct inode *inode, loff_t size)
1176{
1177 struct ceph_inode_info *ci = ceph_inode(inode);
1178 int ret = 0;
1179
1180 spin_lock(&inode->i_lock);
1181 dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
1182 inode->i_size = size;
1183 inode->i_blocks = (size + (1 << 9) - 1) >> 9;
1184
1185 /* tell the MDS if we are approaching max_size */
1186 if ((size << 1) >= ci->i_max_size &&
1187 (ci->i_reported_size << 1) < ci->i_max_size)
1188 ret = 1;
1189
1190 spin_unlock(&inode->i_lock);
1191 return ret;
1192}
1193
1194/*
1195 * Write back inode data in a worker thread. (This can't be done
1196 * in the message handler context.)
1197 */
1198void ceph_inode_writeback(struct work_struct *work)
1199{
1200 struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
1201 i_wb_work);
1202 struct inode *inode = &ci->vfs_inode;
1203
1204 dout("writeback %p\n", inode);
1205 filemap_fdatawrite(&inode->i_data);
1206 iput(inode);
1207}
1208
1209/*
1210 * Invalidate inode pages in a worker thread. (This can't be done
1211 * in the message handler context.)
1212 */
1213static void ceph_inode_invalidate_pages(struct work_struct *work)
1214{
1215 struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
1216 i_pg_inv_work);
1217 struct inode *inode = &ci->vfs_inode;
1218 u32 orig_gen;
1219 int check = 0;
1220
1221 spin_lock(&inode->i_lock);
1222 dout("invalidate_pages %p gen %d revoking %d\n", inode,
1223 ci->i_rdcache_gen, ci->i_rdcache_revoking);
1224 if (ci->i_rdcache_gen == 0 ||
1225 ci->i_rdcache_revoking != ci->i_rdcache_gen) {
1226 BUG_ON(ci->i_rdcache_revoking > ci->i_rdcache_gen);
1227 /* nevermind! */
1228 ci->i_rdcache_revoking = 0;
1229 spin_unlock(&inode->i_lock);
1230 goto out;
1231 }
1232 orig_gen = ci->i_rdcache_gen;
1233 spin_unlock(&inode->i_lock);
1234
1235 truncate_inode_pages(&inode->i_data, 0);
1236
1237 spin_lock(&inode->i_lock);
1238 if (orig_gen == ci->i_rdcache_gen) {
1239 dout("invalidate_pages %p gen %d successful\n", inode,
1240 ci->i_rdcache_gen);
1241 ci->i_rdcache_gen = 0;
1242 ci->i_rdcache_revoking = 0;
1243 check = 1;
1244 } else {
1245 dout("invalidate_pages %p gen %d raced, gen now %d\n",
1246 inode, orig_gen, ci->i_rdcache_gen);
1247 }
1248 spin_unlock(&inode->i_lock);
1249
1250 if (check)
1251 ceph_check_caps(ci, 0, NULL);
1252out:
1253 iput(inode);
1254}
1255
1256
1257/*
1258 * called by trunc_wq; take i_mutex ourselves
1259 *
1260 * We also truncate in a separate thread as well.
1261 */
1262void ceph_vmtruncate_work(struct work_struct *work)
1263{
1264 struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
1265 i_vmtruncate_work);
1266 struct inode *inode = &ci->vfs_inode;
1267
1268 dout("vmtruncate_work %p\n", inode);
1269 mutex_lock(&inode->i_mutex);
1270 __ceph_do_pending_vmtruncate(inode);
1271 mutex_unlock(&inode->i_mutex);
1272 iput(inode);
1273}
1274
1275/*
1276 * called with i_mutex held.
1277 *
1278 * Make sure any pending truncation is applied before doing anything
1279 * that may depend on it.
1280 */
1281void __ceph_do_pending_vmtruncate(struct inode *inode)
1282{
1283 struct ceph_inode_info *ci = ceph_inode(inode);
1284 u64 to;
1285 int wrbuffer_refs, wake = 0;
1286
1287retry:
1288 spin_lock(&inode->i_lock);
1289 if (ci->i_truncate_pending == 0) {
1290 dout("__do_pending_vmtruncate %p none pending\n", inode);
1291 spin_unlock(&inode->i_lock);
1292 return;
1293 }
1294
1295 /*
1296 * make sure any dirty snapped pages are flushed before we
1297 * possibly truncate them.. so write AND block!
1298 */
1299 if (ci->i_wrbuffer_ref_head < ci->i_wrbuffer_ref) {
1300 dout("__do_pending_vmtruncate %p flushing snaps first\n",
1301 inode);
1302 spin_unlock(&inode->i_lock);
1303 filemap_write_and_wait_range(&inode->i_data, 0,
1304 inode->i_sb->s_maxbytes);
1305 goto retry;
1306 }
1307
1308 to = ci->i_truncate_size;
1309 wrbuffer_refs = ci->i_wrbuffer_ref;
1310 dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode,
1311 ci->i_truncate_pending, to);
1312 spin_unlock(&inode->i_lock);
1313
1314 truncate_inode_pages(inode->i_mapping, to);
1315
1316 spin_lock(&inode->i_lock);
1317 ci->i_truncate_pending--;
1318 if (ci->i_truncate_pending == 0)
1319 wake = 1;
1320 spin_unlock(&inode->i_lock);
1321
1322 if (wrbuffer_refs == 0)
1323 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
1324 if (wake)
1325 wake_up(&ci->i_cap_wq);
1326}
1327
1328
1329/*
1330 * symlinks
1331 */
1332static void *ceph_sym_follow_link(struct dentry *dentry, struct nameidata *nd)
1333{
1334 struct ceph_inode_info *ci = ceph_inode(dentry->d_inode);
1335 nd_set_link(nd, ci->i_symlink);
1336 return NULL;
1337}
1338
1339static const struct inode_operations ceph_symlink_iops = {
1340 .readlink = generic_readlink,
1341 .follow_link = ceph_sym_follow_link,
1342};
1343
1344/*
1345 * setattr
1346 */
1347int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1348{
1349 struct inode *inode = dentry->d_inode;
1350 struct ceph_inode_info *ci = ceph_inode(inode);
1351 struct inode *parent_inode = dentry->d_parent->d_inode;
1352 const unsigned int ia_valid = attr->ia_valid;
1353 struct ceph_mds_request *req;
1354 struct ceph_mds_client *mdsc = &ceph_client(dentry->d_sb)->mdsc;
1355 int issued;
1356 int release = 0, dirtied = 0;
1357 int mask = 0;
1358 int err = 0;
1359 int queue_trunc = 0;
1360
1361 if (ceph_snap(inode) != CEPH_NOSNAP)
1362 return -EROFS;
1363
1364 __ceph_do_pending_vmtruncate(inode);
1365
1366 err = inode_change_ok(inode, attr);
1367 if (err != 0)
1368 return err;
1369
1370 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETATTR,
1371 USE_AUTH_MDS);
1372 if (IS_ERR(req))
1373 return PTR_ERR(req);
1374
1375 spin_lock(&inode->i_lock);
1376 issued = __ceph_caps_issued(ci, NULL);
1377 dout("setattr %p issued %s\n", inode, ceph_cap_string(issued));
1378
1379 if (ia_valid & ATTR_UID) {
1380 dout("setattr %p uid %d -> %d\n", inode,
1381 inode->i_uid, attr->ia_uid);
1382 if (issued & CEPH_CAP_AUTH_EXCL) {
1383 inode->i_uid = attr->ia_uid;
1384 dirtied |= CEPH_CAP_AUTH_EXCL;
1385 } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
1386 attr->ia_uid != inode->i_uid) {
1387 req->r_args.setattr.uid = cpu_to_le32(attr->ia_uid);
1388 mask |= CEPH_SETATTR_UID;
1389 release |= CEPH_CAP_AUTH_SHARED;
1390 }
1391 }
1392 if (ia_valid & ATTR_GID) {
1393 dout("setattr %p gid %d -> %d\n", inode,
1394 inode->i_gid, attr->ia_gid);
1395 if (issued & CEPH_CAP_AUTH_EXCL) {
1396 inode->i_gid = attr->ia_gid;
1397 dirtied |= CEPH_CAP_AUTH_EXCL;
1398 } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
1399 attr->ia_gid != inode->i_gid) {
1400 req->r_args.setattr.gid = cpu_to_le32(attr->ia_gid);
1401 mask |= CEPH_SETATTR_GID;
1402 release |= CEPH_CAP_AUTH_SHARED;
1403 }
1404 }
1405 if (ia_valid & ATTR_MODE) {
1406 dout("setattr %p mode 0%o -> 0%o\n", inode, inode->i_mode,
1407 attr->ia_mode);
1408 if (issued & CEPH_CAP_AUTH_EXCL) {
1409 inode->i_mode = attr->ia_mode;
1410 dirtied |= CEPH_CAP_AUTH_EXCL;
1411 } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
1412 attr->ia_mode != inode->i_mode) {
1413 req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode);
1414 mask |= CEPH_SETATTR_MODE;
1415 release |= CEPH_CAP_AUTH_SHARED;
1416 }
1417 }
1418
1419 if (ia_valid & ATTR_ATIME) {
1420 dout("setattr %p atime %ld.%ld -> %ld.%ld\n", inode,
1421 inode->i_atime.tv_sec, inode->i_atime.tv_nsec,
1422 attr->ia_atime.tv_sec, attr->ia_atime.tv_nsec);
1423 if (issued & CEPH_CAP_FILE_EXCL) {
1424 ci->i_time_warp_seq++;
1425 inode->i_atime = attr->ia_atime;
1426 dirtied |= CEPH_CAP_FILE_EXCL;
1427 } else if ((issued & CEPH_CAP_FILE_WR) &&
1428 timespec_compare(&inode->i_atime,
1429 &attr->ia_atime) < 0) {
1430 inode->i_atime = attr->ia_atime;
1431 dirtied |= CEPH_CAP_FILE_WR;
1432 } else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
1433 !timespec_equal(&inode->i_atime, &attr->ia_atime)) {
1434 ceph_encode_timespec(&req->r_args.setattr.atime,
1435 &attr->ia_atime);
1436 mask |= CEPH_SETATTR_ATIME;
1437 release |= CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_RD |
1438 CEPH_CAP_FILE_WR;
1439 }
1440 }
1441 if (ia_valid & ATTR_MTIME) {
1442 dout("setattr %p mtime %ld.%ld -> %ld.%ld\n", inode,
1443 inode->i_mtime.tv_sec, inode->i_mtime.tv_nsec,
1444 attr->ia_mtime.tv_sec, attr->ia_mtime.tv_nsec);
1445 if (issued & CEPH_CAP_FILE_EXCL) {
1446 ci->i_time_warp_seq++;
1447 inode->i_mtime = attr->ia_mtime;
1448 dirtied |= CEPH_CAP_FILE_EXCL;
1449 } else if ((issued & CEPH_CAP_FILE_WR) &&
1450 timespec_compare(&inode->i_mtime,
1451 &attr->ia_mtime) < 0) {
1452 inode->i_mtime = attr->ia_mtime;
1453 dirtied |= CEPH_CAP_FILE_WR;
1454 } else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
1455 !timespec_equal(&inode->i_mtime, &attr->ia_mtime)) {
1456 ceph_encode_timespec(&req->r_args.setattr.mtime,
1457 &attr->ia_mtime);
1458 mask |= CEPH_SETATTR_MTIME;
1459 release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD |
1460 CEPH_CAP_FILE_WR;
1461 }
1462 }
1463 if (ia_valid & ATTR_SIZE) {
1464 dout("setattr %p size %lld -> %lld\n", inode,
1465 inode->i_size, attr->ia_size);
1466 if (attr->ia_size > inode->i_sb->s_maxbytes) {
1467 err = -EINVAL;
1468 goto out;
1469 }
1470 if ((issued & CEPH_CAP_FILE_EXCL) &&
1471 attr->ia_size > inode->i_size) {
1472 inode->i_size = attr->ia_size;
1473 if (attr->ia_size < inode->i_size) {
1474 ci->i_truncate_size = attr->ia_size;
1475 ci->i_truncate_pending++;
1476 queue_trunc = 1;
1477 }
1478 inode->i_blocks =
1479 (attr->ia_size + (1 << 9) - 1) >> 9;
1480 inode->i_ctime = attr->ia_ctime;
1481 ci->i_reported_size = attr->ia_size;
1482 dirtied |= CEPH_CAP_FILE_EXCL;
1483 } else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
1484 attr->ia_size != inode->i_size) {
1485 req->r_args.setattr.size = cpu_to_le64(attr->ia_size);
1486 req->r_args.setattr.old_size =
1487 cpu_to_le64(inode->i_size);
1488 mask |= CEPH_SETATTR_SIZE;
1489 release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD |
1490 CEPH_CAP_FILE_WR;
1491 }
1492 }
1493
1494 /* these do nothing */
1495 if (ia_valid & ATTR_CTIME) {
1496 bool only = (ia_valid & (ATTR_SIZE|ATTR_MTIME|ATTR_ATIME|
1497 ATTR_MODE|ATTR_UID|ATTR_GID)) == 0;
1498 dout("setattr %p ctime %ld.%ld -> %ld.%ld (%s)\n", inode,
1499 inode->i_ctime.tv_sec, inode->i_ctime.tv_nsec,
1500 attr->ia_ctime.tv_sec, attr->ia_ctime.tv_nsec,
1501 only ? "ctime only" : "ignored");
1502 inode->i_ctime = attr->ia_ctime;
1503 if (only) {
1504 /*
1505 * if kernel wants to dirty ctime but nothing else,
1506 * we need to choose a cap to dirty under, or do
1507 * a almost-no-op setattr
1508 */
1509 if (issued & CEPH_CAP_AUTH_EXCL)
1510 dirtied |= CEPH_CAP_AUTH_EXCL;
1511 else if (issued & CEPH_CAP_FILE_EXCL)
1512 dirtied |= CEPH_CAP_FILE_EXCL;
1513 else if (issued & CEPH_CAP_XATTR_EXCL)
1514 dirtied |= CEPH_CAP_XATTR_EXCL;
1515 else
1516 mask |= CEPH_SETATTR_CTIME;
1517 }
1518 }
1519 if (ia_valid & ATTR_FILE)
1520 dout("setattr %p ATTR_FILE ... hrm!\n", inode);
1521
1522 if (dirtied) {
1523 __ceph_mark_dirty_caps(ci, dirtied);
1524 inode->i_ctime = CURRENT_TIME;
1525 }
1526
1527 release &= issued;
1528 spin_unlock(&inode->i_lock);
1529
1530 if (queue_trunc)
1531 __ceph_do_pending_vmtruncate(inode);
1532
1533 if (mask) {
1534 req->r_inode = igrab(inode);
1535 req->r_inode_drop = release;
1536 req->r_args.setattr.mask = cpu_to_le32(mask);
1537 req->r_num_caps = 1;
1538 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
1539 }
1540 dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err,
1541 ceph_cap_string(dirtied), mask);
1542
1543 ceph_mdsc_put_request(req);
1544 __ceph_do_pending_vmtruncate(inode);
1545 return err;
1546out:
1547 spin_unlock(&inode->i_lock);
1548 ceph_mdsc_put_request(req);
1549 return err;
1550}
1551
1552/*
1553 * Verify that we have a lease on the given mask. If not,
1554 * do a getattr against an mds.
1555 */
1556int ceph_do_getattr(struct inode *inode, int mask)
1557{
1558 struct ceph_client *client = ceph_sb_to_client(inode->i_sb);
1559 struct ceph_mds_client *mdsc = &client->mdsc;
1560 struct ceph_mds_request *req;
1561 int err;
1562
1563 if (ceph_snap(inode) == CEPH_SNAPDIR) {
1564 dout("do_getattr inode %p SNAPDIR\n", inode);
1565 return 0;
1566 }
1567
1568 dout("do_getattr inode %p mask %s\n", inode, ceph_cap_string(mask));
1569 if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
1570 return 0;
1571
1572 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
1573 if (IS_ERR(req))
1574 return PTR_ERR(req);
1575 req->r_inode = igrab(inode);
1576 req->r_num_caps = 1;
1577 req->r_args.getattr.mask = cpu_to_le32(mask);
1578 err = ceph_mdsc_do_request(mdsc, NULL, req);
1579 ceph_mdsc_put_request(req);
1580 dout("do_getattr result=%d\n", err);
1581 return err;
1582}
1583
1584
1585/*
1586 * Check inode permissions. We verify we have a valid value for
1587 * the AUTH cap, then call the generic handler.
1588 */
1589int ceph_permission(struct inode *inode, int mask)
1590{
1591 int err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED);
1592
1593 if (!err)
1594 err = generic_permission(inode, mask, NULL);
1595 return err;
1596}
1597
1598/*
1599 * Get all attributes. Hopefully somedata we'll have a statlite()
1600 * and can limit the fields we require to be accurate.
1601 */
1602int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
1603 struct kstat *stat)
1604{
1605 struct inode *inode = dentry->d_inode;
1606 int err;
1607
1608 err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL);
1609 if (!err) {
1610 generic_fillattr(inode, stat);
1611 stat->ino = inode->i_ino;
1612 if (ceph_snap(inode) != CEPH_NOSNAP)
1613 stat->dev = ceph_snap(inode);
1614 else
1615 stat->dev = 0;
1616 if (S_ISDIR(inode->i_mode))
1617 stat->blksize = 65536;
1618 }
1619 return err;
1620}
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
new file mode 100644
index 00000000000..8eaac04d1b8
--- /dev/null
+++ b/fs/ceph/xattr.c
@@ -0,0 +1,833 @@
1#include "ceph_debug.h"
2#include "super.h"
3#include "decode.h"
4
5#include <linux/xattr.h>
6
7static bool ceph_is_valid_xattr(const char *name)
8{
9 return !strncmp(name, XATTR_SECURITY_PREFIX,
10 XATTR_SECURITY_PREFIX_LEN) ||
11 !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) ||
12 !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN);
13}
14
15/*
16 * These define virtual xattrs exposing the recursive directory
17 * statistics and layout metadata.
18 */
19struct ceph_vxattr_cb {
20 bool readonly;
21 char *name;
22 size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
23 size_t size);
24};
25
26/* directories */
27
28static size_t ceph_vxattrcb_entries(struct ceph_inode_info *ci, char *val,
29 size_t size)
30{
31 return snprintf(val, size, "%lld", ci->i_files + ci->i_subdirs);
32}
33
34static size_t ceph_vxattrcb_files(struct ceph_inode_info *ci, char *val,
35 size_t size)
36{
37 return snprintf(val, size, "%lld", ci->i_files);
38}
39
40static size_t ceph_vxattrcb_subdirs(struct ceph_inode_info *ci, char *val,
41 size_t size)
42{
43 return snprintf(val, size, "%lld", ci->i_subdirs);
44}
45
46static size_t ceph_vxattrcb_rentries(struct ceph_inode_info *ci, char *val,
47 size_t size)
48{
49 return snprintf(val, size, "%lld", ci->i_rfiles + ci->i_rsubdirs);
50}
51
52static size_t ceph_vxattrcb_rfiles(struct ceph_inode_info *ci, char *val,
53 size_t size)
54{
55 return snprintf(val, size, "%lld", ci->i_rfiles);
56}
57
58static size_t ceph_vxattrcb_rsubdirs(struct ceph_inode_info *ci, char *val,
59 size_t size)
60{
61 return snprintf(val, size, "%lld", ci->i_rsubdirs);
62}
63
64static size_t ceph_vxattrcb_rbytes(struct ceph_inode_info *ci, char *val,
65 size_t size)
66{
67 return snprintf(val, size, "%lld", ci->i_rbytes);
68}
69
70static size_t ceph_vxattrcb_rctime(struct ceph_inode_info *ci, char *val,
71 size_t size)
72{
73 return snprintf(val, size, "%ld.%ld", (long)ci->i_rctime.tv_sec,
74 (long)ci->i_rctime.tv_nsec);
75}
76
77static struct ceph_vxattr_cb ceph_dir_vxattrs[] = {
78 { true, "user.ceph.dir.entries", ceph_vxattrcb_entries},
79 { true, "user.ceph.dir.files", ceph_vxattrcb_files},
80 { true, "user.ceph.dir.subdirs", ceph_vxattrcb_subdirs},
81 { true, "user.ceph.dir.rentries", ceph_vxattrcb_rentries},
82 { true, "user.ceph.dir.rfiles", ceph_vxattrcb_rfiles},
83 { true, "user.ceph.dir.rsubdirs", ceph_vxattrcb_rsubdirs},
84 { true, "user.ceph.dir.rbytes", ceph_vxattrcb_rbytes},
85 { true, "user.ceph.dir.rctime", ceph_vxattrcb_rctime},
86 { true, NULL, NULL }
87};
88
89/* files */
90
91static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
92 size_t size)
93{
94 return snprintf(val, size,
95 "chunk_bytes=%lld\nstripe_count=%lld\nobject_size=%lld\n",
96 (unsigned long long)ceph_file_layout_su(ci->i_layout),
97 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
98 (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
99}
100
101static struct ceph_vxattr_cb ceph_file_vxattrs[] = {
102 { true, "user.ceph.layout", ceph_vxattrcb_layout},
103 { NULL, NULL }
104};
105
106static struct ceph_vxattr_cb *ceph_inode_vxattrs(struct inode *inode)
107{
108 if (S_ISDIR(inode->i_mode))
109 return ceph_dir_vxattrs;
110 else if (S_ISREG(inode->i_mode))
111 return ceph_file_vxattrs;
112 return NULL;
113}
114
115static struct ceph_vxattr_cb *ceph_match_vxattr(struct ceph_vxattr_cb *vxattr,
116 const char *name)
117{
118 do {
119 if (strcmp(vxattr->name, name) == 0)
120 return vxattr;
121 vxattr++;
122 } while (vxattr->name);
123 return NULL;
124}
125
126static int __set_xattr(struct ceph_inode_info *ci,
127 const char *name, int name_len,
128 const char *val, int val_len,
129 int dirty,
130 int should_free_name, int should_free_val,
131 struct ceph_inode_xattr **newxattr)
132{
133 struct rb_node **p;
134 struct rb_node *parent = NULL;
135 struct ceph_inode_xattr *xattr = NULL;
136 int c;
137 int new = 0;
138
139 p = &ci->i_xattrs.index.rb_node;
140 while (*p) {
141 parent = *p;
142 xattr = rb_entry(parent, struct ceph_inode_xattr, node);
143 c = strncmp(name, xattr->name, min(name_len, xattr->name_len));
144 if (c < 0)
145 p = &(*p)->rb_left;
146 else if (c > 0)
147 p = &(*p)->rb_right;
148 else {
149 if (name_len == xattr->name_len)
150 break;
151 else if (name_len < xattr->name_len)
152 p = &(*p)->rb_left;
153 else
154 p = &(*p)->rb_right;
155 }
156 xattr = NULL;
157 }
158
159 if (!xattr) {
160 new = 1;
161 xattr = *newxattr;
162 xattr->name = name;
163 xattr->name_len = name_len;
164 xattr->should_free_name = should_free_name;
165
166 ci->i_xattrs.count++;
167 dout("__set_xattr count=%d\n", ci->i_xattrs.count);
168 } else {
169 kfree(*newxattr);
170 *newxattr = NULL;
171 if (xattr->should_free_val)
172 kfree((void *)xattr->val);
173
174 if (should_free_name) {
175 kfree((void *)name);
176 name = xattr->name;
177 }
178 ci->i_xattrs.names_size -= xattr->name_len;
179 ci->i_xattrs.vals_size -= xattr->val_len;
180 }
181 if (!xattr) {
182 pr_err("__set_xattr ENOMEM on %p %llx.%llx xattr %s=%s\n",
183 &ci->vfs_inode, ceph_vinop(&ci->vfs_inode), name,
184 xattr->val);
185 return -ENOMEM;
186 }
187 ci->i_xattrs.names_size += name_len;
188 ci->i_xattrs.vals_size += val_len;
189 if (val)
190 xattr->val = val;
191 else
192 xattr->val = "";
193
194 xattr->val_len = val_len;
195 xattr->dirty = dirty;
196 xattr->should_free_val = (val && should_free_val);
197
198 if (new) {
199 rb_link_node(&xattr->node, parent, p);
200 rb_insert_color(&xattr->node, &ci->i_xattrs.index);
201 dout("__set_xattr_val p=%p\n", p);
202 }
203
204 dout("__set_xattr_val added %llx.%llx xattr %p %s=%.*s\n",
205 ceph_vinop(&ci->vfs_inode), xattr, name, val_len, val);
206
207 return 0;
208}
209
210static struct ceph_inode_xattr *__get_xattr(struct ceph_inode_info *ci,
211 const char *name)
212{
213 struct rb_node **p;
214 struct rb_node *parent = NULL;
215 struct ceph_inode_xattr *xattr = NULL;
216 int c;
217
218 p = &ci->i_xattrs.index.rb_node;
219 while (*p) {
220 parent = *p;
221 xattr = rb_entry(parent, struct ceph_inode_xattr, node);
222 c = strncmp(name, xattr->name, xattr->name_len);
223 if (c < 0)
224 p = &(*p)->rb_left;
225 else if (c > 0)
226 p = &(*p)->rb_right;
227 else {
228 dout("__get_xattr %s: found %.*s\n", name,
229 xattr->val_len, xattr->val);
230 return xattr;
231 }
232 }
233
234 dout("__get_xattr %s: not found\n", name);
235
236 return NULL;
237}
238
239static void __free_xattr(struct ceph_inode_xattr *xattr)
240{
241 BUG_ON(!xattr);
242
243 if (xattr->should_free_name)
244 kfree((void *)xattr->name);
245 if (xattr->should_free_val)
246 kfree((void *)xattr->val);
247
248 kfree(xattr);
249}
250
251static int __remove_xattr(struct ceph_inode_info *ci,
252 struct ceph_inode_xattr *xattr)
253{
254 if (!xattr)
255 return -EOPNOTSUPP;
256
257 rb_erase(&xattr->node, &ci->i_xattrs.index);
258
259 if (xattr->should_free_name)
260 kfree((void *)xattr->name);
261 if (xattr->should_free_val)
262 kfree((void *)xattr->val);
263
264 ci->i_xattrs.names_size -= xattr->name_len;
265 ci->i_xattrs.vals_size -= xattr->val_len;
266 ci->i_xattrs.count--;
267 kfree(xattr);
268
269 return 0;
270}
271
272static int __remove_xattr_by_name(struct ceph_inode_info *ci,
273 const char *name)
274{
275 struct rb_node **p;
276 struct ceph_inode_xattr *xattr;
277 int err;
278
279 p = &ci->i_xattrs.index.rb_node;
280 xattr = __get_xattr(ci, name);
281 err = __remove_xattr(ci, xattr);
282 return err;
283}
284
285static char *__copy_xattr_names(struct ceph_inode_info *ci,
286 char *dest)
287{
288 struct rb_node *p;
289 struct ceph_inode_xattr *xattr = NULL;
290
291 p = rb_first(&ci->i_xattrs.index);
292 dout("__copy_xattr_names count=%d\n", ci->i_xattrs.count);
293
294 while (p) {
295 xattr = rb_entry(p, struct ceph_inode_xattr, node);
296 memcpy(dest, xattr->name, xattr->name_len);
297 dest[xattr->name_len] = '\0';
298
299 dout("dest=%s %p (%s) (%d/%d)\n", dest, xattr, xattr->name,
300 xattr->name_len, ci->i_xattrs.names_size);
301
302 dest += xattr->name_len + 1;
303 p = rb_next(p);
304 }
305
306 return dest;
307}
308
309void __ceph_destroy_xattrs(struct ceph_inode_info *ci)
310{
311 struct rb_node *p, *tmp;
312 struct ceph_inode_xattr *xattr = NULL;
313
314 p = rb_first(&ci->i_xattrs.index);
315
316 dout("__ceph_destroy_xattrs p=%p\n", p);
317
318 while (p) {
319 xattr = rb_entry(p, struct ceph_inode_xattr, node);
320 tmp = p;
321 p = rb_next(tmp);
322 dout("__ceph_destroy_xattrs next p=%p (%.*s)\n", p,
323 xattr->name_len, xattr->name);
324 rb_erase(tmp, &ci->i_xattrs.index);
325
326 __free_xattr(xattr);
327 }
328
329 ci->i_xattrs.names_size = 0;
330 ci->i_xattrs.vals_size = 0;
331 ci->i_xattrs.index_version = 0;
332 ci->i_xattrs.count = 0;
333 ci->i_xattrs.index = RB_ROOT;
334}
335
336static int __build_xattrs(struct inode *inode)
337{
338 u32 namelen;
339 u32 numattr = 0;
340 void *p, *end;
341 u32 len;
342 const char *name, *val;
343 struct ceph_inode_info *ci = ceph_inode(inode);
344 int xattr_version;
345 struct ceph_inode_xattr **xattrs = NULL;
346 int err;
347 int i;
348
349 dout("__build_xattrs() len=%d\n",
350 ci->i_xattrs.blob ? (int)ci->i_xattrs.blob->vec.iov_len : 0);
351
352 if (ci->i_xattrs.index_version >= ci->i_xattrs.version)
353 return 0; /* already built */
354
355 __ceph_destroy_xattrs(ci);
356
357start:
358 /* updated internal xattr rb tree */
359 if (ci->i_xattrs.blob && ci->i_xattrs.blob->vec.iov_len > 4) {
360 p = ci->i_xattrs.blob->vec.iov_base;
361 end = p + ci->i_xattrs.blob->vec.iov_len;
362 ceph_decode_32_safe(&p, end, numattr, bad);
363 xattr_version = ci->i_xattrs.version;
364 spin_unlock(&inode->i_lock);
365
366 xattrs = kcalloc(numattr, sizeof(struct ceph_xattr *),
367 GFP_NOFS);
368 err = -ENOMEM;
369 if (!xattrs)
370 goto bad_lock;
371 memset(xattrs, 0, numattr*sizeof(struct ceph_xattr *));
372 for (i = 0; i < numattr; i++) {
373 xattrs[i] = kmalloc(sizeof(struct ceph_inode_xattr),
374 GFP_NOFS);
375 if (!xattrs[i])
376 goto bad_lock;
377 }
378
379 spin_lock(&inode->i_lock);
380 if (ci->i_xattrs.version != xattr_version) {
381 /* lost a race, retry */
382 for (i = 0; i < numattr; i++)
383 kfree(xattrs[i]);
384 kfree(xattrs);
385 goto start;
386 }
387 err = -EIO;
388 while (numattr--) {
389 ceph_decode_32_safe(&p, end, len, bad);
390 namelen = len;
391 name = p;
392 p += len;
393 ceph_decode_32_safe(&p, end, len, bad);
394 val = p;
395 p += len;
396
397 err = __set_xattr(ci, name, namelen, val, len,
398 0, 0, 0, &xattrs[numattr]);
399
400 if (err < 0)
401 goto bad;
402 }
403 kfree(xattrs);
404 }
405 ci->i_xattrs.index_version = ci->i_xattrs.version;
406 ci->i_xattrs.dirty = false;
407
408 return err;
409bad_lock:
410 spin_lock(&inode->i_lock);
411bad:
412 if (xattrs) {
413 for (i = 0; i < numattr; i++)
414 kfree(xattrs[i]);
415 kfree(xattrs);
416 }
417 ci->i_xattrs.names_size = 0;
418 return err;
419}
420
421static int __get_required_blob_size(struct ceph_inode_info *ci, int name_size,
422 int val_size)
423{
424 /*
425 * 4 bytes for the length, and additional 4 bytes per each xattr name,
426 * 4 bytes per each value
427 */
428 int size = 4 + ci->i_xattrs.count*(4 + 4) +
429 ci->i_xattrs.names_size +
430 ci->i_xattrs.vals_size;
431 dout("__get_required_blob_size c=%d names.size=%d vals.size=%d\n",
432 ci->i_xattrs.count, ci->i_xattrs.names_size,
433 ci->i_xattrs.vals_size);
434
435 if (name_size)
436 size += 4 + 4 + name_size + val_size;
437
438 return size;
439}
440
441/*
442 * If there are dirty xattrs, reencode xattrs into the prealloc_blob
443 * and swap into place.
444 */
445void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
446{
447 struct rb_node *p;
448 struct ceph_inode_xattr *xattr = NULL;
449 void *dest;
450
451 dout("__build_xattrs_blob %p\n", &ci->vfs_inode);
452 if (ci->i_xattrs.dirty) {
453 int need = __get_required_blob_size(ci, 0, 0);
454
455 BUG_ON(need > ci->i_xattrs.prealloc_blob->alloc_len);
456
457 p = rb_first(&ci->i_xattrs.index);
458 dest = ci->i_xattrs.prealloc_blob->vec.iov_base;
459
460 ceph_encode_32(&dest, ci->i_xattrs.count);
461 while (p) {
462 xattr = rb_entry(p, struct ceph_inode_xattr, node);
463
464 ceph_encode_32(&dest, xattr->name_len);
465 memcpy(dest, xattr->name, xattr->name_len);
466 dest += xattr->name_len;
467 ceph_encode_32(&dest, xattr->val_len);
468 memcpy(dest, xattr->val, xattr->val_len);
469 dest += xattr->val_len;
470
471 p = rb_next(p);
472 }
473
474 /* adjust buffer len; it may be larger than we need */
475 ci->i_xattrs.prealloc_blob->vec.iov_len =
476 dest - ci->i_xattrs.prealloc_blob->vec.iov_base;
477
478 ceph_buffer_put(ci->i_xattrs.blob);
479 ci->i_xattrs.blob = ci->i_xattrs.prealloc_blob;
480 ci->i_xattrs.prealloc_blob = NULL;
481 ci->i_xattrs.dirty = false;
482 }
483}
484
485ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
486 size_t size)
487{
488 struct inode *inode = dentry->d_inode;
489 struct ceph_inode_info *ci = ceph_inode(inode);
490 struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
491 int err;
492 struct ceph_inode_xattr *xattr;
493 struct ceph_vxattr_cb *vxattr = NULL;
494
495 if (!ceph_is_valid_xattr(name))
496 return -ENODATA;
497
498 /* let's see if a virtual xattr was requested */
499 if (vxattrs)
500 vxattr = ceph_match_vxattr(vxattrs, name);
501
502 spin_lock(&inode->i_lock);
503 dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
504 ci->i_xattrs.version, ci->i_xattrs.index_version);
505
506 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
507 (ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
508 goto get_xattr;
509 } else {
510 spin_unlock(&inode->i_lock);
511 /* get xattrs from mds (if we don't already have them) */
512 err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR);
513 if (err)
514 return err;
515 }
516
517 spin_lock(&inode->i_lock);
518
519 if (vxattr && vxattr->readonly) {
520 err = vxattr->getxattr_cb(ci, value, size);
521 goto out;
522 }
523
524 err = __build_xattrs(inode);
525 if (err < 0)
526 goto out;
527
528get_xattr:
529 err = -ENODATA; /* == ENOATTR */
530 xattr = __get_xattr(ci, name);
531 if (!xattr) {
532 if (vxattr)
533 err = vxattr->getxattr_cb(ci, value, size);
534 goto out;
535 }
536
537 err = -ERANGE;
538 if (size && size < xattr->val_len)
539 goto out;
540
541 err = xattr->val_len;
542 if (size == 0)
543 goto out;
544
545 memcpy(value, xattr->val, xattr->val_len);
546
547out:
548 spin_unlock(&inode->i_lock);
549 return err;
550}
551
552ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
553{
554 struct inode *inode = dentry->d_inode;
555 struct ceph_inode_info *ci = ceph_inode(inode);
556 struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
557 u32 vir_namelen = 0;
558 u32 namelen;
559 int err;
560 u32 len;
561 int i;
562
563 spin_lock(&inode->i_lock);
564 dout("listxattr %p ver=%lld index_ver=%lld\n", inode,
565 ci->i_xattrs.version, ci->i_xattrs.index_version);
566
567 if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
568 (ci->i_xattrs.index_version > ci->i_xattrs.version)) {
569 goto list_xattr;
570 } else {
571 spin_unlock(&inode->i_lock);
572 err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR);
573 if (err)
574 return err;
575 }
576
577 spin_lock(&inode->i_lock);
578
579 err = __build_xattrs(inode);
580 if (err < 0)
581 goto out;
582
583list_xattr:
584 vir_namelen = 0;
585 /* include virtual dir xattrs */
586 if (vxattrs)
587 for (i = 0; vxattrs[i].name; i++)
588 vir_namelen += strlen(vxattrs[i].name) + 1;
589 /* adding 1 byte per each variable due to the null termination */
590 namelen = vir_namelen + ci->i_xattrs.names_size + ci->i_xattrs.count;
591 err = -ERANGE;
592 if (size && namelen > size)
593 goto out;
594
595 err = namelen;
596 if (size == 0)
597 goto out;
598
599 names = __copy_xattr_names(ci, names);
600
601 /* virtual xattr names, too */
602 if (vxattrs)
603 for (i = 0; vxattrs[i].name; i++) {
604 len = sprintf(names, "%s", vxattrs[i].name);
605 names += len + 1;
606 }
607
608out:
609 spin_unlock(&inode->i_lock);
610 return err;
611}
612
613static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
614 const char *value, size_t size, int flags)
615{
616 struct ceph_client *client = ceph_client(dentry->d_sb);
617 struct inode *inode = dentry->d_inode;
618 struct ceph_inode_info *ci = ceph_inode(inode);
619 struct inode *parent_inode = dentry->d_parent->d_inode;
620 struct ceph_mds_request *req;
621 struct ceph_mds_client *mdsc = &client->mdsc;
622 int err;
623 int i, nr_pages;
624 struct page **pages = NULL;
625 void *kaddr;
626
627 /* copy value into some pages */
628 nr_pages = calc_pages_for(0, size);
629 if (nr_pages) {
630 pages = kmalloc(sizeof(pages[0])*nr_pages, GFP_NOFS);
631 if (!pages)
632 return -ENOMEM;
633 err = -ENOMEM;
634 for (i = 0; i < nr_pages; i++) {
635 pages[i] = alloc_page(GFP_NOFS);
636 if (!pages[i]) {
637 nr_pages = i;
638 goto out;
639 }
640 kaddr = kmap(pages[i]);
641 memcpy(kaddr, value + i*PAGE_CACHE_SIZE,
642 min(PAGE_CACHE_SIZE, size-i*PAGE_CACHE_SIZE));
643 }
644 }
645
646 dout("setxattr value=%.*s\n", (int)size, value);
647
648 /* do request */
649 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETXATTR,
650 USE_AUTH_MDS);
651 if (IS_ERR(req))
652 return PTR_ERR(req);
653 req->r_inode = igrab(inode);
654 req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
655 req->r_num_caps = 1;
656 req->r_args.setxattr.flags = cpu_to_le32(flags);
657 req->r_path2 = kstrdup(name, GFP_NOFS);
658
659 req->r_pages = pages;
660 req->r_num_pages = nr_pages;
661 req->r_data_len = size;
662
663 dout("xattr.ver (before): %lld\n", ci->i_xattrs.version);
664 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
665 ceph_mdsc_put_request(req);
666 dout("xattr.ver (after): %lld\n", ci->i_xattrs.version);
667
668out:
669 if (pages) {
670 for (i = 0; i < nr_pages; i++)
671 __free_page(pages[i]);
672 kfree(pages);
673 }
674 return err;
675}
676
677int ceph_setxattr(struct dentry *dentry, const char *name,
678 const void *value, size_t size, int flags)
679{
680 struct inode *inode = dentry->d_inode;
681 struct ceph_inode_info *ci = ceph_inode(inode);
682 struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
683 int err;
684 int name_len = strlen(name);
685 int val_len = size;
686 char *newname = NULL;
687 char *newval = NULL;
688 struct ceph_inode_xattr *xattr = NULL;
689 int issued;
690 int required_blob_size;
691
692 if (ceph_snap(inode) != CEPH_NOSNAP)
693 return -EROFS;
694
695 if (!ceph_is_valid_xattr(name))
696 return -EOPNOTSUPP;
697
698 if (vxattrs) {
699 struct ceph_vxattr_cb *vxattr =
700 ceph_match_vxattr(vxattrs, name);
701 if (vxattr && vxattr->readonly)
702 return -EOPNOTSUPP;
703 }
704
705 /* preallocate memory for xattr name, value, index node */
706 err = -ENOMEM;
707 newname = kmalloc(name_len + 1, GFP_NOFS);
708 if (!newname)
709 goto out;
710 memcpy(newname, name, name_len + 1);
711
712 if (val_len) {
713 newval = kmalloc(val_len + 1, GFP_NOFS);
714 if (!newval)
715 goto out;
716 memcpy(newval, value, val_len);
717 newval[val_len] = '\0';
718 }
719
720 xattr = kmalloc(sizeof(struct ceph_inode_xattr), GFP_NOFS);
721 if (!xattr)
722 goto out;
723
724 spin_lock(&inode->i_lock);
725retry:
726 issued = __ceph_caps_issued(ci, NULL);
727 if (!(issued & CEPH_CAP_XATTR_EXCL))
728 goto do_sync;
729 __build_xattrs(inode);
730
731 required_blob_size = __get_required_blob_size(ci, name_len, val_len);
732
733 if (!ci->i_xattrs.prealloc_blob ||
734 required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
735 struct ceph_buffer *blob = NULL;
736
737 spin_unlock(&inode->i_lock);
738 dout(" preaallocating new blob size=%d\n", required_blob_size);
739 blob = ceph_buffer_new_alloc(required_blob_size, GFP_NOFS);
740 if (!blob)
741 goto out;
742 spin_lock(&inode->i_lock);
743 ceph_buffer_put(ci->i_xattrs.prealloc_blob);
744 ci->i_xattrs.prealloc_blob = blob;
745 goto retry;
746 }
747
748 dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
749 err = __set_xattr(ci, newname, name_len, newval,
750 val_len, 1, 1, 1, &xattr);
751 __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
752 ci->i_xattrs.dirty = true;
753 inode->i_ctime = CURRENT_TIME;
754 spin_unlock(&inode->i_lock);
755
756 return err;
757
758do_sync:
759 spin_unlock(&inode->i_lock);
760 err = ceph_sync_setxattr(dentry, name, value, size, flags);
761out:
762 kfree(newname);
763 kfree(newval);
764 kfree(xattr);
765 return err;
766}
767
768static int ceph_send_removexattr(struct dentry *dentry, const char *name)
769{
770 struct ceph_client *client = ceph_client(dentry->d_sb);
771 struct ceph_mds_client *mdsc = &client->mdsc;
772 struct inode *inode = dentry->d_inode;
773 struct inode *parent_inode = dentry->d_parent->d_inode;
774 struct ceph_mds_request *req;
775 int err;
776
777 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RMXATTR,
778 USE_AUTH_MDS);
779 if (IS_ERR(req))
780 return PTR_ERR(req);
781 req->r_inode = igrab(inode);
782 req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
783 req->r_num_caps = 1;
784 req->r_path2 = kstrdup(name, GFP_NOFS);
785
786 err = ceph_mdsc_do_request(mdsc, parent_inode, req);
787 ceph_mdsc_put_request(req);
788 return err;
789}
790
791int ceph_removexattr(struct dentry *dentry, const char *name)
792{
793 struct inode *inode = dentry->d_inode;
794 struct ceph_inode_info *ci = ceph_inode(inode);
795 struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
796 int issued;
797 int err;
798
799 if (ceph_snap(inode) != CEPH_NOSNAP)
800 return -EROFS;
801
802 if (!ceph_is_valid_xattr(name))
803 return -EOPNOTSUPP;
804
805 if (vxattrs) {
806 struct ceph_vxattr_cb *vxattr =
807 ceph_match_vxattr(vxattrs, name);
808 if (vxattr && vxattr->readonly)
809 return -EOPNOTSUPP;
810 }
811
812 spin_lock(&inode->i_lock);
813 __build_xattrs(inode);
814 issued = __ceph_caps_issued(ci, NULL);
815 dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
816
817 if (!(issued & CEPH_CAP_XATTR_EXCL))
818 goto do_sync;
819
820 err = __remove_xattr_by_name(ceph_inode(inode), name);
821 __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
822 ci->i_xattrs.dirty = true;
823 inode->i_ctime = CURRENT_TIME;
824
825 spin_unlock(&inode->i_lock);
826
827 return err;
828do_sync:
829 spin_unlock(&inode->i_lock);
830 err = ceph_send_removexattr(dentry, name);
831 return err;
832}
833