aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2010-02-16 14:39:45 -0500
committerSage Weil <sage@newdream.net>2010-02-17 13:02:47 -0500
commit7c1332b8cb5b27656cf6ab1f5fe808a8eb8bb2c0 (patch)
treef990ab6b339a88896f41a6b3541d0676684c935d
parent85ccce43a3fc15a40ded6ae1603e3f68a17f4d24 (diff)
ceph: fix iterate_caps removal race
We need to be able to iterate over all caps on a session with a possibly slow callback on each cap. To allow this, we used to prevent cap reordering while we were iterating. However, we were not safe from races with removal: removing the 'next' cap would make the next pointer from list_for_each_entry_safe be invalid, and cause a lock up or similar badness. Instead, we keep an iterator pointer in the session pointing to the current cap. As before, we avoid reordering. For removal, if the cap isn't the current cap we are iterating over, we are fine. If it is, we clear cap->ci (to mark the cap as pending removal) but leave it in the session list. In iterate_caps, we can safely finish removal and get the next cap pointer. While we're at it, clean up put_cap to not take a cap reservation context, as it was never used. Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--fs/ceph/caps.c47
-rw-r--r--fs/ceph/mds_client.c51
-rw-r--r--fs/ceph/mds_client.h2
-rw-r--r--fs/ceph/super.h6
4 files changed, 70 insertions, 36 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index f94b56faba3b..4958a2ef3e04 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -266,12 +266,11 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
266 return cap; 266 return cap;
267} 267}
268 268
269static void put_cap(struct ceph_cap *cap, 269void ceph_put_cap(struct ceph_cap *cap)
270 struct ceph_cap_reservation *ctx)
271{ 270{
272 spin_lock(&caps_list_lock); 271 spin_lock(&caps_list_lock);
273 dout("put_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n", 272 dout("put_cap %p %d = %d used + %d resv + %d avail\n",
274 ctx, ctx ? ctx->count : 0, caps_total_count, caps_use_count, 273 cap, caps_total_count, caps_use_count,
275 caps_reserve_count, caps_avail_count); 274 caps_reserve_count, caps_avail_count);
276 caps_use_count--; 275 caps_use_count--;
277 /* 276 /*
@@ -282,12 +281,7 @@ static void put_cap(struct ceph_cap *cap,
282 caps_total_count--; 281 caps_total_count--;
283 kmem_cache_free(ceph_cap_cachep, cap); 282 kmem_cache_free(ceph_cap_cachep, cap);
284 } else { 283 } else {
285 if (ctx) { 284 caps_avail_count++;
286 ctx->count++;
287 caps_reserve_count++;
288 } else {
289 caps_avail_count++;
290 }
291 list_add(&cap->caps_item, &caps_list); 285 list_add(&cap->caps_item, &caps_list);
292 } 286 }
293 287
@@ -709,7 +703,7 @@ static void __touch_cap(struct ceph_cap *cap)
709 struct ceph_mds_session *s = cap->session; 703 struct ceph_mds_session *s = cap->session;
710 704
711 spin_lock(&s->s_cap_lock); 705 spin_lock(&s->s_cap_lock);
712 if (!s->s_iterating_caps) { 706 if (s->s_cap_iterator == NULL) {
713 dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap, 707 dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
714 s->s_mds); 708 s->s_mds);
715 list_move_tail(&cap->session_caps, &s->s_caps); 709 list_move_tail(&cap->session_caps, &s->s_caps);
@@ -865,8 +859,7 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)
865 * caller should hold i_lock, and session s_mutex. 859 * caller should hold i_lock, and session s_mutex.
866 * returns true if this is the last cap. if so, caller should iput. 860 * returns true if this is the last cap. if so, caller should iput.
867 */ 861 */
868void __ceph_remove_cap(struct ceph_cap *cap, 862void __ceph_remove_cap(struct ceph_cap *cap)
869 struct ceph_cap_reservation *ctx)
870{ 863{
871 struct ceph_mds_session *session = cap->session; 864 struct ceph_mds_session *session = cap->session;
872 struct ceph_inode_info *ci = cap->ci; 865 struct ceph_inode_info *ci = cap->ci;
@@ -874,19 +867,27 @@ void __ceph_remove_cap(struct ceph_cap *cap,
874 867
875 dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode); 868 dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
876 869
877 /* remove from session list */
878 spin_lock(&session->s_cap_lock);
879 list_del_init(&cap->session_caps);
880 session->s_nr_caps--;
881 spin_unlock(&session->s_cap_lock);
882
883 /* remove from inode list */ 870 /* remove from inode list */
884 rb_erase(&cap->ci_node, &ci->i_caps); 871 rb_erase(&cap->ci_node, &ci->i_caps);
885 cap->session = NULL; 872 cap->ci = NULL;
886 if (ci->i_auth_cap == cap) 873 if (ci->i_auth_cap == cap)
887 ci->i_auth_cap = NULL; 874 ci->i_auth_cap = NULL;
888 875
889 put_cap(cap, ctx); 876 /* remove from session list */
877 spin_lock(&session->s_cap_lock);
878 if (session->s_cap_iterator == cap) {
879 /* not yet, we are iterating over this very cap */
880 dout("__ceph_remove_cap delaying %p removal from session %p\n",
881 cap, cap->session);
882 } else {
883 list_del_init(&cap->session_caps);
884 session->s_nr_caps--;
885 cap->session = NULL;
886 }
887 spin_unlock(&session->s_cap_lock);
888
889 if (cap->session == NULL)
890 ceph_put_cap(cap);
890 891
891 if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) { 892 if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
892 struct ceph_snap_realm *realm = ci->i_snap_realm; 893 struct ceph_snap_realm *realm = ci->i_snap_realm;
@@ -1022,7 +1023,7 @@ void ceph_queue_caps_release(struct inode *inode)
1022 } 1023 }
1023 spin_unlock(&session->s_cap_lock); 1024 spin_unlock(&session->s_cap_lock);
1024 p = rb_next(p); 1025 p = rb_next(p);
1025 __ceph_remove_cap(cap, NULL); 1026 __ceph_remove_cap(cap);
1026 1027
1027 } 1028 }
1028 spin_unlock(&inode->i_lock); 1029 spin_unlock(&inode->i_lock);
@@ -2521,7 +2522,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2521 ci->i_cap_exporting_mseq = mseq; 2522 ci->i_cap_exporting_mseq = mseq;
2522 ci->i_cap_exporting_issued = cap->issued; 2523 ci->i_cap_exporting_issued = cap->issued;
2523 } 2524 }
2524 __ceph_remove_cap(cap, NULL); 2525 __ceph_remove_cap(cap);
2525 } else { 2526 } else {
2526 WARN_ON(!cap); 2527 WARN_ON(!cap);
2527 } 2528 }
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 02834cecc3a0..124c0c17a14a 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -344,7 +344,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
344 INIT_LIST_HEAD(&s->s_waiting); 344 INIT_LIST_HEAD(&s->s_waiting);
345 INIT_LIST_HEAD(&s->s_unsafe); 345 INIT_LIST_HEAD(&s->s_unsafe);
346 s->s_num_cap_releases = 0; 346 s->s_num_cap_releases = 0;
347 s->s_iterating_caps = false; 347 s->s_cap_iterator = NULL;
348 INIT_LIST_HEAD(&s->s_cap_releases); 348 INIT_LIST_HEAD(&s->s_cap_releases);
349 INIT_LIST_HEAD(&s->s_cap_releases_done); 349 INIT_LIST_HEAD(&s->s_cap_releases_done);
350 INIT_LIST_HEAD(&s->s_cap_flushing); 350 INIT_LIST_HEAD(&s->s_cap_flushing);
@@ -729,28 +729,61 @@ static int iterate_session_caps(struct ceph_mds_session *session,
729 int (*cb)(struct inode *, struct ceph_cap *, 729 int (*cb)(struct inode *, struct ceph_cap *,
730 void *), void *arg) 730 void *), void *arg)
731{ 731{
732 struct ceph_cap *cap, *ncap; 732 struct list_head *p;
733 struct inode *inode; 733 struct ceph_cap *cap;
734 struct inode *inode, *last_inode = NULL;
735 struct ceph_cap *old_cap = NULL;
734 int ret; 736 int ret;
735 737
736 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 738 dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
737 spin_lock(&session->s_cap_lock); 739 spin_lock(&session->s_cap_lock);
738 session->s_iterating_caps = true; 740 p = session->s_caps.next;
739 list_for_each_entry_safe(cap, ncap, &session->s_caps, session_caps) { 741 while (p != &session->s_caps) {
742 cap = list_entry(p, struct ceph_cap, session_caps);
740 inode = igrab(&cap->ci->vfs_inode); 743 inode = igrab(&cap->ci->vfs_inode);
741 if (!inode) 744 if (!inode) {
745 p = p->next;
742 continue; 746 continue;
747 }
748 session->s_cap_iterator = cap;
743 spin_unlock(&session->s_cap_lock); 749 spin_unlock(&session->s_cap_lock);
750
751 if (last_inode) {
752 iput(last_inode);
753 last_inode = NULL;
754 }
755 if (old_cap) {
756 ceph_put_cap(old_cap);
757 old_cap = NULL;
758 }
759
744 ret = cb(inode, cap, arg); 760 ret = cb(inode, cap, arg);
745 iput(inode); 761 last_inode = inode;
762
746 spin_lock(&session->s_cap_lock); 763 spin_lock(&session->s_cap_lock);
764 p = p->next;
765 if (cap->ci == NULL) {
766 dout("iterate_session_caps finishing cap %p removal\n",
767 cap);
768 BUG_ON(cap->session != session);
769 list_del_init(&cap->session_caps);
770 session->s_nr_caps--;
771 cap->session = NULL;
772 old_cap = cap; /* put_cap it w/o locks held */
773 }
747 if (ret < 0) 774 if (ret < 0)
748 goto out; 775 goto out;
749 } 776 }
750 ret = 0; 777 ret = 0;
751out: 778out:
752 session->s_iterating_caps = false; 779 session->s_cap_iterator = NULL;
753 spin_unlock(&session->s_cap_lock); 780 spin_unlock(&session->s_cap_lock);
781
782 if (last_inode)
783 iput(last_inode);
784 if (old_cap)
785 ceph_put_cap(old_cap);
786
754 return ret; 787 return ret;
755} 788}
756 789
@@ -942,7 +975,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
942 session->s_trim_caps--; 975 session->s_trim_caps--;
943 if (oissued) { 976 if (oissued) {
944 /* we aren't the only cap.. just remove us */ 977 /* we aren't the only cap.. just remove us */
945 __ceph_remove_cap(cap, NULL); 978 __ceph_remove_cap(cap);
946 } else { 979 } else {
947 /* try to drop referring dentries */ 980 /* try to drop referring dentries */
948 spin_unlock(&inode->i_lock); 981 spin_unlock(&inode->i_lock);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 9d6b90173879..961cc6f65878 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -114,7 +114,7 @@ struct ceph_mds_session {
114 int s_num_cap_releases; 114 int s_num_cap_releases;
115 struct list_head s_cap_releases; /* waiting cap_release messages */ 115 struct list_head s_cap_releases; /* waiting cap_release messages */
116 struct list_head s_cap_releases_done; /* ready to send */ 116 struct list_head s_cap_releases_done; /* ready to send */
117 bool s_iterating_caps; 117 struct ceph_cap *s_cap_iterator;
118 118
119 /* protected by mutex */ 119 /* protected by mutex */
120 struct list_head s_cap_flushing; /* inodes w/ flushing caps */ 120 struct list_head s_cap_flushing; /* inodes w/ flushing caps */
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 3b5faf9980f8..384f0e2e7c68 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -795,15 +795,15 @@ extern int ceph_add_cap(struct inode *inode,
795 int fmode, unsigned issued, unsigned wanted, 795 int fmode, unsigned issued, unsigned wanted,
796 unsigned cap, unsigned seq, u64 realmino, int flags, 796 unsigned cap, unsigned seq, u64 realmino, int flags,
797 struct ceph_cap_reservation *caps_reservation); 797 struct ceph_cap_reservation *caps_reservation);
798extern void __ceph_remove_cap(struct ceph_cap *cap, 798extern void __ceph_remove_cap(struct ceph_cap *cap);
799 struct ceph_cap_reservation *ctx);
800static inline void ceph_remove_cap(struct ceph_cap *cap) 799static inline void ceph_remove_cap(struct ceph_cap *cap)
801{ 800{
802 struct inode *inode = &cap->ci->vfs_inode; 801 struct inode *inode = &cap->ci->vfs_inode;
803 spin_lock(&inode->i_lock); 802 spin_lock(&inode->i_lock);
804 __ceph_remove_cap(cap, NULL); 803 __ceph_remove_cap(cap);
805 spin_unlock(&inode->i_lock); 804 spin_unlock(&inode->i_lock);
806} 805}
806extern void ceph_put_cap(struct ceph_cap *cap);
807 807
808extern void ceph_queue_caps_release(struct inode *inode); 808extern void ceph_queue_caps_release(struct inode *inode);
809extern int ceph_write_inode(struct inode *inode, int unused); 809extern int ceph_write_inode(struct inode *inode, int unused);