aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/recover.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/recover.c')
-rw-r--r--fs/dlm/recover.c140
1 files changed, 77 insertions, 63 deletions
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 7554e4dac6bb..3c025fe49ad3 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -361,9 +361,8 @@ static void set_master_lkbs(struct dlm_rsb *r)
361 * rsb's to consider. 361 * rsb's to consider.
362 */ 362 */
363 363
364static void set_new_master(struct dlm_rsb *r, int nodeid) 364static void set_new_master(struct dlm_rsb *r)
365{ 365{
366 r->res_nodeid = nodeid;
367 set_master_lkbs(r); 366 set_master_lkbs(r);
368 rsb_set_flag(r, RSB_NEW_MASTER); 367 rsb_set_flag(r, RSB_NEW_MASTER);
369 rsb_set_flag(r, RSB_NEW_MASTER2); 368 rsb_set_flag(r, RSB_NEW_MASTER2);
@@ -372,31 +371,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
372/* 371/*
373 * We do async lookups on rsb's that need new masters. The rsb's 372 * We do async lookups on rsb's that need new masters. The rsb's
374 * waiting for a lookup reply are kept on the recover_list. 373 * waiting for a lookup reply are kept on the recover_list.
374 *
375 * Another node recovering the master may have sent us a rcom lookup,
376 * and our dlm_master_lookup() set it as the new master, along with
377 * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
378 * equals our_nodeid below).
375 */ 379 */
376 380
377static int recover_master(struct dlm_rsb *r) 381static int recover_master(struct dlm_rsb *r, unsigned int *count)
378{ 382{
379 struct dlm_ls *ls = r->res_ls; 383 struct dlm_ls *ls = r->res_ls;
380 int error, ret_nodeid; 384 int our_nodeid, dir_nodeid;
381 int our_nodeid = dlm_our_nodeid(); 385 int is_removed = 0;
382 int dir_nodeid = dlm_dir_nodeid(r); 386 int error;
387
388 if (is_master(r))
389 return 0;
390
391 is_removed = dlm_is_removed(ls, r->res_nodeid);
392
393 if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
394 return 0;
395
396 our_nodeid = dlm_our_nodeid();
397 dir_nodeid = dlm_dir_nodeid(r);
383 398
384 if (dir_nodeid == our_nodeid) { 399 if (dir_nodeid == our_nodeid) {
385 error = dlm_dir_lookup(ls, our_nodeid, r->res_name, 400 if (is_removed) {
386 r->res_length, &ret_nodeid); 401 r->res_master_nodeid = our_nodeid;
387 if (error) 402 r->res_nodeid = 0;
388 log_error(ls, "recover dir lookup error %d", error); 403 }
389 404
390 if (ret_nodeid == our_nodeid) 405 /* set master of lkbs to ourself when is_removed, or to
391 ret_nodeid = 0; 406 another new master which we set along with NEW_MASTER
392 lock_rsb(r); 407 in dlm_master_lookup */
393 set_new_master(r, ret_nodeid); 408 set_new_master(r);
394 unlock_rsb(r); 409 error = 0;
395 } else { 410 } else {
396 recover_list_add(r); 411 recover_list_add(r);
397 error = dlm_send_rcom_lookup(r, dir_nodeid); 412 error = dlm_send_rcom_lookup(r, dir_nodeid);
398 } 413 }
399 414
415 (*count)++;
400 return error; 416 return error;
401} 417}
402 418
@@ -415,7 +431,7 @@ static int recover_master(struct dlm_rsb *r)
415 * resent. 431 * resent.
416 */ 432 */
417 433
418static int recover_master_static(struct dlm_rsb *r) 434static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
419{ 435{
420 int dir_nodeid = dlm_dir_nodeid(r); 436 int dir_nodeid = dlm_dir_nodeid(r);
421 int new_master = dir_nodeid; 437 int new_master = dir_nodeid;
@@ -423,11 +439,12 @@ static int recover_master_static(struct dlm_rsb *r)
423 if (dir_nodeid == dlm_our_nodeid()) 439 if (dir_nodeid == dlm_our_nodeid())
424 new_master = 0; 440 new_master = 0;
425 441
426 lock_rsb(r);
427 dlm_purge_mstcpy_locks(r); 442 dlm_purge_mstcpy_locks(r);
428 set_new_master(r, new_master); 443 r->res_master_nodeid = dir_nodeid;
429 unlock_rsb(r); 444 r->res_nodeid = new_master;
430 return 1; 445 set_new_master(r);
446 (*count)++;
447 return 0;
431} 448}
432 449
433/* 450/*
@@ -443,7 +460,10 @@ static int recover_master_static(struct dlm_rsb *r)
443int dlm_recover_masters(struct dlm_ls *ls) 460int dlm_recover_masters(struct dlm_ls *ls)
444{ 461{
445 struct dlm_rsb *r; 462 struct dlm_rsb *r;
446 int error = 0, count = 0; 463 unsigned int total = 0;
464 unsigned int count = 0;
465 int nodir = dlm_no_directory(ls);
466 int error;
447 467
448 log_debug(ls, "dlm_recover_masters"); 468 log_debug(ls, "dlm_recover_masters");
449 469
@@ -455,20 +475,23 @@ int dlm_recover_masters(struct dlm_ls *ls)
455 goto out; 475 goto out;
456 } 476 }
457 477
458 if (dlm_no_directory(ls)) 478 lock_rsb(r);
459 count += recover_master_static(r); 479 if (nodir)
460 else if (!is_master(r) && 480 error = recover_master_static(r, &count);
461 (dlm_is_removed(ls, r->res_nodeid) || 481 else
462 rsb_flag(r, RSB_NEW_MASTER))) { 482 error = recover_master(r, &count);
463 recover_master(r); 483 unlock_rsb(r);
464 count++; 484 cond_resched();
465 } 485 total++;
466 486
467 schedule(); 487 if (error) {
488 up_read(&ls->ls_root_sem);
489 goto out;
490 }
468 } 491 }
469 up_read(&ls->ls_root_sem); 492 up_read(&ls->ls_root_sem);
470 493
471 log_debug(ls, "dlm_recover_masters %d resources", count); 494 log_debug(ls, "dlm_recover_masters %u of %u", count, total);
472 495
473 error = dlm_wait_function(ls, &recover_list_empty); 496 error = dlm_wait_function(ls, &recover_list_empty);
474 out: 497 out:
@@ -480,7 +503,7 @@ int dlm_recover_masters(struct dlm_ls *ls)
480int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) 503int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
481{ 504{
482 struct dlm_rsb *r; 505 struct dlm_rsb *r;
483 int nodeid; 506 int ret_nodeid, new_master;
484 507
485 r = recover_list_find(ls, rc->rc_id); 508 r = recover_list_find(ls, rc->rc_id);
486 if (!r) { 509 if (!r) {
@@ -489,12 +512,17 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
489 goto out; 512 goto out;
490 } 513 }
491 514
492 nodeid = rc->rc_result; 515 ret_nodeid = rc->rc_result;
493 if (nodeid == dlm_our_nodeid()) 516
494 nodeid = 0; 517 if (ret_nodeid == dlm_our_nodeid())
518 new_master = 0;
519 else
520 new_master = ret_nodeid;
495 521
496 lock_rsb(r); 522 lock_rsb(r);
497 set_new_master(r, nodeid); 523 r->res_master_nodeid = ret_nodeid;
524 r->res_nodeid = new_master;
525 set_new_master(r);
498 unlock_rsb(r); 526 unlock_rsb(r);
499 recover_list_del(r); 527 recover_list_del(r);
500 528
@@ -791,20 +819,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
791 dlm_hold_rsb(r); 819 dlm_hold_rsb(r);
792 } 820 }
793 821
794 /* If we're using a directory, add tossed rsbs to the root 822 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
795 list; they'll have entries created in the new directory, 823 log_error(ls, "dlm_create_root_list toss not empty");
796 but no other recovery steps should do anything with them. */
797
798 if (dlm_no_directory(ls)) {
799 spin_unlock(&ls->ls_rsbtbl[i].lock);
800 continue;
801 }
802
803 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
804 r = rb_entry(n, struct dlm_rsb, res_hashnode);
805 list_add(&r->res_root_list, &ls->ls_root_list);
806 dlm_hold_rsb(r);
807 }
808 spin_unlock(&ls->ls_rsbtbl[i].lock); 824 spin_unlock(&ls->ls_rsbtbl[i].lock);
809 } 825 }
810 out: 826 out:
@@ -824,28 +840,26 @@ void dlm_release_root_list(struct dlm_ls *ls)
824 up_write(&ls->ls_root_sem); 840 up_write(&ls->ls_root_sem);
825} 841}
826 842
827/* If not using a directory, clear the entire toss list, there's no benefit to 843void dlm_clear_toss(struct dlm_ls *ls)
828 caching the master value since it's fixed. If we are using a dir, keep the
829 rsb's we're the master of. Recovery will add them to the root list and from
830 there they'll be entered in the rebuilt directory. */
831
832void dlm_clear_toss_list(struct dlm_ls *ls)
833{ 844{
834 struct rb_node *n, *next; 845 struct rb_node *n, *next;
835 struct dlm_rsb *rsb; 846 struct dlm_rsb *r;
847 unsigned int count = 0;
836 int i; 848 int i;
837 849
838 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 850 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
839 spin_lock(&ls->ls_rsbtbl[i].lock); 851 spin_lock(&ls->ls_rsbtbl[i].lock);
840 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) { 852 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
841 next = rb_next(n);; 853 next = rb_next(n);
842 rsb = rb_entry(n, struct dlm_rsb, res_hashnode); 854 r = rb_entry(n, struct dlm_rsb, res_hashnode);
843 if (dlm_no_directory(ls) || !is_master(rsb)) { 855 rb_erase(n, &ls->ls_rsbtbl[i].toss);
844 rb_erase(n, &ls->ls_rsbtbl[i].toss); 856 dlm_free_rsb(r);
845 dlm_free_rsb(rsb); 857 count++;
846 }
847 } 858 }
848 spin_unlock(&ls->ls_rsbtbl[i].lock); 859 spin_unlock(&ls->ls_rsbtbl[i].lock);
849 } 860 }
861
862 if (count)
863 log_debug(ls, "dlm_clear_toss %u done", count);
850} 864}
851 865