aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoel Becker <joel.becker@oracle.com>2008-02-01 15:03:57 -0500
committerMark Fasheh <mfasheh@suse.com>2008-04-18 11:56:02 -0400
commit553abd046af609191a91af7289d87d477adc659f (patch)
treecff21f65d49c0041993095a051edf76840c2af28
parentd85b20e4b300edfd290f21fc2d790ba16d2f225b (diff)
ocfs2: Change the recovery map to an array of node numbers.
The old recovery map was a bitmap of node numbers. This was sufficient for the maximum node number of 254. Going forward, we want node numbers to be UINT32. Thus, we need a new recovery map. Note that we can't keep track of slots here. We must write down the node number to recovery *before* we get the locks needed to convert a node number into a slot number. The recovery map is now an array of unsigned ints, max_slots in size. It moves to journal.c with the rest of recovery. Because it needs to be initialized, we move all of recovery initialization into a new function, ocfs2_recovery_init(). This actually cleans up ocfs2_initialize_super() a little as well. Following on, recovery cleaup becomes part of ocfs2_recovery_exit(). A number of node map functions are rendered obsolete and are removed. Finally, waiting on recovery is wrapped in a function rather than naked checks on the recovery_event. This is a cleanup from Mark. Signed-off-by: Joel Becker <joel.becker@oracle.com> Signed-off-by: Mark Fasheh <mfasheh@suse.com>
-rw-r--r--fs/ocfs2/dlmglue.c6
-rw-r--r--fs/ocfs2/heartbeat.c111
-rw-r--r--fs/ocfs2/heartbeat.h14
-rw-r--r--fs/ocfs2/journal.c181
-rw-r--r--fs/ocfs2/journal.h4
-rw-r--r--fs/ocfs2/ocfs2.h3
-rw-r--r--fs/ocfs2/super.c33
7 files changed, 182 insertions, 170 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 1a80fa9e7c9a..15a5167e0513 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -1950,8 +1950,7 @@ int ocfs2_inode_lock_full(struct inode *inode,
1950 goto local; 1950 goto local;
1951 1951
1952 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1952 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1953 wait_event(osb->recovery_event, 1953 ocfs2_wait_for_recovery(osb);
1954 ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1955 1954
1956 lockres = &OCFS2_I(inode)->ip_inode_lockres; 1955 lockres = &OCFS2_I(inode)->ip_inode_lockres;
1957 level = ex ? LKM_EXMODE : LKM_PRMODE; 1956 level = ex ? LKM_EXMODE : LKM_PRMODE;
@@ -1974,8 +1973,7 @@ int ocfs2_inode_lock_full(struct inode *inode,
1974 * committed to owning this lock so we don't allow signals to 1973 * committed to owning this lock so we don't allow signals to
1975 * abort the operation. */ 1974 * abort the operation. */
1976 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1975 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1977 wait_event(osb->recovery_event, 1976 ocfs2_wait_for_recovery(osb);
1978 ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1979 1977
1980local: 1978local:
1981 /* 1979 /*
diff --git a/fs/ocfs2/heartbeat.c b/fs/ocfs2/heartbeat.c
index 0758daf64da0..80de2397c161 100644
--- a/fs/ocfs2/heartbeat.c
+++ b/fs/ocfs2/heartbeat.c
@@ -48,7 +48,6 @@ static inline void __ocfs2_node_map_set_bit(struct ocfs2_node_map *map,
48 int bit); 48 int bit);
49static inline void __ocfs2_node_map_clear_bit(struct ocfs2_node_map *map, 49static inline void __ocfs2_node_map_clear_bit(struct ocfs2_node_map *map,
50 int bit); 50 int bit);
51static inline int __ocfs2_node_map_is_empty(struct ocfs2_node_map *map);
52 51
53/* special case -1 for now 52/* special case -1 for now
54 * TODO: should *really* make sure the calling func never passes -1!! */ 53 * TODO: should *really* make sure the calling func never passes -1!! */
@@ -62,7 +61,6 @@ static void ocfs2_node_map_init(struct ocfs2_node_map *map)
62void ocfs2_init_node_maps(struct ocfs2_super *osb) 61void ocfs2_init_node_maps(struct ocfs2_super *osb)
63{ 62{
64 spin_lock_init(&osb->node_map_lock); 63 spin_lock_init(&osb->node_map_lock);
65 ocfs2_node_map_init(&osb->recovery_map);
66 ocfs2_node_map_init(&osb->osb_recovering_orphan_dirs); 64 ocfs2_node_map_init(&osb->osb_recovering_orphan_dirs);
67} 65}
68 66
@@ -192,112 +190,3 @@ int ocfs2_node_map_test_bit(struct ocfs2_super *osb,
192 return ret; 190 return ret;
193} 191}
194 192
195static inline int __ocfs2_node_map_is_empty(struct ocfs2_node_map *map)
196{
197 int bit;
198 bit = find_next_bit(map->map, map->num_nodes, 0);
199 if (bit < map->num_nodes)
200 return 0;
201 return 1;
202}
203
204int ocfs2_node_map_is_empty(struct ocfs2_super *osb,
205 struct ocfs2_node_map *map)
206{
207 int ret;
208 BUG_ON(map->num_nodes == 0);
209 spin_lock(&osb->node_map_lock);
210 ret = __ocfs2_node_map_is_empty(map);
211 spin_unlock(&osb->node_map_lock);
212 return ret;
213}
214
215#if 0
216
217static void __ocfs2_node_map_dup(struct ocfs2_node_map *target,
218 struct ocfs2_node_map *from)
219{
220 BUG_ON(from->num_nodes == 0);
221 ocfs2_node_map_init(target);
222 __ocfs2_node_map_set(target, from);
223}
224
225/* returns 1 if bit is the only bit set in target, 0 otherwise */
226int ocfs2_node_map_is_only(struct ocfs2_super *osb,
227 struct ocfs2_node_map *target,
228 int bit)
229{
230 struct ocfs2_node_map temp;
231 int ret;
232
233 spin_lock(&osb->node_map_lock);
234 __ocfs2_node_map_dup(&temp, target);
235 __ocfs2_node_map_clear_bit(&temp, bit);
236 ret = __ocfs2_node_map_is_empty(&temp);
237 spin_unlock(&osb->node_map_lock);
238
239 return ret;
240}
241
242static void __ocfs2_node_map_set(struct ocfs2_node_map *target,
243 struct ocfs2_node_map *from)
244{
245 int num_longs, i;
246
247 BUG_ON(target->num_nodes != from->num_nodes);
248 BUG_ON(target->num_nodes == 0);
249
250 num_longs = BITS_TO_LONGS(target->num_nodes);
251 for (i = 0; i < num_longs; i++)
252 target->map[i] = from->map[i];
253}
254
255#endif /* 0 */
256
257/* Returns whether the recovery bit was actually set - it may not be
258 * if a node is still marked as needing recovery */
259int ocfs2_recovery_map_set(struct ocfs2_super *osb,
260 int num)
261{
262 int set = 0;
263
264 spin_lock(&osb->node_map_lock);
265
266 if (!test_bit(num, osb->recovery_map.map)) {
267 __ocfs2_node_map_set_bit(&osb->recovery_map, num);
268 set = 1;
269 }
270
271 spin_unlock(&osb->node_map_lock);
272
273 return set;
274}
275
276void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
277 int num)
278{
279 ocfs2_node_map_clear_bit(osb, &osb->recovery_map, num);
280}
281
282int ocfs2_node_map_iterate(struct ocfs2_super *osb,
283 struct ocfs2_node_map *map,
284 int idx)
285{
286 int i = idx;
287
288 idx = O2NM_INVALID_NODE_NUM;
289 spin_lock(&osb->node_map_lock);
290 if ((i != O2NM_INVALID_NODE_NUM) &&
291 (i >= 0) &&
292 (i < map->num_nodes)) {
293 while(i < map->num_nodes) {
294 if (test_bit(i, map->map)) {
295 idx = i;
296 break;
297 }
298 i++;
299 }
300 }
301 spin_unlock(&osb->node_map_lock);
302 return idx;
303}
diff --git a/fs/ocfs2/heartbeat.h b/fs/ocfs2/heartbeat.h
index eac63aed7611..98d8ffc995b1 100644
--- a/fs/ocfs2/heartbeat.h
+++ b/fs/ocfs2/heartbeat.h
@@ -33,8 +33,6 @@ void ocfs2_stop_heartbeat(struct ocfs2_super *osb);
33 33
34/* node map functions - used to keep track of mounted and in-recovery 34/* node map functions - used to keep track of mounted and in-recovery
35 * nodes. */ 35 * nodes. */
36int ocfs2_node_map_is_empty(struct ocfs2_super *osb,
37 struct ocfs2_node_map *map);
38void ocfs2_node_map_set_bit(struct ocfs2_super *osb, 36void ocfs2_node_map_set_bit(struct ocfs2_super *osb,
39 struct ocfs2_node_map *map, 37 struct ocfs2_node_map *map,
40 int bit); 38 int bit);
@@ -44,17 +42,5 @@ void ocfs2_node_map_clear_bit(struct ocfs2_super *osb,
44int ocfs2_node_map_test_bit(struct ocfs2_super *osb, 42int ocfs2_node_map_test_bit(struct ocfs2_super *osb,
45 struct ocfs2_node_map *map, 43 struct ocfs2_node_map *map,
46 int bit); 44 int bit);
47int ocfs2_node_map_iterate(struct ocfs2_super *osb,
48 struct ocfs2_node_map *map,
49 int idx);
50static inline int ocfs2_node_map_first_set_bit(struct ocfs2_super *osb,
51 struct ocfs2_node_map *map)
52{
53 return ocfs2_node_map_iterate(osb, map, 0);
54}
55int ocfs2_recovery_map_set(struct ocfs2_super *osb,
56 int num);
57void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
58 int num);
59 45
60#endif /* OCFS2_HEARTBEAT_H */ 46#endif /* OCFS2_HEARTBEAT_H */
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index ed0c6d0850d7..ca4c0ea5a4cd 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -64,6 +64,137 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
64 int slot); 64 int slot);
65static int ocfs2_commit_thread(void *arg); 65static int ocfs2_commit_thread(void *arg);
66 66
67
68/*
69 * The recovery_list is a simple linked list of node numbers to recover.
70 * It is protected by the recovery_lock.
71 */
72
73struct ocfs2_recovery_map {
74 int rm_used;
75 unsigned int *rm_entries;
76};
77
78int ocfs2_recovery_init(struct ocfs2_super *osb)
79{
80 struct ocfs2_recovery_map *rm;
81
82 mutex_init(&osb->recovery_lock);
83 osb->disable_recovery = 0;
84 osb->recovery_thread_task = NULL;
85 init_waitqueue_head(&osb->recovery_event);
86
87 rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
88 osb->max_slots * sizeof(unsigned int),
89 GFP_KERNEL);
90 if (!rm) {
91 mlog_errno(-ENOMEM);
92 return -ENOMEM;
93 }
94
95 rm->rm_entries = (unsigned int *)((char *)rm +
96 sizeof(struct ocfs2_recovery_map));
97 osb->recovery_map = rm;
98
99 return 0;
100}
101
102/* we can't grab the goofy sem lock from inside wait_event, so we use
103 * memory barriers to make sure that we'll see the null task before
104 * being woken up */
105static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
106{
107 mb();
108 return osb->recovery_thread_task != NULL;
109}
110
111void ocfs2_recovery_exit(struct ocfs2_super *osb)
112{
113 struct ocfs2_recovery_map *rm;
114
115 /* disable any new recovery threads and wait for any currently
116 * running ones to exit. Do this before setting the vol_state. */
117 mutex_lock(&osb->recovery_lock);
118 osb->disable_recovery = 1;
119 mutex_unlock(&osb->recovery_lock);
120 wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
121
122 /* At this point, we know that no more recovery threads can be
123 * launched, so wait for any recovery completion work to
124 * complete. */
125 flush_workqueue(ocfs2_wq);
126
127 /*
128 * Now that recovery is shut down, and the osb is about to be
129 * freed, the osb_lock is not taken here.
130 */
131 rm = osb->recovery_map;
132 /* XXX: Should we bug if there are dirty entries? */
133
134 kfree(rm);
135}
136
137static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
138 unsigned int node_num)
139{
140 int i;
141 struct ocfs2_recovery_map *rm = osb->recovery_map;
142
143 assert_spin_locked(&osb->osb_lock);
144
145 for (i = 0; i < rm->rm_used; i++) {
146 if (rm->rm_entries[i] == node_num)
147 return 1;
148 }
149
150 return 0;
151}
152
153/* Behaves like test-and-set. Returns the previous value */
154static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
155 unsigned int node_num)
156{
157 struct ocfs2_recovery_map *rm = osb->recovery_map;
158
159 spin_lock(&osb->osb_lock);
160 if (__ocfs2_recovery_map_test(osb, node_num)) {
161 spin_unlock(&osb->osb_lock);
162 return 1;
163 }
164
165 /* XXX: Can this be exploited? Not from o2dlm... */
166 BUG_ON(rm->rm_used >= osb->max_slots);
167
168 rm->rm_entries[rm->rm_used] = node_num;
169 rm->rm_used++;
170 spin_unlock(&osb->osb_lock);
171
172 return 0;
173}
174
175static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
176 unsigned int node_num)
177{
178 int i;
179 struct ocfs2_recovery_map *rm = osb->recovery_map;
180
181 spin_lock(&osb->osb_lock);
182
183 for (i = 0; i < rm->rm_used; i++) {
184 if (rm->rm_entries[i] == node_num)
185 break;
186 }
187
188 if (i < rm->rm_used) {
189 /* XXX: be careful with the pointer math */
190 memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
191 (rm->rm_used - i - 1) * sizeof(unsigned int));
192 rm->rm_used--;
193 }
194
195 spin_unlock(&osb->osb_lock);
196}
197
67static int ocfs2_commit_cache(struct ocfs2_super *osb) 198static int ocfs2_commit_cache(struct ocfs2_super *osb)
68{ 199{
69 int status = 0; 200 int status = 0;
@@ -650,6 +781,23 @@ bail:
650 return status; 781 return status;
651} 782}
652 783
784static int ocfs2_recovery_completed(struct ocfs2_super *osb)
785{
786 int empty;
787 struct ocfs2_recovery_map *rm = osb->recovery_map;
788
789 spin_lock(&osb->osb_lock);
790 empty = (rm->rm_used == 0);
791 spin_unlock(&osb->osb_lock);
792
793 return empty;
794}
795
796void ocfs2_wait_for_recovery(struct ocfs2_super *osb)
797{
798 wait_event(osb->recovery_event, ocfs2_recovery_completed(osb));
799}
800
653/* 801/*
654 * JBD Might read a cached version of another nodes journal file. We 802 * JBD Might read a cached version of another nodes journal file. We
655 * don't want this as this file changes often and we get no 803 * don't want this as this file changes often and we get no
@@ -848,6 +996,7 @@ static int __ocfs2_recovery_thread(void *arg)
848{ 996{
849 int status, node_num; 997 int status, node_num;
850 struct ocfs2_super *osb = arg; 998 struct ocfs2_super *osb = arg;
999 struct ocfs2_recovery_map *rm = osb->recovery_map;
851 1000
852 mlog_entry_void(); 1001 mlog_entry_void();
853 1002
@@ -863,26 +1012,29 @@ restart:
863 goto bail; 1012 goto bail;
864 } 1013 }
865 1014
866 while(!ocfs2_node_map_is_empty(osb, &osb->recovery_map)) { 1015 spin_lock(&osb->osb_lock);
867 node_num = ocfs2_node_map_first_set_bit(osb, 1016 while (rm->rm_used) {
868 &osb->recovery_map); 1017 /* It's always safe to remove entry zero, as we won't
869 if (node_num == O2NM_INVALID_NODE_NUM) { 1018 * clear it until ocfs2_recover_node() has succeeded. */
870 mlog(0, "Out of nodes to recover.\n"); 1019 node_num = rm->rm_entries[0];
871 break; 1020 spin_unlock(&osb->osb_lock);
872 }
873 1021
874 status = ocfs2_recover_node(osb, node_num); 1022 status = ocfs2_recover_node(osb, node_num);
875 if (status < 0) { 1023 if (!status) {
1024 ocfs2_recovery_map_clear(osb, node_num);
1025 } else {
876 mlog(ML_ERROR, 1026 mlog(ML_ERROR,
877 "Error %d recovering node %d on device (%u,%u)!\n", 1027 "Error %d recovering node %d on device (%u,%u)!\n",
878 status, node_num, 1028 status, node_num,
879 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); 1029 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
880 mlog(ML_ERROR, "Volume requires unmount.\n"); 1030 mlog(ML_ERROR, "Volume requires unmount.\n");
881 continue;
882 } 1031 }
883 1032
884 ocfs2_recovery_map_clear(osb, node_num); 1033 spin_lock(&osb->osb_lock);
885 } 1034 }
1035 spin_unlock(&osb->osb_lock);
1036 mlog(0, "All nodes recovered\n");
1037
886 ocfs2_super_unlock(osb, 1); 1038 ocfs2_super_unlock(osb, 1);
887 1039
888 /* We always run recovery on our own orphan dir - the dead 1040 /* We always run recovery on our own orphan dir - the dead
@@ -893,8 +1045,7 @@ restart:
893 1045
894bail: 1046bail:
895 mutex_lock(&osb->recovery_lock); 1047 mutex_lock(&osb->recovery_lock);
896 if (!status && 1048 if (!status && !ocfs2_recovery_completed(osb)) {
897 !ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
898 mutex_unlock(&osb->recovery_lock); 1049 mutex_unlock(&osb->recovery_lock);
899 goto restart; 1050 goto restart;
900 } 1051 }
@@ -924,8 +1075,8 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
924 1075
925 /* People waiting on recovery will wait on 1076 /* People waiting on recovery will wait on
926 * the recovery map to empty. */ 1077 * the recovery map to empty. */
927 if (!ocfs2_recovery_map_set(osb, node_num)) 1078 if (ocfs2_recovery_map_set(osb, node_num))
928 mlog(0, "node %d already be in recovery.\n", node_num); 1079 mlog(0, "node %d already in recovery map.\n", node_num);
929 1080
930 mlog(0, "starting recovery thread...\n"); 1081 mlog(0, "starting recovery thread...\n");
931 1082
@@ -1197,7 +1348,7 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
1197 if (status == -ENOENT) 1348 if (status == -ENOENT)
1198 continue; 1349 continue;
1199 1350
1200 if (ocfs2_node_map_test_bit(osb, &osb->recovery_map, node_num)) 1351 if (__ocfs2_recovery_map_test(osb, node_num))
1201 continue; 1352 continue;
1202 spin_unlock(&osb->osb_lock); 1353 spin_unlock(&osb->osb_lock);
1203 1354
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index 220f3e818e78..db82be2532ed 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -134,6 +134,10 @@ static inline void ocfs2_inode_set_new(struct ocfs2_super *osb,
134 134
135/* Exported only for the journal struct init code in super.c. Do not call. */ 135/* Exported only for the journal struct init code in super.c. Do not call. */
136void ocfs2_complete_recovery(struct work_struct *work); 136void ocfs2_complete_recovery(struct work_struct *work);
137void ocfs2_wait_for_recovery(struct ocfs2_super *osb);
138
139int ocfs2_recovery_init(struct ocfs2_super *osb);
140void ocfs2_recovery_exit(struct ocfs2_super *osb);
137 141
138/* 142/*
139 * Journal Control: 143 * Journal Control:
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index ee3f675a4210..c6ed8c35de0d 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -180,6 +180,7 @@ enum ocfs2_mount_options
180 180
181struct ocfs2_journal; 181struct ocfs2_journal;
182struct ocfs2_slot_info; 182struct ocfs2_slot_info;
183struct ocfs2_recovery_map;
183struct ocfs2_super 184struct ocfs2_super
184{ 185{
185 struct task_struct *commit_task; 186 struct task_struct *commit_task;
@@ -191,7 +192,6 @@ struct ocfs2_super
191 struct ocfs2_slot_info *slot_info; 192 struct ocfs2_slot_info *slot_info;
192 193
193 spinlock_t node_map_lock; 194 spinlock_t node_map_lock;
194 struct ocfs2_node_map recovery_map;
195 195
196 u64 root_blkno; 196 u64 root_blkno;
197 u64 system_dir_blkno; 197 u64 system_dir_blkno;
@@ -226,6 +226,7 @@ struct ocfs2_super
226 226
227 atomic_t vol_state; 227 atomic_t vol_state;
228 struct mutex recovery_lock; 228 struct mutex recovery_lock;
229 struct ocfs2_recovery_map *recovery_map;
229 struct task_struct *recovery_thread_task; 230 struct task_struct *recovery_thread_task;
230 int disable_recovery; 231 int disable_recovery;
231 wait_queue_head_t checkpoint_event; 232 wait_queue_head_t checkpoint_event;
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index fad37af2af9c..1a4c7c7850f2 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -1224,15 +1224,6 @@ leave:
1224 return status; 1224 return status;
1225} 1225}
1226 1226
1227/* we can't grab the goofy sem lock from inside wait_event, so we use
1228 * memory barriers to make sure that we'll see the null task before
1229 * being woken up */
1230static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
1231{
1232 mb();
1233 return osb->recovery_thread_task != NULL;
1234}
1235
1236static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err) 1227static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
1237{ 1228{
1238 int tmp; 1229 int tmp;
@@ -1249,17 +1240,8 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
1249 1240
1250 ocfs2_truncate_log_shutdown(osb); 1241 ocfs2_truncate_log_shutdown(osb);
1251 1242
1252 /* disable any new recovery threads and wait for any currently 1243 /* This will disable recovery and flush any recovery work. */
1253 * running ones to exit. Do this before setting the vol_state. */ 1244 ocfs2_recovery_exit(osb);
1254 mutex_lock(&osb->recovery_lock);
1255 osb->disable_recovery = 1;
1256 mutex_unlock(&osb->recovery_lock);
1257 wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
1258
1259 /* At this point, we know that no more recovery threads can be
1260 * launched, so wait for any recovery completion work to
1261 * complete. */
1262 flush_workqueue(ocfs2_wq);
1263 1245
1264 ocfs2_journal_shutdown(osb); 1246 ocfs2_journal_shutdown(osb);
1265 1247
@@ -1368,7 +1350,6 @@ static int ocfs2_initialize_super(struct super_block *sb,
1368 osb->s_sectsize_bits = blksize_bits(sector_size); 1350 osb->s_sectsize_bits = blksize_bits(sector_size);
1369 BUG_ON(!osb->s_sectsize_bits); 1351 BUG_ON(!osb->s_sectsize_bits);
1370 1352
1371 init_waitqueue_head(&osb->recovery_event);
1372 spin_lock_init(&osb->dc_task_lock); 1353 spin_lock_init(&osb->dc_task_lock);
1373 init_waitqueue_head(&osb->dc_event); 1354 init_waitqueue_head(&osb->dc_event);
1374 osb->dc_work_sequence = 0; 1355 osb->dc_work_sequence = 0;
@@ -1388,10 +1369,12 @@ static int ocfs2_initialize_super(struct super_block *sb,
1388 snprintf(osb->dev_str, sizeof(osb->dev_str), "%u,%u", 1369 snprintf(osb->dev_str, sizeof(osb->dev_str), "%u,%u",
1389 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); 1370 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
1390 1371
1391 mutex_init(&osb->recovery_lock); 1372 status = ocfs2_recovery_init(osb);
1392 1373 if (status) {
1393 osb->disable_recovery = 0; 1374 mlog(ML_ERROR, "Unable to initialize recovery state\n");
1394 osb->recovery_thread_task = NULL; 1375 mlog_errno(status);
1376 goto bail;
1377 }
1395 1378
1396 init_waitqueue_head(&osb->checkpoint_event); 1379 init_waitqueue_head(&osb->checkpoint_event);
1397 atomic_set(&osb->needs_checkpoint, 0); 1380 atomic_set(&osb->needs_checkpoint, 0);