aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/caps.c24
-rw-r--r--fs/ceph/mds_client.c9
-rw-r--r--fs/ceph/snap.c54
-rw-r--r--fs/ceph/super.h3
4 files changed, 63 insertions, 27 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index d0618e8412fd..8ed1192606d9 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -577,7 +577,6 @@ void ceph_add_cap(struct inode *inode,
577 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, 577 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
578 realmino); 578 realmino);
579 if (realm) { 579 if (realm) {
580 ceph_get_snap_realm(mdsc, realm);
581 spin_lock(&realm->inodes_with_caps_lock); 580 spin_lock(&realm->inodes_with_caps_lock);
582 ci->i_snap_realm = realm; 581 ci->i_snap_realm = realm;
583 list_add(&ci->i_snap_realm_item, 582 list_add(&ci->i_snap_realm_item,
@@ -2447,13 +2446,13 @@ static void invalidate_aliases(struct inode *inode)
2447 */ 2446 */
2448static void handle_cap_grant(struct ceph_mds_client *mdsc, 2447static void handle_cap_grant(struct ceph_mds_client *mdsc,
2449 struct inode *inode, struct ceph_mds_caps *grant, 2448 struct inode *inode, struct ceph_mds_caps *grant,
2450 void *snaptrace, int snaptrace_len,
2451 u64 inline_version, 2449 u64 inline_version,
2452 void *inline_data, int inline_len, 2450 void *inline_data, int inline_len,
2453 struct ceph_buffer *xattr_buf, 2451 struct ceph_buffer *xattr_buf,
2454 struct ceph_mds_session *session, 2452 struct ceph_mds_session *session,
2455 struct ceph_cap *cap, int issued) 2453 struct ceph_cap *cap, int issued)
2456 __releases(ci->i_ceph_lock) 2454 __releases(ci->i_ceph_lock)
2455 __releases(mdsc->snap_rwsem)
2457{ 2456{
2458 struct ceph_inode_info *ci = ceph_inode(inode); 2457 struct ceph_inode_info *ci = ceph_inode(inode);
2459 int mds = session->s_mds; 2458 int mds = session->s_mds;
@@ -2654,10 +2653,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
2654 spin_unlock(&ci->i_ceph_lock); 2653 spin_unlock(&ci->i_ceph_lock);
2655 2654
2656 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 2655 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
2657 down_write(&mdsc->snap_rwsem);
2658 ceph_update_snap_trace(mdsc, snaptrace,
2659 snaptrace + snaptrace_len, false);
2660 downgrade_write(&mdsc->snap_rwsem);
2661 kick_flushing_inode_caps(mdsc, session, inode); 2656 kick_flushing_inode_caps(mdsc, session, inode);
2662 up_read(&mdsc->snap_rwsem); 2657 up_read(&mdsc->snap_rwsem);
2663 if (newcaps & ~issued) 2658 if (newcaps & ~issued)
@@ -3067,6 +3062,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3067 struct ceph_cap *cap; 3062 struct ceph_cap *cap;
3068 struct ceph_mds_caps *h; 3063 struct ceph_mds_caps *h;
3069 struct ceph_mds_cap_peer *peer = NULL; 3064 struct ceph_mds_cap_peer *peer = NULL;
3065 struct ceph_snap_realm *realm;
3070 int mds = session->s_mds; 3066 int mds = session->s_mds;
3071 int op, issued; 3067 int op, issued;
3072 u32 seq, mseq; 3068 u32 seq, mseq;
@@ -3168,11 +3164,23 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3168 goto done_unlocked; 3164 goto done_unlocked;
3169 3165
3170 case CEPH_CAP_OP_IMPORT: 3166 case CEPH_CAP_OP_IMPORT:
3167 realm = NULL;
3168 if (snaptrace_len) {
3169 down_write(&mdsc->snap_rwsem);
3170 ceph_update_snap_trace(mdsc, snaptrace,
3171 snaptrace + snaptrace_len,
3172 false, &realm);
3173 downgrade_write(&mdsc->snap_rwsem);
3174 } else {
3175 down_read(&mdsc->snap_rwsem);
3176 }
3171 handle_cap_import(mdsc, inode, h, peer, session, 3177 handle_cap_import(mdsc, inode, h, peer, session,
3172 &cap, &issued); 3178 &cap, &issued);
3173 handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, 3179 handle_cap_grant(mdsc, inode, h,
3174 inline_version, inline_data, inline_len, 3180 inline_version, inline_data, inline_len,
3175 msg->middle, session, cap, issued); 3181 msg->middle, session, cap, issued);
3182 if (realm)
3183 ceph_put_snap_realm(mdsc, realm);
3176 goto done_unlocked; 3184 goto done_unlocked;
3177 } 3185 }
3178 3186
@@ -3192,7 +3200,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3192 case CEPH_CAP_OP_GRANT: 3200 case CEPH_CAP_OP_GRANT:
3193 __ceph_caps_issued(ci, &issued); 3201 __ceph_caps_issued(ci, &issued);
3194 issued |= __ceph_caps_dirty(ci); 3202 issued |= __ceph_caps_dirty(ci);
3195 handle_cap_grant(mdsc, inode, h, NULL, 0, 3203 handle_cap_grant(mdsc, inode, h,
3196 inline_version, inline_data, inline_len, 3204 inline_version, inline_data, inline_len,
3197 msg->middle, session, cap, issued); 3205 msg->middle, session, cap, issued);
3198 goto done_unlocked; 3206 goto done_unlocked;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index c6c33b411a2f..85c67ae03e46 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2286,6 +2286,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2286 struct ceph_mds_request *req; 2286 struct ceph_mds_request *req;
2287 struct ceph_mds_reply_head *head = msg->front.iov_base; 2287 struct ceph_mds_reply_head *head = msg->front.iov_base;
2288 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2288 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
2289 struct ceph_snap_realm *realm;
2289 u64 tid; 2290 u64 tid;
2290 int err, result; 2291 int err, result;
2291 int mds = session->s_mds; 2292 int mds = session->s_mds;
@@ -2401,11 +2402,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2401 } 2402 }
2402 2403
2403 /* snap trace */ 2404 /* snap trace */
2405 realm = NULL;
2404 if (rinfo->snapblob_len) { 2406 if (rinfo->snapblob_len) {
2405 down_write(&mdsc->snap_rwsem); 2407 down_write(&mdsc->snap_rwsem);
2406 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2408 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2407 rinfo->snapblob + rinfo->snapblob_len, 2409 rinfo->snapblob + rinfo->snapblob_len,
2408 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP); 2410 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2411 &realm);
2409 downgrade_write(&mdsc->snap_rwsem); 2412 downgrade_write(&mdsc->snap_rwsem);
2410 } else { 2413 } else {
2411 down_read(&mdsc->snap_rwsem); 2414 down_read(&mdsc->snap_rwsem);
@@ -2423,6 +2426,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2423 mutex_unlock(&req->r_fill_mutex); 2426 mutex_unlock(&req->r_fill_mutex);
2424 2427
2425 up_read(&mdsc->snap_rwsem); 2428 up_read(&mdsc->snap_rwsem);
2429 if (realm)
2430 ceph_put_snap_realm(mdsc, realm);
2426out_err: 2431out_err:
2427 mutex_lock(&mdsc->mutex); 2432 mutex_lock(&mdsc->mutex);
2428 if (!req->r_aborted) { 2433 if (!req->r_aborted) {
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index ce35fbd4ba5d..a97e39f09ba6 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -70,13 +70,11 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
70 * safe. we do need to protect against concurrent empty list 70 * safe. we do need to protect against concurrent empty list
71 * additions, however. 71 * additions, however.
72 */ 72 */
73 if (atomic_read(&realm->nref) == 0) { 73 if (atomic_inc_return(&realm->nref) == 1) {
74 spin_lock(&mdsc->snap_empty_lock); 74 spin_lock(&mdsc->snap_empty_lock);
75 list_del_init(&realm->empty_item); 75 list_del_init(&realm->empty_item);
76 spin_unlock(&mdsc->snap_empty_lock); 76 spin_unlock(&mdsc->snap_empty_lock);
77 } 77 }
78
79 atomic_inc(&realm->nref);
80} 78}
81 79
82static void __insert_snap_realm(struct rb_root *root, 80static void __insert_snap_realm(struct rb_root *root,
@@ -116,7 +114,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
116 if (!realm) 114 if (!realm)
117 return ERR_PTR(-ENOMEM); 115 return ERR_PTR(-ENOMEM);
118 116
119 atomic_set(&realm->nref, 0); /* tree does not take a ref */ 117 atomic_set(&realm->nref, 1); /* for caller */
120 realm->ino = ino; 118 realm->ino = ino;
121 INIT_LIST_HEAD(&realm->children); 119 INIT_LIST_HEAD(&realm->children);
122 INIT_LIST_HEAD(&realm->child_item); 120 INIT_LIST_HEAD(&realm->child_item);
@@ -134,8 +132,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
134 * 132 *
135 * caller must hold snap_rwsem for write. 133 * caller must hold snap_rwsem for write.
136 */ 134 */
137struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, 135static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
138 u64 ino) 136 u64 ino)
139{ 137{
140 struct rb_node *n = mdsc->snap_realms.rb_node; 138 struct rb_node *n = mdsc->snap_realms.rb_node;
141 struct ceph_snap_realm *r; 139 struct ceph_snap_realm *r;
@@ -154,6 +152,16 @@ struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
154 return NULL; 152 return NULL;
155} 153}
156 154
155struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
156 u64 ino)
157{
158 struct ceph_snap_realm *r;
159 r = __lookup_snap_realm(mdsc, ino);
160 if (r)
161 ceph_get_snap_realm(mdsc, r);
162 return r;
163}
164
157static void __put_snap_realm(struct ceph_mds_client *mdsc, 165static void __put_snap_realm(struct ceph_mds_client *mdsc,
158 struct ceph_snap_realm *realm); 166 struct ceph_snap_realm *realm);
159 167
@@ -273,7 +281,6 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
273 } 281 }
274 realm->parent_ino = parentino; 282 realm->parent_ino = parentino;
275 realm->parent = parent; 283 realm->parent = parent;
276 ceph_get_snap_realm(mdsc, parent);
277 list_add(&realm->child_item, &parent->children); 284 list_add(&realm->child_item, &parent->children);
278 return 1; 285 return 1;
279} 286}
@@ -631,12 +638,14 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
631 * Caller must hold snap_rwsem for write. 638 * Caller must hold snap_rwsem for write.
632 */ 639 */
633int ceph_update_snap_trace(struct ceph_mds_client *mdsc, 640int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
634 void *p, void *e, bool deletion) 641 void *p, void *e, bool deletion,
642 struct ceph_snap_realm **realm_ret)
635{ 643{
636 struct ceph_mds_snap_realm *ri; /* encoded */ 644 struct ceph_mds_snap_realm *ri; /* encoded */
637 __le64 *snaps; /* encoded */ 645 __le64 *snaps; /* encoded */
638 __le64 *prior_parent_snaps; /* encoded */ 646 __le64 *prior_parent_snaps; /* encoded */
639 struct ceph_snap_realm *realm; 647 struct ceph_snap_realm *realm = NULL;
648 struct ceph_snap_realm *first_realm = NULL;
640 int invalidate = 0; 649 int invalidate = 0;
641 int err = -ENOMEM; 650 int err = -ENOMEM;
642 LIST_HEAD(dirty_realms); 651 LIST_HEAD(dirty_realms);
@@ -704,13 +713,18 @@ more:
704 dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino, 713 dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
705 realm, invalidate, p, e); 714 realm, invalidate, p, e);
706 715
707 if (p < e)
708 goto more;
709
710 /* invalidate when we reach the _end_ (root) of the trace */ 716 /* invalidate when we reach the _end_ (root) of the trace */
711 if (invalidate) 717 if (invalidate && p >= e)
712 rebuild_snap_realms(realm); 718 rebuild_snap_realms(realm);
713 719
720 if (!first_realm)
721 first_realm = realm;
722 else
723 ceph_put_snap_realm(mdsc, realm);
724
725 if (p < e)
726 goto more;
727
714 /* 728 /*
715 * queue cap snaps _after_ we've built the new snap contexts, 729 * queue cap snaps _after_ we've built the new snap contexts,
716 * so that i_head_snapc can be set appropriately. 730 * so that i_head_snapc can be set appropriately.
@@ -721,12 +735,21 @@ more:
721 queue_realm_cap_snaps(realm); 735 queue_realm_cap_snaps(realm);
722 } 736 }
723 737
738 if (realm_ret)
739 *realm_ret = first_realm;
740 else
741 ceph_put_snap_realm(mdsc, first_realm);
742
724 __cleanup_empty_realms(mdsc); 743 __cleanup_empty_realms(mdsc);
725 return 0; 744 return 0;
726 745
727bad: 746bad:
728 err = -EINVAL; 747 err = -EINVAL;
729fail: 748fail:
749 if (realm && !IS_ERR(realm))
750 ceph_put_snap_realm(mdsc, realm);
751 if (first_realm)
752 ceph_put_snap_realm(mdsc, first_realm);
730 pr_err("update_snap_trace error %d\n", err); 753 pr_err("update_snap_trace error %d\n", err);
731 return err; 754 return err;
732} 755}
@@ -844,7 +867,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
844 if (IS_ERR(realm)) 867 if (IS_ERR(realm))
845 goto out; 868 goto out;
846 } 869 }
847 ceph_get_snap_realm(mdsc, realm);
848 870
849 dout("splitting snap_realm %llx %p\n", realm->ino, realm); 871 dout("splitting snap_realm %llx %p\n", realm->ino, realm);
850 for (i = 0; i < num_split_inos; i++) { 872 for (i = 0; i < num_split_inos; i++) {
@@ -905,7 +927,7 @@ skip_inode:
905 /* we may have taken some of the old realm's children. */ 927 /* we may have taken some of the old realm's children. */
906 for (i = 0; i < num_split_realms; i++) { 928 for (i = 0; i < num_split_realms; i++) {
907 struct ceph_snap_realm *child = 929 struct ceph_snap_realm *child =
908 ceph_lookup_snap_realm(mdsc, 930 __lookup_snap_realm(mdsc,
909 le64_to_cpu(split_realms[i])); 931 le64_to_cpu(split_realms[i]));
910 if (!child) 932 if (!child)
911 continue; 933 continue;
@@ -918,7 +940,7 @@ skip_inode:
918 * snap, we can avoid queueing cap_snaps. 940 * snap, we can avoid queueing cap_snaps.
919 */ 941 */
920 ceph_update_snap_trace(mdsc, p, e, 942 ceph_update_snap_trace(mdsc, p, e,
921 op == CEPH_SNAP_OP_DESTROY); 943 op == CEPH_SNAP_OP_DESTROY, NULL);
922 944
923 if (op == CEPH_SNAP_OP_SPLIT) 945 if (op == CEPH_SNAP_OP_SPLIT)
924 /* we took a reference when we created the realm, above */ 946 /* we took a reference when we created the realm, above */
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index e1aa32d0759d..72bc05a73b69 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -693,7 +693,8 @@ extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
693extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc, 693extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
694 struct ceph_snap_realm *realm); 694 struct ceph_snap_realm *realm);
695extern int ceph_update_snap_trace(struct ceph_mds_client *m, 695extern int ceph_update_snap_trace(struct ceph_mds_client *m,
696 void *p, void *e, bool deletion); 696 void *p, void *e, bool deletion,
697 struct ceph_snap_realm **realm_ret);
697extern void ceph_handle_snap(struct ceph_mds_client *mdsc, 698extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
698 struct ceph_mds_session *session, 699 struct ceph_mds_session *session,
699 struct ceph_msg *msg); 700 struct ceph_msg *msg);