aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/journal.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/journal.c')
-rw-r--r--fs/ocfs2/journal.c211
1 files changed, 180 insertions, 31 deletions
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index f31c7e8c19c3..9698338adc39 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -64,6 +64,137 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
64 int slot); 64 int slot);
65static int ocfs2_commit_thread(void *arg); 65static int ocfs2_commit_thread(void *arg);
66 66
67
68/*
69 * The recovery_list is a simple linked list of node numbers to recover.
70 * It is protected by the recovery_lock.
71 */
72
73struct ocfs2_recovery_map {
74 unsigned int rm_used;
75 unsigned int *rm_entries;
76};
77
78int ocfs2_recovery_init(struct ocfs2_super *osb)
79{
80 struct ocfs2_recovery_map *rm;
81
82 mutex_init(&osb->recovery_lock);
83 osb->disable_recovery = 0;
84 osb->recovery_thread_task = NULL;
85 init_waitqueue_head(&osb->recovery_event);
86
87 rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
88 osb->max_slots * sizeof(unsigned int),
89 GFP_KERNEL);
90 if (!rm) {
91 mlog_errno(-ENOMEM);
92 return -ENOMEM;
93 }
94
95 rm->rm_entries = (unsigned int *)((char *)rm +
96 sizeof(struct ocfs2_recovery_map));
97 osb->recovery_map = rm;
98
99 return 0;
100}
101
102/* we can't grab the goofy sem lock from inside wait_event, so we use
103 * memory barriers to make sure that we'll see the null task before
104 * being woken up */
105static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
106{
107 mb();
108 return osb->recovery_thread_task != NULL;
109}
110
111void ocfs2_recovery_exit(struct ocfs2_super *osb)
112{
113 struct ocfs2_recovery_map *rm;
114
115 /* disable any new recovery threads and wait for any currently
116 * running ones to exit. Do this before setting the vol_state. */
117 mutex_lock(&osb->recovery_lock);
118 osb->disable_recovery = 1;
119 mutex_unlock(&osb->recovery_lock);
120 wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
121
122 /* At this point, we know that no more recovery threads can be
123 * launched, so wait for any recovery completion work to
124 * complete. */
125 flush_workqueue(ocfs2_wq);
126
127 /*
128 * Now that recovery is shut down, and the osb is about to be
129 * freed, the osb_lock is not taken here.
130 */
131 rm = osb->recovery_map;
132 /* XXX: Should we bug if there are dirty entries? */
133
134 kfree(rm);
135}
136
137static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
138 unsigned int node_num)
139{
140 int i;
141 struct ocfs2_recovery_map *rm = osb->recovery_map;
142
143 assert_spin_locked(&osb->osb_lock);
144
145 for (i = 0; i < rm->rm_used; i++) {
146 if (rm->rm_entries[i] == node_num)
147 return 1;
148 }
149
150 return 0;
151}
152
153/* Behaves like test-and-set. Returns the previous value */
154static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
155 unsigned int node_num)
156{
157 struct ocfs2_recovery_map *rm = osb->recovery_map;
158
159 spin_lock(&osb->osb_lock);
160 if (__ocfs2_recovery_map_test(osb, node_num)) {
161 spin_unlock(&osb->osb_lock);
162 return 1;
163 }
164
165 /* XXX: Can this be exploited? Not from o2dlm... */
166 BUG_ON(rm->rm_used >= osb->max_slots);
167
168 rm->rm_entries[rm->rm_used] = node_num;
169 rm->rm_used++;
170 spin_unlock(&osb->osb_lock);
171
172 return 0;
173}
174
175static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
176 unsigned int node_num)
177{
178 int i;
179 struct ocfs2_recovery_map *rm = osb->recovery_map;
180
181 spin_lock(&osb->osb_lock);
182
183 for (i = 0; i < rm->rm_used; i++) {
184 if (rm->rm_entries[i] == node_num)
185 break;
186 }
187
188 if (i < rm->rm_used) {
189 /* XXX: be careful with the pointer math */
190 memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
191 (rm->rm_used - i - 1) * sizeof(unsigned int));
192 rm->rm_used--;
193 }
194
195 spin_unlock(&osb->osb_lock);
196}
197
67static int ocfs2_commit_cache(struct ocfs2_super *osb) 198static int ocfs2_commit_cache(struct ocfs2_super *osb)
68{ 199{
69 int status = 0; 200 int status = 0;
@@ -586,8 +717,7 @@ int ocfs2_journal_load(struct ocfs2_journal *journal, int local)
586 717
587 mlog_entry_void(); 718 mlog_entry_void();
588 719
589 if (!journal) 720 BUG_ON(!journal);
590 BUG();
591 721
592 osb = journal->j_osb; 722 osb = journal->j_osb;
593 723
@@ -650,6 +780,23 @@ bail:
650 return status; 780 return status;
651} 781}
652 782
783static int ocfs2_recovery_completed(struct ocfs2_super *osb)
784{
785 int empty;
786 struct ocfs2_recovery_map *rm = osb->recovery_map;
787
788 spin_lock(&osb->osb_lock);
789 empty = (rm->rm_used == 0);
790 spin_unlock(&osb->osb_lock);
791
792 return empty;
793}
794
795void ocfs2_wait_for_recovery(struct ocfs2_super *osb)
796{
797 wait_event(osb->recovery_event, ocfs2_recovery_completed(osb));
798}
799
653/* 800/*
654 * JBD Might read a cached version of another nodes journal file. We 801 * JBD Might read a cached version of another nodes journal file. We
655 * don't want this as this file changes often and we get no 802 * don't want this as this file changes often and we get no
@@ -848,6 +995,7 @@ static int __ocfs2_recovery_thread(void *arg)
848{ 995{
849 int status, node_num; 996 int status, node_num;
850 struct ocfs2_super *osb = arg; 997 struct ocfs2_super *osb = arg;
998 struct ocfs2_recovery_map *rm = osb->recovery_map;
851 999
852 mlog_entry_void(); 1000 mlog_entry_void();
853 1001
@@ -863,26 +1011,29 @@ restart:
863 goto bail; 1011 goto bail;
864 } 1012 }
865 1013
866 while(!ocfs2_node_map_is_empty(osb, &osb->recovery_map)) { 1014 spin_lock(&osb->osb_lock);
867 node_num = ocfs2_node_map_first_set_bit(osb, 1015 while (rm->rm_used) {
868 &osb->recovery_map); 1016 /* It's always safe to remove entry zero, as we won't
869 if (node_num == O2NM_INVALID_NODE_NUM) { 1017 * clear it until ocfs2_recover_node() has succeeded. */
870 mlog(0, "Out of nodes to recover.\n"); 1018 node_num = rm->rm_entries[0];
871 break; 1019 spin_unlock(&osb->osb_lock);
872 }
873 1020
874 status = ocfs2_recover_node(osb, node_num); 1021 status = ocfs2_recover_node(osb, node_num);
875 if (status < 0) { 1022 if (!status) {
1023 ocfs2_recovery_map_clear(osb, node_num);
1024 } else {
876 mlog(ML_ERROR, 1025 mlog(ML_ERROR,
877 "Error %d recovering node %d on device (%u,%u)!\n", 1026 "Error %d recovering node %d on device (%u,%u)!\n",
878 status, node_num, 1027 status, node_num,
879 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); 1028 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
880 mlog(ML_ERROR, "Volume requires unmount.\n"); 1029 mlog(ML_ERROR, "Volume requires unmount.\n");
881 continue;
882 } 1030 }
883 1031
884 ocfs2_recovery_map_clear(osb, node_num); 1032 spin_lock(&osb->osb_lock);
885 } 1033 }
1034 spin_unlock(&osb->osb_lock);
1035 mlog(0, "All nodes recovered\n");
1036
886 ocfs2_super_unlock(osb, 1); 1037 ocfs2_super_unlock(osb, 1);
887 1038
888 /* We always run recovery on our own orphan dir - the dead 1039 /* We always run recovery on our own orphan dir - the dead
@@ -893,8 +1044,7 @@ restart:
893 1044
894bail: 1045bail:
895 mutex_lock(&osb->recovery_lock); 1046 mutex_lock(&osb->recovery_lock);
896 if (!status && 1047 if (!status && !ocfs2_recovery_completed(osb)) {
897 !ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
898 mutex_unlock(&osb->recovery_lock); 1048 mutex_unlock(&osb->recovery_lock);
899 goto restart; 1049 goto restart;
900 } 1050 }
@@ -924,8 +1074,8 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
924 1074
925 /* People waiting on recovery will wait on 1075 /* People waiting on recovery will wait on
926 * the recovery map to empty. */ 1076 * the recovery map to empty. */
927 if (!ocfs2_recovery_map_set(osb, node_num)) 1077 if (ocfs2_recovery_map_set(osb, node_num))
928 mlog(0, "node %d already be in recovery.\n", node_num); 1078 mlog(0, "node %d already in recovery map.\n", node_num);
929 1079
930 mlog(0, "starting recovery thread...\n"); 1080 mlog(0, "starting recovery thread...\n");
931 1081
@@ -1079,7 +1229,6 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
1079{ 1229{
1080 int status = 0; 1230 int status = 0;
1081 int slot_num; 1231 int slot_num;
1082 struct ocfs2_slot_info *si = osb->slot_info;
1083 struct ocfs2_dinode *la_copy = NULL; 1232 struct ocfs2_dinode *la_copy = NULL;
1084 struct ocfs2_dinode *tl_copy = NULL; 1233 struct ocfs2_dinode *tl_copy = NULL;
1085 1234
@@ -1092,8 +1241,8 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
1092 * case we should've called ocfs2_journal_load instead. */ 1241 * case we should've called ocfs2_journal_load instead. */
1093 BUG_ON(osb->node_num == node_num); 1242 BUG_ON(osb->node_num == node_num);
1094 1243
1095 slot_num = ocfs2_node_num_to_slot(si, node_num); 1244 slot_num = ocfs2_node_num_to_slot(osb, node_num);
1096 if (slot_num == OCFS2_INVALID_SLOT) { 1245 if (slot_num == -ENOENT) {
1097 status = 0; 1246 status = 0;
1098 mlog(0, "no slot for this node, so no recovery required.\n"); 1247 mlog(0, "no slot for this node, so no recovery required.\n");
1099 goto done; 1248 goto done;
@@ -1123,8 +1272,7 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
1123 1272
1124 /* Likewise, this would be a strange but ultimately not so 1273 /* Likewise, this would be a strange but ultimately not so
1125 * harmful place to get an error... */ 1274 * harmful place to get an error... */
1126 ocfs2_clear_slot(si, slot_num); 1275 status = ocfs2_clear_slot(osb, slot_num);
1127 status = ocfs2_update_disk_slots(osb, si);
1128 if (status < 0) 1276 if (status < 0)
1129 mlog_errno(status); 1277 mlog_errno(status);
1130 1278
@@ -1184,23 +1332,24 @@ bail:
1184 * slot info struct has been updated from disk. */ 1332 * slot info struct has been updated from disk. */
1185int ocfs2_mark_dead_nodes(struct ocfs2_super *osb) 1333int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
1186{ 1334{
1187 int status, i, node_num; 1335 unsigned int node_num;
1188 struct ocfs2_slot_info *si = osb->slot_info; 1336 int status, i;
1189 1337
1190 /* This is called with the super block cluster lock, so we 1338 /* This is called with the super block cluster lock, so we
1191 * know that the slot map can't change underneath us. */ 1339 * know that the slot map can't change underneath us. */
1192 1340
1193 spin_lock(&si->si_lock); 1341 spin_lock(&osb->osb_lock);
1194 for(i = 0; i < si->si_num_slots; i++) { 1342 for (i = 0; i < osb->max_slots; i++) {
1195 if (i == osb->slot_num) 1343 if (i == osb->slot_num)
1196 continue; 1344 continue;
1197 if (ocfs2_is_empty_slot(si, i)) 1345
1346 status = ocfs2_slot_to_node_num_locked(osb, i, &node_num);
1347 if (status == -ENOENT)
1198 continue; 1348 continue;
1199 1349
1200 node_num = si->si_global_node_nums[i]; 1350 if (__ocfs2_recovery_map_test(osb, node_num))
1201 if (ocfs2_node_map_test_bit(osb, &osb->recovery_map, node_num))
1202 continue; 1351 continue;
1203 spin_unlock(&si->si_lock); 1352 spin_unlock(&osb->osb_lock);
1204 1353
1205 /* Ok, we have a slot occupied by another node which 1354 /* Ok, we have a slot occupied by another node which
1206 * is not in the recovery map. We trylock his journal 1355 * is not in the recovery map. We trylock his journal
@@ -1216,9 +1365,9 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
1216 goto bail; 1365 goto bail;
1217 } 1366 }
1218 1367
1219 spin_lock(&si->si_lock); 1368 spin_lock(&osb->osb_lock);
1220 } 1369 }
1221 spin_unlock(&si->si_lock); 1370 spin_unlock(&osb->osb_lock);
1222 1371
1223 status = 0; 1372 status = 0;
1224bail: 1373bail: