aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmrecovery.c
diff options
context:
space:
mode:
authorKurt Hackel <kurt.hackel@oracle.com>2006-01-12 17:24:55 -0500
committerMark Fasheh <mark.fasheh@oracle.com>2006-02-03 16:47:20 -0500
commite2faea4ce340f199c1957986c4c3dc2de76f5746 (patch)
tree2336b06cf270b3cff2ff39ba75fc67639dc63df9 /fs/ocfs2/dlm/dlmrecovery.c
parent0d419a6a95ee158675aa184c6c3e476b22d02145 (diff)
[PATCH] ocfs2/dlm: fixes
* fix a hang which can occur during shutdown migration * do not allow nodes to join during recovery * when restarting lock mastery, do not ignore nodes which come up * more than one node could become recovery master, fix this * sleep to allow some time for heartbeat state to catch up to network * extra debug info for bad recovery state problems * make DLM_RECO_NODE_DATA_DONE a valid state for non-master recovery nodes * prune all locks from dead nodes on $RECOVERY lock resources * do NOT automatically add new nodes to mle nodemaps until they have properly joined the domain * make sure dlm_pick_recovery_master only exits when all nodes have synced * properly handle dlmunlock errors in dlm_pick_recovery_master * do not propagate network errors in dlm_send_begin_reco_message * dead nodes were not being put in the recovery map sometimes, fix this * dlmunlock was failing to clear the unlock actions on DLM_DENIED Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com> Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
Diffstat (limited to 'fs/ocfs2/dlm/dlmrecovery.c')
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c249
1 files changed, 212 insertions, 37 deletions
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 0c8eb1093f00..325c9f5529c1 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -256,6 +256,27 @@ static int dlm_recovery_thread(void *data)
256 return 0; 256 return 0;
257} 257}
258 258
259/* returns true when the recovery master has contacted us */
260static int dlm_reco_master_ready(struct dlm_ctxt *dlm)
261{
262 int ready;
263 spin_lock(&dlm->spinlock);
264 ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM);
265 spin_unlock(&dlm->spinlock);
266 return ready;
267}
268
269/* returns true if node is no longer in the domain
270 * could be dead or just not joined */
271int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
272{
273 int dead;
274 spin_lock(&dlm->spinlock);
275 dead = test_bit(node, dlm->domain_map);
276 spin_unlock(&dlm->spinlock);
277 return dead;
278}
279
259/* callers of the top-level api calls (dlmlock/dlmunlock) should 280/* callers of the top-level api calls (dlmlock/dlmunlock) should
260 * block on the dlm->reco.event when recovery is in progress. 281 * block on the dlm->reco.event when recovery is in progress.
261 * the dlm recovery thread will set this state when it begins 282 * the dlm recovery thread will set this state when it begins
@@ -297,6 +318,7 @@ static void dlm_end_recovery(struct dlm_ctxt *dlm)
297static int dlm_do_recovery(struct dlm_ctxt *dlm) 318static int dlm_do_recovery(struct dlm_ctxt *dlm)
298{ 319{
299 int status = 0; 320 int status = 0;
321 int ret;
300 322
301 spin_lock(&dlm->spinlock); 323 spin_lock(&dlm->spinlock);
302 324
@@ -343,10 +365,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
343 goto master_here; 365 goto master_here;
344 366
345 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { 367 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
346 /* choose a new master */ 368 /* choose a new master, returns 0 if this node
347 if (!dlm_pick_recovery_master(dlm)) { 369 * is the master, -EEXIST if it's another node.
370 * this does not return until a new master is chosen
371 * or recovery completes entirely. */
372 ret = dlm_pick_recovery_master(dlm);
373 if (!ret) {
348 /* already notified everyone. go. */ 374 /* already notified everyone. go. */
349 dlm->reco.new_master = dlm->node_num;
350 goto master_here; 375 goto master_here;
351 } 376 }
352 mlog(0, "another node will master this recovery session.\n"); 377 mlog(0, "another node will master this recovery session.\n");
@@ -371,8 +396,13 @@ master_here:
371 if (status < 0) { 396 if (status < 0) {
372 mlog(ML_ERROR, "error %d remastering locks for node %u, " 397 mlog(ML_ERROR, "error %d remastering locks for node %u, "
373 "retrying.\n", status, dlm->reco.dead_node); 398 "retrying.\n", status, dlm->reco.dead_node);
399 /* yield a bit to allow any final network messages
400 * to get handled on remaining nodes */
401 msleep(100);
374 } else { 402 } else {
375 /* success! see if any other nodes need recovery */ 403 /* success! see if any other nodes need recovery */
404 mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n",
405 dlm->name, dlm->reco.dead_node, dlm->node_num);
376 dlm_reset_recovery(dlm); 406 dlm_reset_recovery(dlm);
377 } 407 }
378 dlm_end_recovery(dlm); 408 dlm_end_recovery(dlm);
@@ -477,7 +507,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
477 BUG(); 507 BUG();
478 break; 508 break;
479 case DLM_RECO_NODE_DATA_DEAD: 509 case DLM_RECO_NODE_DATA_DEAD:
480 mlog(0, "node %u died after " 510 mlog(ML_NOTICE, "node %u died after "
481 "requesting recovery info for " 511 "requesting recovery info for "
482 "node %u\n", ndata->node_num, 512 "node %u\n", ndata->node_num,
483 dead_node); 513 dead_node);
@@ -485,6 +515,19 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
485 // start all over 515 // start all over
486 destroy = 1; 516 destroy = 1;
487 status = -EAGAIN; 517 status = -EAGAIN;
518 /* instead of spinning like crazy here,
519 * wait for the domain map to catch up
520 * with the network state. otherwise this
521 * can be hit hundreds of times before
522 * the node is really seen as dead. */
523 wait_event_timeout(dlm->dlm_reco_thread_wq,
524 dlm_is_node_dead(dlm,
525 ndata->node_num),
526 msecs_to_jiffies(1000));
527 mlog(0, "waited 1 sec for %u, "
528 "dead? %s\n", ndata->node_num,
529 dlm_is_node_dead(dlm, ndata->node_num) ?
530 "yes" : "no");
488 goto leave; 531 goto leave;
489 case DLM_RECO_NODE_DATA_RECEIVING: 532 case DLM_RECO_NODE_DATA_RECEIVING:
490 case DLM_RECO_NODE_DATA_REQUESTED: 533 case DLM_RECO_NODE_DATA_REQUESTED:
@@ -678,11 +721,27 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
678 dlm = item->dlm; 721 dlm = item->dlm;
679 dead_node = item->u.ral.dead_node; 722 dead_node = item->u.ral.dead_node;
680 reco_master = item->u.ral.reco_master; 723 reco_master = item->u.ral.reco_master;
724 mres = (struct dlm_migratable_lockres *)data;
725
726 if (dead_node != dlm->reco.dead_node ||
727 reco_master != dlm->reco.new_master) {
728 /* show extra debug info if the recovery state is messed */
729 mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), "
730 "request(dead=%u, master=%u)\n",
731 dlm->name, dlm->reco.dead_node, dlm->reco.new_master,
732 dead_node, reco_master);
733 mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u "
734 "entry[0]={c=%"MLFu64",l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n",
735 dlm->name, mres->lockname_len, mres->lockname, mres->master,
736 mres->num_locks, mres->total_locks, mres->flags,
737 mres->ml[0].cookie, mres->ml[0].list, mres->ml[0].flags,
738 mres->ml[0].type, mres->ml[0].convert_type,
739 mres->ml[0].highest_blocked, mres->ml[0].node);
740 BUG();
741 }
681 BUG_ON(dead_node != dlm->reco.dead_node); 742 BUG_ON(dead_node != dlm->reco.dead_node);
682 BUG_ON(reco_master != dlm->reco.new_master); 743 BUG_ON(reco_master != dlm->reco.new_master);
683 744
684 mres = (struct dlm_migratable_lockres *)data;
685
686 /* lock resources should have already been moved to the 745 /* lock resources should have already been moved to the
687 * dlm->reco.resources list. now move items from that list 746 * dlm->reco.resources list. now move items from that list
688 * to a temp list if the dead owner matches. note that the 747 * to a temp list if the dead owner matches. note that the
@@ -757,15 +816,18 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
757 continue; 816 continue;
758 817
759 switch (ndata->state) { 818 switch (ndata->state) {
819 /* should have moved beyond INIT but not to FINALIZE yet */
760 case DLM_RECO_NODE_DATA_INIT: 820 case DLM_RECO_NODE_DATA_INIT:
761 case DLM_RECO_NODE_DATA_DEAD: 821 case DLM_RECO_NODE_DATA_DEAD:
762 case DLM_RECO_NODE_DATA_DONE:
763 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 822 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
764 mlog(ML_ERROR, "bad ndata state for node %u:" 823 mlog(ML_ERROR, "bad ndata state for node %u:"
765 " state=%d\n", ndata->node_num, 824 " state=%d\n", ndata->node_num,
766 ndata->state); 825 ndata->state);
767 BUG(); 826 BUG();
768 break; 827 break;
828 /* these states are possible at this point, anywhere along
829 * the line of recovery */
830 case DLM_RECO_NODE_DATA_DONE:
769 case DLM_RECO_NODE_DATA_RECEIVING: 831 case DLM_RECO_NODE_DATA_RECEIVING:
770 case DLM_RECO_NODE_DATA_REQUESTED: 832 case DLM_RECO_NODE_DATA_REQUESTED:
771 case DLM_RECO_NODE_DATA_REQUESTING: 833 case DLM_RECO_NODE_DATA_REQUESTING:
@@ -799,13 +861,31 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
799{ 861{
800 struct dlm_lock_resource *res; 862 struct dlm_lock_resource *res;
801 struct list_head *iter, *iter2; 863 struct list_head *iter, *iter2;
864 struct dlm_lock *lock;
802 865
803 spin_lock(&dlm->spinlock); 866 spin_lock(&dlm->spinlock);
804 list_for_each_safe(iter, iter2, &dlm->reco.resources) { 867 list_for_each_safe(iter, iter2, &dlm->reco.resources) {
805 res = list_entry (iter, struct dlm_lock_resource, recovering); 868 res = list_entry (iter, struct dlm_lock_resource, recovering);
869 /* always prune any $RECOVERY entries for dead nodes,
870 * otherwise hangs can occur during later recovery */
806 if (dlm_is_recovery_lock(res->lockname.name, 871 if (dlm_is_recovery_lock(res->lockname.name,
807 res->lockname.len)) 872 res->lockname.len)) {
873 spin_lock(&res->spinlock);
874 list_for_each_entry(lock, &res->granted, list) {
875 if (lock->ml.node == dead_node) {
876 mlog(0, "AHA! there was "
877 "a $RECOVERY lock for dead "
878 "node %u (%s)!\n",
879 dead_node, dlm->name);
880 list_del_init(&lock->list);
881 dlm_lock_put(lock);
882 break;
883 }
884 }
885 spin_unlock(&res->spinlock);
808 continue; 886 continue;
887 }
888
809 if (res->owner == dead_node) { 889 if (res->owner == dead_node) {
810 mlog(0, "found lockres owned by dead node while " 890 mlog(0, "found lockres owned by dead node while "
811 "doing recovery for node %u. sending it.\n", 891 "doing recovery for node %u. sending it.\n",
@@ -1179,7 +1259,7 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data)
1179again: 1259again:
1180 ret = dlm_lockres_master_requery(dlm, res, &real_master); 1260 ret = dlm_lockres_master_requery(dlm, res, &real_master);
1181 if (ret < 0) { 1261 if (ret < 0) {
1182 mlog(0, "dlm_lockres_master_requery failure: %d\n", 1262 mlog(0, "dlm_lockres_master_requery ret=%d\n",
1183 ret); 1263 ret);
1184 goto again; 1264 goto again;
1185 } 1265 }
@@ -1757,6 +1837,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
1757 struct dlm_lock_resource *res; 1837 struct dlm_lock_resource *res;
1758 int i; 1838 int i;
1759 struct list_head *bucket; 1839 struct list_head *bucket;
1840 struct dlm_lock *lock;
1760 1841
1761 1842
1762 /* purge any stale mles */ 1843 /* purge any stale mles */
@@ -1780,10 +1861,25 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
1780 bucket = &(dlm->resources[i]); 1861 bucket = &(dlm->resources[i]);
1781 list_for_each(iter, bucket) { 1862 list_for_each(iter, bucket) {
1782 res = list_entry (iter, struct dlm_lock_resource, list); 1863 res = list_entry (iter, struct dlm_lock_resource, list);
1864 /* always prune any $RECOVERY entries for dead nodes,
1865 * otherwise hangs can occur during later recovery */
1783 if (dlm_is_recovery_lock(res->lockname.name, 1866 if (dlm_is_recovery_lock(res->lockname.name,
1784 res->lockname.len)) 1867 res->lockname.len)) {
1868 spin_lock(&res->spinlock);
1869 list_for_each_entry(lock, &res->granted, list) {
1870 if (lock->ml.node == dead_node) {
1871 mlog(0, "AHA! there was "
1872 "a $RECOVERY lock for dead "
1873 "node %u (%s)!\n",
1874 dead_node, dlm->name);
1875 list_del_init(&lock->list);
1876 dlm_lock_put(lock);
1877 break;
1878 }
1879 }
1880 spin_unlock(&res->spinlock);
1785 continue; 1881 continue;
1786 1882 }
1787 spin_lock(&res->spinlock); 1883 spin_lock(&res->spinlock);
1788 /* zero the lvb if necessary */ 1884 /* zero the lvb if necessary */
1789 dlm_revalidate_lvb(dlm, res, dead_node); 1885 dlm_revalidate_lvb(dlm, res, dead_node);
@@ -1869,12 +1965,9 @@ void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data)
1869 return; 1965 return;
1870 1966
1871 spin_lock(&dlm->spinlock); 1967 spin_lock(&dlm->spinlock);
1872
1873 set_bit(idx, dlm->live_nodes_map); 1968 set_bit(idx, dlm->live_nodes_map);
1874 1969 /* do NOT notify mle attached to the heartbeat events.
1875 /* notify any mles attached to the heartbeat events */ 1970 * new nodes are not interesting in mastery until joined. */
1876 dlm_hb_event_notify_attached(dlm, idx, 1);
1877
1878 spin_unlock(&dlm->spinlock); 1971 spin_unlock(&dlm->spinlock);
1879 1972
1880 dlm_put(dlm); 1973 dlm_put(dlm);
@@ -1897,7 +1990,18 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st)
1897 mlog(0, "unlockast for recovery lock fired!\n"); 1990 mlog(0, "unlockast for recovery lock fired!\n");
1898} 1991}
1899 1992
1900 1993/*
1994 * dlm_pick_recovery_master will continually attempt to use
1995 * dlmlock() on the special "$RECOVERY" lockres with the
1996 * LKM_NOQUEUE flag to get an EX. every thread that enters
1997 * this function on each node racing to become the recovery
1998 * master will not stop attempting this until either:
1999 * a) this node gets the EX (and becomes the recovery master),
2000 * or b) dlm->reco.new_master gets set to some nodenum
2001 * != O2NM_INVALID_NODE_NUM (another node will do the reco).
2002 * so each time a recovery master is needed, the entire cluster
2003 * will sync at this point. if the new master dies, that will
2004 * be detected in dlm_do_recovery */
1901static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) 2005static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
1902{ 2006{
1903 enum dlm_status ret; 2007 enum dlm_status ret;
@@ -1906,23 +2010,45 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
1906 2010
1907 mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n", 2011 mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",
1908 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); 2012 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
1909retry: 2013again:
1910 memset(&lksb, 0, sizeof(lksb)); 2014 memset(&lksb, 0, sizeof(lksb));
1911 2015
1912 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, 2016 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
1913 DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast); 2017 DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
1914 2018
2019 mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
2020 dlm->name, ret, lksb.status);
2021
1915 if (ret == DLM_NORMAL) { 2022 if (ret == DLM_NORMAL) {
1916 mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n", 2023 mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
1917 dlm->name, dlm->node_num); 2024 dlm->name, dlm->node_num);
1918 /* I am master, send message to all nodes saying 2025
1919 * that I am beginning a recovery session */ 2026 /* got the EX lock. check to see if another node
1920 status = dlm_send_begin_reco_message(dlm, 2027 * just became the reco master */
1921 dlm->reco.dead_node); 2028 if (dlm_reco_master_ready(dlm)) {
2029 mlog(0, "%s: got reco EX lock, but %u will "
2030 "do the recovery\n", dlm->name,
2031 dlm->reco.new_master);
2032 status = -EEXIST;
2033 } else {
2034 status = dlm_send_begin_reco_message(dlm,
2035 dlm->reco.dead_node);
2036 /* this always succeeds */
2037 BUG_ON(status);
2038
2039 /* set the new_master to this node */
2040 spin_lock(&dlm->spinlock);
2041 dlm->reco.new_master = dlm->node_num;
2042 spin_unlock(&dlm->spinlock);
2043 }
1922 2044
1923 /* recovery lock is a special case. ast will not get fired, 2045 /* recovery lock is a special case. ast will not get fired,
1924 * so just go ahead and unlock it. */ 2046 * so just go ahead and unlock it. */
1925 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm); 2047 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm);
2048 if (ret == DLM_DENIED) {
2049 mlog(0, "got DLM_DENIED, trying LKM_CANCEL\n");
2050 ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm);
2051 }
1926 if (ret != DLM_NORMAL) { 2052 if (ret != DLM_NORMAL) {
1927 /* this would really suck. this could only happen 2053 /* this would really suck. this could only happen
1928 * if there was a network error during the unlock 2054 * if there was a network error during the unlock
@@ -1930,20 +2056,42 @@ retry:
1930 * is actually "done" and the lock structure is 2056 * is actually "done" and the lock structure is
1931 * even freed. we can continue, but only 2057 * even freed. we can continue, but only
1932 * because this specific lock name is special. */ 2058 * because this specific lock name is special. */
1933 mlog(0, "dlmunlock returned %d\n", ret); 2059 mlog(ML_ERROR, "dlmunlock returned %d\n", ret);
1934 }
1935
1936 if (status < 0) {
1937 mlog(0, "failed to send recovery message. "
1938 "must retry with new node map.\n");
1939 goto retry;
1940 } 2060 }
1941 } else if (ret == DLM_NOTQUEUED) { 2061 } else if (ret == DLM_NOTQUEUED) {
1942 mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n", 2062 mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
1943 dlm->name, dlm->node_num); 2063 dlm->name, dlm->node_num);
1944 /* another node is master. wait on 2064 /* another node is master. wait on
1945 * reco.new_master != O2NM_INVALID_NODE_NUM */ 2065 * reco.new_master != O2NM_INVALID_NODE_NUM
2066 * for at most one second */
2067 wait_event_timeout(dlm->dlm_reco_thread_wq,
2068 dlm_reco_master_ready(dlm),
2069 msecs_to_jiffies(1000));
2070 if (!dlm_reco_master_ready(dlm)) {
2071 mlog(0, "%s: reco master taking awhile\n",
2072 dlm->name);
2073 goto again;
2074 }
2075 /* another node has informed this one that it is reco master */
2076 mlog(0, "%s: reco master %u is ready to recover %u\n",
2077 dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
1946 status = -EEXIST; 2078 status = -EEXIST;
2079 } else {
2080 struct dlm_lock_resource *res;
2081
2082 /* dlmlock returned something other than NOTQUEUED or NORMAL */
2083 mlog(ML_ERROR, "%s: got %s from dlmlock($RECOVERY), "
2084 "lksb.status=%s\n", dlm->name, dlm_errname(ret),
2085 dlm_errname(lksb.status));
2086 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,
2087 DLM_RECOVERY_LOCK_NAME_LEN);
2088 if (res) {
2089 dlm_print_one_lock_resource(res);
2090 dlm_lockres_put(res);
2091 } else {
2092 mlog(ML_ERROR, "recovery lock not found\n");
2093 }
2094 BUG();
1947 } 2095 }
1948 2096
1949 return status; 2097 return status;
@@ -1982,7 +2130,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
1982 mlog(0, "not sending begin reco to self\n"); 2130 mlog(0, "not sending begin reco to self\n");
1983 continue; 2131 continue;
1984 } 2132 }
1985 2133retry:
1986 ret = -EINVAL; 2134 ret = -EINVAL;
1987 mlog(0, "attempting to send begin reco msg to %d\n", 2135 mlog(0, "attempting to send begin reco msg to %d\n",
1988 nodenum); 2136 nodenum);
@@ -1991,8 +2139,17 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
1991 /* negative status is handled ok by caller here */ 2139 /* negative status is handled ok by caller here */
1992 if (ret >= 0) 2140 if (ret >= 0)
1993 ret = status; 2141 ret = status;
2142 if (dlm_is_host_down(ret)) {
2143 /* node is down. not involved in recovery
2144 * so just keep going */
2145 mlog(0, "%s: node %u was down when sending "
2146 "begin reco msg (%d)\n", dlm->name, nodenum, ret);
2147 ret = 0;
2148 }
1994 if (ret < 0) { 2149 if (ret < 0) {
1995 struct dlm_lock_resource *res; 2150 struct dlm_lock_resource *res;
2151 /* this is now a serious problem, possibly ENOMEM
2152 * in the network stack. must retry */
1996 mlog_errno(ret); 2153 mlog_errno(ret);
1997 mlog(ML_ERROR, "begin reco of dlm %s to node %u " 2154 mlog(ML_ERROR, "begin reco of dlm %s to node %u "
1998 " returned %d\n", dlm->name, nodenum, ret); 2155 " returned %d\n", dlm->name, nodenum, ret);
@@ -2004,7 +2161,10 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
2004 } else { 2161 } else {
2005 mlog(ML_ERROR, "recovery lock not found\n"); 2162 mlog(ML_ERROR, "recovery lock not found\n");
2006 } 2163 }
2007 break; 2164 /* sleep for a bit in hopes that we can avoid
2165 * another ENOMEM */
2166 msleep(100);
2167 goto retry;
2008 } 2168 }
2009 } 2169 }
2010 2170
@@ -2027,19 +2187,34 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2027 2187
2028 spin_lock(&dlm->spinlock); 2188 spin_lock(&dlm->spinlock);
2029 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { 2189 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
2030 mlog(0, "new_master already set to %u!\n", 2190 if (test_bit(dlm->reco.new_master, dlm->recovery_map)) {
2031 dlm->reco.new_master); 2191 mlog(0, "%s: new_master %u died, changing "
2192 "to %u\n", dlm->name, dlm->reco.new_master,
2193 br->node_idx);
2194 } else {
2195 mlog(0, "%s: new_master %u NOT DEAD, changing "
2196 "to %u\n", dlm->name, dlm->reco.new_master,
2197 br->node_idx);
2198 /* may not have seen the new master as dead yet */
2199 }
2032 } 2200 }
2033 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) { 2201 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) {
2034 mlog(0, "dead_node already set to %u!\n", 2202 mlog(ML_NOTICE, "%s: dead_node previously set to %u, "
2035 dlm->reco.dead_node); 2203 "node %u changing it to %u\n", dlm->name,
2204 dlm->reco.dead_node, br->node_idx, br->dead_node);
2036 } 2205 }
2037 dlm->reco.new_master = br->node_idx; 2206 dlm->reco.new_master = br->node_idx;
2038 dlm->reco.dead_node = br->dead_node; 2207 dlm->reco.dead_node = br->dead_node;
2039 if (!test_bit(br->dead_node, dlm->recovery_map)) { 2208 if (!test_bit(br->dead_node, dlm->recovery_map)) {
2040 mlog(ML_ERROR, "recovery master %u sees %u as dead, but this " 2209 mlog(0, "recovery master %u sees %u as dead, but this "
2041 "node has not yet. marking %u as dead\n", 2210 "node has not yet. marking %u as dead\n",
2042 br->node_idx, br->dead_node, br->dead_node); 2211 br->node_idx, br->dead_node, br->dead_node);
2212 if (!test_bit(br->dead_node, dlm->domain_map) ||
2213 !test_bit(br->dead_node, dlm->live_nodes_map))
2214 mlog(0, "%u not in domain/live_nodes map "
2215 "so setting it in reco map manually\n",
2216 br->dead_node);
2217 set_bit(br->dead_node, dlm->recovery_map);
2043 __dlm_hb_node_down(dlm, br->dead_node); 2218 __dlm_hb_node_down(dlm, br->dead_node);
2044 } 2219 }
2045 spin_unlock(&dlm->spinlock); 2220 spin_unlock(&dlm->spinlock);