aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKurt Hackel <kurt.hackel@oracle.com>2006-03-06 17:08:49 -0500
committerMark Fasheh <mark.fasheh@oracle.com>2006-03-24 17:58:25 -0500
commitc03872f5f50bc10f2a1a485f08879a8d01bcfe49 (patch)
tree9ac370cf1a7c015522af75af3f60e9d6c4425bbc
parent9c6510a5bfe2f1c5f5b93386c06954be02e974e4 (diff)
[PATCH] ocfs2: dlm recovery fixes
when starting lock mastery (excepting the recovery lock) wait on any nodes needing recovery. fix one instance where lock resources were left attached to the recovery list after recovery completed. ensure that the node_down code is run uniformly regardless of which node found the dead node first. Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com> Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h6
-rw-r--r--fs/ocfs2/dlm/dlmlock.c14
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c103
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c38
4 files changed, 142 insertions, 19 deletions
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 9c772583744a..a8aec9341347 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -658,6 +658,7 @@ void dlm_complete_thread(struct dlm_ctxt *dlm);
658int dlm_launch_recovery_thread(struct dlm_ctxt *dlm); 658int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
659void dlm_complete_recovery_thread(struct dlm_ctxt *dlm); 659void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
660void dlm_wait_for_recovery(struct dlm_ctxt *dlm); 660void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
661void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
661int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node); 662int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
662int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout); 663int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
663 664
@@ -762,6 +763,11 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data);
762int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data); 763int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data);
763int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data); 764int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data);
764int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data); 765int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data);
766int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
767 u8 nodenum, u8 *real_master);
768int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
769 struct dlm_lock_resource *res, u8 *real_master);
770
765 771
766int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, 772int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
767 struct dlm_lock_resource *res, 773 struct dlm_lock_resource *res,
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index 671d4ff222cc..6fea28318d6d 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -141,13 +141,23 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
141 res->lockname.len)) { 141 res->lockname.len)) {
142 kick_thread = 1; 142 kick_thread = 1;
143 call_ast = 1; 143 call_ast = 1;
144 } else {
145 mlog(0, "%s: returning DLM_NORMAL to "
146 "node %u for reco lock\n", dlm->name,
147 lock->ml.node);
144 } 148 }
145 } else { 149 } else {
146 /* for NOQUEUE request, unless we get the 150 /* for NOQUEUE request, unless we get the
147 * lock right away, return DLM_NOTQUEUED */ 151 * lock right away, return DLM_NOTQUEUED */
148 if (flags & LKM_NOQUEUE) 152 if (flags & LKM_NOQUEUE) {
149 status = DLM_NOTQUEUED; 153 status = DLM_NOTQUEUED;
150 else { 154 if (dlm_is_recovery_lock(res->lockname.name,
155 res->lockname.len)) {
156 mlog(0, "%s: returning NOTQUEUED to "
157 "node %u for reco lock\n", dlm->name,
158 lock->ml.node);
159 }
160 } else {
151 dlm_lock_get(lock); 161 dlm_lock_get(lock);
152 list_add_tail(&lock->list, &res->blocked); 162 list_add_tail(&lock->list, &res->blocked);
153 kick_thread = 1; 163 kick_thread = 1;
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 78ac3a00eb54..940be4c13b1f 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -239,6 +239,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
239static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm, 239static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
240 struct dlm_lock_resource *res, 240 struct dlm_lock_resource *res,
241 u8 target); 241 u8 target);
242static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
243 struct dlm_lock_resource *res);
242 244
243 245
244int dlm_is_host_down(int errno) 246int dlm_is_host_down(int errno)
@@ -677,6 +679,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
677 struct dlm_node_iter iter; 679 struct dlm_node_iter iter;
678 unsigned int namelen; 680 unsigned int namelen;
679 int tries = 0; 681 int tries = 0;
682 int bit, wait_on_recovery = 0;
680 683
681 BUG_ON(!lockid); 684 BUG_ON(!lockid);
682 685
@@ -762,6 +765,18 @@ lookup:
762 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0); 765 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
763 set_bit(dlm->node_num, mle->maybe_map); 766 set_bit(dlm->node_num, mle->maybe_map);
764 list_add(&mle->list, &dlm->master_list); 767 list_add(&mle->list, &dlm->master_list);
768
769 /* still holding the dlm spinlock, check the recovery map
770 * to see if there are any nodes that still need to be
771 * considered. these will not appear in the mle nodemap
772 * but they might own this lockres. wait on them. */
773 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
774 if (bit < O2NM_MAX_NODES) {
775 mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
776 "recover before lock mastery can begin\n",
777 dlm->name, namelen, (char *)lockid, bit);
778 wait_on_recovery = 1;
779 }
765 } 780 }
766 781
767 /* at this point there is either a DLM_MLE_BLOCK or a 782 /* at this point there is either a DLM_MLE_BLOCK or a
@@ -779,6 +794,39 @@ lookup:
779 spin_unlock(&dlm->master_lock); 794 spin_unlock(&dlm->master_lock);
780 spin_unlock(&dlm->spinlock); 795 spin_unlock(&dlm->spinlock);
781 796
797 while (wait_on_recovery) {
798 /* any cluster changes that occurred after dropping the
799 * dlm spinlock would be detectable be a change on the mle,
800 * so we only need to clear out the recovery map once. */
801 if (dlm_is_recovery_lock(lockid, namelen)) {
802 mlog(ML_NOTICE, "%s: recovery map is not empty, but "
803 "must master $RECOVERY lock now\n", dlm->name);
804 if (!dlm_pre_master_reco_lockres(dlm, res))
805 wait_on_recovery = 0;
806 else {
807 mlog(0, "%s: waiting 500ms for heartbeat state "
808 "change\n", dlm->name);
809 msleep(500);
810 }
811 continue;
812 }
813
814 dlm_kick_recovery_thread(dlm);
815 msleep(100);
816 dlm_wait_for_recovery(dlm);
817
818 spin_lock(&dlm->spinlock);
819 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
820 if (bit < O2NM_MAX_NODES) {
821 mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
822 "recover before lock mastery can begin\n",
823 dlm->name, namelen, (char *)lockid, bit);
824 wait_on_recovery = 1;
825 } else
826 wait_on_recovery = 0;
827 spin_unlock(&dlm->spinlock);
828 }
829
782 /* must wait for lock to be mastered elsewhere */ 830 /* must wait for lock to be mastered elsewhere */
783 if (blocked) 831 if (blocked)
784 goto wait; 832 goto wait;
@@ -1835,6 +1883,61 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1835 mlog(0, "finished with dlm_assert_master_worker\n"); 1883 mlog(0, "finished with dlm_assert_master_worker\n");
1836} 1884}
1837 1885
1886/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
1887 * We cannot wait for node recovery to complete to begin mastering this
1888 * lockres because this lockres is used to kick off recovery! ;-)
1889 * So, do a pre-check on all living nodes to see if any of those nodes
1890 * think that $RECOVERY is currently mastered by a dead node. If so,
1891 * we wait a short time to allow that node to get notified by its own
1892 * heartbeat stack, then check again. All $RECOVERY lock resources
1893 * mastered by dead nodes are purged when the hearbeat callback is
1894 * fired, so we can know for sure that it is safe to continue once
1895 * the node returns a live node or no node. */
1896static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
1897 struct dlm_lock_resource *res)
1898{
1899 struct dlm_node_iter iter;
1900 int nodenum;
1901 int ret = 0;
1902 u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
1903
1904 spin_lock(&dlm->spinlock);
1905 dlm_node_iter_init(dlm->domain_map, &iter);
1906 spin_unlock(&dlm->spinlock);
1907
1908 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
1909 /* do not send to self */
1910 if (nodenum == dlm->node_num)
1911 continue;
1912 ret = dlm_do_master_requery(dlm, res, nodenum, &master);
1913 if (ret < 0) {
1914 mlog_errno(ret);
1915 if (!dlm_is_host_down(ret))
1916 BUG();
1917 /* host is down, so answer for that node would be
1918 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
1919 }
1920
1921 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1922 /* check to see if this master is in the recovery map */
1923 spin_lock(&dlm->spinlock);
1924 if (test_bit(master, dlm->recovery_map)) {
1925 mlog(ML_NOTICE, "%s: node %u has not seen "
1926 "node %u go down yet, and thinks the "
1927 "dead node is mastering the recovery "
1928 "lock. must wait.\n", dlm->name,
1929 nodenum, master);
1930 ret = -EAGAIN;
1931 }
1932 spin_unlock(&dlm->spinlock);
1933 mlog(0, "%s: reco lock master is %u\n", dlm->name,
1934 master);
1935 break;
1936 }
1937 }
1938 return ret;
1939}
1940
1838 1941
1839/* 1942/*
1840 * DLM_MIGRATE_LOCKRES 1943 * DLM_MIGRATE_LOCKRES
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 1e232000f3f7..36610bdf1231 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -58,7 +58,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
58static int dlm_recovery_thread(void *data); 58static int dlm_recovery_thread(void *data);
59void dlm_complete_recovery_thread(struct dlm_ctxt *dlm); 59void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
60int dlm_launch_recovery_thread(struct dlm_ctxt *dlm); 60int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
61static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm); 61void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
62static int dlm_do_recovery(struct dlm_ctxt *dlm); 62static int dlm_do_recovery(struct dlm_ctxt *dlm);
63 63
64static int dlm_pick_recovery_master(struct dlm_ctxt *dlm); 64static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
@@ -78,15 +78,9 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
78 u8 send_to, 78 u8 send_to,
79 struct dlm_lock_resource *res, 79 struct dlm_lock_resource *res,
80 int total_locks); 80 int total_locks);
81static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
82 struct dlm_lock_resource *res,
83 u8 *real_master);
84static int dlm_process_recovery_data(struct dlm_ctxt *dlm, 81static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
85 struct dlm_lock_resource *res, 82 struct dlm_lock_resource *res,
86 struct dlm_migratable_lockres *mres); 83 struct dlm_migratable_lockres *mres);
87static int dlm_do_master_requery(struct dlm_ctxt *dlm,
88 struct dlm_lock_resource *res,
89 u8 nodenum, u8 *real_master);
90static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm); 84static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
91static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, 85static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
92 u8 dead_node, u8 send_to); 86 u8 dead_node, u8 send_to);
@@ -165,7 +159,7 @@ void dlm_dispatch_work(void *data)
165 * RECOVERY THREAD 159 * RECOVERY THREAD
166 */ 160 */
167 161
168static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm) 162void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
169{ 163{
170 /* wake the recovery thread 164 /* wake the recovery thread
171 * this will wake the reco thread in one of three places 165 * this will wake the reco thread in one of three places
@@ -1316,9 +1310,8 @@ leave:
1316 1310
1317 1311
1318 1312
1319static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, 1313int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
1320 struct dlm_lock_resource *res, 1314 struct dlm_lock_resource *res, u8 *real_master)
1321 u8 *real_master)
1322{ 1315{
1323 struct dlm_node_iter iter; 1316 struct dlm_node_iter iter;
1324 int nodenum; 1317 int nodenum;
@@ -1360,8 +1353,10 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
1360 ret = dlm_do_master_requery(dlm, res, nodenum, real_master); 1353 ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
1361 if (ret < 0) { 1354 if (ret < 0) {
1362 mlog_errno(ret); 1355 mlog_errno(ret);
1363 BUG(); 1356 if (!dlm_is_host_down(ret))
1364 /* TODO: need to figure a way to restart this */ 1357 BUG();
1358 /* host is down, so answer for that node would be
1359 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
1365 } 1360 }
1366 if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) { 1361 if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1367 mlog(0, "lock master is %u\n", *real_master); 1362 mlog(0, "lock master is %u\n", *real_master);
@@ -1372,9 +1367,8 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
1372} 1367}
1373 1368
1374 1369
1375static int dlm_do_master_requery(struct dlm_ctxt *dlm, 1370int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1376 struct dlm_lock_resource *res, 1371 u8 nodenum, u8 *real_master)
1377 u8 nodenum, u8 *real_master)
1378{ 1372{
1379 int ret = -EINVAL; 1373 int ret = -EINVAL;
1380 struct dlm_master_requery req; 1374 struct dlm_master_requery req;
@@ -1739,6 +1733,13 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1739 } else 1733 } else
1740 continue; 1734 continue;
1741 1735
1736 if (!list_empty(&res->recovering)) {
1737 mlog(0, "%s:%.*s: lockres was "
1738 "marked RECOVERING, owner=%u\n",
1739 dlm->name, res->lockname.len,
1740 res->lockname.name, res->owner);
1741 list_del_init(&res->recovering);
1742 }
1742 spin_lock(&res->spinlock); 1743 spin_lock(&res->spinlock);
1743 dlm_change_lockres_owner(dlm, res, new_master); 1744 dlm_change_lockres_owner(dlm, res, new_master);
1744 res->state &= ~DLM_LOCK_RES_RECOVERING; 1745 res->state &= ~DLM_LOCK_RES_RECOVERING;
@@ -2258,7 +2259,10 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2258 mlog(0, "%u not in domain/live_nodes map " 2259 mlog(0, "%u not in domain/live_nodes map "
2259 "so setting it in reco map manually\n", 2260 "so setting it in reco map manually\n",
2260 br->dead_node); 2261 br->dead_node);
2261 set_bit(br->dead_node, dlm->recovery_map); 2262 /* force the recovery cleanup in __dlm_hb_node_down
2263 * both of these will be cleared in a moment */
2264 set_bit(br->dead_node, dlm->domain_map);
2265 set_bit(br->dead_node, dlm->live_nodes_map);
2262 __dlm_hb_node_down(dlm, br->dead_node); 2266 __dlm_hb_node_down(dlm, br->dead_node);
2263 } 2267 }
2264 spin_unlock(&dlm->spinlock); 2268 spin_unlock(&dlm->spinlock);