aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/journal.c
diff options
context:
space:
mode:
authorJan Kara <jack@suse.cz>2008-10-20 17:50:38 -0400
committerMark Fasheh <mfasheh@suse.com>2009-01-05 11:40:24 -0500
commit2205363dce7447b8e85f1ead14387664c1a98753 (patch)
tree729b2716f2e31bda2e035a11cc39aa5472dff2c4 /fs/ocfs2/journal.c
parent171bf93ce11f4c9929fdce6ce63df8da2f3c4475 (diff)
ocfs2: Implement quota recovery
Implement functions for recovery after a crash. Functions just read local quota file and sync info to global quota file. Signed-off-by: Jan Kara <jack@suse.cz> Signed-off-by: Mark Fasheh <mfasheh@suse.com>
Diffstat (limited to 'fs/ocfs2/journal.c')
-rw-r--r--fs/ocfs2/journal.c108
1 files changed, 86 insertions, 22 deletions
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index 11a1178d5ee8..c60242018d9a 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -45,6 +45,7 @@
45#include "slot_map.h" 45#include "slot_map.h"
46#include "super.h" 46#include "super.h"
47#include "sysfile.h" 47#include "sysfile.h"
48#include "quota.h"
48 49
49#include "buffer_head_io.h" 50#include "buffer_head_io.h"
50 51
@@ -52,7 +53,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
52 53
53static int ocfs2_force_read_journal(struct inode *inode); 54static int ocfs2_force_read_journal(struct inode *inode);
54static int ocfs2_recover_node(struct ocfs2_super *osb, 55static int ocfs2_recover_node(struct ocfs2_super *osb,
55 int node_num); 56 int node_num, int slot_num);
56static int __ocfs2_recovery_thread(void *arg); 57static int __ocfs2_recovery_thread(void *arg);
57static int ocfs2_commit_cache(struct ocfs2_super *osb); 58static int ocfs2_commit_cache(struct ocfs2_super *osb);
58static int ocfs2_wait_on_mount(struct ocfs2_super *osb); 59static int ocfs2_wait_on_mount(struct ocfs2_super *osb);
@@ -857,6 +858,7 @@ struct ocfs2_la_recovery_item {
857 int lri_slot; 858 int lri_slot;
858 struct ocfs2_dinode *lri_la_dinode; 859 struct ocfs2_dinode *lri_la_dinode;
859 struct ocfs2_dinode *lri_tl_dinode; 860 struct ocfs2_dinode *lri_tl_dinode;
861 struct ocfs2_quota_recovery *lri_qrec;
860}; 862};
861 863
862/* Does the second half of the recovery process. By this point, the 864/* Does the second half of the recovery process. By this point, the
@@ -877,6 +879,7 @@ void ocfs2_complete_recovery(struct work_struct *work)
877 struct ocfs2_super *osb = journal->j_osb; 879 struct ocfs2_super *osb = journal->j_osb;
878 struct ocfs2_dinode *la_dinode, *tl_dinode; 880 struct ocfs2_dinode *la_dinode, *tl_dinode;
879 struct ocfs2_la_recovery_item *item, *n; 881 struct ocfs2_la_recovery_item *item, *n;
882 struct ocfs2_quota_recovery *qrec;
880 LIST_HEAD(tmp_la_list); 883 LIST_HEAD(tmp_la_list);
881 884
882 mlog_entry_void(); 885 mlog_entry_void();
@@ -922,6 +925,16 @@ void ocfs2_complete_recovery(struct work_struct *work)
922 if (ret < 0) 925 if (ret < 0)
923 mlog_errno(ret); 926 mlog_errno(ret);
924 927
928 qrec = item->lri_qrec;
929 if (qrec) {
930 mlog(0, "Recovering quota files");
931 ret = ocfs2_finish_quota_recovery(osb, qrec,
932 item->lri_slot);
933 if (ret < 0)
934 mlog_errno(ret);
935 /* Recovery info is already freed now */
936 }
937
925 kfree(item); 938 kfree(item);
926 } 939 }
927 940
@@ -935,7 +948,8 @@ void ocfs2_complete_recovery(struct work_struct *work)
935static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, 948static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
936 int slot_num, 949 int slot_num,
937 struct ocfs2_dinode *la_dinode, 950 struct ocfs2_dinode *la_dinode,
938 struct ocfs2_dinode *tl_dinode) 951 struct ocfs2_dinode *tl_dinode,
952 struct ocfs2_quota_recovery *qrec)
939{ 953{
940 struct ocfs2_la_recovery_item *item; 954 struct ocfs2_la_recovery_item *item;
941 955
@@ -950,6 +964,9 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
950 if (tl_dinode) 964 if (tl_dinode)
951 kfree(tl_dinode); 965 kfree(tl_dinode);
952 966
967 if (qrec)
968 ocfs2_free_quota_recovery(qrec);
969
953 mlog_errno(-ENOMEM); 970 mlog_errno(-ENOMEM);
954 return; 971 return;
955 } 972 }
@@ -958,6 +975,7 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
958 item->lri_la_dinode = la_dinode; 975 item->lri_la_dinode = la_dinode;
959 item->lri_slot = slot_num; 976 item->lri_slot = slot_num;
960 item->lri_tl_dinode = tl_dinode; 977 item->lri_tl_dinode = tl_dinode;
978 item->lri_qrec = qrec;
961 979
962 spin_lock(&journal->j_lock); 980 spin_lock(&journal->j_lock);
963 list_add_tail(&item->lri_list, &journal->j_la_cleanups); 981 list_add_tail(&item->lri_list, &journal->j_la_cleanups);
@@ -977,6 +995,7 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
977 ocfs2_queue_recovery_completion(journal, 995 ocfs2_queue_recovery_completion(journal,
978 osb->slot_num, 996 osb->slot_num,
979 osb->local_alloc_copy, 997 osb->local_alloc_copy,
998 NULL,
980 NULL); 999 NULL);
981 ocfs2_schedule_truncate_log_flush(osb, 0); 1000 ocfs2_schedule_truncate_log_flush(osb, 0);
982 1001
@@ -985,11 +1004,26 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
985 } 1004 }
986} 1005}
987 1006
1007void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
1008{
1009 if (osb->quota_rec) {
1010 ocfs2_queue_recovery_completion(osb->journal,
1011 osb->slot_num,
1012 NULL,
1013 NULL,
1014 osb->quota_rec);
1015 osb->quota_rec = NULL;
1016 }
1017}
1018
988static int __ocfs2_recovery_thread(void *arg) 1019static int __ocfs2_recovery_thread(void *arg)
989{ 1020{
990 int status, node_num; 1021 int status, node_num, slot_num;
991 struct ocfs2_super *osb = arg; 1022 struct ocfs2_super *osb = arg;
992 struct ocfs2_recovery_map *rm = osb->recovery_map; 1023 struct ocfs2_recovery_map *rm = osb->recovery_map;
1024 int *rm_quota = NULL;
1025 int rm_quota_used = 0, i;
1026 struct ocfs2_quota_recovery *qrec;
993 1027
994 mlog_entry_void(); 1028 mlog_entry_void();
995 1029
@@ -998,6 +1032,11 @@ static int __ocfs2_recovery_thread(void *arg)
998 goto bail; 1032 goto bail;
999 } 1033 }
1000 1034
1035 rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS);
1036 if (!rm_quota) {
1037 status = -ENOMEM;
1038 goto bail;
1039 }
1001restart: 1040restart:
1002 status = ocfs2_super_lock(osb, 1); 1041 status = ocfs2_super_lock(osb, 1);
1003 if (status < 0) { 1042 if (status < 0) {
@@ -1011,8 +1050,28 @@ restart:
1011 * clear it until ocfs2_recover_node() has succeeded. */ 1050 * clear it until ocfs2_recover_node() has succeeded. */
1012 node_num = rm->rm_entries[0]; 1051 node_num = rm->rm_entries[0];
1013 spin_unlock(&osb->osb_lock); 1052 spin_unlock(&osb->osb_lock);
1014 1053 mlog(0, "checking node %d\n", node_num);
1015 status = ocfs2_recover_node(osb, node_num); 1054 slot_num = ocfs2_node_num_to_slot(osb, node_num);
1055 if (slot_num == -ENOENT) {
1056 status = 0;
1057 mlog(0, "no slot for this node, so no recovery"
1058 "required.\n");
1059 goto skip_recovery;
1060 }
1061 mlog(0, "node %d was using slot %d\n", node_num, slot_num);
1062
1063 /* It is a bit subtle with quota recovery. We cannot do it
1064 * immediately because we have to obtain cluster locks from
1065 * quota files and we also don't want to just skip it because
1066 * then quota usage would be out of sync until some node takes
1067 * the slot. So we remember which nodes need quota recovery
1068 * and when everything else is done, we recover quotas. */
1069 for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
1070 if (i == rm_quota_used)
1071 rm_quota[rm_quota_used++] = slot_num;
1072
1073 status = ocfs2_recover_node(osb, node_num, slot_num);
1074skip_recovery:
1016 if (!status) { 1075 if (!status) {
1017 ocfs2_recovery_map_clear(osb, node_num); 1076 ocfs2_recovery_map_clear(osb, node_num);
1018 } else { 1077 } else {
@@ -1034,13 +1093,27 @@ restart:
1034 if (status < 0) 1093 if (status < 0)
1035 mlog_errno(status); 1094 mlog_errno(status);
1036 1095
1096 /* Now it is right time to recover quotas... We have to do this under
1097 * superblock lock so that noone can start using the slot (and crash)
1098 * before we recover it */
1099 for (i = 0; i < rm_quota_used; i++) {
1100 qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]);
1101 if (IS_ERR(qrec)) {
1102 status = PTR_ERR(qrec);
1103 mlog_errno(status);
1104 continue;
1105 }
1106 ocfs2_queue_recovery_completion(osb->journal, rm_quota[i],
1107 NULL, NULL, qrec);
1108 }
1109
1037 ocfs2_super_unlock(osb, 1); 1110 ocfs2_super_unlock(osb, 1);
1038 1111
1039 /* We always run recovery on our own orphan dir - the dead 1112 /* We always run recovery on our own orphan dir - the dead
1040 * node(s) may have disallowd a previos inode delete. Re-processing 1113 * node(s) may have disallowd a previos inode delete. Re-processing
1041 * is therefore required. */ 1114 * is therefore required. */
1042 ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL, 1115 ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
1043 NULL); 1116 NULL, NULL);
1044 1117
1045bail: 1118bail:
1046 mutex_lock(&osb->recovery_lock); 1119 mutex_lock(&osb->recovery_lock);
@@ -1055,6 +1128,9 @@ bail:
1055 1128
1056 mutex_unlock(&osb->recovery_lock); 1129 mutex_unlock(&osb->recovery_lock);
1057 1130
1131 if (rm_quota)
1132 kfree(rm_quota);
1133
1058 mlog_exit(status); 1134 mlog_exit(status);
1059 /* no one is callint kthread_stop() for us so the kthread() api 1135 /* no one is callint kthread_stop() for us so the kthread() api
1060 * requires that we call do_exit(). And it isn't exported, but 1136 * requires that we call do_exit(). And it isn't exported, but
@@ -1282,31 +1358,19 @@ done:
1282 * far less concerning. 1358 * far less concerning.
1283 */ 1359 */
1284static int ocfs2_recover_node(struct ocfs2_super *osb, 1360static int ocfs2_recover_node(struct ocfs2_super *osb,
1285 int node_num) 1361 int node_num, int slot_num)
1286{ 1362{
1287 int status = 0; 1363 int status = 0;
1288 int slot_num;
1289 struct ocfs2_dinode *la_copy = NULL; 1364 struct ocfs2_dinode *la_copy = NULL;
1290 struct ocfs2_dinode *tl_copy = NULL; 1365 struct ocfs2_dinode *tl_copy = NULL;
1291 1366
1292 mlog_entry("(node_num=%d, osb->node_num = %d)\n", 1367 mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
1293 node_num, osb->node_num); 1368 node_num, slot_num, osb->node_num);
1294
1295 mlog(0, "checking node %d\n", node_num);
1296 1369
1297 /* Should not ever be called to recover ourselves -- in that 1370 /* Should not ever be called to recover ourselves -- in that
1298 * case we should've called ocfs2_journal_load instead. */ 1371 * case we should've called ocfs2_journal_load instead. */
1299 BUG_ON(osb->node_num == node_num); 1372 BUG_ON(osb->node_num == node_num);
1300 1373
1301 slot_num = ocfs2_node_num_to_slot(osb, node_num);
1302 if (slot_num == -ENOENT) {
1303 status = 0;
1304 mlog(0, "no slot for this node, so no recovery required.\n");
1305 goto done;
1306 }
1307
1308 mlog(0, "node %d was using slot %d\n", node_num, slot_num);
1309
1310 status = ocfs2_replay_journal(osb, node_num, slot_num); 1374 status = ocfs2_replay_journal(osb, node_num, slot_num);
1311 if (status < 0) { 1375 if (status < 0) {
1312 if (status == -EBUSY) { 1376 if (status == -EBUSY) {
@@ -1342,7 +1406,7 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
1342 1406
1343 /* This will kfree the memory pointed to by la_copy and tl_copy */ 1407 /* This will kfree the memory pointed to by la_copy and tl_copy */
1344 ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy, 1408 ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
1345 tl_copy); 1409 tl_copy, NULL);
1346 1410
1347 status = 0; 1411 status = 0;
1348done: 1412done: