aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h6
-rw-r--r--fs/ocfs2/dlm/dlmlock.c14
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c103
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c38
4 files changed, 142 insertions, 19 deletions
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 9c772583744a..a8aec9341347 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -658,6 +658,7 @@ void dlm_complete_thread(struct dlm_ctxt *dlm);
658int dlm_launch_recovery_thread(struct dlm_ctxt *dlm); 658int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
659void dlm_complete_recovery_thread(struct dlm_ctxt *dlm); 659void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
660void dlm_wait_for_recovery(struct dlm_ctxt *dlm); 660void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
661void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
661int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node); 662int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
662int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout); 663int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
663 664
@@ -762,6 +763,11 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data);
762int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data); 763int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data);
763int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data); 764int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data);
764int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data); 765int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data);
766int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
767 u8 nodenum, u8 *real_master);
768int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
769 struct dlm_lock_resource *res, u8 *real_master);
770
765 771
766int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, 772int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
767 struct dlm_lock_resource *res, 773 struct dlm_lock_resource *res,
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index 671d4ff222cc..6fea28318d6d 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -141,13 +141,23 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
141 res->lockname.len)) { 141 res->lockname.len)) {
142 kick_thread = 1; 142 kick_thread = 1;
143 call_ast = 1; 143 call_ast = 1;
144 } else {
145 mlog(0, "%s: returning DLM_NORMAL to "
146 "node %u for reco lock\n", dlm->name,
147 lock->ml.node);
144 } 148 }
145 } else { 149 } else {
146 /* for NOQUEUE request, unless we get the 150 /* for NOQUEUE request, unless we get the
147 * lock right away, return DLM_NOTQUEUED */ 151 * lock right away, return DLM_NOTQUEUED */
148 if (flags & LKM_NOQUEUE) 152 if (flags & LKM_NOQUEUE) {
149 status = DLM_NOTQUEUED; 153 status = DLM_NOTQUEUED;
150 else { 154 if (dlm_is_recovery_lock(res->lockname.name,
155 res->lockname.len)) {
156 mlog(0, "%s: returning NOTQUEUED to "
157 "node %u for reco lock\n", dlm->name,
158 lock->ml.node);
159 }
160 } else {
151 dlm_lock_get(lock); 161 dlm_lock_get(lock);
152 list_add_tail(&lock->list, &res->blocked); 162 list_add_tail(&lock->list, &res->blocked);
153 kick_thread = 1; 163 kick_thread = 1;
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 78ac3a00eb54..940be4c13b1f 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -239,6 +239,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
239static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm, 239static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
240 struct dlm_lock_resource *res, 240 struct dlm_lock_resource *res,
241 u8 target); 241 u8 target);
242static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
243 struct dlm_lock_resource *res);
242 244
243 245
244int dlm_is_host_down(int errno) 246int dlm_is_host_down(int errno)
@@ -677,6 +679,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
677 struct dlm_node_iter iter; 679 struct dlm_node_iter iter;
678 unsigned int namelen; 680 unsigned int namelen;
679 int tries = 0; 681 int tries = 0;
682 int bit, wait_on_recovery = 0;
680 683
681 BUG_ON(!lockid); 684 BUG_ON(!lockid);
682 685
@@ -762,6 +765,18 @@ lookup:
762 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0); 765 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
763 set_bit(dlm->node_num, mle->maybe_map); 766 set_bit(dlm->node_num, mle->maybe_map);
764 list_add(&mle->list, &dlm->master_list); 767 list_add(&mle->list, &dlm->master_list);
768
769 /* still holding the dlm spinlock, check the recovery map
770 * to see if there are any nodes that still need to be
771 * considered. these will not appear in the mle nodemap
772 * but they might own this lockres. wait on them. */
773 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
774 if (bit < O2NM_MAX_NODES) {
775 mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
776 "recover before lock mastery can begin\n",
777 dlm->name, namelen, (char *)lockid, bit);
778 wait_on_recovery = 1;
779 }
765 } 780 }
766 781
767 /* at this point there is either a DLM_MLE_BLOCK or a 782 /* at this point there is either a DLM_MLE_BLOCK or a
@@ -779,6 +794,39 @@ lookup:
779 spin_unlock(&dlm->master_lock); 794 spin_unlock(&dlm->master_lock);
780 spin_unlock(&dlm->spinlock); 795 spin_unlock(&dlm->spinlock);
781 796
797 while (wait_on_recovery) {
798 /* any cluster changes that occurred after dropping the
799 * dlm spinlock would be detectable be a change on the mle,
800 * so we only need to clear out the recovery map once. */
801 if (dlm_is_recovery_lock(lockid, namelen)) {
802 mlog(ML_NOTICE, "%s: recovery map is not empty, but "
803 "must master $RECOVERY lock now\n", dlm->name);
804 if (!dlm_pre_master_reco_lockres(dlm, res))
805 wait_on_recovery = 0;
806 else {
807 mlog(0, "%s: waiting 500ms for heartbeat state "
808 "change\n", dlm->name);
809 msleep(500);
810 }
811 continue;
812 }
813
814 dlm_kick_recovery_thread(dlm);
815 msleep(100);
816 dlm_wait_for_recovery(dlm);
817
818 spin_lock(&dlm->spinlock);
819 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
820 if (bit < O2NM_MAX_NODES) {
821 mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
822 "recover before lock mastery can begin\n",
823 dlm->name, namelen, (char *)lockid, bit);
824 wait_on_recovery = 1;
825 } else
826 wait_on_recovery = 0;
827 spin_unlock(&dlm->spinlock);
828 }
829
782 /* must wait for lock to be mastered elsewhere */ 830 /* must wait for lock to be mastered elsewhere */
783 if (blocked) 831 if (blocked)
784 goto wait; 832 goto wait;
@@ -1835,6 +1883,61 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1835 mlog(0, "finished with dlm_assert_master_worker\n"); 1883 mlog(0, "finished with dlm_assert_master_worker\n");
1836} 1884}
1837 1885
1886/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
1887 * We cannot wait for node recovery to complete to begin mastering this
1888 * lockres because this lockres is used to kick off recovery! ;-)
1889 * So, do a pre-check on all living nodes to see if any of those nodes
1890 * think that $RECOVERY is currently mastered by a dead node. If so,
1891 * we wait a short time to allow that node to get notified by its own
1892 * heartbeat stack, then check again. All $RECOVERY lock resources
1893 * mastered by dead nodes are purged when the hearbeat callback is
1894 * fired, so we can know for sure that it is safe to continue once
1895 * the node returns a live node or no node. */
1896static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
1897 struct dlm_lock_resource *res)
1898{
1899 struct dlm_node_iter iter;
1900 int nodenum;
1901 int ret = 0;
1902 u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
1903
1904 spin_lock(&dlm->spinlock);
1905 dlm_node_iter_init(dlm->domain_map, &iter);
1906 spin_unlock(&dlm->spinlock);
1907
1908 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
1909 /* do not send to self */
1910 if (nodenum == dlm->node_num)
1911 continue;
1912 ret = dlm_do_master_requery(dlm, res, nodenum, &master);
1913 if (ret < 0) {
1914 mlog_errno(ret);
1915 if (!dlm_is_host_down(ret))
1916 BUG();
1917 /* host is down, so answer for that node would be
1918 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
1919 }
1920
1921 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1922 /* check to see if this master is in the recovery map */
1923 spin_lock(&dlm->spinlock);
1924 if (test_bit(master, dlm->recovery_map)) {
1925 mlog(ML_NOTICE, "%s: node %u has not seen "
1926 "node %u go down yet, and thinks the "
1927 "dead node is mastering the recovery "
1928 "lock. must wait.\n", dlm->name,
1929 nodenum, master);
1930 ret = -EAGAIN;
1931 }
1932 spin_unlock(&dlm->spinlock);
1933 mlog(0, "%s: reco lock master is %u\n", dlm->name,
1934 master);
1935 break;
1936 }
1937 }
1938 return ret;
1939}
1940
1838 1941
1839/* 1942/*
1840 * DLM_MIGRATE_LOCKRES 1943 * DLM_MIGRATE_LOCKRES
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 1e232000f3f7..36610bdf1231 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -58,7 +58,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
58static int dlm_recovery_thread(void *data); 58static int dlm_recovery_thread(void *data);
59void dlm_complete_recovery_thread(struct dlm_ctxt *dlm); 59void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
60int dlm_launch_recovery_thread(struct dlm_ctxt *dlm); 60int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
61static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm); 61void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
62static int dlm_do_recovery(struct dlm_ctxt *dlm); 62static int dlm_do_recovery(struct dlm_ctxt *dlm);
63 63
64static int dlm_pick_recovery_master(struct dlm_ctxt *dlm); 64static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
@@ -78,15 +78,9 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
78 u8 send_to, 78 u8 send_to,
79 struct dlm_lock_resource *res, 79 struct dlm_lock_resource *res,
80 int total_locks); 80 int total_locks);
81static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
82 struct dlm_lock_resource *res,
83 u8 *real_master);
84static int dlm_process_recovery_data(struct dlm_ctxt *dlm, 81static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
85 struct dlm_lock_resource *res, 82 struct dlm_lock_resource *res,
86 struct dlm_migratable_lockres *mres); 83 struct dlm_migratable_lockres *mres);
87static int dlm_do_master_requery(struct dlm_ctxt *dlm,
88 struct dlm_lock_resource *res,
89 u8 nodenum, u8 *real_master);
90static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm); 84static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
91static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, 85static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
92 u8 dead_node, u8 send_to); 86 u8 dead_node, u8 send_to);
@@ -165,7 +159,7 @@ void dlm_dispatch_work(void *data)
165 * RECOVERY THREAD 159 * RECOVERY THREAD
166 */ 160 */
167 161
168static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm) 162void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
169{ 163{
170 /* wake the recovery thread 164 /* wake the recovery thread
171 * this will wake the reco thread in one of three places 165 * this will wake the reco thread in one of three places
@@ -1316,9 +1310,8 @@ leave:
1316 1310
1317 1311
1318 1312
1319static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, 1313int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
1320 struct dlm_lock_resource *res, 1314 struct dlm_lock_resource *res, u8 *real_master)
1321 u8 *real_master)
1322{ 1315{
1323 struct dlm_node_iter iter; 1316 struct dlm_node_iter iter;
1324 int nodenum; 1317 int nodenum;
@@ -1360,8 +1353,10 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
1360 ret = dlm_do_master_requery(dlm, res, nodenum, real_master); 1353 ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
1361 if (ret < 0) { 1354 if (ret < 0) {
1362 mlog_errno(ret); 1355 mlog_errno(ret);
1363 BUG(); 1356 if (!dlm_is_host_down(ret))
1364 /* TODO: need to figure a way to restart this */ 1357 BUG();
1358 /* host is down, so answer for that node would be
1359 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
1365 } 1360 }
1366 if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) { 1361 if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1367 mlog(0, "lock master is %u\n", *real_master); 1362 mlog(0, "lock master is %u\n", *real_master);
@@ -1372,9 +1367,8 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
1372} 1367}
1373 1368
1374 1369
1375static int dlm_do_master_requery(struct dlm_ctxt *dlm, 1370int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1376 struct dlm_lock_resource *res, 1371 u8 nodenum, u8 *real_master)
1377 u8 nodenum, u8 *real_master)
1378{ 1372{
1379 int ret = -EINVAL; 1373 int ret = -EINVAL;
1380 struct dlm_master_requery req; 1374 struct dlm_master_requery req;
@@ -1739,6 +1733,13 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1739 } else 1733 } else
1740 continue; 1734 continue;
1741 1735
1736 if (!list_empty(&res->recovering)) {
1737 mlog(0, "%s:%.*s: lockres was "
1738 "marked RECOVERING, owner=%u\n",
1739 dlm->name, res->lockname.len,
1740 res->lockname.name, res->owner);
1741 list_del_init(&res->recovering);
1742 }
1742 spin_lock(&res->spinlock); 1743 spin_lock(&res->spinlock);
1743 dlm_change_lockres_owner(dlm, res, new_master); 1744 dlm_change_lockres_owner(dlm, res, new_master);
1744 res->state &= ~DLM_LOCK_RES_RECOVERING; 1745 res->state &= ~DLM_LOCK_RES_RECOVERING;
@@ -2258,7 +2259,10 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2258 mlog(0, "%u not in domain/live_nodes map " 2259 mlog(0, "%u not in domain/live_nodes map "
2259 "so setting it in reco map manually\n", 2260 "so setting it in reco map manually\n",
2260 br->dead_node); 2261 br->dead_node);
2261 set_bit(br->dead_node, dlm->recovery_map); 2262 /* force the recovery cleanup in __dlm_hb_node_down
2263 * both of these will be cleared in a moment */
2264 set_bit(br->dead_node, dlm->domain_map);
2265 set_bit(br->dead_node, dlm->live_nodes_map);
2262 __dlm_hb_node_down(dlm, br->dead_node); 2266 __dlm_hb_node_down(dlm, br->dead_node);
2263 } 2267 }
2264 spin_unlock(&dlm->spinlock); 2268 spin_unlock(&dlm->spinlock);