aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmrecovery.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmrecovery.c')
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c164
1 files changed, 82 insertions, 82 deletions
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 7efab6d28a21..01ebfd0bdad7 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -362,40 +362,38 @@ static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
362} 362}
363 363
364 364
365int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) 365void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
366{ 366{
367 if (timeout) { 367 if (dlm_is_node_dead(dlm, node))
368 mlog(ML_NOTICE, "%s: waiting %dms for notification of " 368 return;
369 "death of node %u\n", dlm->name, timeout, node); 369
370 printk(KERN_NOTICE "o2dlm: Waiting on the death of node %u in "
371 "domain %s\n", node, dlm->name);
372
373 if (timeout)
370 wait_event_timeout(dlm->dlm_reco_thread_wq, 374 wait_event_timeout(dlm->dlm_reco_thread_wq,
371 dlm_is_node_dead(dlm, node), 375 dlm_is_node_dead(dlm, node),
372 msecs_to_jiffies(timeout)); 376 msecs_to_jiffies(timeout));
373 } else { 377 else
374 mlog(ML_NOTICE, "%s: waiting indefinitely for notification "
375 "of death of node %u\n", dlm->name, node);
376 wait_event(dlm->dlm_reco_thread_wq, 378 wait_event(dlm->dlm_reco_thread_wq,
377 dlm_is_node_dead(dlm, node)); 379 dlm_is_node_dead(dlm, node));
378 }
379 /* for now, return 0 */
380 return 0;
381} 380}
382 381
383int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) 382void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
384{ 383{
385 if (timeout) { 384 if (dlm_is_node_recovered(dlm, node))
386 mlog(0, "%s: waiting %dms for notification of " 385 return;
387 "recovery of node %u\n", dlm->name, timeout, node); 386
387 printk(KERN_NOTICE "o2dlm: Waiting on the recovery of node %u in "
388 "domain %s\n", node, dlm->name);
389
390 if (timeout)
388 wait_event_timeout(dlm->dlm_reco_thread_wq, 391 wait_event_timeout(dlm->dlm_reco_thread_wq,
389 dlm_is_node_recovered(dlm, node), 392 dlm_is_node_recovered(dlm, node),
390 msecs_to_jiffies(timeout)); 393 msecs_to_jiffies(timeout));
391 } else { 394 else
392 mlog(0, "%s: waiting indefinitely for notification "
393 "of recovery of node %u\n", dlm->name, node);
394 wait_event(dlm->dlm_reco_thread_wq, 395 wait_event(dlm->dlm_reco_thread_wq,
395 dlm_is_node_recovered(dlm, node)); 396 dlm_is_node_recovered(dlm, node));
396 }
397 /* for now, return 0 */
398 return 0;
399} 397}
400 398
401/* callers of the top-level api calls (dlmlock/dlmunlock) should 399/* callers of the top-level api calls (dlmlock/dlmunlock) should
@@ -430,6 +428,8 @@ static void dlm_begin_recovery(struct dlm_ctxt *dlm)
430{ 428{
431 spin_lock(&dlm->spinlock); 429 spin_lock(&dlm->spinlock);
432 BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE); 430 BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE);
431 printk(KERN_NOTICE "o2dlm: Begin recovery on domain %s for node %u\n",
432 dlm->name, dlm->reco.dead_node);
433 dlm->reco.state |= DLM_RECO_STATE_ACTIVE; 433 dlm->reco.state |= DLM_RECO_STATE_ACTIVE;
434 spin_unlock(&dlm->spinlock); 434 spin_unlock(&dlm->spinlock);
435} 435}
@@ -440,9 +440,18 @@ static void dlm_end_recovery(struct dlm_ctxt *dlm)
440 BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE)); 440 BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE));
441 dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE; 441 dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE;
442 spin_unlock(&dlm->spinlock); 442 spin_unlock(&dlm->spinlock);
443 printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name);
443 wake_up(&dlm->reco.event); 444 wake_up(&dlm->reco.event);
444} 445}
445 446
447static void dlm_print_recovery_master(struct dlm_ctxt *dlm)
448{
449 printk(KERN_NOTICE "o2dlm: Node %u (%s) is the Recovery Master for the "
450 "dead node %u in domain %s\n", dlm->reco.new_master,
451 (dlm->node_num == dlm->reco.new_master ? "me" : "he"),
452 dlm->reco.dead_node, dlm->name);
453}
454
446static int dlm_do_recovery(struct dlm_ctxt *dlm) 455static int dlm_do_recovery(struct dlm_ctxt *dlm)
447{ 456{
448 int status = 0; 457 int status = 0;
@@ -505,9 +514,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
505 } 514 }
506 mlog(0, "another node will master this recovery session.\n"); 515 mlog(0, "another node will master this recovery session.\n");
507 } 516 }
508 mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n", 517
509 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), dlm->reco.new_master, 518 dlm_print_recovery_master(dlm);
510 dlm->node_num, dlm->reco.dead_node);
511 519
512 /* it is safe to start everything back up here 520 /* it is safe to start everything back up here
513 * because all of the dead node's lock resources 521 * because all of the dead node's lock resources
@@ -518,15 +526,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
518 return 0; 526 return 0;
519 527
520master_here: 528master_here:
521 mlog(ML_NOTICE, "(%d) Node %u is the Recovery Master for the Dead Node " 529 dlm_print_recovery_master(dlm);
522 "%u for Domain %s\n", task_pid_nr(dlm->dlm_reco_thread_task),
523 dlm->node_num, dlm->reco.dead_node, dlm->name);
524 530
525 status = dlm_remaster_locks(dlm, dlm->reco.dead_node); 531 status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
526 if (status < 0) { 532 if (status < 0) {
527 /* we should never hit this anymore */ 533 /* we should never hit this anymore */
528 mlog(ML_ERROR, "error %d remastering locks for node %u, " 534 mlog(ML_ERROR, "%s: Error %d remastering locks for node %u, "
529 "retrying.\n", status, dlm->reco.dead_node); 535 "retrying.\n", dlm->name, status, dlm->reco.dead_node);
530 /* yield a bit to allow any final network messages 536 /* yield a bit to allow any final network messages
531 * to get handled on remaining nodes */ 537 * to get handled on remaining nodes */
532 msleep(100); 538 msleep(100);
@@ -567,7 +573,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
567 BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT); 573 BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT);
568 ndata->state = DLM_RECO_NODE_DATA_REQUESTING; 574 ndata->state = DLM_RECO_NODE_DATA_REQUESTING;
569 575
570 mlog(0, "requesting lock info from node %u\n", 576 mlog(0, "%s: Requesting lock info from node %u\n", dlm->name,
571 ndata->node_num); 577 ndata->node_num);
572 578
573 if (ndata->node_num == dlm->node_num) { 579 if (ndata->node_num == dlm->node_num) {
@@ -640,7 +646,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
640 spin_unlock(&dlm_reco_state_lock); 646 spin_unlock(&dlm_reco_state_lock);
641 } 647 }
642 648
643 mlog(0, "done requesting all lock info\n"); 649 mlog(0, "%s: Done requesting all lock info\n", dlm->name);
644 650
645 /* nodes should be sending reco data now 651 /* nodes should be sending reco data now
646 * just need to wait */ 652 * just need to wait */
@@ -802,10 +808,9 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
802 808
803 /* negative status is handled by caller */ 809 /* negative status is handled by caller */
804 if (ret < 0) 810 if (ret < 0)
805 mlog(ML_ERROR, "Error %d when sending message %u (key " 811 mlog(ML_ERROR, "%s: Error %d send LOCK_REQUEST to node %u "
806 "0x%x) to node %u\n", ret, DLM_LOCK_REQUEST_MSG, 812 "to recover dead node %u\n", dlm->name, ret,
807 dlm->key, request_from); 813 request_from, dead_node);
808
809 // return from here, then 814 // return from here, then
810 // sleep until all received or error 815 // sleep until all received or error
811 return ret; 816 return ret;
@@ -956,9 +961,9 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
956 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, 961 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
957 sizeof(done_msg), send_to, &tmpret); 962 sizeof(done_msg), send_to, &tmpret);
958 if (ret < 0) { 963 if (ret < 0) {
959 mlog(ML_ERROR, "Error %d when sending message %u (key " 964 mlog(ML_ERROR, "%s: Error %d send RECO_DATA_DONE to node %u "
960 "0x%x) to node %u\n", ret, DLM_RECO_DATA_DONE_MSG, 965 "to recover dead node %u\n", dlm->name, ret, send_to,
961 dlm->key, send_to); 966 dead_node);
962 if (!dlm_is_host_down(ret)) { 967 if (!dlm_is_host_down(ret)) {
963 BUG(); 968 BUG();
964 } 969 }
@@ -1127,9 +1132,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
1127 if (ret < 0) { 1132 if (ret < 0) {
1128 /* XXX: negative status is not handled. 1133 /* XXX: negative status is not handled.
1129 * this will end up killing this node. */ 1134 * this will end up killing this node. */
1130 mlog(ML_ERROR, "Error %d when sending message %u (key " 1135 mlog(ML_ERROR, "%s: res %.*s, Error %d send MIG_LOCKRES to "
1131 "0x%x) to node %u\n", ret, DLM_MIG_LOCKRES_MSG, 1136 "node %u (%s)\n", dlm->name, mres->lockname_len,
1132 dlm->key, send_to); 1137 mres->lockname, ret, send_to,
1138 (orig_flags & DLM_MRES_MIGRATION ?
1139 "migration" : "recovery"));
1133 } else { 1140 } else {
1134 /* might get an -ENOMEM back here */ 1141 /* might get an -ENOMEM back here */
1135 ret = status; 1142 ret = status;
@@ -1767,7 +1774,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1767 dlm->name, mres->lockname_len, mres->lockname, 1774 dlm->name, mres->lockname_len, mres->lockname,
1768 from); 1775 from);
1769 spin_lock(&res->spinlock); 1776 spin_lock(&res->spinlock);
1770 dlm_lockres_set_refmap_bit(from, res); 1777 dlm_lockres_set_refmap_bit(dlm, res, from);
1771 spin_unlock(&res->spinlock); 1778 spin_unlock(&res->spinlock);
1772 added++; 1779 added++;
1773 break; 1780 break;
@@ -1965,7 +1972,7 @@ skip_lvb:
1965 mlog(0, "%s:%.*s: added lock for node %u, " 1972 mlog(0, "%s:%.*s: added lock for node %u, "
1966 "setting refmap bit\n", dlm->name, 1973 "setting refmap bit\n", dlm->name,
1967 res->lockname.len, res->lockname.name, ml->node); 1974 res->lockname.len, res->lockname.name, ml->node);
1968 dlm_lockres_set_refmap_bit(ml->node, res); 1975 dlm_lockres_set_refmap_bit(dlm, res, ml->node);
1969 added++; 1976 added++;
1970 } 1977 }
1971 spin_unlock(&res->spinlock); 1978 spin_unlock(&res->spinlock);
@@ -2084,6 +2091,9 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
2084 2091
2085 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { 2092 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
2086 if (res->owner == dead_node) { 2093 if (res->owner == dead_node) {
2094 mlog(0, "%s: res %.*s, Changing owner from %u to %u\n",
2095 dlm->name, res->lockname.len, res->lockname.name,
2096 res->owner, new_master);
2087 list_del_init(&res->recovering); 2097 list_del_init(&res->recovering);
2088 spin_lock(&res->spinlock); 2098 spin_lock(&res->spinlock);
2089 /* new_master has our reference from 2099 /* new_master has our reference from
@@ -2105,40 +2115,30 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
2105 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 2115 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
2106 bucket = dlm_lockres_hash(dlm, i); 2116 bucket = dlm_lockres_hash(dlm, i);
2107 hlist_for_each_entry(res, hash_iter, bucket, hash_node) { 2117 hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
2108 if (res->state & DLM_LOCK_RES_RECOVERING) { 2118 if (!(res->state & DLM_LOCK_RES_RECOVERING))
2109 if (res->owner == dead_node) { 2119 continue;
2110 mlog(0, "(this=%u) res %.*s owner=%u "
2111 "was not on recovering list, but "
2112 "clearing state anyway\n",
2113 dlm->node_num, res->lockname.len,
2114 res->lockname.name, new_master);
2115 } else if (res->owner == dlm->node_num) {
2116 mlog(0, "(this=%u) res %.*s owner=%u "
2117 "was not on recovering list, "
2118 "owner is THIS node, clearing\n",
2119 dlm->node_num, res->lockname.len,
2120 res->lockname.name, new_master);
2121 } else
2122 continue;
2123 2120
2124 if (!list_empty(&res->recovering)) { 2121 if (res->owner != dead_node &&
2125 mlog(0, "%s:%.*s: lockres was " 2122 res->owner != dlm->node_num)
2126 "marked RECOVERING, owner=%u\n", 2123 continue;
2127 dlm->name, res->lockname.len, 2124
2128 res->lockname.name, res->owner); 2125 if (!list_empty(&res->recovering)) {
2129 list_del_init(&res->recovering); 2126 list_del_init(&res->recovering);
2130 dlm_lockres_put(res); 2127 dlm_lockres_put(res);
2131 }
2132 spin_lock(&res->spinlock);
2133 /* new_master has our reference from
2134 * the lock state sent during recovery */
2135 dlm_change_lockres_owner(dlm, res, new_master);
2136 res->state &= ~DLM_LOCK_RES_RECOVERING;
2137 if (__dlm_lockres_has_locks(res))
2138 __dlm_dirty_lockres(dlm, res);
2139 spin_unlock(&res->spinlock);
2140 wake_up(&res->wq);
2141 } 2128 }
2129
2130 /* new_master has our reference from
2131 * the lock state sent during recovery */
2132 mlog(0, "%s: res %.*s, Changing owner from %u to %u\n",
2133 dlm->name, res->lockname.len, res->lockname.name,
2134 res->owner, new_master);
2135 spin_lock(&res->spinlock);
2136 dlm_change_lockres_owner(dlm, res, new_master);
2137 res->state &= ~DLM_LOCK_RES_RECOVERING;
2138 if (__dlm_lockres_has_locks(res))
2139 __dlm_dirty_lockres(dlm, res);
2140 spin_unlock(&res->spinlock);
2141 wake_up(&res->wq);
2142 } 2142 }
2143 } 2143 }
2144} 2144}
@@ -2252,12 +2252,12 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2252 res->lockname.len, res->lockname.name, freed, dead_node); 2252 res->lockname.len, res->lockname.name, freed, dead_node);
2253 __dlm_print_one_lock_resource(res); 2253 __dlm_print_one_lock_resource(res);
2254 } 2254 }
2255 dlm_lockres_clear_refmap_bit(dead_node, res); 2255 dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
2256 } else if (test_bit(dead_node, res->refmap)) { 2256 } else if (test_bit(dead_node, res->refmap)) {
2257 mlog(0, "%s:%.*s: dead node %u had a ref, but had " 2257 mlog(0, "%s:%.*s: dead node %u had a ref, but had "
2258 "no locks and had not purged before dying\n", dlm->name, 2258 "no locks and had not purged before dying\n", dlm->name,
2259 res->lockname.len, res->lockname.name, dead_node); 2259 res->lockname.len, res->lockname.name, dead_node);
2260 dlm_lockres_clear_refmap_bit(dead_node, res); 2260 dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
2261 } 2261 }
2262 2262
2263 /* do not kick thread yet */ 2263 /* do not kick thread yet */
@@ -2324,9 +2324,9 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
2324 dlm_revalidate_lvb(dlm, res, dead_node); 2324 dlm_revalidate_lvb(dlm, res, dead_node);
2325 if (res->owner == dead_node) { 2325 if (res->owner == dead_node) {
2326 if (res->state & DLM_LOCK_RES_DROPPING_REF) { 2326 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
2327 mlog(ML_NOTICE, "Ignore %.*s for " 2327 mlog(ML_NOTICE, "%s: res %.*s, Skip "
2328 "recovery as it is being freed\n", 2328 "recovery as it is being freed\n",
2329 res->lockname.len, 2329 dlm->name, res->lockname.len,
2330 res->lockname.name); 2330 res->lockname.name);
2331 } else 2331 } else
2332 dlm_move_lockres_to_recovery_list(dlm, 2332 dlm_move_lockres_to_recovery_list(dlm,