aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmrecovery.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmrecovery.c')
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c593
1 files changed, 454 insertions, 139 deletions
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 805cbabac051..29b2845f370d 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -98,8 +98,8 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data);
98 98
99static u64 dlm_get_next_mig_cookie(void); 99static u64 dlm_get_next_mig_cookie(void);
100 100
101static spinlock_t dlm_reco_state_lock = SPIN_LOCK_UNLOCKED; 101static DEFINE_SPINLOCK(dlm_reco_state_lock);
102static spinlock_t dlm_mig_cookie_lock = SPIN_LOCK_UNLOCKED; 102static DEFINE_SPINLOCK(dlm_mig_cookie_lock);
103static u64 dlm_mig_cookie = 1; 103static u64 dlm_mig_cookie = 1;
104 104
105static u64 dlm_get_next_mig_cookie(void) 105static u64 dlm_get_next_mig_cookie(void)
@@ -115,12 +115,37 @@ static u64 dlm_get_next_mig_cookie(void)
115 return c; 115 return c;
116} 116}
117 117
118static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm,
119 u8 dead_node)
120{
121 assert_spin_locked(&dlm->spinlock);
122 if (dlm->reco.dead_node != dead_node)
123 mlog(0, "%s: changing dead_node from %u to %u\n",
124 dlm->name, dlm->reco.dead_node, dead_node);
125 dlm->reco.dead_node = dead_node;
126}
127
128static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
129 u8 master)
130{
131 assert_spin_locked(&dlm->spinlock);
132 mlog(0, "%s: changing new_master from %u to %u\n",
133 dlm->name, dlm->reco.new_master, master);
134 dlm->reco.new_master = master;
135}
136
137static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm)
138{
139 assert_spin_locked(&dlm->spinlock);
140 clear_bit(dlm->reco.dead_node, dlm->recovery_map);
141 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
142 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
143}
144
118static inline void dlm_reset_recovery(struct dlm_ctxt *dlm) 145static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
119{ 146{
120 spin_lock(&dlm->spinlock); 147 spin_lock(&dlm->spinlock);
121 clear_bit(dlm->reco.dead_node, dlm->recovery_map); 148 __dlm_reset_recovery(dlm);
122 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
123 dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
124 spin_unlock(&dlm->spinlock); 149 spin_unlock(&dlm->spinlock);
125} 150}
126 151
@@ -132,12 +157,21 @@ void dlm_dispatch_work(void *data)
132 struct list_head *iter, *iter2; 157 struct list_head *iter, *iter2;
133 struct dlm_work_item *item; 158 struct dlm_work_item *item;
134 dlm_workfunc_t *workfunc; 159 dlm_workfunc_t *workfunc;
160 int tot=0;
161
162 if (!dlm_joined(dlm))
163 return;
135 164
136 spin_lock(&dlm->work_lock); 165 spin_lock(&dlm->work_lock);
137 list_splice_init(&dlm->work_list, &tmp_list); 166 list_splice_init(&dlm->work_list, &tmp_list);
138 spin_unlock(&dlm->work_lock); 167 spin_unlock(&dlm->work_lock);
139 168
140 list_for_each_safe(iter, iter2, &tmp_list) { 169 list_for_each_safe(iter, iter2, &tmp_list) {
170 tot++;
171 }
172 mlog(0, "%s: work thread has %d work items\n", dlm->name, tot);
173
174 list_for_each_safe(iter, iter2, &tmp_list) {
141 item = list_entry(iter, struct dlm_work_item, list); 175 item = list_entry(iter, struct dlm_work_item, list);
142 workfunc = item->func; 176 workfunc = item->func;
143 list_del_init(&item->list); 177 list_del_init(&item->list);
@@ -220,6 +254,52 @@ void dlm_complete_recovery_thread(struct dlm_ctxt *dlm)
220 * 254 *
221 */ 255 */
222 256
257static void dlm_print_reco_node_status(struct dlm_ctxt *dlm)
258{
259 struct dlm_reco_node_data *ndata;
260 struct dlm_lock_resource *res;
261
262 mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n",
263 dlm->name, dlm->dlm_reco_thread_task->pid,
264 dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive",
265 dlm->reco.dead_node, dlm->reco.new_master);
266
267 list_for_each_entry(ndata, &dlm->reco.node_data, list) {
268 char *st = "unknown";
269 switch (ndata->state) {
270 case DLM_RECO_NODE_DATA_INIT:
271 st = "init";
272 break;
273 case DLM_RECO_NODE_DATA_REQUESTING:
274 st = "requesting";
275 break;
276 case DLM_RECO_NODE_DATA_DEAD:
277 st = "dead";
278 break;
279 case DLM_RECO_NODE_DATA_RECEIVING:
280 st = "receiving";
281 break;
282 case DLM_RECO_NODE_DATA_REQUESTED:
283 st = "requested";
284 break;
285 case DLM_RECO_NODE_DATA_DONE:
286 st = "done";
287 break;
288 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
289 st = "finalize-sent";
290 break;
291 default:
292 st = "bad";
293 break;
294 }
295 mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n",
296 dlm->name, ndata->node_num, st);
297 }
298 list_for_each_entry(res, &dlm->reco.resources, recovering) {
299 mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n",
300 dlm->name, res->lockname.len, res->lockname.name);
301 }
302}
223 303
224#define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000) 304#define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000)
225 305
@@ -267,11 +347,23 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
267{ 347{
268 int dead; 348 int dead;
269 spin_lock(&dlm->spinlock); 349 spin_lock(&dlm->spinlock);
270 dead = test_bit(node, dlm->domain_map); 350 dead = !test_bit(node, dlm->domain_map);
271 spin_unlock(&dlm->spinlock); 351 spin_unlock(&dlm->spinlock);
272 return dead; 352 return dead;
273} 353}
274 354
355/* returns true if node is no longer in the domain
356 * could be dead or just not joined */
357static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
358{
359 int recovered;
360 spin_lock(&dlm->spinlock);
361 recovered = !test_bit(node, dlm->recovery_map);
362 spin_unlock(&dlm->spinlock);
363 return recovered;
364}
365
366
275int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) 367int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
276{ 368{
277 if (timeout) { 369 if (timeout) {
@@ -290,6 +382,24 @@ int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
290 return 0; 382 return 0;
291} 383}
292 384
385int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
386{
387 if (timeout) {
388 mlog(0, "%s: waiting %dms for notification of "
389 "recovery of node %u\n", dlm->name, timeout, node);
390 wait_event_timeout(dlm->dlm_reco_thread_wq,
391 dlm_is_node_recovered(dlm, node),
392 msecs_to_jiffies(timeout));
393 } else {
394 mlog(0, "%s: waiting indefinitely for notification "
395 "of recovery of node %u\n", dlm->name, node);
396 wait_event(dlm->dlm_reco_thread_wq,
397 dlm_is_node_recovered(dlm, node));
398 }
399 /* for now, return 0 */
400 return 0;
401}
402
293/* callers of the top-level api calls (dlmlock/dlmunlock) should 403/* callers of the top-level api calls (dlmlock/dlmunlock) should
294 * block on the dlm->reco.event when recovery is in progress. 404 * block on the dlm->reco.event when recovery is in progress.
295 * the dlm recovery thread will set this state when it begins 405 * the dlm recovery thread will set this state when it begins
@@ -308,6 +418,13 @@ static int dlm_in_recovery(struct dlm_ctxt *dlm)
308 418
309void dlm_wait_for_recovery(struct dlm_ctxt *dlm) 419void dlm_wait_for_recovery(struct dlm_ctxt *dlm)
310{ 420{
421 if (dlm_in_recovery(dlm)) {
422 mlog(0, "%s: reco thread %d in recovery: "
423 "state=%d, master=%u, dead=%u\n",
424 dlm->name, dlm->dlm_reco_thread_task->pid,
425 dlm->reco.state, dlm->reco.new_master,
426 dlm->reco.dead_node);
427 }
311 wait_event(dlm->reco.event, !dlm_in_recovery(dlm)); 428 wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
312} 429}
313 430
@@ -341,7 +458,7 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
341 mlog(0, "new master %u died while recovering %u!\n", 458 mlog(0, "new master %u died while recovering %u!\n",
342 dlm->reco.new_master, dlm->reco.dead_node); 459 dlm->reco.new_master, dlm->reco.dead_node);
343 /* unset the new_master, leave dead_node */ 460 /* unset the new_master, leave dead_node */
344 dlm->reco.new_master = O2NM_INVALID_NODE_NUM; 461 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
345 } 462 }
346 463
347 /* select a target to recover */ 464 /* select a target to recover */
@@ -350,14 +467,14 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
350 467
351 bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0); 468 bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0);
352 if (bit >= O2NM_MAX_NODES || bit < 0) 469 if (bit >= O2NM_MAX_NODES || bit < 0)
353 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; 470 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
354 else 471 else
355 dlm->reco.dead_node = bit; 472 dlm_set_reco_dead_node(dlm, bit);
356 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) { 473 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
357 /* BUG? */ 474 /* BUG? */
358 mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n", 475 mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n",
359 dlm->reco.dead_node); 476 dlm->reco.dead_node);
360 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; 477 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
361 } 478 }
362 479
363 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { 480 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
@@ -366,7 +483,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
366 /* return to main thread loop and sleep. */ 483 /* return to main thread loop and sleep. */
367 return 0; 484 return 0;
368 } 485 }
369 mlog(0, "recovery thread found node %u in the recovery map!\n", 486 mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n",
487 dlm->name, dlm->dlm_reco_thread_task->pid,
370 dlm->reco.dead_node); 488 dlm->reco.dead_node);
371 spin_unlock(&dlm->spinlock); 489 spin_unlock(&dlm->spinlock);
372 490
@@ -389,8 +507,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
389 } 507 }
390 mlog(0, "another node will master this recovery session.\n"); 508 mlog(0, "another node will master this recovery session.\n");
391 } 509 }
392 mlog(0, "dlm=%s, new_master=%u, this node=%u, dead_node=%u\n", 510 mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n",
393 dlm->name, dlm->reco.new_master, 511 dlm->name, dlm->dlm_reco_thread_task->pid, dlm->reco.new_master,
394 dlm->node_num, dlm->reco.dead_node); 512 dlm->node_num, dlm->reco.dead_node);
395 513
396 /* it is safe to start everything back up here 514 /* it is safe to start everything back up here
@@ -402,11 +520,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
402 return 0; 520 return 0;
403 521
404master_here: 522master_here:
405 mlog(0, "mastering recovery of %s:%u here(this=%u)!\n", 523 mlog(0, "(%d) mastering recovery of %s:%u here(this=%u)!\n",
524 dlm->dlm_reco_thread_task->pid,
406 dlm->name, dlm->reco.dead_node, dlm->node_num); 525 dlm->name, dlm->reco.dead_node, dlm->node_num);
407 526
408 status = dlm_remaster_locks(dlm, dlm->reco.dead_node); 527 status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
409 if (status < 0) { 528 if (status < 0) {
529 /* we should never hit this anymore */
410 mlog(ML_ERROR, "error %d remastering locks for node %u, " 530 mlog(ML_ERROR, "error %d remastering locks for node %u, "
411 "retrying.\n", status, dlm->reco.dead_node); 531 "retrying.\n", status, dlm->reco.dead_node);
412 /* yield a bit to allow any final network messages 532 /* yield a bit to allow any final network messages
@@ -433,9 +553,16 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
433 int destroy = 0; 553 int destroy = 0;
434 int pass = 0; 554 int pass = 0;
435 555
436 status = dlm_init_recovery_area(dlm, dead_node); 556 do {
437 if (status < 0) 557 /* we have become recovery master. there is no escaping
438 goto leave; 558 * this, so just keep trying until we get it. */
559 status = dlm_init_recovery_area(dlm, dead_node);
560 if (status < 0) {
561 mlog(ML_ERROR, "%s: failed to alloc recovery area, "
562 "retrying\n", dlm->name);
563 msleep(1000);
564 }
565 } while (status != 0);
439 566
440 /* safe to access the node data list without a lock, since this 567 /* safe to access the node data list without a lock, since this
441 * process is the only one to change the list */ 568 * process is the only one to change the list */
@@ -452,16 +579,36 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
452 continue; 579 continue;
453 } 580 }
454 581
455 status = dlm_request_all_locks(dlm, ndata->node_num, dead_node); 582 do {
456 if (status < 0) { 583 status = dlm_request_all_locks(dlm, ndata->node_num,
457 mlog_errno(status); 584 dead_node);
458 if (dlm_is_host_down(status)) 585 if (status < 0) {
459 ndata->state = DLM_RECO_NODE_DATA_DEAD; 586 mlog_errno(status);
460 else { 587 if (dlm_is_host_down(status)) {
461 destroy = 1; 588 /* node died, ignore it for recovery */
462 goto leave; 589 status = 0;
590 ndata->state = DLM_RECO_NODE_DATA_DEAD;
591 /* wait for the domain map to catch up
592 * with the network state. */
593 wait_event_timeout(dlm->dlm_reco_thread_wq,
594 dlm_is_node_dead(dlm,
595 ndata->node_num),
596 msecs_to_jiffies(1000));
597 mlog(0, "waited 1 sec for %u, "
598 "dead? %s\n", ndata->node_num,
599 dlm_is_node_dead(dlm, ndata->node_num) ?
600 "yes" : "no");
601 } else {
602 /* -ENOMEM on the other node */
603 mlog(0, "%s: node %u returned "
604 "%d during recovery, retrying "
605 "after a short wait\n",
606 dlm->name, ndata->node_num,
607 status);
608 msleep(100);
609 }
463 } 610 }
464 } 611 } while (status != 0);
465 612
466 switch (ndata->state) { 613 switch (ndata->state) {
467 case DLM_RECO_NODE_DATA_INIT: 614 case DLM_RECO_NODE_DATA_INIT:
@@ -473,10 +620,9 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
473 mlog(0, "node %u died after requesting " 620 mlog(0, "node %u died after requesting "
474 "recovery info for node %u\n", 621 "recovery info for node %u\n",
475 ndata->node_num, dead_node); 622 ndata->node_num, dead_node);
476 // start all over 623 /* fine. don't need this node's info.
477 destroy = 1; 624 * continue without it. */
478 status = -EAGAIN; 625 break;
479 goto leave;
480 case DLM_RECO_NODE_DATA_REQUESTING: 626 case DLM_RECO_NODE_DATA_REQUESTING:
481 ndata->state = DLM_RECO_NODE_DATA_REQUESTED; 627 ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
482 mlog(0, "now receiving recovery data from " 628 mlog(0, "now receiving recovery data from "
@@ -520,35 +666,26 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
520 BUG(); 666 BUG();
521 break; 667 break;
522 case DLM_RECO_NODE_DATA_DEAD: 668 case DLM_RECO_NODE_DATA_DEAD:
523 mlog(ML_NOTICE, "node %u died after " 669 mlog(0, "node %u died after "
524 "requesting recovery info for " 670 "requesting recovery info for "
525 "node %u\n", ndata->node_num, 671 "node %u\n", ndata->node_num,
526 dead_node); 672 dead_node);
527 spin_unlock(&dlm_reco_state_lock); 673 break;
528 // start all over
529 destroy = 1;
530 status = -EAGAIN;
531 /* instead of spinning like crazy here,
532 * wait for the domain map to catch up
533 * with the network state. otherwise this
534 * can be hit hundreds of times before
535 * the node is really seen as dead. */
536 wait_event_timeout(dlm->dlm_reco_thread_wq,
537 dlm_is_node_dead(dlm,
538 ndata->node_num),
539 msecs_to_jiffies(1000));
540 mlog(0, "waited 1 sec for %u, "
541 "dead? %s\n", ndata->node_num,
542 dlm_is_node_dead(dlm, ndata->node_num) ?
543 "yes" : "no");
544 goto leave;
545 case DLM_RECO_NODE_DATA_RECEIVING: 674 case DLM_RECO_NODE_DATA_RECEIVING:
546 case DLM_RECO_NODE_DATA_REQUESTED: 675 case DLM_RECO_NODE_DATA_REQUESTED:
676 mlog(0, "%s: node %u still in state %s\n",
677 dlm->name, ndata->node_num,
678 ndata->state==DLM_RECO_NODE_DATA_RECEIVING ?
679 "receiving" : "requested");
547 all_nodes_done = 0; 680 all_nodes_done = 0;
548 break; 681 break;
549 case DLM_RECO_NODE_DATA_DONE: 682 case DLM_RECO_NODE_DATA_DONE:
683 mlog(0, "%s: node %u state is done\n",
684 dlm->name, ndata->node_num);
550 break; 685 break;
551 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 686 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
687 mlog(0, "%s: node %u state is finalize\n",
688 dlm->name, ndata->node_num);
552 break; 689 break;
553 } 690 }
554 } 691 }
@@ -578,7 +715,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
578 jiffies, dlm->reco.dead_node, 715 jiffies, dlm->reco.dead_node,
579 dlm->node_num, dlm->reco.new_master); 716 dlm->node_num, dlm->reco.new_master);
580 destroy = 1; 717 destroy = 1;
581 status = ret; 718 status = 0;
582 /* rescan everything marked dirty along the way */ 719 /* rescan everything marked dirty along the way */
583 dlm_kick_thread(dlm, NULL); 720 dlm_kick_thread(dlm, NULL);
584 break; 721 break;
@@ -591,7 +728,6 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
591 728
592 } 729 }
593 730
594leave:
595 if (destroy) 731 if (destroy)
596 dlm_destroy_recovery_area(dlm, dead_node); 732 dlm_destroy_recovery_area(dlm, dead_node);
597 733
@@ -617,7 +753,7 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
617 } 753 }
618 BUG_ON(num == dead_node); 754 BUG_ON(num == dead_node);
619 755
620 ndata = kcalloc(1, sizeof(*ndata), GFP_KERNEL); 756 ndata = kcalloc(1, sizeof(*ndata), GFP_NOFS);
621 if (!ndata) { 757 if (!ndata) {
622 dlm_destroy_recovery_area(dlm, dead_node); 758 dlm_destroy_recovery_area(dlm, dead_node);
623 return -ENOMEM; 759 return -ENOMEM;
@@ -691,16 +827,25 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
691 if (!dlm_grab(dlm)) 827 if (!dlm_grab(dlm))
692 return -EINVAL; 828 return -EINVAL;
693 829
830 if (lr->dead_node != dlm->reco.dead_node) {
831 mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "
832 "dead_node is %u\n", dlm->name, lr->node_idx,
833 lr->dead_node, dlm->reco.dead_node);
834 dlm_print_reco_node_status(dlm);
835 /* this is a hack */
836 dlm_put(dlm);
837 return -ENOMEM;
838 }
694 BUG_ON(lr->dead_node != dlm->reco.dead_node); 839 BUG_ON(lr->dead_node != dlm->reco.dead_node);
695 840
696 item = kcalloc(1, sizeof(*item), GFP_KERNEL); 841 item = kcalloc(1, sizeof(*item), GFP_NOFS);
697 if (!item) { 842 if (!item) {
698 dlm_put(dlm); 843 dlm_put(dlm);
699 return -ENOMEM; 844 return -ENOMEM;
700 } 845 }
701 846
702 /* this will get freed by dlm_request_all_locks_worker */ 847 /* this will get freed by dlm_request_all_locks_worker */
703 buf = (char *) __get_free_page(GFP_KERNEL); 848 buf = (char *) __get_free_page(GFP_NOFS);
704 if (!buf) { 849 if (!buf) {
705 kfree(item); 850 kfree(item);
706 dlm_put(dlm); 851 dlm_put(dlm);
@@ -715,7 +860,7 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
715 spin_lock(&dlm->work_lock); 860 spin_lock(&dlm->work_lock);
716 list_add_tail(&item->list, &dlm->work_list); 861 list_add_tail(&item->list, &dlm->work_list);
717 spin_unlock(&dlm->work_lock); 862 spin_unlock(&dlm->work_lock);
718 schedule_work(&dlm->dispatched_work); 863 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
719 864
720 dlm_put(dlm); 865 dlm_put(dlm);
721 return 0; 866 return 0;
@@ -730,32 +875,34 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
730 struct list_head *iter; 875 struct list_head *iter;
731 int ret; 876 int ret;
732 u8 dead_node, reco_master; 877 u8 dead_node, reco_master;
878 int skip_all_done = 0;
733 879
734 dlm = item->dlm; 880 dlm = item->dlm;
735 dead_node = item->u.ral.dead_node; 881 dead_node = item->u.ral.dead_node;
736 reco_master = item->u.ral.reco_master; 882 reco_master = item->u.ral.reco_master;
737 mres = (struct dlm_migratable_lockres *)data; 883 mres = (struct dlm_migratable_lockres *)data;
738 884
885 mlog(0, "%s: recovery worker started, dead=%u, master=%u\n",
886 dlm->name, dead_node, reco_master);
887
739 if (dead_node != dlm->reco.dead_node || 888 if (dead_node != dlm->reco.dead_node ||
740 reco_master != dlm->reco.new_master) { 889 reco_master != dlm->reco.new_master) {
741 /* show extra debug info if the recovery state is messed */ 890 /* worker could have been created before the recovery master
742 mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), " 891 * died. if so, do not continue, but do not error. */
743 "request(dead=%u, master=%u)\n", 892 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
744 dlm->name, dlm->reco.dead_node, dlm->reco.new_master, 893 mlog(ML_NOTICE, "%s: will not send recovery state, "
745 dead_node, reco_master); 894 "recovery master %u died, thread=(dead=%u,mas=%u)"
746 mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u " 895 " current=(dead=%u,mas=%u)\n", dlm->name,
747 "entry[0]={c=%u:%llu,l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n", 896 reco_master, dead_node, reco_master,
748 dlm->name, mres->lockname_len, mres->lockname, mres->master, 897 dlm->reco.dead_node, dlm->reco.new_master);
749 mres->num_locks, mres->total_locks, mres->flags, 898 } else {
750 dlm_get_lock_cookie_node(mres->ml[0].cookie), 899 mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, "
751 dlm_get_lock_cookie_seq(mres->ml[0].cookie), 900 "master=%u), request(dead=%u, master=%u)\n",
752 mres->ml[0].list, mres->ml[0].flags, 901 dlm->name, dlm->reco.dead_node,
753 mres->ml[0].type, mres->ml[0].convert_type, 902 dlm->reco.new_master, dead_node, reco_master);
754 mres->ml[0].highest_blocked, mres->ml[0].node); 903 }
755 BUG(); 904 goto leave;
756 } 905 }
757 BUG_ON(dead_node != dlm->reco.dead_node);
758 BUG_ON(reco_master != dlm->reco.new_master);
759 906
760 /* lock resources should have already been moved to the 907 /* lock resources should have already been moved to the
761 * dlm->reco.resources list. now move items from that list 908 * dlm->reco.resources list. now move items from that list
@@ -766,12 +913,20 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
766 dlm_move_reco_locks_to_list(dlm, &resources, dead_node); 913 dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
767 914
768 /* now we can begin blasting lockreses without the dlm lock */ 915 /* now we can begin blasting lockreses without the dlm lock */
916
917 /* any errors returned will be due to the new_master dying,
918 * the dlm_reco_thread should detect this */
769 list_for_each(iter, &resources) { 919 list_for_each(iter, &resources) {
770 res = list_entry (iter, struct dlm_lock_resource, recovering); 920 res = list_entry (iter, struct dlm_lock_resource, recovering);
771 ret = dlm_send_one_lockres(dlm, res, mres, reco_master, 921 ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
772 DLM_MRES_RECOVERY); 922 DLM_MRES_RECOVERY);
773 if (ret < 0) 923 if (ret < 0) {
774 mlog_errno(ret); 924 mlog(ML_ERROR, "%s: node %u went down while sending "
925 "recovery state for dead node %u, ret=%d\n", dlm->name,
926 reco_master, dead_node, ret);
927 skip_all_done = 1;
928 break;
929 }
775 } 930 }
776 931
777 /* move the resources back to the list */ 932 /* move the resources back to the list */
@@ -779,10 +934,15 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
779 list_splice_init(&resources, &dlm->reco.resources); 934 list_splice_init(&resources, &dlm->reco.resources);
780 spin_unlock(&dlm->spinlock); 935 spin_unlock(&dlm->spinlock);
781 936
782 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); 937 if (!skip_all_done) {
783 if (ret < 0) 938 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
784 mlog_errno(ret); 939 if (ret < 0) {
785 940 mlog(ML_ERROR, "%s: node %u went down while sending "
941 "recovery all-done for dead node %u, ret=%d\n",
942 dlm->name, reco_master, dead_node, ret);
943 }
944 }
945leave:
786 free_page((unsigned long)data); 946 free_page((unsigned long)data);
787} 947}
788 948
@@ -801,8 +961,14 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
801 961
802 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, 962 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
803 sizeof(done_msg), send_to, &tmpret); 963 sizeof(done_msg), send_to, &tmpret);
804 /* negative status is ignored by the caller */ 964 if (ret < 0) {
805 if (ret >= 0) 965 if (!dlm_is_host_down(ret)) {
966 mlog_errno(ret);
967 mlog(ML_ERROR, "%s: unknown error sending data-done "
968 "to %u\n", dlm->name, send_to);
969 BUG();
970 }
971 } else
806 ret = tmpret; 972 ret = tmpret;
807 return ret; 973 return ret;
808} 974}
@@ -822,7 +988,11 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
822 mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, " 988 mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "
823 "node_idx=%u, this node=%u\n", done->dead_node, 989 "node_idx=%u, this node=%u\n", done->dead_node,
824 dlm->reco.dead_node, done->node_idx, dlm->node_num); 990 dlm->reco.dead_node, done->node_idx, dlm->node_num);
825 BUG_ON(done->dead_node != dlm->reco.dead_node); 991
992 mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node),
993 "Got DATA DONE: dead_node=%u, reco.dead_node=%u, "
994 "node_idx=%u, this node=%u\n", done->dead_node,
995 dlm->reco.dead_node, done->node_idx, dlm->node_num);
826 996
827 spin_lock(&dlm_reco_state_lock); 997 spin_lock(&dlm_reco_state_lock);
828 list_for_each(iter, &dlm->reco.node_data) { 998 list_for_each(iter, &dlm->reco.node_data) {
@@ -905,13 +1075,11 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
905 mlog(0, "found lockres owned by dead node while " 1075 mlog(0, "found lockres owned by dead node while "
906 "doing recovery for node %u. sending it.\n", 1076 "doing recovery for node %u. sending it.\n",
907 dead_node); 1077 dead_node);
908 list_del_init(&res->recovering); 1078 list_move_tail(&res->recovering, list);
909 list_add_tail(&res->recovering, list);
910 } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { 1079 } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
911 mlog(0, "found UNKNOWN owner while doing recovery " 1080 mlog(0, "found UNKNOWN owner while doing recovery "
912 "for node %u. sending it.\n", dead_node); 1081 "for node %u. sending it.\n", dead_node);
913 list_del_init(&res->recovering); 1082 list_move_tail(&res->recovering, list);
914 list_add_tail(&res->recovering, list);
915 } 1083 }
916 } 1084 }
917 spin_unlock(&dlm->spinlock); 1085 spin_unlock(&dlm->spinlock);
@@ -1023,8 +1191,9 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
1023 ml->type == LKM_PRMODE) { 1191 ml->type == LKM_PRMODE) {
1024 /* if it is already set, this had better be a PR 1192 /* if it is already set, this had better be a PR
1025 * and it has to match */ 1193 * and it has to match */
1026 if (mres->lvb[0] && (ml->type == LKM_EXMODE || 1194 if (!dlm_lvb_is_empty(mres->lvb) &&
1027 memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) { 1195 (ml->type == LKM_EXMODE ||
1196 memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
1028 mlog(ML_ERROR, "mismatched lvbs!\n"); 1197 mlog(ML_ERROR, "mismatched lvbs!\n");
1029 __dlm_print_one_lock_resource(lock->lockres); 1198 __dlm_print_one_lock_resource(lock->lockres);
1030 BUG(); 1199 BUG();
@@ -1083,22 +1252,25 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1083 * we must send it immediately. */ 1252 * we must send it immediately. */
1084 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, 1253 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
1085 res, total_locks); 1254 res, total_locks);
1086 if (ret < 0) { 1255 if (ret < 0)
1087 // TODO 1256 goto error;
1088 mlog(ML_ERROR, "dlm_send_mig_lockres_msg "
1089 "returned %d, TODO\n", ret);
1090 BUG();
1091 }
1092 } 1257 }
1093 } 1258 }
1094 /* flush any remaining locks */ 1259 /* flush any remaining locks */
1095 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); 1260 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
1096 if (ret < 0) { 1261 if (ret < 0)
1097 // TODO 1262 goto error;
1098 mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, " 1263 return ret;
1099 "TODO\n", ret); 1264
1265error:
1266 mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n",
1267 dlm->name, ret);
1268 if (!dlm_is_host_down(ret))
1100 BUG(); 1269 BUG();
1101 } 1270 mlog(0, "%s: node %u went down while sending %s "
1271 "lockres %.*s\n", dlm->name, send_to,
1272 flags & DLM_MRES_RECOVERY ? "recovery" : "migration",
1273 res->lockname.len, res->lockname.name);
1102 return ret; 1274 return ret;
1103} 1275}
1104 1276
@@ -1146,8 +1318,8 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1146 mlog(0, "all done flag. all lockres data received!\n"); 1318 mlog(0, "all done flag. all lockres data received!\n");
1147 1319
1148 ret = -ENOMEM; 1320 ret = -ENOMEM;
1149 buf = kmalloc(be16_to_cpu(msg->data_len), GFP_KERNEL); 1321 buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS);
1150 item = kcalloc(1, sizeof(*item), GFP_KERNEL); 1322 item = kcalloc(1, sizeof(*item), GFP_NOFS);
1151 if (!buf || !item) 1323 if (!buf || !item)
1152 goto leave; 1324 goto leave;
1153 1325
@@ -1238,7 +1410,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1238 spin_lock(&dlm->work_lock); 1410 spin_lock(&dlm->work_lock);
1239 list_add_tail(&item->list, &dlm->work_list); 1411 list_add_tail(&item->list, &dlm->work_list);
1240 spin_unlock(&dlm->work_lock); 1412 spin_unlock(&dlm->work_lock);
1241 schedule_work(&dlm->dispatched_work); 1413 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
1242 1414
1243leave: 1415leave:
1244 dlm_put(dlm); 1416 dlm_put(dlm);
@@ -1406,6 +1578,7 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
1406 struct dlm_ctxt *dlm = data; 1578 struct dlm_ctxt *dlm = data;
1407 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf; 1579 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
1408 struct dlm_lock_resource *res = NULL; 1580 struct dlm_lock_resource *res = NULL;
1581 unsigned int hash;
1409 int master = DLM_LOCK_RES_OWNER_UNKNOWN; 1582 int master = DLM_LOCK_RES_OWNER_UNKNOWN;
1410 u32 flags = DLM_ASSERT_MASTER_REQUERY; 1583 u32 flags = DLM_ASSERT_MASTER_REQUERY;
1411 1584
@@ -1415,8 +1588,10 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
1415 return master; 1588 return master;
1416 } 1589 }
1417 1590
1591 hash = dlm_lockid_hash(req->name, req->namelen);
1592
1418 spin_lock(&dlm->spinlock); 1593 spin_lock(&dlm->spinlock);
1419 res = __dlm_lookup_lockres(dlm, req->name, req->namelen); 1594 res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash);
1420 if (res) { 1595 if (res) {
1421 spin_lock(&res->spinlock); 1596 spin_lock(&res->spinlock);
1422 master = res->owner; 1597 master = res->owner;
@@ -1483,7 +1658,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1483 struct dlm_lock *newlock = NULL; 1658 struct dlm_lock *newlock = NULL;
1484 struct dlm_lockstatus *lksb = NULL; 1659 struct dlm_lockstatus *lksb = NULL;
1485 int ret = 0; 1660 int ret = 0;
1486 int i; 1661 int i, bad;
1487 struct list_head *iter; 1662 struct list_head *iter;
1488 struct dlm_lock *lock = NULL; 1663 struct dlm_lock *lock = NULL;
1489 1664
@@ -1529,8 +1704,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1529 1704
1530 /* move the lock to its proper place */ 1705 /* move the lock to its proper place */
1531 /* do not alter lock refcount. switching lists. */ 1706 /* do not alter lock refcount. switching lists. */
1532 list_del_init(&lock->list); 1707 list_move_tail(&lock->list, queue);
1533 list_add_tail(&lock->list, queue);
1534 spin_unlock(&res->spinlock); 1708 spin_unlock(&res->spinlock);
1535 1709
1536 mlog(0, "just reordered a local lock!\n"); 1710 mlog(0, "just reordered a local lock!\n");
@@ -1553,28 +1727,48 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1553 } 1727 }
1554 lksb->flags |= (ml->flags & 1728 lksb->flags |= (ml->flags &
1555 (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)); 1729 (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
1556 1730
1557 if (mres->lvb[0]) { 1731 if (ml->type == LKM_NLMODE)
1732 goto skip_lvb;
1733
1734 if (!dlm_lvb_is_empty(mres->lvb)) {
1558 if (lksb->flags & DLM_LKSB_PUT_LVB) { 1735 if (lksb->flags & DLM_LKSB_PUT_LVB) {
1559 /* other node was trying to update 1736 /* other node was trying to update
1560 * lvb when node died. recreate the 1737 * lvb when node died. recreate the
1561 * lksb with the updated lvb. */ 1738 * lksb with the updated lvb. */
1562 memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN); 1739 memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
1740 /* the lock resource lvb update must happen
1741 * NOW, before the spinlock is dropped.
1742 * we no longer wait for the AST to update
1743 * the lvb. */
1744 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
1563 } else { 1745 } else {
1564 /* otherwise, the node is sending its 1746 /* otherwise, the node is sending its
1565 * most recent valid lvb info */ 1747 * most recent valid lvb info */
1566 BUG_ON(ml->type != LKM_EXMODE && 1748 BUG_ON(ml->type != LKM_EXMODE &&
1567 ml->type != LKM_PRMODE); 1749 ml->type != LKM_PRMODE);
1568 if (res->lvb[0] && (ml->type == LKM_EXMODE || 1750 if (!dlm_lvb_is_empty(res->lvb) &&
1569 memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { 1751 (ml->type == LKM_EXMODE ||
1570 mlog(ML_ERROR, "received bad lvb!\n"); 1752 memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
1571 __dlm_print_one_lock_resource(res); 1753 int i;
1572 BUG(); 1754 mlog(ML_ERROR, "%s:%.*s: received bad "
1755 "lvb! type=%d\n", dlm->name,
1756 res->lockname.len,
1757 res->lockname.name, ml->type);
1758 printk("lockres lvb=[");
1759 for (i=0; i<DLM_LVB_LEN; i++)
1760 printk("%02x", res->lvb[i]);
1761 printk("]\nmigrated lvb=[");
1762 for (i=0; i<DLM_LVB_LEN; i++)
1763 printk("%02x", mres->lvb[i]);
1764 printk("]\n");
1765 dlm_print_one_lock_resource(res);
1766 BUG();
1573 } 1767 }
1574 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); 1768 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
1575 } 1769 }
1576 } 1770 }
1577 1771skip_lvb:
1578 1772
1579 /* NOTE: 1773 /* NOTE:
1580 * wrt lock queue ordering and recovery: 1774 * wrt lock queue ordering and recovery:
@@ -1592,9 +1786,33 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1592 * relative to each other, but clearly *not* 1786 * relative to each other, but clearly *not*
1593 * preserved relative to locks from other nodes. 1787 * preserved relative to locks from other nodes.
1594 */ 1788 */
1789 bad = 0;
1595 spin_lock(&res->spinlock); 1790 spin_lock(&res->spinlock);
1596 dlm_lock_get(newlock); 1791 list_for_each_entry(lock, queue, list) {
1597 list_add_tail(&newlock->list, queue); 1792 if (lock->ml.cookie == ml->cookie) {
1793 u64 c = lock->ml.cookie;
1794 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
1795 "exists on this lockres!\n", dlm->name,
1796 res->lockname.len, res->lockname.name,
1797 dlm_get_lock_cookie_node(c),
1798 dlm_get_lock_cookie_seq(c));
1799
1800 mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
1801 "node=%u, cookie=%u:%llu, queue=%d\n",
1802 ml->type, ml->convert_type, ml->node,
1803 dlm_get_lock_cookie_node(ml->cookie),
1804 dlm_get_lock_cookie_seq(ml->cookie),
1805 ml->list);
1806
1807 __dlm_print_one_lock_resource(res);
1808 bad = 1;
1809 break;
1810 }
1811 }
1812 if (!bad) {
1813 dlm_lock_get(newlock);
1814 list_add_tail(&newlock->list, queue);
1815 }
1598 spin_unlock(&res->spinlock); 1816 spin_unlock(&res->spinlock);
1599 } 1817 }
1600 mlog(0, "done running all the locks\n"); 1818 mlog(0, "done running all the locks\n");
@@ -1618,8 +1836,14 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
1618 struct dlm_lock *lock; 1836 struct dlm_lock *lock;
1619 1837
1620 res->state |= DLM_LOCK_RES_RECOVERING; 1838 res->state |= DLM_LOCK_RES_RECOVERING;
1621 if (!list_empty(&res->recovering)) 1839 if (!list_empty(&res->recovering)) {
1840 mlog(0,
1841 "Recovering res %s:%.*s, is already on recovery list!\n",
1842 dlm->name, res->lockname.len, res->lockname.name);
1622 list_del_init(&res->recovering); 1843 list_del_init(&res->recovering);
1844 }
1845 /* We need to hold a reference while on the recovery list */
1846 dlm_lockres_get(res);
1623 list_add_tail(&res->recovering, &dlm->reco.resources); 1847 list_add_tail(&res->recovering, &dlm->reco.resources);
1624 1848
1625 /* find any pending locks and put them back on proper list */ 1849 /* find any pending locks and put them back on proper list */
@@ -1708,9 +1932,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1708 spin_lock(&res->spinlock); 1932 spin_lock(&res->spinlock);
1709 dlm_change_lockres_owner(dlm, res, new_master); 1933 dlm_change_lockres_owner(dlm, res, new_master);
1710 res->state &= ~DLM_LOCK_RES_RECOVERING; 1934 res->state &= ~DLM_LOCK_RES_RECOVERING;
1711 __dlm_dirty_lockres(dlm, res); 1935 if (!__dlm_lockres_unused(res))
1936 __dlm_dirty_lockres(dlm, res);
1712 spin_unlock(&res->spinlock); 1937 spin_unlock(&res->spinlock);
1713 wake_up(&res->wq); 1938 wake_up(&res->wq);
1939 dlm_lockres_put(res);
1714 } 1940 }
1715 } 1941 }
1716 1942
@@ -1719,7 +1945,7 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1719 * the RECOVERING state and set the owner 1945 * the RECOVERING state and set the owner
1720 * if necessary */ 1946 * if necessary */
1721 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 1947 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
1722 bucket = &(dlm->lockres_hash[i]); 1948 bucket = dlm_lockres_hash(dlm, i);
1723 hlist_for_each_entry(res, hash_iter, bucket, hash_node) { 1949 hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
1724 if (res->state & DLM_LOCK_RES_RECOVERING) { 1950 if (res->state & DLM_LOCK_RES_RECOVERING) {
1725 if (res->owner == dead_node) { 1951 if (res->owner == dead_node) {
@@ -1743,11 +1969,13 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1743 dlm->name, res->lockname.len, 1969 dlm->name, res->lockname.len,
1744 res->lockname.name, res->owner); 1970 res->lockname.name, res->owner);
1745 list_del_init(&res->recovering); 1971 list_del_init(&res->recovering);
1972 dlm_lockres_put(res);
1746 } 1973 }
1747 spin_lock(&res->spinlock); 1974 spin_lock(&res->spinlock);
1748 dlm_change_lockres_owner(dlm, res, new_master); 1975 dlm_change_lockres_owner(dlm, res, new_master);
1749 res->state &= ~DLM_LOCK_RES_RECOVERING; 1976 res->state &= ~DLM_LOCK_RES_RECOVERING;
1750 __dlm_dirty_lockres(dlm, res); 1977 if (!__dlm_lockres_unused(res))
1978 __dlm_dirty_lockres(dlm, res);
1751 spin_unlock(&res->spinlock); 1979 spin_unlock(&res->spinlock);
1752 wake_up(&res->wq); 1980 wake_up(&res->wq);
1753 } 1981 }
@@ -1884,7 +2112,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
1884 * need to be fired as a result. 2112 * need to be fired as a result.
1885 */ 2113 */
1886 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 2114 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
1887 bucket = &(dlm->lockres_hash[i]); 2115 bucket = dlm_lockres_hash(dlm, i);
1888 hlist_for_each_entry(res, iter, bucket, hash_node) { 2116 hlist_for_each_entry(res, iter, bucket, hash_node) {
1889 /* always prune any $RECOVERY entries for dead nodes, 2117 /* always prune any $RECOVERY entries for dead nodes,
1890 * otherwise hangs can occur during later recovery */ 2118 * otherwise hangs can occur during later recovery */
@@ -1924,6 +2152,20 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx)
1924{ 2152{
1925 assert_spin_locked(&dlm->spinlock); 2153 assert_spin_locked(&dlm->spinlock);
1926 2154
2155 if (dlm->reco.new_master == idx) {
2156 mlog(0, "%s: recovery master %d just died\n",
2157 dlm->name, idx);
2158 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2159 /* finalize1 was reached, so it is safe to clear
2160 * the new_master and dead_node. that recovery
2161 * is complete. */
2162 mlog(0, "%s: dead master %d had reached "
2163 "finalize1 state, clearing\n", dlm->name, idx);
2164 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
2165 __dlm_reset_recovery(dlm);
2166 }
2167 }
2168
1927 /* check to see if the node is already considered dead */ 2169 /* check to see if the node is already considered dead */
1928 if (!test_bit(idx, dlm->live_nodes_map)) { 2170 if (!test_bit(idx, dlm->live_nodes_map)) {
1929 mlog(0, "for domain %s, node %d is already dead. " 2171 mlog(0, "for domain %s, node %d is already dead. "
@@ -2087,7 +2329,7 @@ again:
2087 2329
2088 /* set the new_master to this node */ 2330 /* set the new_master to this node */
2089 spin_lock(&dlm->spinlock); 2331 spin_lock(&dlm->spinlock);
2090 dlm->reco.new_master = dlm->node_num; 2332 dlm_set_reco_master(dlm, dlm->node_num);
2091 spin_unlock(&dlm->spinlock); 2333 spin_unlock(&dlm->spinlock);
2092 } 2334 }
2093 2335
@@ -2125,6 +2367,10 @@ again:
2125 mlog(0, "%s: reco master %u is ready to recover %u\n", 2367 mlog(0, "%s: reco master %u is ready to recover %u\n",
2126 dlm->name, dlm->reco.new_master, dlm->reco.dead_node); 2368 dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
2127 status = -EEXIST; 2369 status = -EEXIST;
2370 } else if (ret == DLM_RECOVERING) {
2371 mlog(0, "dlm=%s dlmlock says master node died (this=%u)\n",
2372 dlm->name, dlm->node_num);
2373 goto again;
2128 } else { 2374 } else {
2129 struct dlm_lock_resource *res; 2375 struct dlm_lock_resource *res;
2130 2376
@@ -2156,7 +2402,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
2156 2402
2157 mlog_entry("%u\n", dead_node); 2403 mlog_entry("%u\n", dead_node);
2158 2404
2159 mlog(0, "dead node is %u\n", dead_node); 2405 mlog(0, "%s: dead node is %u\n", dlm->name, dead_node);
2160 2406
2161 spin_lock(&dlm->spinlock); 2407 spin_lock(&dlm->spinlock);
2162 dlm_node_iter_init(dlm->domain_map, &iter); 2408 dlm_node_iter_init(dlm->domain_map, &iter);
@@ -2214,6 +2460,14 @@ retry:
2214 * another ENOMEM */ 2460 * another ENOMEM */
2215 msleep(100); 2461 msleep(100);
2216 goto retry; 2462 goto retry;
2463 } else if (ret == EAGAIN) {
2464 mlog(0, "%s: trying to start recovery of node "
2465 "%u, but node %u is waiting for last recovery "
2466 "to complete, backoff for a bit\n", dlm->name,
2467 dead_node, nodenum);
2468 /* TODO Look into replacing msleep with cond_resched() */
2469 msleep(100);
2470 goto retry;
2217 } 2471 }
2218 } 2472 }
2219 2473
@@ -2229,8 +2483,20 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2229 if (!dlm_grab(dlm)) 2483 if (!dlm_grab(dlm))
2230 return 0; 2484 return 0;
2231 2485
2232 mlog(0, "node %u wants to recover node %u\n", 2486 spin_lock(&dlm->spinlock);
2233 br->node_idx, br->dead_node); 2487 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2488 mlog(0, "%s: node %u wants to recover node %u (%u:%u) "
2489 "but this node is in finalize state, waiting on finalize2\n",
2490 dlm->name, br->node_idx, br->dead_node,
2491 dlm->reco.dead_node, dlm->reco.new_master);
2492 spin_unlock(&dlm->spinlock);
2493 return EAGAIN;
2494 }
2495 spin_unlock(&dlm->spinlock);
2496
2497 mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n",
2498 dlm->name, br->node_idx, br->dead_node,
2499 dlm->reco.dead_node, dlm->reco.new_master);
2234 2500
2235 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node); 2501 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node);
2236 2502
@@ -2252,8 +2518,8 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2252 "node %u changing it to %u\n", dlm->name, 2518 "node %u changing it to %u\n", dlm->name,
2253 dlm->reco.dead_node, br->node_idx, br->dead_node); 2519 dlm->reco.dead_node, br->node_idx, br->dead_node);
2254 } 2520 }
2255 dlm->reco.new_master = br->node_idx; 2521 dlm_set_reco_master(dlm, br->node_idx);
2256 dlm->reco.dead_node = br->dead_node; 2522 dlm_set_reco_dead_node(dlm, br->dead_node);
2257 if (!test_bit(br->dead_node, dlm->recovery_map)) { 2523 if (!test_bit(br->dead_node, dlm->recovery_map)) {
2258 mlog(0, "recovery master %u sees %u as dead, but this " 2524 mlog(0, "recovery master %u sees %u as dead, but this "
2259 "node has not yet. marking %u as dead\n", 2525 "node has not yet. marking %u as dead\n",
@@ -2272,10 +2538,16 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2272 spin_unlock(&dlm->spinlock); 2538 spin_unlock(&dlm->spinlock);
2273 2539
2274 dlm_kick_recovery_thread(dlm); 2540 dlm_kick_recovery_thread(dlm);
2541
2542 mlog(0, "%s: recovery started by node %u, for %u (%u:%u)\n",
2543 dlm->name, br->node_idx, br->dead_node,
2544 dlm->reco.dead_node, dlm->reco.new_master);
2545
2275 dlm_put(dlm); 2546 dlm_put(dlm);
2276 return 0; 2547 return 0;
2277} 2548}
2278 2549
2550#define DLM_FINALIZE_STAGE2 0x01
2279static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) 2551static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
2280{ 2552{
2281 int ret = 0; 2553 int ret = 0;
@@ -2283,25 +2555,31 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
2283 struct dlm_node_iter iter; 2555 struct dlm_node_iter iter;
2284 int nodenum; 2556 int nodenum;
2285 int status; 2557 int status;
2558 int stage = 1;
2286 2559
2287 mlog(0, "finishing recovery for node %s:%u\n", 2560 mlog(0, "finishing recovery for node %s:%u, "
2288 dlm->name, dlm->reco.dead_node); 2561 "stage %d\n", dlm->name, dlm->reco.dead_node, stage);
2289 2562
2290 spin_lock(&dlm->spinlock); 2563 spin_lock(&dlm->spinlock);
2291 dlm_node_iter_init(dlm->domain_map, &iter); 2564 dlm_node_iter_init(dlm->domain_map, &iter);
2292 spin_unlock(&dlm->spinlock); 2565 spin_unlock(&dlm->spinlock);
2293 2566
2567stage2:
2294 memset(&fr, 0, sizeof(fr)); 2568 memset(&fr, 0, sizeof(fr));
2295 fr.node_idx = dlm->node_num; 2569 fr.node_idx = dlm->node_num;
2296 fr.dead_node = dlm->reco.dead_node; 2570 fr.dead_node = dlm->reco.dead_node;
2571 if (stage == 2)
2572 fr.flags |= DLM_FINALIZE_STAGE2;
2297 2573
2298 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 2574 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2299 if (nodenum == dlm->node_num) 2575 if (nodenum == dlm->node_num)
2300 continue; 2576 continue;
2301 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key, 2577 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key,
2302 &fr, sizeof(fr), nodenum, &status); 2578 &fr, sizeof(fr), nodenum, &status);
2303 if (ret >= 0) { 2579 if (ret >= 0)
2304 ret = status; 2580 ret = status;
2581 if (ret < 0) {
2582 mlog_errno(ret);
2305 if (dlm_is_host_down(ret)) { 2583 if (dlm_is_host_down(ret)) {
2306 /* this has no effect on this recovery 2584 /* this has no effect on this recovery
2307 * session, so set the status to zero to 2585 * session, so set the status to zero to
@@ -2309,13 +2587,17 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
2309 mlog(ML_ERROR, "node %u went down after this " 2587 mlog(ML_ERROR, "node %u went down after this "
2310 "node finished recovery.\n", nodenum); 2588 "node finished recovery.\n", nodenum);
2311 ret = 0; 2589 ret = 0;
2590 continue;
2312 } 2591 }
2313 }
2314 if (ret < 0) {
2315 mlog_errno(ret);
2316 break; 2592 break;
2317 } 2593 }
2318 } 2594 }
2595 if (stage == 1) {
2596 /* reset the node_iter back to the top and send finalize2 */
2597 iter.curnode = -1;
2598 stage = 2;
2599 goto stage2;
2600 }
2319 2601
2320 return ret; 2602 return ret;
2321} 2603}
@@ -2324,14 +2606,19 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2324{ 2606{
2325 struct dlm_ctxt *dlm = data; 2607 struct dlm_ctxt *dlm = data;
2326 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf; 2608 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
2609 int stage = 1;
2327 2610
2328 /* ok to return 0, domain has gone away */ 2611 /* ok to return 0, domain has gone away */
2329 if (!dlm_grab(dlm)) 2612 if (!dlm_grab(dlm))
2330 return 0; 2613 return 0;
2331 2614
2332 mlog(0, "node %u finalizing recovery of node %u\n", 2615 if (fr->flags & DLM_FINALIZE_STAGE2)
2333 fr->node_idx, fr->dead_node); 2616 stage = 2;
2334 2617
2618 mlog(0, "%s: node %u finalizing recovery stage%d of "
2619 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
2620 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
2621
2335 spin_lock(&dlm->spinlock); 2622 spin_lock(&dlm->spinlock);
2336 2623
2337 if (dlm->reco.new_master != fr->node_idx) { 2624 if (dlm->reco.new_master != fr->node_idx) {
@@ -2347,13 +2634,41 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2347 BUG(); 2634 BUG();
2348 } 2635 }
2349 2636
2350 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); 2637 switch (stage) {
2351 2638 case 1:
2352 spin_unlock(&dlm->spinlock); 2639 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
2640 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2641 mlog(ML_ERROR, "%s: received finalize1 from "
2642 "new master %u for dead node %u, but "
2643 "this node has already received it!\n",
2644 dlm->name, fr->node_idx, fr->dead_node);
2645 dlm_print_reco_node_status(dlm);
2646 BUG();
2647 }
2648 dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
2649 spin_unlock(&dlm->spinlock);
2650 break;
2651 case 2:
2652 if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) {
2653 mlog(ML_ERROR, "%s: received finalize2 from "
2654 "new master %u for dead node %u, but "
2655 "this node did not have finalize1!\n",
2656 dlm->name, fr->node_idx, fr->dead_node);
2657 dlm_print_reco_node_status(dlm);
2658 BUG();
2659 }
2660 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
2661 spin_unlock(&dlm->spinlock);
2662 dlm_reset_recovery(dlm);
2663 dlm_kick_recovery_thread(dlm);
2664 break;
2665 default:
2666 BUG();
2667 }
2353 2668
2354 dlm_reset_recovery(dlm); 2669 mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n",
2670 dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
2355 2671
2356 dlm_kick_recovery_thread(dlm);
2357 dlm_put(dlm); 2672 dlm_put(dlm);
2358 return 0; 2673 return 0;
2359} 2674}