aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-10-06 14:31:12 -0400
committerSage Weil <sage@newdream.net>2009-10-06 14:31:12 -0400
commit963b61eb041e8850807d95f8d7a4c6a454c45000 (patch)
tree00d7926c7b2102c8c19ec9abd95b241b8bff1b7a /fs
parenta8599bd821d084d04a3290fffae1071624ec00ea (diff)
ceph: snapshot management
Ceph snapshots rely on client cooperation in determining which operations apply to which snapshots, and appropriately flushing snapshotted data and metadata back to the OSD and MDS clusters. Because snapshots apply to subtrees of the file hierarchy and can be created at any time, there is a fair bit of bookkeeping required to make this work. Portions of the hierarchy that belong to the same set of snapshots are described by a single 'snap realm.' A 'snap context' describes the set of snapshots that exist for a given file or directory. Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/snap.c897
1 files changed, 897 insertions, 0 deletions
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
new file mode 100644
index 000000000000..2e3cb40b7e48
--- /dev/null
+++ b/fs/ceph/snap.c
@@ -0,0 +1,897 @@
1#include "ceph_debug.h"
2
3#include <linux/radix-tree.h>
4#include <linux/sort.h>
5
6#include "super.h"
7#include "decode.h"
8
9/*
10 * Snapshots in ceph are driven in large part by cooperation from the
11 * client. In contrast to local file systems or file servers that
12 * implement snapshots at a single point in the system, ceph's
13 * distributed access to storage requires clients to help decide
14 * whether a write logically occurs before or after a recently created
15 * snapshot.
16 *
17 * This provides a perfect instantanous client-wide snapshot. Between
18 * clients, however, snapshots may appear to be applied at slightly
19 * different points in time, depending on delays in delivering the
20 * snapshot notification.
21 *
22 * Snapshots are _not_ file system-wide. Instead, each snapshot
23 * applies to the subdirectory nested beneath some directory. This
24 * effectively divides the hierarchy into multiple "realms," where all
25 * of the files contained by each realm share the same set of
26 * snapshots. An individual realm's snap set contains snapshots
27 * explicitly created on that realm, as well as any snaps in its
28 * parent's snap set _after_ the point at which the parent became it's
29 * parent (due to, say, a rename). Similarly, snaps from prior parents
30 * during the time intervals during which they were the parent are included.
31 *
32 * The client is spared most of this detail, fortunately... it must only
33 * maintains a hierarchy of realms reflecting the current parent/child
34 * realm relationship, and for each realm has an explicit list of snaps
35 * inherited from prior parents.
36 *
37 * A snap_realm struct is maintained for realms containing every inode
38 * with an open cap in the system. (The needed snap realm information is
39 * provided by the MDS whenever a cap is issued, i.e., on open.) A 'seq'
40 * version number is used to ensure that as realm parameters change (new
41 * snapshot, new parent, etc.) the client's realm hierarchy is updated.
42 *
43 * The realm hierarchy drives the generation of a 'snap context' for each
44 * realm, which simply lists the resulting set of snaps for the realm. This
45 * is attached to any writes sent to OSDs.
46 */
47/*
48 * Unfortunately error handling is a bit mixed here. If we get a snap
49 * update, but don't have enough memory to update our realm hierarchy,
50 * it's not clear what we can do about it (besides complaining to the
51 * console).
52 */
53
54
55/*
56 * increase ref count for the realm
57 *
58 * caller must hold snap_rwsem for write.
59 */
60void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
61 struct ceph_snap_realm *realm)
62{
63 dout("get_realm %p %d -> %d\n", realm,
64 atomic_read(&realm->nref), atomic_read(&realm->nref)+1);
65 /*
66 * since we _only_ increment realm refs or empty the empty
67 * list with snap_rwsem held, adjusting the empty list here is
68 * safe. we do need to protect against concurrent empty list
69 * additions, however.
70 */
71 if (atomic_read(&realm->nref) == 0) {
72 spin_lock(&mdsc->snap_empty_lock);
73 list_del_init(&realm->empty_item);
74 spin_unlock(&mdsc->snap_empty_lock);
75 }
76
77 atomic_inc(&realm->nref);
78}
79
80/*
81 * create and get the realm rooted at @ino and bump its ref count.
82 *
83 * caller must hold snap_rwsem for write.
84 */
85static struct ceph_snap_realm *ceph_create_snap_realm(
86 struct ceph_mds_client *mdsc,
87 u64 ino)
88{
89 struct ceph_snap_realm *realm;
90
91 realm = kzalloc(sizeof(*realm), GFP_NOFS);
92 if (!realm)
93 return ERR_PTR(-ENOMEM);
94
95 radix_tree_insert(&mdsc->snap_realms, ino, realm);
96
97 atomic_set(&realm->nref, 0); /* tree does not take a ref */
98 realm->ino = ino;
99 INIT_LIST_HEAD(&realm->children);
100 INIT_LIST_HEAD(&realm->child_item);
101 INIT_LIST_HEAD(&realm->empty_item);
102 INIT_LIST_HEAD(&realm->inodes_with_caps);
103 spin_lock_init(&realm->inodes_with_caps_lock);
104 dout("create_snap_realm %llx %p\n", realm->ino, realm);
105 return realm;
106}
107
108/*
109 * find and get (if found) the realm rooted at @ino and bump its ref count.
110 *
111 * caller must hold snap_rwsem for write.
112 */
113struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
114 u64 ino)
115{
116 struct ceph_snap_realm *realm;
117
118 realm = radix_tree_lookup(&mdsc->snap_realms, ino);
119 if (realm)
120 dout("lookup_snap_realm %llx %p\n", realm->ino, realm);
121 return realm;
122}
123
124static void __put_snap_realm(struct ceph_mds_client *mdsc,
125 struct ceph_snap_realm *realm);
126
127/*
128 * called with snap_rwsem (write)
129 */
130static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
131 struct ceph_snap_realm *realm)
132{
133 dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
134
135 radix_tree_delete(&mdsc->snap_realms, realm->ino);
136
137 if (realm->parent) {
138 list_del_init(&realm->child_item);
139 __put_snap_realm(mdsc, realm->parent);
140 }
141
142 kfree(realm->prior_parent_snaps);
143 kfree(realm->snaps);
144 ceph_put_snap_context(realm->cached_context);
145 kfree(realm);
146}
147
148/*
149 * caller holds snap_rwsem (write)
150 */
151static void __put_snap_realm(struct ceph_mds_client *mdsc,
152 struct ceph_snap_realm *realm)
153{
154 dout("__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
155 atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
156 if (atomic_dec_and_test(&realm->nref))
157 __destroy_snap_realm(mdsc, realm);
158}
159
160/*
161 * caller needn't hold any locks
162 */
163void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
164 struct ceph_snap_realm *realm)
165{
166 dout("put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
167 atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
168 if (!atomic_dec_and_test(&realm->nref))
169 return;
170
171 if (down_write_trylock(&mdsc->snap_rwsem)) {
172 __destroy_snap_realm(mdsc, realm);
173 up_write(&mdsc->snap_rwsem);
174 } else {
175 spin_lock(&mdsc->snap_empty_lock);
176 list_add(&mdsc->snap_empty, &realm->empty_item);
177 spin_unlock(&mdsc->snap_empty_lock);
178 }
179}
180
181/*
182 * Clean up any realms whose ref counts have dropped to zero. Note
183 * that this does not include realms who were created but not yet
184 * used.
185 *
186 * Called under snap_rwsem (write)
187 */
188static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
189{
190 struct ceph_snap_realm *realm;
191
192 spin_lock(&mdsc->snap_empty_lock);
193 while (!list_empty(&mdsc->snap_empty)) {
194 realm = list_first_entry(&mdsc->snap_empty,
195 struct ceph_snap_realm, empty_item);
196 list_del(&realm->empty_item);
197 spin_unlock(&mdsc->snap_empty_lock);
198 __destroy_snap_realm(mdsc, realm);
199 spin_lock(&mdsc->snap_empty_lock);
200 }
201 spin_unlock(&mdsc->snap_empty_lock);
202}
203
204void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc)
205{
206 down_write(&mdsc->snap_rwsem);
207 __cleanup_empty_realms(mdsc);
208 up_write(&mdsc->snap_rwsem);
209}
210
211/*
212 * adjust the parent realm of a given @realm. adjust child list, and parent
213 * pointers, and ref counts appropriately.
214 *
215 * return true if parent was changed, 0 if unchanged, <0 on error.
216 *
217 * caller must hold snap_rwsem for write.
218 */
219static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
220 struct ceph_snap_realm *realm,
221 u64 parentino)
222{
223 struct ceph_snap_realm *parent;
224
225 if (realm->parent_ino == parentino)
226 return 0;
227
228 parent = ceph_lookup_snap_realm(mdsc, parentino);
229 if (IS_ERR(parent))
230 return PTR_ERR(parent);
231 if (!parent) {
232 parent = ceph_create_snap_realm(mdsc, parentino);
233 if (IS_ERR(parent))
234 return PTR_ERR(parent);
235 }
236 dout("adjust_snap_realm_parent %llx %p: %llx %p -> %llx %p\n",
237 realm->ino, realm, realm->parent_ino, realm->parent,
238 parentino, parent);
239 if (realm->parent) {
240 list_del_init(&realm->child_item);
241 ceph_put_snap_realm(mdsc, realm->parent);
242 }
243 realm->parent_ino = parentino;
244 realm->parent = parent;
245 ceph_get_snap_realm(mdsc, parent);
246 list_add(&realm->child_item, &parent->children);
247 return 1;
248}
249
250
251static int cmpu64_rev(const void *a, const void *b)
252{
253 if (*(u64 *)a < *(u64 *)b)
254 return 1;
255 if (*(u64 *)a > *(u64 *)b)
256 return -1;
257 return 0;
258}
259
260/*
261 * build the snap context for a given realm.
262 */
263static int build_snap_context(struct ceph_snap_realm *realm)
264{
265 struct ceph_snap_realm *parent = realm->parent;
266 struct ceph_snap_context *snapc;
267 int err = 0;
268 int i;
269 int num = realm->num_prior_parent_snaps + realm->num_snaps;
270
271 /*
272 * build parent context, if it hasn't been built.
273 * conservatively estimate that all parent snaps might be
274 * included by us.
275 */
276 if (parent) {
277 if (!parent->cached_context) {
278 err = build_snap_context(parent);
279 if (err)
280 goto fail;
281 }
282 num += parent->cached_context->num_snaps;
283 }
284
285 /* do i actually need to update? not if my context seq
286 matches realm seq, and my parents' does to. (this works
287 because we rebuild_snap_realms() works _downward_ in
288 hierarchy after each update.) */
289 if (realm->cached_context &&
290 realm->cached_context->seq <= realm->seq &&
291 (!parent ||
292 realm->cached_context->seq <= parent->cached_context->seq)) {
293 dout("build_snap_context %llx %p: %p seq %lld (%d snaps)"
294 " (unchanged)\n",
295 realm->ino, realm, realm->cached_context,
296 realm->cached_context->seq,
297 realm->cached_context->num_snaps);
298 return 0;
299 }
300
301 /* alloc new snap context */
302 err = -ENOMEM;
303 if (num > ULONG_MAX / sizeof(u64) - sizeof(*snapc))
304 goto fail;
305 snapc = kzalloc(sizeof(*snapc) + num*sizeof(u64), GFP_NOFS);
306 if (!snapc)
307 goto fail;
308 atomic_set(&snapc->nref, 1);
309
310 /* build (reverse sorted) snap vector */
311 num = 0;
312 snapc->seq = realm->seq;
313 if (parent) {
314 /* include any of parent's snaps occuring _after_ my
315 parent became my parent */
316 for (i = 0; i < parent->cached_context->num_snaps; i++)
317 if (parent->cached_context->snaps[i] >=
318 realm->parent_since)
319 snapc->snaps[num++] =
320 parent->cached_context->snaps[i];
321 if (parent->cached_context->seq > snapc->seq)
322 snapc->seq = parent->cached_context->seq;
323 }
324 memcpy(snapc->snaps + num, realm->snaps,
325 sizeof(u64)*realm->num_snaps);
326 num += realm->num_snaps;
327 memcpy(snapc->snaps + num, realm->prior_parent_snaps,
328 sizeof(u64)*realm->num_prior_parent_snaps);
329 num += realm->num_prior_parent_snaps;
330
331 sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
332 snapc->num_snaps = num;
333 dout("build_snap_context %llx %p: %p seq %lld (%d snaps)\n",
334 realm->ino, realm, snapc, snapc->seq, snapc->num_snaps);
335
336 if (realm->cached_context)
337 ceph_put_snap_context(realm->cached_context);
338 realm->cached_context = snapc;
339 return 0;
340
341fail:
342 /*
343 * if we fail, clear old (incorrect) cached_context... hopefully
344 * we'll have better luck building it later
345 */
346 if (realm->cached_context) {
347 ceph_put_snap_context(realm->cached_context);
348 realm->cached_context = NULL;
349 }
350 pr_err("build_snap_context %llx %p fail %d\n", realm->ino,
351 realm, err);
352 return err;
353}
354
355/*
356 * rebuild snap context for the given realm and all of its children.
357 */
358static void rebuild_snap_realms(struct ceph_snap_realm *realm)
359{
360 struct ceph_snap_realm *child;
361
362 dout("rebuild_snap_realms %llx %p\n", realm->ino, realm);
363 build_snap_context(realm);
364
365 list_for_each_entry(child, &realm->children, child_item)
366 rebuild_snap_realms(child);
367}
368
369
370/*
371 * helper to allocate and decode an array of snapids. free prior
372 * instance, if any.
373 */
374static int dup_array(u64 **dst, __le64 *src, int num)
375{
376 int i;
377
378 kfree(*dst);
379 if (num) {
380 *dst = kcalloc(num, sizeof(u64), GFP_NOFS);
381 if (!*dst)
382 return -ENOMEM;
383 for (i = 0; i < num; i++)
384 (*dst)[i] = get_unaligned_le64(src + i);
385 } else {
386 *dst = NULL;
387 }
388 return 0;
389}
390
391
392/*
393 * When a snapshot is applied, the size/mtime inode metadata is queued
394 * in a ceph_cap_snap (one for each snapshot) until writeback
395 * completes and the metadata can be flushed back to the MDS.
396 *
397 * However, if a (sync) write is currently in-progress when we apply
398 * the snapshot, we have to wait until the write succeeds or fails
399 * (and a final size/mtime is known). In this case the
400 * cap_snap->writing = 1, and is said to be "pending." When the write
401 * finishes, we __ceph_finish_cap_snap().
402 *
403 * Caller must hold snap_rwsem for read (i.e., the realm topology won't
404 * change).
405 */
406void ceph_queue_cap_snap(struct ceph_inode_info *ci,
407 struct ceph_snap_context *snapc)
408{
409 struct inode *inode = &ci->vfs_inode;
410 struct ceph_cap_snap *capsnap;
411 int used;
412
413 capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
414 if (!capsnap) {
415 pr_err("ENOMEM allocating ceph_cap_snap on %p\n", inode);
416 return;
417 }
418
419 spin_lock(&inode->i_lock);
420 used = __ceph_caps_used(ci);
421 if (__ceph_have_pending_cap_snap(ci)) {
422 /* there is no point in queuing multiple "pending" cap_snaps,
423 as no new writes are allowed to start when pending, so any
424 writes in progress now were started before the previous
425 cap_snap. lucky us. */
426 dout("queue_cap_snap %p snapc %p seq %llu used %d"
427 " already pending\n", inode, snapc, snapc->seq, used);
428 kfree(capsnap);
429 } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR)) {
430 igrab(inode);
431
432 atomic_set(&capsnap->nref, 1);
433 capsnap->ci = ci;
434 INIT_LIST_HEAD(&capsnap->ci_item);
435 INIT_LIST_HEAD(&capsnap->flushing_item);
436
437 capsnap->follows = snapc->seq - 1;
438 capsnap->context = ceph_get_snap_context(snapc);
439 capsnap->issued = __ceph_caps_issued(ci, NULL);
440 capsnap->dirty = __ceph_caps_dirty(ci);
441
442 capsnap->mode = inode->i_mode;
443 capsnap->uid = inode->i_uid;
444 capsnap->gid = inode->i_gid;
445
446 /* fixme? */
447 capsnap->xattr_blob = NULL;
448 capsnap->xattr_len = 0;
449
450 /* dirty page count moved from _head to this cap_snap;
451 all subsequent writes page dirties occur _after_ this
452 snapshot. */
453 capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
454 ci->i_wrbuffer_ref_head = 0;
455 ceph_put_snap_context(ci->i_head_snapc);
456 ci->i_head_snapc = NULL;
457 list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
458
459 if (used & CEPH_CAP_FILE_WR) {
460 dout("queue_cap_snap %p cap_snap %p snapc %p"
461 " seq %llu used WR, now pending\n", inode,
462 capsnap, snapc, snapc->seq);
463 capsnap->writing = 1;
464 } else {
465 /* note mtime, size NOW. */
466 __ceph_finish_cap_snap(ci, capsnap);
467 }
468 } else {
469 dout("queue_cap_snap %p nothing dirty|writing\n", inode);
470 kfree(capsnap);
471 }
472
473 spin_unlock(&inode->i_lock);
474}
475
476/*
477 * Finalize the size, mtime for a cap_snap.. that is, settle on final values
478 * to be used for the snapshot, to be flushed back to the mds.
479 *
480 * If capsnap can now be flushed, add to snap_flush list, and return 1.
481 *
482 * Caller must hold i_lock.
483 */
484int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
485 struct ceph_cap_snap *capsnap)
486{
487 struct inode *inode = &ci->vfs_inode;
488 struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
489
490 BUG_ON(capsnap->writing);
491 capsnap->size = inode->i_size;
492 capsnap->mtime = inode->i_mtime;
493 capsnap->atime = inode->i_atime;
494 capsnap->ctime = inode->i_ctime;
495 capsnap->time_warp_seq = ci->i_time_warp_seq;
496 if (capsnap->dirty_pages) {
497 dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu "
498 "still has %d dirty pages\n", inode, capsnap,
499 capsnap->context, capsnap->context->seq,
500 capsnap->size, capsnap->dirty_pages);
501 return 0;
502 }
503 dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu clean\n",
504 inode, capsnap, capsnap->context,
505 capsnap->context->seq, capsnap->size);
506
507 spin_lock(&mdsc->snap_flush_lock);
508 list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list);
509 spin_unlock(&mdsc->snap_flush_lock);
510 return 1; /* caller may want to ceph_flush_snaps */
511}
512
513
514/*
515 * Parse and apply a snapblob "snap trace" from the MDS. This specifies
516 * the snap realm parameters from a given realm and all of its ancestors,
517 * up to the root.
518 *
519 * Caller must hold snap_rwsem for write.
520 */
521int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
522 void *p, void *e, bool deletion)
523{
524 struct ceph_mds_snap_realm *ri; /* encoded */
525 __le64 *snaps; /* encoded */
526 __le64 *prior_parent_snaps; /* encoded */
527 struct ceph_snap_realm *realm;
528 int invalidate = 0;
529 int err = -ENOMEM;
530
531 dout("update_snap_trace deletion=%d\n", deletion);
532more:
533 ceph_decode_need(&p, e, sizeof(*ri), bad);
534 ri = p;
535 p += sizeof(*ri);
536 ceph_decode_need(&p, e, sizeof(u64)*(le32_to_cpu(ri->num_snaps) +
537 le32_to_cpu(ri->num_prior_parent_snaps)), bad);
538 snaps = p;
539 p += sizeof(u64) * le32_to_cpu(ri->num_snaps);
540 prior_parent_snaps = p;
541 p += sizeof(u64) * le32_to_cpu(ri->num_prior_parent_snaps);
542
543 realm = ceph_lookup_snap_realm(mdsc, le64_to_cpu(ri->ino));
544 if (IS_ERR(realm)) {
545 err = PTR_ERR(realm);
546 goto fail;
547 }
548 if (!realm) {
549 realm = ceph_create_snap_realm(mdsc, le64_to_cpu(ri->ino));
550 if (IS_ERR(realm)) {
551 err = PTR_ERR(realm);
552 goto fail;
553 }
554 }
555
556 if (le64_to_cpu(ri->seq) > realm->seq) {
557 dout("update_snap_trace updating %llx %p %lld -> %lld\n",
558 realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
559 /*
560 * if the realm seq has changed, queue a cap_snap for every
561 * inode with open caps. we do this _before_ we update
562 * the realm info so that we prepare for writeback under the
563 * _previous_ snap context.
564 *
565 * ...unless it's a snap deletion!
566 */
567 if (!deletion) {
568 struct ceph_inode_info *ci;
569 struct inode *lastinode = NULL;
570
571 spin_lock(&realm->inodes_with_caps_lock);
572 list_for_each_entry(ci, &realm->inodes_with_caps,
573 i_snap_realm_item) {
574 struct inode *inode = igrab(&ci->vfs_inode);
575 if (!inode)
576 continue;
577 spin_unlock(&realm->inodes_with_caps_lock);
578 if (lastinode)
579 iput(lastinode);
580 lastinode = inode;
581 ceph_queue_cap_snap(ci, realm->cached_context);
582 spin_lock(&realm->inodes_with_caps_lock);
583 }
584 spin_unlock(&realm->inodes_with_caps_lock);
585 if (lastinode)
586 iput(lastinode);
587 dout("update_snap_trace cap_snaps queued\n");
588 }
589
590 } else {
591 dout("update_snap_trace %llx %p seq %lld unchanged\n",
592 realm->ino, realm, realm->seq);
593 }
594
595 /* ensure the parent is correct */
596 err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
597 if (err < 0)
598 goto fail;
599 invalidate += err;
600
601 if (le64_to_cpu(ri->seq) > realm->seq) {
602 /* update realm parameters, snap lists */
603 realm->seq = le64_to_cpu(ri->seq);
604 realm->created = le64_to_cpu(ri->created);
605 realm->parent_since = le64_to_cpu(ri->parent_since);
606
607 realm->num_snaps = le32_to_cpu(ri->num_snaps);
608 err = dup_array(&realm->snaps, snaps, realm->num_snaps);
609 if (err < 0)
610 goto fail;
611
612 realm->num_prior_parent_snaps =
613 le32_to_cpu(ri->num_prior_parent_snaps);
614 err = dup_array(&realm->prior_parent_snaps, prior_parent_snaps,
615 realm->num_prior_parent_snaps);
616 if (err < 0)
617 goto fail;
618
619 invalidate = 1;
620 } else if (!realm->cached_context) {
621 invalidate = 1;
622 }
623
624 dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
625 realm, invalidate, p, e);
626
627 if (p < e)
628 goto more;
629
630 /* invalidate when we reach the _end_ (root) of the trace */
631 if (invalidate)
632 rebuild_snap_realms(realm);
633
634 __cleanup_empty_realms(mdsc);
635 return 0;
636
637bad:
638 err = -EINVAL;
639fail:
640 pr_err("update_snap_trace error %d\n", err);
641 return err;
642}
643
644
645/*
646 * Send any cap_snaps that are queued for flush. Try to carry
647 * s_mutex across multiple snap flushes to avoid locking overhead.
648 *
649 * Caller holds no locks.
650 */
651static void flush_snaps(struct ceph_mds_client *mdsc)
652{
653 struct ceph_inode_info *ci;
654 struct inode *inode;
655 struct ceph_mds_session *session = NULL;
656
657 dout("flush_snaps\n");
658 spin_lock(&mdsc->snap_flush_lock);
659 while (!list_empty(&mdsc->snap_flush_list)) {
660 ci = list_first_entry(&mdsc->snap_flush_list,
661 struct ceph_inode_info, i_snap_flush_item);
662 inode = &ci->vfs_inode;
663 igrab(inode);
664 spin_unlock(&mdsc->snap_flush_lock);
665 spin_lock(&inode->i_lock);
666 __ceph_flush_snaps(ci, &session);
667 spin_unlock(&inode->i_lock);
668 iput(inode);
669 spin_lock(&mdsc->snap_flush_lock);
670 }
671 spin_unlock(&mdsc->snap_flush_lock);
672
673 if (session) {
674 mutex_unlock(&session->s_mutex);
675 ceph_put_mds_session(session);
676 }
677 dout("flush_snaps done\n");
678}
679
680
681/*
682 * Handle a snap notification from the MDS.
683 *
684 * This can take two basic forms: the simplest is just a snap creation
685 * or deletion notification on an existing realm. This should update the
686 * realm and its children.
687 *
688 * The more difficult case is realm creation, due to snap creation at a
689 * new point in the file hierarchy, or due to a rename that moves a file or
690 * directory into another realm.
691 */
692void ceph_handle_snap(struct ceph_mds_client *mdsc,
693 struct ceph_msg *msg)
694{
695 struct super_block *sb = mdsc->client->sb;
696 struct ceph_mds_session *session;
697 int mds;
698 u64 split;
699 int op;
700 int trace_len;
701 struct ceph_snap_realm *realm = NULL;
702 void *p = msg->front.iov_base;
703 void *e = p + msg->front.iov_len;
704 struct ceph_mds_snap_head *h;
705 int num_split_inos, num_split_realms;
706 __le64 *split_inos = NULL, *split_realms = NULL;
707 int i;
708 int locked_rwsem = 0;
709
710 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
711 return;
712 mds = le64_to_cpu(msg->hdr.src.name.num);
713
714 /* decode */
715 if (msg->front.iov_len < sizeof(*h))
716 goto bad;
717 h = p;
718 op = le32_to_cpu(h->op);
719 split = le64_to_cpu(h->split); /* non-zero if we are splitting an
720 * existing realm */
721 num_split_inos = le32_to_cpu(h->num_split_inos);
722 num_split_realms = le32_to_cpu(h->num_split_realms);
723 trace_len = le32_to_cpu(h->trace_len);
724 p += sizeof(*h);
725
726 dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds,
727 ceph_snap_op_name(op), split, trace_len);
728
729 /* find session */
730 mutex_lock(&mdsc->mutex);
731 session = __ceph_lookup_mds_session(mdsc, mds);
732 mutex_unlock(&mdsc->mutex);
733 if (!session) {
734 dout("WTF, got snap but no session for mds%d\n", mds);
735 return;
736 }
737
738 mutex_lock(&session->s_mutex);
739 session->s_seq++;
740 mutex_unlock(&session->s_mutex);
741
742 down_write(&mdsc->snap_rwsem);
743 locked_rwsem = 1;
744
745 if (op == CEPH_SNAP_OP_SPLIT) {
746 struct ceph_mds_snap_realm *ri;
747
748 /*
749 * A "split" breaks part of an existing realm off into
750 * a new realm. The MDS provides a list of inodes
751 * (with caps) and child realms that belong to the new
752 * child.
753 */
754 split_inos = p;
755 p += sizeof(u64) * num_split_inos;
756 split_realms = p;
757 p += sizeof(u64) * num_split_realms;
758 ceph_decode_need(&p, e, sizeof(*ri), bad);
759 /* we will peek at realm info here, but will _not_
760 * advance p, as the realm update will occur below in
761 * ceph_update_snap_trace. */
762 ri = p;
763
764 realm = ceph_lookup_snap_realm(mdsc, split);
765 if (IS_ERR(realm))
766 goto out;
767 if (!realm) {
768 realm = ceph_create_snap_realm(mdsc, split);
769 if (IS_ERR(realm))
770 goto out;
771 }
772 ceph_get_snap_realm(mdsc, realm);
773
774 dout("splitting snap_realm %llx %p\n", realm->ino, realm);
775 for (i = 0; i < num_split_inos; i++) {
776 struct ceph_vino vino = {
777 .ino = le64_to_cpu(split_inos[i]),
778 .snap = CEPH_NOSNAP,
779 };
780 struct inode *inode = ceph_find_inode(sb, vino);
781 struct ceph_inode_info *ci;
782
783 if (!inode)
784 continue;
785 ci = ceph_inode(inode);
786
787 spin_lock(&inode->i_lock);
788 if (!ci->i_snap_realm)
789 goto skip_inode;
790 /*
791 * If this inode belongs to a realm that was
792 * created after our new realm, we experienced
793 * a race (due to another split notifications
794 * arriving from a different MDS). So skip
795 * this inode.
796 */
797 if (ci->i_snap_realm->created >
798 le64_to_cpu(ri->created)) {
799 dout(" leaving %p in newer realm %llx %p\n",
800 inode, ci->i_snap_realm->ino,
801 ci->i_snap_realm);
802 goto skip_inode;
803 }
804 dout(" will move %p to split realm %llx %p\n",
805 inode, realm->ino, realm);
806 /*
807 * Remove the inode from the realm's inode
808 * list, but don't add it to the new realm
809 * yet. We don't want the cap_snap to be
810 * queued (again) by ceph_update_snap_trace()
811 * below. Queue it _now_, under the old context.
812 */
813 list_del_init(&ci->i_snap_realm_item);
814 spin_unlock(&inode->i_lock);
815
816 ceph_queue_cap_snap(ci,
817 ci->i_snap_realm->cached_context);
818
819 iput(inode);
820 continue;
821
822skip_inode:
823 spin_unlock(&inode->i_lock);
824 iput(inode);
825 }
826
827 /* we may have taken some of the old realm's children. */
828 for (i = 0; i < num_split_realms; i++) {
829 struct ceph_snap_realm *child =
830 ceph_lookup_snap_realm(mdsc,
831 le64_to_cpu(split_realms[i]));
832 if (IS_ERR(child))
833 continue;
834 if (!child)
835 continue;
836 adjust_snap_realm_parent(mdsc, child, realm->ino);
837 }
838 }
839
840 /*
841 * update using the provided snap trace. if we are deleting a
842 * snap, we can avoid queueing cap_snaps.
843 */
844 ceph_update_snap_trace(mdsc, p, e,
845 op == CEPH_SNAP_OP_DESTROY);
846
847 if (op == CEPH_SNAP_OP_SPLIT) {
848 /*
849 * ok, _now_ add the inodes into the new realm.
850 */
851 for (i = 0; i < num_split_inos; i++) {
852 struct ceph_vino vino = {
853 .ino = le64_to_cpu(split_inos[i]),
854 .snap = CEPH_NOSNAP,
855 };
856 struct inode *inode = ceph_find_inode(sb, vino);
857 struct ceph_inode_info *ci;
858
859 if (!inode)
860 continue;
861 ci = ceph_inode(inode);
862 spin_lock(&inode->i_lock);
863 if (!ci->i_snap_realm)
864 goto split_skip_inode;
865 ceph_put_snap_realm(mdsc, ci->i_snap_realm);
866 spin_lock(&realm->inodes_with_caps_lock);
867 list_add(&ci->i_snap_realm_item,
868 &realm->inodes_with_caps);
869 ci->i_snap_realm = realm;
870 spin_unlock(&realm->inodes_with_caps_lock);
871 ceph_get_snap_realm(mdsc, realm);
872split_skip_inode:
873 spin_unlock(&inode->i_lock);
874 iput(inode);
875 }
876
877 /* we took a reference when we created the realm, above */
878 ceph_put_snap_realm(mdsc, realm);
879 }
880
881 __cleanup_empty_realms(mdsc);
882
883 up_write(&mdsc->snap_rwsem);
884
885 flush_snaps(mdsc);
886 return;
887
888bad:
889 pr_err("corrupt snap message from mds%d\n", mds);
890out:
891 if (locked_rwsem)
892 up_write(&mdsc->snap_rwsem);
893 return;
894}
895
896
897