aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/journal.c
diff options
context:
space:
mode:
authorSrinivas Eeda <srinivas.eeda@oracle.com>2009-03-06 17:21:46 -0500
committerMark Fasheh <mfasheh@suse.com>2009-04-03 14:39:26 -0400
commit9140db04ef185f934acf2b1b15b3dd5e6a6bfc22 (patch)
tree354dbc0351195ae2b6ea5f7dfbf68a99307c3d69 /fs/ocfs2/journal.c
parent1fca3a05ef2823830925dfb66711d6d920265a8d (diff)
ocfs2: recover orphans in offline slots during recovery and mount
During recovery, a node recovers orphans in it's slot and the dead node(s). But if the dead nodes were holding orphans in offline slots, they will be left unrecovered. If the dead node is the last one to die and is holding orphans in other slots and is the first one to mount, then it only recovers it's own slot, which leaves orphans in offline slots. This patch queues complete_recovery to clean orphans for all offline slots during mount and node recovery. Signed-off-by: Srinivas Eeda <srinivas.eeda@oracle.com> Acked-by: Joel Becker <joel.becker@oracle.com> Signed-off-by: Mark Fasheh <mfasheh@suse.com>
Diffstat (limited to 'fs/ocfs2/journal.c')
-rw-r--r--fs/ocfs2/journal.c141
1 files changed, 123 insertions, 18 deletions
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index a70d49da2ceb..a20a0f1e37fd 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -65,6 +65,11 @@ static int ocfs2_trylock_journal(struct ocfs2_super *osb,
65static int ocfs2_recover_orphans(struct ocfs2_super *osb, 65static int ocfs2_recover_orphans(struct ocfs2_super *osb,
66 int slot); 66 int slot);
67static int ocfs2_commit_thread(void *arg); 67static int ocfs2_commit_thread(void *arg);
68static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
69 int slot_num,
70 struct ocfs2_dinode *la_dinode,
71 struct ocfs2_dinode *tl_dinode,
72 struct ocfs2_quota_recovery *qrec);
68 73
69static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb) 74static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb)
70{ 75{
@@ -76,6 +81,97 @@ static inline int ocfs2_wait_on_quotas(struct ocfs2_super *osb)
76 return __ocfs2_wait_on_mount(osb, 1); 81 return __ocfs2_wait_on_mount(osb, 1);
77} 82}
78 83
84/*
85 * This replay_map is to track online/offline slots, so we could recover
86 * offline slots during recovery and mount
87 */
88
89enum ocfs2_replay_state {
90 REPLAY_UNNEEDED = 0, /* Replay is not needed, so ignore this map */
91 REPLAY_NEEDED, /* Replay slots marked in rm_replay_slots */
92 REPLAY_DONE /* Replay was already queued */
93};
94
95struct ocfs2_replay_map {
96 unsigned int rm_slots;
97 enum ocfs2_replay_state rm_state;
98 unsigned char rm_replay_slots[0];
99};
100
101void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state)
102{
103 if (!osb->replay_map)
104 return;
105
106 /* If we've already queued the replay, we don't have any more to do */
107 if (osb->replay_map->rm_state == REPLAY_DONE)
108 return;
109
110 osb->replay_map->rm_state = state;
111}
112
113int ocfs2_compute_replay_slots(struct ocfs2_super *osb)
114{
115 struct ocfs2_replay_map *replay_map;
116 int i, node_num;
117
118 /* If replay map is already set, we don't do it again */
119 if (osb->replay_map)
120 return 0;
121
122 replay_map = kzalloc(sizeof(struct ocfs2_replay_map) +
123 (osb->max_slots * sizeof(char)), GFP_KERNEL);
124
125 if (!replay_map) {
126 mlog_errno(-ENOMEM);
127 return -ENOMEM;
128 }
129
130 spin_lock(&osb->osb_lock);
131
132 replay_map->rm_slots = osb->max_slots;
133 replay_map->rm_state = REPLAY_UNNEEDED;
134
135 /* set rm_replay_slots for offline slot(s) */
136 for (i = 0; i < replay_map->rm_slots; i++) {
137 if (ocfs2_slot_to_node_num_locked(osb, i, &node_num) == -ENOENT)
138 replay_map->rm_replay_slots[i] = 1;
139 }
140
141 osb->replay_map = replay_map;
142 spin_unlock(&osb->osb_lock);
143 return 0;
144}
145
146void ocfs2_queue_replay_slots(struct ocfs2_super *osb)
147{
148 struct ocfs2_replay_map *replay_map = osb->replay_map;
149 int i;
150
151 if (!replay_map)
152 return;
153
154 if (replay_map->rm_state != REPLAY_NEEDED)
155 return;
156
157 for (i = 0; i < replay_map->rm_slots; i++)
158 if (replay_map->rm_replay_slots[i])
159 ocfs2_queue_recovery_completion(osb->journal, i, NULL,
160 NULL, NULL);
161 replay_map->rm_state = REPLAY_DONE;
162}
163
164void ocfs2_free_replay_slots(struct ocfs2_super *osb)
165{
166 struct ocfs2_replay_map *replay_map = osb->replay_map;
167
168 if (!osb->replay_map)
169 return;
170
171 kfree(replay_map);
172 osb->replay_map = NULL;
173}
174
79int ocfs2_recovery_init(struct ocfs2_super *osb) 175int ocfs2_recovery_init(struct ocfs2_super *osb)
80{ 176{
81 struct ocfs2_recovery_map *rm; 177 struct ocfs2_recovery_map *rm;
@@ -1194,24 +1290,24 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
1194} 1290}
1195 1291
1196/* Called by the mount code to queue recovery the last part of 1292/* Called by the mount code to queue recovery the last part of
1197 * recovery for it's own slot. */ 1293 * recovery for it's own and offline slot(s). */
1198void ocfs2_complete_mount_recovery(struct ocfs2_super *osb) 1294void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
1199{ 1295{
1200 struct ocfs2_journal *journal = osb->journal; 1296 struct ocfs2_journal *journal = osb->journal;
1201 1297
1202 if (osb->dirty) { 1298 /* No need to queue up our truncate_log as regular cleanup will catch
1203 /* No need to queue up our truncate_log as regular 1299 * that */
1204 * cleanup will catch that. */ 1300 ocfs2_queue_recovery_completion(journal, osb->slot_num,
1205 ocfs2_queue_recovery_completion(journal, 1301 osb->local_alloc_copy, NULL, NULL);
1206 osb->slot_num, 1302 ocfs2_schedule_truncate_log_flush(osb, 0);
1207 osb->local_alloc_copy,
1208 NULL,
1209 NULL);
1210 ocfs2_schedule_truncate_log_flush(osb, 0);
1211 1303
1212 osb->local_alloc_copy = NULL; 1304 osb->local_alloc_copy = NULL;
1213 osb->dirty = 0; 1305 osb->dirty = 0;
1214 } 1306
1307 /* queue to recover orphan slots for all offline slots */
1308 ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
1309 ocfs2_queue_replay_slots(osb);
1310 ocfs2_free_replay_slots(osb);
1215} 1311}
1216 1312
1217void ocfs2_complete_quota_recovery(struct ocfs2_super *osb) 1313void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
@@ -1254,6 +1350,14 @@ restart:
1254 goto bail; 1350 goto bail;
1255 } 1351 }
1256 1352
1353 status = ocfs2_compute_replay_slots(osb);
1354 if (status < 0)
1355 mlog_errno(status);
1356
1357 /* queue recovery for our own slot */
1358 ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
1359 NULL, NULL);
1360
1257 spin_lock(&osb->osb_lock); 1361 spin_lock(&osb->osb_lock);
1258 while (rm->rm_used) { 1362 while (rm->rm_used) {
1259 /* It's always safe to remove entry zero, as we won't 1363 /* It's always safe to remove entry zero, as we won't
@@ -1319,11 +1423,8 @@ skip_recovery:
1319 1423
1320 ocfs2_super_unlock(osb, 1); 1424 ocfs2_super_unlock(osb, 1);
1321 1425
1322 /* We always run recovery on our own orphan dir - the dead 1426 /* queue recovery for offline slots */
1323 * node(s) may have disallowd a previos inode delete. Re-processing 1427 ocfs2_queue_replay_slots(osb);
1324 * is therefore required. */
1325 ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
1326 NULL, NULL);
1327 1428
1328bail: 1429bail:
1329 mutex_lock(&osb->recovery_lock); 1430 mutex_lock(&osb->recovery_lock);
@@ -1332,6 +1433,7 @@ bail:
1332 goto restart; 1433 goto restart;
1333 } 1434 }
1334 1435
1436 ocfs2_free_replay_slots(osb);
1335 osb->recovery_thread_task = NULL; 1437 osb->recovery_thread_task = NULL;
1336 mb(); /* sync with ocfs2_recovery_thread_running */ 1438 mb(); /* sync with ocfs2_recovery_thread_running */
1337 wake_up(&osb->recovery_event); 1439 wake_up(&osb->recovery_event);
@@ -1483,6 +1585,9 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
1483 goto done; 1585 goto done;
1484 } 1586 }
1485 1587
1588 /* we need to run complete recovery for offline orphan slots */
1589 ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
1590
1486 mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n", 1591 mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n",
1487 node_num, slot_num, 1592 node_num, slot_num,
1488 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); 1593 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));