aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/recover.c
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2012-05-10 11:18:07 -0400
committerDavid Teigland <teigland@redhat.com>2012-07-16 15:16:19 -0400
commitc04fecb4d9f7753e0cbff7edd03ec68f8721cdce (patch)
treeecd82017d49c7bb03b96a8ad1eb4e9a5bb84409a /fs/dlm/recover.c
parentecc728467fb0c3e350b57fc66ed7585c15be50f5 (diff)
dlm: use rsbtbl as resource directory
Remove the dir hash table (dirtbl), and use the rsb hash table (rsbtbl) as the resource directory. It has always been an unnecessary duplication of information. This improves efficiency by using a single rsbtbl lookup in many cases where both rsbtbl and dirtbl lookups were needed previously. This eliminates the need to handle cases of rsbtbl and dirtbl being out of sync. In many cases there will be memory savings because the dir hash table no longer exists. Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm/recover.c')
-rw-r--r--fs/dlm/recover.c140
1 files changed, 77 insertions, 63 deletions
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 7554e4dac6bb..3c025fe49ad3 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -361,9 +361,8 @@ static void set_master_lkbs(struct dlm_rsb *r)
361 * rsb's to consider. 361 * rsb's to consider.
362 */ 362 */
363 363
364static void set_new_master(struct dlm_rsb *r, int nodeid) 364static void set_new_master(struct dlm_rsb *r)
365{ 365{
366 r->res_nodeid = nodeid;
367 set_master_lkbs(r); 366 set_master_lkbs(r);
368 rsb_set_flag(r, RSB_NEW_MASTER); 367 rsb_set_flag(r, RSB_NEW_MASTER);
369 rsb_set_flag(r, RSB_NEW_MASTER2); 368 rsb_set_flag(r, RSB_NEW_MASTER2);
@@ -372,31 +371,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
372/* 371/*
373 * We do async lookups on rsb's that need new masters. The rsb's 372 * We do async lookups on rsb's that need new masters. The rsb's
374 * waiting for a lookup reply are kept on the recover_list. 373 * waiting for a lookup reply are kept on the recover_list.
374 *
375 * Another node recovering the master may have sent us a rcom lookup,
376 * and our dlm_master_lookup() set it as the new master, along with
377 * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
378 * equals our_nodeid below).
375 */ 379 */
376 380
377static int recover_master(struct dlm_rsb *r) 381static int recover_master(struct dlm_rsb *r, unsigned int *count)
378{ 382{
379 struct dlm_ls *ls = r->res_ls; 383 struct dlm_ls *ls = r->res_ls;
380 int error, ret_nodeid; 384 int our_nodeid, dir_nodeid;
381 int our_nodeid = dlm_our_nodeid(); 385 int is_removed = 0;
382 int dir_nodeid = dlm_dir_nodeid(r); 386 int error;
387
388 if (is_master(r))
389 return 0;
390
391 is_removed = dlm_is_removed(ls, r->res_nodeid);
392
393 if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
394 return 0;
395
396 our_nodeid = dlm_our_nodeid();
397 dir_nodeid = dlm_dir_nodeid(r);
383 398
384 if (dir_nodeid == our_nodeid) { 399 if (dir_nodeid == our_nodeid) {
385 error = dlm_dir_lookup(ls, our_nodeid, r->res_name, 400 if (is_removed) {
386 r->res_length, &ret_nodeid); 401 r->res_master_nodeid = our_nodeid;
387 if (error) 402 r->res_nodeid = 0;
388 log_error(ls, "recover dir lookup error %d", error); 403 }
389 404
390 if (ret_nodeid == our_nodeid) 405 /* set master of lkbs to ourself when is_removed, or to
391 ret_nodeid = 0; 406 another new master which we set along with NEW_MASTER
392 lock_rsb(r); 407 in dlm_master_lookup */
393 set_new_master(r, ret_nodeid); 408 set_new_master(r);
394 unlock_rsb(r); 409 error = 0;
395 } else { 410 } else {
396 recover_list_add(r); 411 recover_list_add(r);
397 error = dlm_send_rcom_lookup(r, dir_nodeid); 412 error = dlm_send_rcom_lookup(r, dir_nodeid);
398 } 413 }
399 414
415 (*count)++;
400 return error; 416 return error;
401} 417}
402 418
@@ -415,7 +431,7 @@ static int recover_master(struct dlm_rsb *r)
415 * resent. 431 * resent.
416 */ 432 */
417 433
418static int recover_master_static(struct dlm_rsb *r) 434static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
419{ 435{
420 int dir_nodeid = dlm_dir_nodeid(r); 436 int dir_nodeid = dlm_dir_nodeid(r);
421 int new_master = dir_nodeid; 437 int new_master = dir_nodeid;
@@ -423,11 +439,12 @@ static int recover_master_static(struct dlm_rsb *r)
423 if (dir_nodeid == dlm_our_nodeid()) 439 if (dir_nodeid == dlm_our_nodeid())
424 new_master = 0; 440 new_master = 0;
425 441
426 lock_rsb(r);
427 dlm_purge_mstcpy_locks(r); 442 dlm_purge_mstcpy_locks(r);
428 set_new_master(r, new_master); 443 r->res_master_nodeid = dir_nodeid;
429 unlock_rsb(r); 444 r->res_nodeid = new_master;
430 return 1; 445 set_new_master(r);
446 (*count)++;
447 return 0;
431} 448}
432 449
433/* 450/*
@@ -443,7 +460,10 @@ static int recover_master_static(struct dlm_rsb *r)
443int dlm_recover_masters(struct dlm_ls *ls) 460int dlm_recover_masters(struct dlm_ls *ls)
444{ 461{
445 struct dlm_rsb *r; 462 struct dlm_rsb *r;
446 int error = 0, count = 0; 463 unsigned int total = 0;
464 unsigned int count = 0;
465 int nodir = dlm_no_directory(ls);
466 int error;
447 467
448 log_debug(ls, "dlm_recover_masters"); 468 log_debug(ls, "dlm_recover_masters");
449 469
@@ -455,20 +475,23 @@ int dlm_recover_masters(struct dlm_ls *ls)
455 goto out; 475 goto out;
456 } 476 }
457 477
458 if (dlm_no_directory(ls)) 478 lock_rsb(r);
459 count += recover_master_static(r); 479 if (nodir)
460 else if (!is_master(r) && 480 error = recover_master_static(r, &count);
461 (dlm_is_removed(ls, r->res_nodeid) || 481 else
462 rsb_flag(r, RSB_NEW_MASTER))) { 482 error = recover_master(r, &count);
463 recover_master(r); 483 unlock_rsb(r);
464 count++; 484 cond_resched();
465 } 485 total++;
466 486
467 schedule(); 487 if (error) {
488 up_read(&ls->ls_root_sem);
489 goto out;
490 }
468 } 491 }
469 up_read(&ls->ls_root_sem); 492 up_read(&ls->ls_root_sem);
470 493
471 log_debug(ls, "dlm_recover_masters %d resources", count); 494 log_debug(ls, "dlm_recover_masters %u of %u", count, total);
472 495
473 error = dlm_wait_function(ls, &recover_list_empty); 496 error = dlm_wait_function(ls, &recover_list_empty);
474 out: 497 out:
@@ -480,7 +503,7 @@ int dlm_recover_masters(struct dlm_ls *ls)
480int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) 503int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
481{ 504{
482 struct dlm_rsb *r; 505 struct dlm_rsb *r;
483 int nodeid; 506 int ret_nodeid, new_master;
484 507
485 r = recover_list_find(ls, rc->rc_id); 508 r = recover_list_find(ls, rc->rc_id);
486 if (!r) { 509 if (!r) {
@@ -489,12 +512,17 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
489 goto out; 512 goto out;
490 } 513 }
491 514
492 nodeid = rc->rc_result; 515 ret_nodeid = rc->rc_result;
493 if (nodeid == dlm_our_nodeid()) 516
494 nodeid = 0; 517 if (ret_nodeid == dlm_our_nodeid())
518 new_master = 0;
519 else
520 new_master = ret_nodeid;
495 521
496 lock_rsb(r); 522 lock_rsb(r);
497 set_new_master(r, nodeid); 523 r->res_master_nodeid = ret_nodeid;
524 r->res_nodeid = new_master;
525 set_new_master(r);
498 unlock_rsb(r); 526 unlock_rsb(r);
499 recover_list_del(r); 527 recover_list_del(r);
500 528
@@ -791,20 +819,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
791 dlm_hold_rsb(r); 819 dlm_hold_rsb(r);
792 } 820 }
793 821
794 /* If we're using a directory, add tossed rsbs to the root 822 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
795 list; they'll have entries created in the new directory, 823 log_error(ls, "dlm_create_root_list toss not empty");
796 but no other recovery steps should do anything with them. */
797
798 if (dlm_no_directory(ls)) {
799 spin_unlock(&ls->ls_rsbtbl[i].lock);
800 continue;
801 }
802
803 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
804 r = rb_entry(n, struct dlm_rsb, res_hashnode);
805 list_add(&r->res_root_list, &ls->ls_root_list);
806 dlm_hold_rsb(r);
807 }
808 spin_unlock(&ls->ls_rsbtbl[i].lock); 824 spin_unlock(&ls->ls_rsbtbl[i].lock);
809 } 825 }
810 out: 826 out:
@@ -824,28 +840,26 @@ void dlm_release_root_list(struct dlm_ls *ls)
824 up_write(&ls->ls_root_sem); 840 up_write(&ls->ls_root_sem);
825} 841}
826 842
827/* If not using a directory, clear the entire toss list, there's no benefit to 843void dlm_clear_toss(struct dlm_ls *ls)
828 caching the master value since it's fixed. If we are using a dir, keep the
829 rsb's we're the master of. Recovery will add them to the root list and from
830 there they'll be entered in the rebuilt directory. */
831
832void dlm_clear_toss_list(struct dlm_ls *ls)
833{ 844{
834 struct rb_node *n, *next; 845 struct rb_node *n, *next;
835 struct dlm_rsb *rsb; 846 struct dlm_rsb *r;
847 unsigned int count = 0;
836 int i; 848 int i;
837 849
838 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 850 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
839 spin_lock(&ls->ls_rsbtbl[i].lock); 851 spin_lock(&ls->ls_rsbtbl[i].lock);
840 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) { 852 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
841 next = rb_next(n);; 853 next = rb_next(n);
842 rsb = rb_entry(n, struct dlm_rsb, res_hashnode); 854 r = rb_entry(n, struct dlm_rsb, res_hashnode);
843 if (dlm_no_directory(ls) || !is_master(rsb)) { 855 rb_erase(n, &ls->ls_rsbtbl[i].toss);
844 rb_erase(n, &ls->ls_rsbtbl[i].toss); 856 dlm_free_rsb(r);
845 dlm_free_rsb(rsb); 857 count++;
846 }
847 } 858 }
848 spin_unlock(&ls->ls_rsbtbl[i].lock); 859 spin_unlock(&ls->ls_rsbtbl[i].lock);
849 } 860 }
861
862 if (count)
863 log_debug(ls, "dlm_clear_toss %u done", count);
850} 864}
851 865