aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmrecovery.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmrecovery.c')
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c580
1 files changed, 449 insertions, 131 deletions
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 9962190e7416..da399013516f 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -115,12 +115,37 @@ static u64 dlm_get_next_mig_cookie(void)
115 return c; 115 return c;
116} 116}
117 117
118static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm,
119 u8 dead_node)
120{
121 assert_spin_locked(&dlm->spinlock);
122 if (dlm->reco.dead_node != dead_node)
123 mlog(0, "%s: changing dead_node from %u to %u\n",
124 dlm->name, dlm->reco.dead_node, dead_node);
125 dlm->reco.dead_node = dead_node;
126}
127
128static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
129 u8 master)
130{
131 assert_spin_locked(&dlm->spinlock);
132 mlog(0, "%s: changing new_master from %u to %u\n",
133 dlm->name, dlm->reco.new_master, master);
134 dlm->reco.new_master = master;
135}
136
137static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm)
138{
139 assert_spin_locked(&dlm->spinlock);
140 clear_bit(dlm->reco.dead_node, dlm->recovery_map);
141 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
142 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
143}
144
118static inline void dlm_reset_recovery(struct dlm_ctxt *dlm) 145static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
119{ 146{
120 spin_lock(&dlm->spinlock); 147 spin_lock(&dlm->spinlock);
121 clear_bit(dlm->reco.dead_node, dlm->recovery_map); 148 __dlm_reset_recovery(dlm);
122 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
123 dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
124 spin_unlock(&dlm->spinlock); 149 spin_unlock(&dlm->spinlock);
125} 150}
126 151
@@ -132,12 +157,21 @@ void dlm_dispatch_work(void *data)
132 struct list_head *iter, *iter2; 157 struct list_head *iter, *iter2;
133 struct dlm_work_item *item; 158 struct dlm_work_item *item;
134 dlm_workfunc_t *workfunc; 159 dlm_workfunc_t *workfunc;
160 int tot=0;
161
162 if (!dlm_joined(dlm))
163 return;
135 164
136 spin_lock(&dlm->work_lock); 165 spin_lock(&dlm->work_lock);
137 list_splice_init(&dlm->work_list, &tmp_list); 166 list_splice_init(&dlm->work_list, &tmp_list);
138 spin_unlock(&dlm->work_lock); 167 spin_unlock(&dlm->work_lock);
139 168
140 list_for_each_safe(iter, iter2, &tmp_list) { 169 list_for_each_safe(iter, iter2, &tmp_list) {
170 tot++;
171 }
172 mlog(0, "%s: work thread has %d work items\n", dlm->name, tot);
173
174 list_for_each_safe(iter, iter2, &tmp_list) {
141 item = list_entry(iter, struct dlm_work_item, list); 175 item = list_entry(iter, struct dlm_work_item, list);
142 workfunc = item->func; 176 workfunc = item->func;
143 list_del_init(&item->list); 177 list_del_init(&item->list);
@@ -220,6 +254,52 @@ void dlm_complete_recovery_thread(struct dlm_ctxt *dlm)
220 * 254 *
221 */ 255 */
222 256
257static void dlm_print_reco_node_status(struct dlm_ctxt *dlm)
258{
259 struct dlm_reco_node_data *ndata;
260 struct dlm_lock_resource *res;
261
262 mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n",
263 dlm->name, dlm->dlm_reco_thread_task->pid,
264 dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive",
265 dlm->reco.dead_node, dlm->reco.new_master);
266
267 list_for_each_entry(ndata, &dlm->reco.node_data, list) {
268 char *st = "unknown";
269 switch (ndata->state) {
270 case DLM_RECO_NODE_DATA_INIT:
271 st = "init";
272 break;
273 case DLM_RECO_NODE_DATA_REQUESTING:
274 st = "requesting";
275 break;
276 case DLM_RECO_NODE_DATA_DEAD:
277 st = "dead";
278 break;
279 case DLM_RECO_NODE_DATA_RECEIVING:
280 st = "receiving";
281 break;
282 case DLM_RECO_NODE_DATA_REQUESTED:
283 st = "requested";
284 break;
285 case DLM_RECO_NODE_DATA_DONE:
286 st = "done";
287 break;
288 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
289 st = "finalize-sent";
290 break;
291 default:
292 st = "bad";
293 break;
294 }
295 mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n",
296 dlm->name, ndata->node_num, st);
297 }
298 list_for_each_entry(res, &dlm->reco.resources, recovering) {
299 mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n",
300 dlm->name, res->lockname.len, res->lockname.name);
301 }
302}
223 303
224#define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000) 304#define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000)
225 305
@@ -267,11 +347,23 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
267{ 347{
268 int dead; 348 int dead;
269 spin_lock(&dlm->spinlock); 349 spin_lock(&dlm->spinlock);
270 dead = test_bit(node, dlm->domain_map); 350 dead = !test_bit(node, dlm->domain_map);
271 spin_unlock(&dlm->spinlock); 351 spin_unlock(&dlm->spinlock);
272 return dead; 352 return dead;
273} 353}
274 354
355/* returns true if node is no longer in the domain
356 * could be dead or just not joined */
357static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
358{
359 int recovered;
360 spin_lock(&dlm->spinlock);
361 recovered = !test_bit(node, dlm->recovery_map);
362 spin_unlock(&dlm->spinlock);
363 return recovered;
364}
365
366
275int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) 367int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
276{ 368{
277 if (timeout) { 369 if (timeout) {
@@ -290,6 +382,24 @@ int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
290 return 0; 382 return 0;
291} 383}
292 384
385int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
386{
387 if (timeout) {
388 mlog(0, "%s: waiting %dms for notification of "
389 "recovery of node %u\n", dlm->name, timeout, node);
390 wait_event_timeout(dlm->dlm_reco_thread_wq,
391 dlm_is_node_recovered(dlm, node),
392 msecs_to_jiffies(timeout));
393 } else {
394 mlog(0, "%s: waiting indefinitely for notification "
395 "of recovery of node %u\n", dlm->name, node);
396 wait_event(dlm->dlm_reco_thread_wq,
397 dlm_is_node_recovered(dlm, node));
398 }
399 /* for now, return 0 */
400 return 0;
401}
402
293/* callers of the top-level api calls (dlmlock/dlmunlock) should 403/* callers of the top-level api calls (dlmlock/dlmunlock) should
294 * block on the dlm->reco.event when recovery is in progress. 404 * block on the dlm->reco.event when recovery is in progress.
295 * the dlm recovery thread will set this state when it begins 405 * the dlm recovery thread will set this state when it begins
@@ -308,6 +418,13 @@ static int dlm_in_recovery(struct dlm_ctxt *dlm)
308 418
309void dlm_wait_for_recovery(struct dlm_ctxt *dlm) 419void dlm_wait_for_recovery(struct dlm_ctxt *dlm)
310{ 420{
421 if (dlm_in_recovery(dlm)) {
422 mlog(0, "%s: reco thread %d in recovery: "
423 "state=%d, master=%u, dead=%u\n",
424 dlm->name, dlm->dlm_reco_thread_task->pid,
425 dlm->reco.state, dlm->reco.new_master,
426 dlm->reco.dead_node);
427 }
311 wait_event(dlm->reco.event, !dlm_in_recovery(dlm)); 428 wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
312} 429}
313 430
@@ -341,7 +458,7 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
341 mlog(0, "new master %u died while recovering %u!\n", 458 mlog(0, "new master %u died while recovering %u!\n",
342 dlm->reco.new_master, dlm->reco.dead_node); 459 dlm->reco.new_master, dlm->reco.dead_node);
343 /* unset the new_master, leave dead_node */ 460 /* unset the new_master, leave dead_node */
344 dlm->reco.new_master = O2NM_INVALID_NODE_NUM; 461 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
345 } 462 }
346 463
347 /* select a target to recover */ 464 /* select a target to recover */
@@ -350,14 +467,14 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
350 467
351 bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0); 468 bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0);
352 if (bit >= O2NM_MAX_NODES || bit < 0) 469 if (bit >= O2NM_MAX_NODES || bit < 0)
353 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; 470 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
354 else 471 else
355 dlm->reco.dead_node = bit; 472 dlm_set_reco_dead_node(dlm, bit);
356 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) { 473 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
357 /* BUG? */ 474 /* BUG? */
358 mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n", 475 mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n",
359 dlm->reco.dead_node); 476 dlm->reco.dead_node);
360 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; 477 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
361 } 478 }
362 479
363 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { 480 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
@@ -366,7 +483,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
366 /* return to main thread loop and sleep. */ 483 /* return to main thread loop and sleep. */
367 return 0; 484 return 0;
368 } 485 }
369 mlog(0, "recovery thread found node %u in the recovery map!\n", 486 mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n",
487 dlm->name, dlm->dlm_reco_thread_task->pid,
370 dlm->reco.dead_node); 488 dlm->reco.dead_node);
371 spin_unlock(&dlm->spinlock); 489 spin_unlock(&dlm->spinlock);
372 490
@@ -389,8 +507,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
389 } 507 }
390 mlog(0, "another node will master this recovery session.\n"); 508 mlog(0, "another node will master this recovery session.\n");
391 } 509 }
392 mlog(0, "dlm=%s, new_master=%u, this node=%u, dead_node=%u\n", 510 mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n",
393 dlm->name, dlm->reco.new_master, 511 dlm->name, dlm->dlm_reco_thread_task->pid, dlm->reco.new_master,
394 dlm->node_num, dlm->reco.dead_node); 512 dlm->node_num, dlm->reco.dead_node);
395 513
396 /* it is safe to start everything back up here 514 /* it is safe to start everything back up here
@@ -402,11 +520,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
402 return 0; 520 return 0;
403 521
404master_here: 522master_here:
405 mlog(0, "mastering recovery of %s:%u here(this=%u)!\n", 523 mlog(0, "(%d) mastering recovery of %s:%u here(this=%u)!\n",
524 dlm->dlm_reco_thread_task->pid,
406 dlm->name, dlm->reco.dead_node, dlm->node_num); 525 dlm->name, dlm->reco.dead_node, dlm->node_num);
407 526
408 status = dlm_remaster_locks(dlm, dlm->reco.dead_node); 527 status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
409 if (status < 0) { 528 if (status < 0) {
529 /* we should never hit this anymore */
410 mlog(ML_ERROR, "error %d remastering locks for node %u, " 530 mlog(ML_ERROR, "error %d remastering locks for node %u, "
411 "retrying.\n", status, dlm->reco.dead_node); 531 "retrying.\n", status, dlm->reco.dead_node);
412 /* yield a bit to allow any final network messages 532 /* yield a bit to allow any final network messages
@@ -433,9 +553,16 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
433 int destroy = 0; 553 int destroy = 0;
434 int pass = 0; 554 int pass = 0;
435 555
436 status = dlm_init_recovery_area(dlm, dead_node); 556 do {
437 if (status < 0) 557 /* we have become recovery master. there is no escaping
438 goto leave; 558 * this, so just keep trying until we get it. */
559 status = dlm_init_recovery_area(dlm, dead_node);
560 if (status < 0) {
561 mlog(ML_ERROR, "%s: failed to alloc recovery area, "
562 "retrying\n", dlm->name);
563 msleep(1000);
564 }
565 } while (status != 0);
439 566
440 /* safe to access the node data list without a lock, since this 567 /* safe to access the node data list without a lock, since this
441 * process is the only one to change the list */ 568 * process is the only one to change the list */
@@ -452,16 +579,36 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
452 continue; 579 continue;
453 } 580 }
454 581
455 status = dlm_request_all_locks(dlm, ndata->node_num, dead_node); 582 do {
456 if (status < 0) { 583 status = dlm_request_all_locks(dlm, ndata->node_num,
457 mlog_errno(status); 584 dead_node);
458 if (dlm_is_host_down(status)) 585 if (status < 0) {
459 ndata->state = DLM_RECO_NODE_DATA_DEAD; 586 mlog_errno(status);
460 else { 587 if (dlm_is_host_down(status)) {
461 destroy = 1; 588 /* node died, ignore it for recovery */
462 goto leave; 589 status = 0;
590 ndata->state = DLM_RECO_NODE_DATA_DEAD;
591 /* wait for the domain map to catch up
592 * with the network state. */
593 wait_event_timeout(dlm->dlm_reco_thread_wq,
594 dlm_is_node_dead(dlm,
595 ndata->node_num),
596 msecs_to_jiffies(1000));
597 mlog(0, "waited 1 sec for %u, "
598 "dead? %s\n", ndata->node_num,
599 dlm_is_node_dead(dlm, ndata->node_num) ?
600 "yes" : "no");
601 } else {
602 /* -ENOMEM on the other node */
603 mlog(0, "%s: node %u returned "
604 "%d during recovery, retrying "
605 "after a short wait\n",
606 dlm->name, ndata->node_num,
607 status);
608 msleep(100);
609 }
463 } 610 }
464 } 611 } while (status != 0);
465 612
466 switch (ndata->state) { 613 switch (ndata->state) {
467 case DLM_RECO_NODE_DATA_INIT: 614 case DLM_RECO_NODE_DATA_INIT:
@@ -473,10 +620,9 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
473 mlog(0, "node %u died after requesting " 620 mlog(0, "node %u died after requesting "
474 "recovery info for node %u\n", 621 "recovery info for node %u\n",
475 ndata->node_num, dead_node); 622 ndata->node_num, dead_node);
476 // start all over 623 /* fine. don't need this node's info.
477 destroy = 1; 624 * continue without it. */
478 status = -EAGAIN; 625 break;
479 goto leave;
480 case DLM_RECO_NODE_DATA_REQUESTING: 626 case DLM_RECO_NODE_DATA_REQUESTING:
481 ndata->state = DLM_RECO_NODE_DATA_REQUESTED; 627 ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
482 mlog(0, "now receiving recovery data from " 628 mlog(0, "now receiving recovery data from "
@@ -520,35 +666,26 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
520 BUG(); 666 BUG();
521 break; 667 break;
522 case DLM_RECO_NODE_DATA_DEAD: 668 case DLM_RECO_NODE_DATA_DEAD:
523 mlog(ML_NOTICE, "node %u died after " 669 mlog(0, "node %u died after "
524 "requesting recovery info for " 670 "requesting recovery info for "
525 "node %u\n", ndata->node_num, 671 "node %u\n", ndata->node_num,
526 dead_node); 672 dead_node);
527 spin_unlock(&dlm_reco_state_lock); 673 break;
528 // start all over
529 destroy = 1;
530 status = -EAGAIN;
531 /* instead of spinning like crazy here,
532 * wait for the domain map to catch up
533 * with the network state. otherwise this
534 * can be hit hundreds of times before
535 * the node is really seen as dead. */
536 wait_event_timeout(dlm->dlm_reco_thread_wq,
537 dlm_is_node_dead(dlm,
538 ndata->node_num),
539 msecs_to_jiffies(1000));
540 mlog(0, "waited 1 sec for %u, "
541 "dead? %s\n", ndata->node_num,
542 dlm_is_node_dead(dlm, ndata->node_num) ?
543 "yes" : "no");
544 goto leave;
545 case DLM_RECO_NODE_DATA_RECEIVING: 674 case DLM_RECO_NODE_DATA_RECEIVING:
546 case DLM_RECO_NODE_DATA_REQUESTED: 675 case DLM_RECO_NODE_DATA_REQUESTED:
676 mlog(0, "%s: node %u still in state %s\n",
677 dlm->name, ndata->node_num,
678 ndata->state==DLM_RECO_NODE_DATA_RECEIVING ?
679 "receiving" : "requested");
547 all_nodes_done = 0; 680 all_nodes_done = 0;
548 break; 681 break;
549 case DLM_RECO_NODE_DATA_DONE: 682 case DLM_RECO_NODE_DATA_DONE:
683 mlog(0, "%s: node %u state is done\n",
684 dlm->name, ndata->node_num);
550 break; 685 break;
551 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 686 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
687 mlog(0, "%s: node %u state is finalize\n",
688 dlm->name, ndata->node_num);
552 break; 689 break;
553 } 690 }
554 } 691 }
@@ -578,7 +715,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
578 jiffies, dlm->reco.dead_node, 715 jiffies, dlm->reco.dead_node,
579 dlm->node_num, dlm->reco.new_master); 716 dlm->node_num, dlm->reco.new_master);
580 destroy = 1; 717 destroy = 1;
581 status = ret; 718 status = 0;
582 /* rescan everything marked dirty along the way */ 719 /* rescan everything marked dirty along the way */
583 dlm_kick_thread(dlm, NULL); 720 dlm_kick_thread(dlm, NULL);
584 break; 721 break;
@@ -591,7 +728,6 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
591 728
592 } 729 }
593 730
594leave:
595 if (destroy) 731 if (destroy)
596 dlm_destroy_recovery_area(dlm, dead_node); 732 dlm_destroy_recovery_area(dlm, dead_node);
597 733
@@ -617,7 +753,7 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
617 } 753 }
618 BUG_ON(num == dead_node); 754 BUG_ON(num == dead_node);
619 755
620 ndata = kcalloc(1, sizeof(*ndata), GFP_KERNEL); 756 ndata = kcalloc(1, sizeof(*ndata), GFP_NOFS);
621 if (!ndata) { 757 if (!ndata) {
622 dlm_destroy_recovery_area(dlm, dead_node); 758 dlm_destroy_recovery_area(dlm, dead_node);
623 return -ENOMEM; 759 return -ENOMEM;
@@ -691,16 +827,25 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
691 if (!dlm_grab(dlm)) 827 if (!dlm_grab(dlm))
692 return -EINVAL; 828 return -EINVAL;
693 829
830 if (lr->dead_node != dlm->reco.dead_node) {
831 mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "
832 "dead_node is %u\n", dlm->name, lr->node_idx,
833 lr->dead_node, dlm->reco.dead_node);
834 dlm_print_reco_node_status(dlm);
835 /* this is a hack */
836 dlm_put(dlm);
837 return -ENOMEM;
838 }
694 BUG_ON(lr->dead_node != dlm->reco.dead_node); 839 BUG_ON(lr->dead_node != dlm->reco.dead_node);
695 840
696 item = kcalloc(1, sizeof(*item), GFP_KERNEL); 841 item = kcalloc(1, sizeof(*item), GFP_NOFS);
697 if (!item) { 842 if (!item) {
698 dlm_put(dlm); 843 dlm_put(dlm);
699 return -ENOMEM; 844 return -ENOMEM;
700 } 845 }
701 846
702 /* this will get freed by dlm_request_all_locks_worker */ 847 /* this will get freed by dlm_request_all_locks_worker */
703 buf = (char *) __get_free_page(GFP_KERNEL); 848 buf = (char *) __get_free_page(GFP_NOFS);
704 if (!buf) { 849 if (!buf) {
705 kfree(item); 850 kfree(item);
706 dlm_put(dlm); 851 dlm_put(dlm);
@@ -715,7 +860,7 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
715 spin_lock(&dlm->work_lock); 860 spin_lock(&dlm->work_lock);
716 list_add_tail(&item->list, &dlm->work_list); 861 list_add_tail(&item->list, &dlm->work_list);
717 spin_unlock(&dlm->work_lock); 862 spin_unlock(&dlm->work_lock);
718 schedule_work(&dlm->dispatched_work); 863 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
719 864
720 dlm_put(dlm); 865 dlm_put(dlm);
721 return 0; 866 return 0;
@@ -730,32 +875,34 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
730 struct list_head *iter; 875 struct list_head *iter;
731 int ret; 876 int ret;
732 u8 dead_node, reco_master; 877 u8 dead_node, reco_master;
878 int skip_all_done = 0;
733 879
734 dlm = item->dlm; 880 dlm = item->dlm;
735 dead_node = item->u.ral.dead_node; 881 dead_node = item->u.ral.dead_node;
736 reco_master = item->u.ral.reco_master; 882 reco_master = item->u.ral.reco_master;
737 mres = (struct dlm_migratable_lockres *)data; 883 mres = (struct dlm_migratable_lockres *)data;
738 884
885 mlog(0, "%s: recovery worker started, dead=%u, master=%u\n",
886 dlm->name, dead_node, reco_master);
887
739 if (dead_node != dlm->reco.dead_node || 888 if (dead_node != dlm->reco.dead_node ||
740 reco_master != dlm->reco.new_master) { 889 reco_master != dlm->reco.new_master) {
741 /* show extra debug info if the recovery state is messed */ 890 /* worker could have been created before the recovery master
742 mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), " 891 * died. if so, do not continue, but do not error. */
743 "request(dead=%u, master=%u)\n", 892 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
744 dlm->name, dlm->reco.dead_node, dlm->reco.new_master, 893 mlog(ML_NOTICE, "%s: will not send recovery state, "
745 dead_node, reco_master); 894 "recovery master %u died, thread=(dead=%u,mas=%u)"
746 mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u " 895 " current=(dead=%u,mas=%u)\n", dlm->name,
747 "entry[0]={c=%u:%llu,l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n", 896 reco_master, dead_node, reco_master,
748 dlm->name, mres->lockname_len, mres->lockname, mres->master, 897 dlm->reco.dead_node, dlm->reco.new_master);
749 mres->num_locks, mres->total_locks, mres->flags, 898 } else {
750 dlm_get_lock_cookie_node(mres->ml[0].cookie), 899 mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, "
751 dlm_get_lock_cookie_seq(mres->ml[0].cookie), 900 "master=%u), request(dead=%u, master=%u)\n",
752 mres->ml[0].list, mres->ml[0].flags, 901 dlm->name, dlm->reco.dead_node,
753 mres->ml[0].type, mres->ml[0].convert_type, 902 dlm->reco.new_master, dead_node, reco_master);
754 mres->ml[0].highest_blocked, mres->ml[0].node); 903 }
755 BUG(); 904 goto leave;
756 } 905 }
757 BUG_ON(dead_node != dlm->reco.dead_node);
758 BUG_ON(reco_master != dlm->reco.new_master);
759 906
760 /* lock resources should have already been moved to the 907 /* lock resources should have already been moved to the
761 * dlm->reco.resources list. now move items from that list 908 * dlm->reco.resources list. now move items from that list
@@ -766,12 +913,20 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
766 dlm_move_reco_locks_to_list(dlm, &resources, dead_node); 913 dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
767 914
768 /* now we can begin blasting lockreses without the dlm lock */ 915 /* now we can begin blasting lockreses without the dlm lock */
916
917 /* any errors returned will be due to the new_master dying,
918 * the dlm_reco_thread should detect this */
769 list_for_each(iter, &resources) { 919 list_for_each(iter, &resources) {
770 res = list_entry (iter, struct dlm_lock_resource, recovering); 920 res = list_entry (iter, struct dlm_lock_resource, recovering);
771 ret = dlm_send_one_lockres(dlm, res, mres, reco_master, 921 ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
772 DLM_MRES_RECOVERY); 922 DLM_MRES_RECOVERY);
773 if (ret < 0) 923 if (ret < 0) {
774 mlog_errno(ret); 924 mlog(ML_ERROR, "%s: node %u went down while sending "
925 "recovery state for dead node %u, ret=%d\n", dlm->name,
926 reco_master, dead_node, ret);
927 skip_all_done = 1;
928 break;
929 }
775 } 930 }
776 931
777 /* move the resources back to the list */ 932 /* move the resources back to the list */
@@ -779,10 +934,15 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
779 list_splice_init(&resources, &dlm->reco.resources); 934 list_splice_init(&resources, &dlm->reco.resources);
780 spin_unlock(&dlm->spinlock); 935 spin_unlock(&dlm->spinlock);
781 936
782 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); 937 if (!skip_all_done) {
783 if (ret < 0) 938 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
784 mlog_errno(ret); 939 if (ret < 0) {
785 940 mlog(ML_ERROR, "%s: node %u went down while sending "
941 "recovery all-done for dead node %u, ret=%d\n",
942 dlm->name, reco_master, dead_node, ret);
943 }
944 }
945leave:
786 free_page((unsigned long)data); 946 free_page((unsigned long)data);
787} 947}
788 948
@@ -801,8 +961,14 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
801 961
802 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, 962 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
803 sizeof(done_msg), send_to, &tmpret); 963 sizeof(done_msg), send_to, &tmpret);
804 /* negative status is ignored by the caller */ 964 if (ret < 0) {
805 if (ret >= 0) 965 if (!dlm_is_host_down(ret)) {
966 mlog_errno(ret);
967 mlog(ML_ERROR, "%s: unknown error sending data-done "
968 "to %u\n", dlm->name, send_to);
969 BUG();
970 }
971 } else
806 ret = tmpret; 972 ret = tmpret;
807 return ret; 973 return ret;
808} 974}
@@ -822,7 +988,11 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
822 mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, " 988 mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "
823 "node_idx=%u, this node=%u\n", done->dead_node, 989 "node_idx=%u, this node=%u\n", done->dead_node,
824 dlm->reco.dead_node, done->node_idx, dlm->node_num); 990 dlm->reco.dead_node, done->node_idx, dlm->node_num);
825 BUG_ON(done->dead_node != dlm->reco.dead_node); 991
992 mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node),
993 "Got DATA DONE: dead_node=%u, reco.dead_node=%u, "
994 "node_idx=%u, this node=%u\n", done->dead_node,
995 dlm->reco.dead_node, done->node_idx, dlm->node_num);
826 996
827 spin_lock(&dlm_reco_state_lock); 997 spin_lock(&dlm_reco_state_lock);
828 list_for_each(iter, &dlm->reco.node_data) { 998 list_for_each(iter, &dlm->reco.node_data) {
@@ -1021,8 +1191,9 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
1021 ml->type == LKM_PRMODE) { 1191 ml->type == LKM_PRMODE) {
1022 /* if it is already set, this had better be a PR 1192 /* if it is already set, this had better be a PR
1023 * and it has to match */ 1193 * and it has to match */
1024 if (mres->lvb[0] && (ml->type == LKM_EXMODE || 1194 if (!dlm_lvb_is_empty(mres->lvb) &&
1025 memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) { 1195 (ml->type == LKM_EXMODE ||
1196 memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
1026 mlog(ML_ERROR, "mismatched lvbs!\n"); 1197 mlog(ML_ERROR, "mismatched lvbs!\n");
1027 __dlm_print_one_lock_resource(lock->lockres); 1198 __dlm_print_one_lock_resource(lock->lockres);
1028 BUG(); 1199 BUG();
@@ -1081,22 +1252,25 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1081 * we must send it immediately. */ 1252 * we must send it immediately. */
1082 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, 1253 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
1083 res, total_locks); 1254 res, total_locks);
1084 if (ret < 0) { 1255 if (ret < 0)
1085 // TODO 1256 goto error;
1086 mlog(ML_ERROR, "dlm_send_mig_lockres_msg "
1087 "returned %d, TODO\n", ret);
1088 BUG();
1089 }
1090 } 1257 }
1091 } 1258 }
1092 /* flush any remaining locks */ 1259 /* flush any remaining locks */
1093 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); 1260 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
1094 if (ret < 0) { 1261 if (ret < 0)
1095 // TODO 1262 goto error;
1096 mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, " 1263 return ret;
1097 "TODO\n", ret); 1264
1265error:
1266 mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n",
1267 dlm->name, ret);
1268 if (!dlm_is_host_down(ret))
1098 BUG(); 1269 BUG();
1099 } 1270 mlog(0, "%s: node %u went down while sending %s "
1271 "lockres %.*s\n", dlm->name, send_to,
1272 flags & DLM_MRES_RECOVERY ? "recovery" : "migration",
1273 res->lockname.len, res->lockname.name);
1100 return ret; 1274 return ret;
1101} 1275}
1102 1276
@@ -1144,8 +1318,8 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1144 mlog(0, "all done flag. all lockres data received!\n"); 1318 mlog(0, "all done flag. all lockres data received!\n");
1145 1319
1146 ret = -ENOMEM; 1320 ret = -ENOMEM;
1147 buf = kmalloc(be16_to_cpu(msg->data_len), GFP_KERNEL); 1321 buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS);
1148 item = kcalloc(1, sizeof(*item), GFP_KERNEL); 1322 item = kcalloc(1, sizeof(*item), GFP_NOFS);
1149 if (!buf || !item) 1323 if (!buf || !item)
1150 goto leave; 1324 goto leave;
1151 1325
@@ -1236,7 +1410,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1236 spin_lock(&dlm->work_lock); 1410 spin_lock(&dlm->work_lock);
1237 list_add_tail(&item->list, &dlm->work_list); 1411 list_add_tail(&item->list, &dlm->work_list);
1238 spin_unlock(&dlm->work_lock); 1412 spin_unlock(&dlm->work_lock);
1239 schedule_work(&dlm->dispatched_work); 1413 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
1240 1414
1241leave: 1415leave:
1242 dlm_put(dlm); 1416 dlm_put(dlm);
@@ -1404,6 +1578,7 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
1404 struct dlm_ctxt *dlm = data; 1578 struct dlm_ctxt *dlm = data;
1405 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf; 1579 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
1406 struct dlm_lock_resource *res = NULL; 1580 struct dlm_lock_resource *res = NULL;
1581 unsigned int hash;
1407 int master = DLM_LOCK_RES_OWNER_UNKNOWN; 1582 int master = DLM_LOCK_RES_OWNER_UNKNOWN;
1408 u32 flags = DLM_ASSERT_MASTER_REQUERY; 1583 u32 flags = DLM_ASSERT_MASTER_REQUERY;
1409 1584
@@ -1413,8 +1588,10 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
1413 return master; 1588 return master;
1414 } 1589 }
1415 1590
1591 hash = dlm_lockid_hash(req->name, req->namelen);
1592
1416 spin_lock(&dlm->spinlock); 1593 spin_lock(&dlm->spinlock);
1417 res = __dlm_lookup_lockres(dlm, req->name, req->namelen); 1594 res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash);
1418 if (res) { 1595 if (res) {
1419 spin_lock(&res->spinlock); 1596 spin_lock(&res->spinlock);
1420 master = res->owner; 1597 master = res->owner;
@@ -1481,7 +1658,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1481 struct dlm_lock *newlock = NULL; 1658 struct dlm_lock *newlock = NULL;
1482 struct dlm_lockstatus *lksb = NULL; 1659 struct dlm_lockstatus *lksb = NULL;
1483 int ret = 0; 1660 int ret = 0;
1484 int i; 1661 int i, bad;
1485 struct list_head *iter; 1662 struct list_head *iter;
1486 struct dlm_lock *lock = NULL; 1663 struct dlm_lock *lock = NULL;
1487 1664
@@ -1550,28 +1727,48 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1550 } 1727 }
1551 lksb->flags |= (ml->flags & 1728 lksb->flags |= (ml->flags &
1552 (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)); 1729 (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
1553 1730
1554 if (mres->lvb[0]) { 1731 if (ml->type == LKM_NLMODE)
1732 goto skip_lvb;
1733
1734 if (!dlm_lvb_is_empty(mres->lvb)) {
1555 if (lksb->flags & DLM_LKSB_PUT_LVB) { 1735 if (lksb->flags & DLM_LKSB_PUT_LVB) {
1556 /* other node was trying to update 1736 /* other node was trying to update
1557 * lvb when node died. recreate the 1737 * lvb when node died. recreate the
1558 * lksb with the updated lvb. */ 1738 * lksb with the updated lvb. */
1559 memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN); 1739 memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
1740 /* the lock resource lvb update must happen
1741 * NOW, before the spinlock is dropped.
1742 * we no longer wait for the AST to update
1743 * the lvb. */
1744 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
1560 } else { 1745 } else {
1561 /* otherwise, the node is sending its 1746 /* otherwise, the node is sending its
1562 * most recent valid lvb info */ 1747 * most recent valid lvb info */
1563 BUG_ON(ml->type != LKM_EXMODE && 1748 BUG_ON(ml->type != LKM_EXMODE &&
1564 ml->type != LKM_PRMODE); 1749 ml->type != LKM_PRMODE);
1565 if (res->lvb[0] && (ml->type == LKM_EXMODE || 1750 if (!dlm_lvb_is_empty(res->lvb) &&
1566 memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { 1751 (ml->type == LKM_EXMODE ||
1567 mlog(ML_ERROR, "received bad lvb!\n"); 1752 memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
1568 __dlm_print_one_lock_resource(res); 1753 int i;
1569 BUG(); 1754 mlog(ML_ERROR, "%s:%.*s: received bad "
1755 "lvb! type=%d\n", dlm->name,
1756 res->lockname.len,
1757 res->lockname.name, ml->type);
1758 printk("lockres lvb=[");
1759 for (i=0; i<DLM_LVB_LEN; i++)
1760 printk("%02x", res->lvb[i]);
1761 printk("]\nmigrated lvb=[");
1762 for (i=0; i<DLM_LVB_LEN; i++)
1763 printk("%02x", mres->lvb[i]);
1764 printk("]\n");
1765 dlm_print_one_lock_resource(res);
1766 BUG();
1570 } 1767 }
1571 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); 1768 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
1572 } 1769 }
1573 } 1770 }
1574 1771skip_lvb:
1575 1772
1576 /* NOTE: 1773 /* NOTE:
1577 * wrt lock queue ordering and recovery: 1774 * wrt lock queue ordering and recovery:
@@ -1589,9 +1786,33 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1589 * relative to each other, but clearly *not* 1786 * relative to each other, but clearly *not*
1590 * preserved relative to locks from other nodes. 1787 * preserved relative to locks from other nodes.
1591 */ 1788 */
1789 bad = 0;
1592 spin_lock(&res->spinlock); 1790 spin_lock(&res->spinlock);
1593 dlm_lock_get(newlock); 1791 list_for_each_entry(lock, queue, list) {
1594 list_add_tail(&newlock->list, queue); 1792 if (lock->ml.cookie == ml->cookie) {
1793 u64 c = lock->ml.cookie;
1794 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
1795 "exists on this lockres!\n", dlm->name,
1796 res->lockname.len, res->lockname.name,
1797 dlm_get_lock_cookie_node(c),
1798 dlm_get_lock_cookie_seq(c));
1799
1800 mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
1801 "node=%u, cookie=%u:%llu, queue=%d\n",
1802 ml->type, ml->convert_type, ml->node,
1803 dlm_get_lock_cookie_node(ml->cookie),
1804 dlm_get_lock_cookie_seq(ml->cookie),
1805 ml->list);
1806
1807 __dlm_print_one_lock_resource(res);
1808 bad = 1;
1809 break;
1810 }
1811 }
1812 if (!bad) {
1813 dlm_lock_get(newlock);
1814 list_add_tail(&newlock->list, queue);
1815 }
1595 spin_unlock(&res->spinlock); 1816 spin_unlock(&res->spinlock);
1596 } 1817 }
1597 mlog(0, "done running all the locks\n"); 1818 mlog(0, "done running all the locks\n");
@@ -1615,8 +1836,14 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
1615 struct dlm_lock *lock; 1836 struct dlm_lock *lock;
1616 1837
1617 res->state |= DLM_LOCK_RES_RECOVERING; 1838 res->state |= DLM_LOCK_RES_RECOVERING;
1618 if (!list_empty(&res->recovering)) 1839 if (!list_empty(&res->recovering)) {
1840 mlog(0,
1841 "Recovering res %s:%.*s, is already on recovery list!\n",
1842 dlm->name, res->lockname.len, res->lockname.name);
1619 list_del_init(&res->recovering); 1843 list_del_init(&res->recovering);
1844 }
1845 /* We need to hold a reference while on the recovery list */
1846 dlm_lockres_get(res);
1620 list_add_tail(&res->recovering, &dlm->reco.resources); 1847 list_add_tail(&res->recovering, &dlm->reco.resources);
1621 1848
1622 /* find any pending locks and put them back on proper list */ 1849 /* find any pending locks and put them back on proper list */
@@ -1705,9 +1932,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1705 spin_lock(&res->spinlock); 1932 spin_lock(&res->spinlock);
1706 dlm_change_lockres_owner(dlm, res, new_master); 1933 dlm_change_lockres_owner(dlm, res, new_master);
1707 res->state &= ~DLM_LOCK_RES_RECOVERING; 1934 res->state &= ~DLM_LOCK_RES_RECOVERING;
1708 __dlm_dirty_lockres(dlm, res); 1935 if (!__dlm_lockres_unused(res))
1936 __dlm_dirty_lockres(dlm, res);
1709 spin_unlock(&res->spinlock); 1937 spin_unlock(&res->spinlock);
1710 wake_up(&res->wq); 1938 wake_up(&res->wq);
1939 dlm_lockres_put(res);
1711 } 1940 }
1712 } 1941 }
1713 1942
@@ -1716,7 +1945,7 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1716 * the RECOVERING state and set the owner 1945 * the RECOVERING state and set the owner
1717 * if necessary */ 1946 * if necessary */
1718 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 1947 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
1719 bucket = &(dlm->lockres_hash[i]); 1948 bucket = dlm_lockres_hash(dlm, i);
1720 hlist_for_each_entry(res, hash_iter, bucket, hash_node) { 1949 hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
1721 if (res->state & DLM_LOCK_RES_RECOVERING) { 1950 if (res->state & DLM_LOCK_RES_RECOVERING) {
1722 if (res->owner == dead_node) { 1951 if (res->owner == dead_node) {
@@ -1740,11 +1969,13 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1740 dlm->name, res->lockname.len, 1969 dlm->name, res->lockname.len,
1741 res->lockname.name, res->owner); 1970 res->lockname.name, res->owner);
1742 list_del_init(&res->recovering); 1971 list_del_init(&res->recovering);
1972 dlm_lockres_put(res);
1743 } 1973 }
1744 spin_lock(&res->spinlock); 1974 spin_lock(&res->spinlock);
1745 dlm_change_lockres_owner(dlm, res, new_master); 1975 dlm_change_lockres_owner(dlm, res, new_master);
1746 res->state &= ~DLM_LOCK_RES_RECOVERING; 1976 res->state &= ~DLM_LOCK_RES_RECOVERING;
1747 __dlm_dirty_lockres(dlm, res); 1977 if (!__dlm_lockres_unused(res))
1978 __dlm_dirty_lockres(dlm, res);
1748 spin_unlock(&res->spinlock); 1979 spin_unlock(&res->spinlock);
1749 wake_up(&res->wq); 1980 wake_up(&res->wq);
1750 } 1981 }
@@ -1881,7 +2112,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
1881 * need to be fired as a result. 2112 * need to be fired as a result.
1882 */ 2113 */
1883 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 2114 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
1884 bucket = &(dlm->lockres_hash[i]); 2115 bucket = dlm_lockres_hash(dlm, i);
1885 hlist_for_each_entry(res, iter, bucket, hash_node) { 2116 hlist_for_each_entry(res, iter, bucket, hash_node) {
1886 /* always prune any $RECOVERY entries for dead nodes, 2117 /* always prune any $RECOVERY entries for dead nodes,
1887 * otherwise hangs can occur during later recovery */ 2118 * otherwise hangs can occur during later recovery */
@@ -1921,6 +2152,20 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx)
1921{ 2152{
1922 assert_spin_locked(&dlm->spinlock); 2153 assert_spin_locked(&dlm->spinlock);
1923 2154
2155 if (dlm->reco.new_master == idx) {
2156 mlog(0, "%s: recovery master %d just died\n",
2157 dlm->name, idx);
2158 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2159 /* finalize1 was reached, so it is safe to clear
2160 * the new_master and dead_node. that recovery
2161 * is complete. */
2162 mlog(0, "%s: dead master %d had reached "
2163 "finalize1 state, clearing\n", dlm->name, idx);
2164 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
2165 __dlm_reset_recovery(dlm);
2166 }
2167 }
2168
1924 /* check to see if the node is already considered dead */ 2169 /* check to see if the node is already considered dead */
1925 if (!test_bit(idx, dlm->live_nodes_map)) { 2170 if (!test_bit(idx, dlm->live_nodes_map)) {
1926 mlog(0, "for domain %s, node %d is already dead. " 2171 mlog(0, "for domain %s, node %d is already dead. "
@@ -2084,7 +2329,7 @@ again:
2084 2329
2085 /* set the new_master to this node */ 2330 /* set the new_master to this node */
2086 spin_lock(&dlm->spinlock); 2331 spin_lock(&dlm->spinlock);
2087 dlm->reco.new_master = dlm->node_num; 2332 dlm_set_reco_master(dlm, dlm->node_num);
2088 spin_unlock(&dlm->spinlock); 2333 spin_unlock(&dlm->spinlock);
2089 } 2334 }
2090 2335
@@ -2122,6 +2367,10 @@ again:
2122 mlog(0, "%s: reco master %u is ready to recover %u\n", 2367 mlog(0, "%s: reco master %u is ready to recover %u\n",
2123 dlm->name, dlm->reco.new_master, dlm->reco.dead_node); 2368 dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
2124 status = -EEXIST; 2369 status = -EEXIST;
2370 } else if (ret == DLM_RECOVERING) {
2371 mlog(0, "dlm=%s dlmlock says master node died (this=%u)\n",
2372 dlm->name, dlm->node_num);
2373 goto again;
2125 } else { 2374 } else {
2126 struct dlm_lock_resource *res; 2375 struct dlm_lock_resource *res;
2127 2376
@@ -2153,7 +2402,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
2153 2402
2154 mlog_entry("%u\n", dead_node); 2403 mlog_entry("%u\n", dead_node);
2155 2404
2156 mlog(0, "dead node is %u\n", dead_node); 2405 mlog(0, "%s: dead node is %u\n", dlm->name, dead_node);
2157 2406
2158 spin_lock(&dlm->spinlock); 2407 spin_lock(&dlm->spinlock);
2159 dlm_node_iter_init(dlm->domain_map, &iter); 2408 dlm_node_iter_init(dlm->domain_map, &iter);
@@ -2211,6 +2460,14 @@ retry:
2211 * another ENOMEM */ 2460 * another ENOMEM */
2212 msleep(100); 2461 msleep(100);
2213 goto retry; 2462 goto retry;
2463 } else if (ret == EAGAIN) {
2464 mlog(0, "%s: trying to start recovery of node "
2465 "%u, but node %u is waiting for last recovery "
2466 "to complete, backoff for a bit\n", dlm->name,
2467 dead_node, nodenum);
2468 /* TODO Look into replacing msleep with cond_resched() */
2469 msleep(100);
2470 goto retry;
2214 } 2471 }
2215 } 2472 }
2216 2473
@@ -2226,8 +2483,20 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2226 if (!dlm_grab(dlm)) 2483 if (!dlm_grab(dlm))
2227 return 0; 2484 return 0;
2228 2485
2229 mlog(0, "node %u wants to recover node %u\n", 2486 spin_lock(&dlm->spinlock);
2230 br->node_idx, br->dead_node); 2487 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2488 mlog(0, "%s: node %u wants to recover node %u (%u:%u) "
2489 "but this node is in finalize state, waiting on finalize2\n",
2490 dlm->name, br->node_idx, br->dead_node,
2491 dlm->reco.dead_node, dlm->reco.new_master);
2492 spin_unlock(&dlm->spinlock);
2493 return EAGAIN;
2494 }
2495 spin_unlock(&dlm->spinlock);
2496
2497 mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n",
2498 dlm->name, br->node_idx, br->dead_node,
2499 dlm->reco.dead_node, dlm->reco.new_master);
2231 2500
2232 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node); 2501 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node);
2233 2502
@@ -2249,8 +2518,8 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2249 "node %u changing it to %u\n", dlm->name, 2518 "node %u changing it to %u\n", dlm->name,
2250 dlm->reco.dead_node, br->node_idx, br->dead_node); 2519 dlm->reco.dead_node, br->node_idx, br->dead_node);
2251 } 2520 }
2252 dlm->reco.new_master = br->node_idx; 2521 dlm_set_reco_master(dlm, br->node_idx);
2253 dlm->reco.dead_node = br->dead_node; 2522 dlm_set_reco_dead_node(dlm, br->dead_node);
2254 if (!test_bit(br->dead_node, dlm->recovery_map)) { 2523 if (!test_bit(br->dead_node, dlm->recovery_map)) {
2255 mlog(0, "recovery master %u sees %u as dead, but this " 2524 mlog(0, "recovery master %u sees %u as dead, but this "
2256 "node has not yet. marking %u as dead\n", 2525 "node has not yet. marking %u as dead\n",
@@ -2269,10 +2538,16 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2269 spin_unlock(&dlm->spinlock); 2538 spin_unlock(&dlm->spinlock);
2270 2539
2271 dlm_kick_recovery_thread(dlm); 2540 dlm_kick_recovery_thread(dlm);
2541
2542 mlog(0, "%s: recovery started by node %u, for %u (%u:%u)\n",
2543 dlm->name, br->node_idx, br->dead_node,
2544 dlm->reco.dead_node, dlm->reco.new_master);
2545
2272 dlm_put(dlm); 2546 dlm_put(dlm);
2273 return 0; 2547 return 0;
2274} 2548}
2275 2549
2550#define DLM_FINALIZE_STAGE2 0x01
2276static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) 2551static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
2277{ 2552{
2278 int ret = 0; 2553 int ret = 0;
@@ -2280,25 +2555,31 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
2280 struct dlm_node_iter iter; 2555 struct dlm_node_iter iter;
2281 int nodenum; 2556 int nodenum;
2282 int status; 2557 int status;
2558 int stage = 1;
2283 2559
2284 mlog(0, "finishing recovery for node %s:%u\n", 2560 mlog(0, "finishing recovery for node %s:%u, "
2285 dlm->name, dlm->reco.dead_node); 2561 "stage %d\n", dlm->name, dlm->reco.dead_node, stage);
2286 2562
2287 spin_lock(&dlm->spinlock); 2563 spin_lock(&dlm->spinlock);
2288 dlm_node_iter_init(dlm->domain_map, &iter); 2564 dlm_node_iter_init(dlm->domain_map, &iter);
2289 spin_unlock(&dlm->spinlock); 2565 spin_unlock(&dlm->spinlock);
2290 2566
2567stage2:
2291 memset(&fr, 0, sizeof(fr)); 2568 memset(&fr, 0, sizeof(fr));
2292 fr.node_idx = dlm->node_num; 2569 fr.node_idx = dlm->node_num;
2293 fr.dead_node = dlm->reco.dead_node; 2570 fr.dead_node = dlm->reco.dead_node;
2571 if (stage == 2)
2572 fr.flags |= DLM_FINALIZE_STAGE2;
2294 2573
2295 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 2574 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2296 if (nodenum == dlm->node_num) 2575 if (nodenum == dlm->node_num)
2297 continue; 2576 continue;
2298 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key, 2577 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key,
2299 &fr, sizeof(fr), nodenum, &status); 2578 &fr, sizeof(fr), nodenum, &status);
2300 if (ret >= 0) { 2579 if (ret >= 0)
2301 ret = status; 2580 ret = status;
2581 if (ret < 0) {
2582 mlog_errno(ret);
2302 if (dlm_is_host_down(ret)) { 2583 if (dlm_is_host_down(ret)) {
2303 /* this has no effect on this recovery 2584 /* this has no effect on this recovery
2304 * session, so set the status to zero to 2585 * session, so set the status to zero to
@@ -2306,13 +2587,17 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
2306 mlog(ML_ERROR, "node %u went down after this " 2587 mlog(ML_ERROR, "node %u went down after this "
2307 "node finished recovery.\n", nodenum); 2588 "node finished recovery.\n", nodenum);
2308 ret = 0; 2589 ret = 0;
2590 continue;
2309 } 2591 }
2310 }
2311 if (ret < 0) {
2312 mlog_errno(ret);
2313 break; 2592 break;
2314 } 2593 }
2315 } 2594 }
2595 if (stage == 1) {
2596 /* reset the node_iter back to the top and send finalize2 */
2597 iter.curnode = -1;
2598 stage = 2;
2599 goto stage2;
2600 }
2316 2601
2317 return ret; 2602 return ret;
2318} 2603}
@@ -2321,14 +2606,19 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2321{ 2606{
2322 struct dlm_ctxt *dlm = data; 2607 struct dlm_ctxt *dlm = data;
2323 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf; 2608 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
2609 int stage = 1;
2324 2610
2325 /* ok to return 0, domain has gone away */ 2611 /* ok to return 0, domain has gone away */
2326 if (!dlm_grab(dlm)) 2612 if (!dlm_grab(dlm))
2327 return 0; 2613 return 0;
2328 2614
2329 mlog(0, "node %u finalizing recovery of node %u\n", 2615 if (fr->flags & DLM_FINALIZE_STAGE2)
2330 fr->node_idx, fr->dead_node); 2616 stage = 2;
2331 2617
2618 mlog(0, "%s: node %u finalizing recovery stage%d of "
2619 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
2620 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
2621
2332 spin_lock(&dlm->spinlock); 2622 spin_lock(&dlm->spinlock);
2333 2623
2334 if (dlm->reco.new_master != fr->node_idx) { 2624 if (dlm->reco.new_master != fr->node_idx) {
@@ -2344,13 +2634,41 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2344 BUG(); 2634 BUG();
2345 } 2635 }
2346 2636
2347 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); 2637 switch (stage) {
2348 2638 case 1:
2349 spin_unlock(&dlm->spinlock); 2639 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
2640 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2641 mlog(ML_ERROR, "%s: received finalize1 from "
2642 "new master %u for dead node %u, but "
2643 "this node has already received it!\n",
2644 dlm->name, fr->node_idx, fr->dead_node);
2645 dlm_print_reco_node_status(dlm);
2646 BUG();
2647 }
2648 dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
2649 spin_unlock(&dlm->spinlock);
2650 break;
2651 case 2:
2652 if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) {
2653 mlog(ML_ERROR, "%s: received finalize2 from "
2654 "new master %u for dead node %u, but "
2655 "this node did not have finalize1!\n",
2656 dlm->name, fr->node_idx, fr->dead_node);
2657 dlm_print_reco_node_status(dlm);
2658 BUG();
2659 }
2660 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
2661 spin_unlock(&dlm->spinlock);
2662 dlm_reset_recovery(dlm);
2663 dlm_kick_recovery_thread(dlm);
2664 break;
2665 default:
2666 BUG();
2667 }
2350 2668
2351 dlm_reset_recovery(dlm); 2669 mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n",
2670 dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
2352 2671
2353 dlm_kick_recovery_thread(dlm);
2354 dlm_put(dlm); 2672 dlm_put(dlm);
2355 return 0; 2673 return 0;
2356} 2674}