aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2012-04-23 17:36:01 -0400
committerDavid Teigland <teigland@redhat.com>2012-04-26 16:41:46 -0400
commit6d40c4a708e0e996fd9c60d4093aebba5fe1f749 (patch)
tree85c2c602d5c18c3a4c94e525114af4f449751c10 /fs/dlm/lock.c
parent57638bf3aa64facd9eba0e018b5773f5d2da6c2b (diff)
dlm: improve error and debug messages
Change some existing error/debug messages to collect more useful information, and add some new error/debug messages to address recently found problems. Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c241
1 files changed, 156 insertions, 85 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 49926f1df23e..f3ba70301a45 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -160,11 +160,11 @@ static const int __quecvt_compat_matrix[8][8] = {
160 160
161void dlm_print_lkb(struct dlm_lkb *lkb) 161void dlm_print_lkb(struct dlm_lkb *lkb)
162{ 162{
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n" 163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 " status %d rqmode %d grmode %d wait_type %d\n", 164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d\n",
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, 166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
167 lkb->lkb_grmode, lkb->lkb_wait_type); 167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
168} 168}
169 169
170static void dlm_print_rsb(struct dlm_rsb *r) 170static void dlm_print_rsb(struct dlm_rsb *r)
@@ -589,6 +589,23 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
589 return error; 589 return error;
590} 590}
591 591
592static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
593{
594 struct rb_node *n;
595 struct dlm_rsb *r;
596 int i;
597
598 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
599 spin_lock(&ls->ls_rsbtbl[i].lock);
600 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
601 r = rb_entry(n, struct dlm_rsb, res_hashnode);
602 if (r->res_hash == hash)
603 dlm_dump_rsb(r);
604 }
605 spin_unlock(&ls->ls_rsbtbl[i].lock);
606 }
607}
608
592/* This is only called to add a reference when the code already holds 609/* This is only called to add a reference when the code already holds
593 a valid reference to the rsb, so there's no need for locking. */ 610 a valid reference to the rsb, so there's no need for locking. */
594 611
@@ -1067,8 +1084,9 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1067 goto out_del; 1084 goto out_del;
1068 } 1085 }
1069 1086
1070 log_error(ls, "remwait error %x reply %d flags %x no wait_type", 1087 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1071 lkb->lkb_id, mstype, lkb->lkb_flags); 1088 lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1089 mstype, lkb->lkb_flags);
1072 return -1; 1090 return -1;
1073 1091
1074 out_del: 1092 out_del:
@@ -3375,7 +3393,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3375 return error; 3393 return error;
3376} 3394}
3377 3395
3378static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) 3396static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3379{ 3397{
3380 struct dlm_lkb *lkb; 3398 struct dlm_lkb *lkb;
3381 struct dlm_rsb *r; 3399 struct dlm_rsb *r;
@@ -3415,14 +3433,15 @@ static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3415 error = 0; 3433 error = 0;
3416 if (error) 3434 if (error)
3417 dlm_put_lkb(lkb); 3435 dlm_put_lkb(lkb);
3418 return; 3436 return 0;
3419 3437
3420 fail: 3438 fail:
3421 setup_stub_lkb(ls, ms); 3439 setup_stub_lkb(ls, ms);
3422 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 3440 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3441 return error;
3423} 3442}
3424 3443
3425static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) 3444static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3426{ 3445{
3427 struct dlm_lkb *lkb; 3446 struct dlm_lkb *lkb;
3428 struct dlm_rsb *r; 3447 struct dlm_rsb *r;
@@ -3432,6 +3451,14 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3432 if (error) 3451 if (error)
3433 goto fail; 3452 goto fail;
3434 3453
3454 if (lkb->lkb_remid != ms->m_lkid) {
3455 log_error(ls, "receive_convert %x remid %x remote %d %x",
3456 lkb->lkb_id, lkb->lkb_remid,
3457 ms->m_header.h_nodeid, ms->m_lkid);
3458 error = -ENOENT;
3459 goto fail;
3460 }
3461
3435 r = lkb->lkb_resource; 3462 r = lkb->lkb_resource;
3436 3463
3437 hold_rsb(r); 3464 hold_rsb(r);
@@ -3459,14 +3486,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3459 unlock_rsb(r); 3486 unlock_rsb(r);
3460 put_rsb(r); 3487 put_rsb(r);
3461 dlm_put_lkb(lkb); 3488 dlm_put_lkb(lkb);
3462 return; 3489 return 0;
3463 3490
3464 fail: 3491 fail:
3465 setup_stub_lkb(ls, ms); 3492 setup_stub_lkb(ls, ms);
3466 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 3493 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3494 return error;
3467} 3495}
3468 3496
3469static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) 3497static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3470{ 3498{
3471 struct dlm_lkb *lkb; 3499 struct dlm_lkb *lkb;
3472 struct dlm_rsb *r; 3500 struct dlm_rsb *r;
@@ -3476,6 +3504,14 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3476 if (error) 3504 if (error)
3477 goto fail; 3505 goto fail;
3478 3506
3507 if (lkb->lkb_remid != ms->m_lkid) {
3508 log_error(ls, "receive_unlock %x remid %x remote %d %x",
3509 lkb->lkb_id, lkb->lkb_remid,
3510 ms->m_header.h_nodeid, ms->m_lkid);
3511 error = -ENOENT;
3512 goto fail;
3513 }
3514
3479 r = lkb->lkb_resource; 3515 r = lkb->lkb_resource;
3480 3516
3481 hold_rsb(r); 3517 hold_rsb(r);
@@ -3500,14 +3536,15 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3500 unlock_rsb(r); 3536 unlock_rsb(r);
3501 put_rsb(r); 3537 put_rsb(r);
3502 dlm_put_lkb(lkb); 3538 dlm_put_lkb(lkb);
3503 return; 3539 return 0;
3504 3540
3505 fail: 3541 fail:
3506 setup_stub_lkb(ls, ms); 3542 setup_stub_lkb(ls, ms);
3507 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 3543 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3544 return error;
3508} 3545}
3509 3546
3510static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) 3547static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3511{ 3548{
3512 struct dlm_lkb *lkb; 3549 struct dlm_lkb *lkb;
3513 struct dlm_rsb *r; 3550 struct dlm_rsb *r;
@@ -3535,25 +3572,23 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3535 unlock_rsb(r); 3572 unlock_rsb(r);
3536 put_rsb(r); 3573 put_rsb(r);
3537 dlm_put_lkb(lkb); 3574 dlm_put_lkb(lkb);
3538 return; 3575 return 0;
3539 3576
3540 fail: 3577 fail:
3541 setup_stub_lkb(ls, ms); 3578 setup_stub_lkb(ls, ms);
3542 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 3579 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3580 return error;
3543} 3581}
3544 3582
3545static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) 3583static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3546{ 3584{
3547 struct dlm_lkb *lkb; 3585 struct dlm_lkb *lkb;
3548 struct dlm_rsb *r; 3586 struct dlm_rsb *r;
3549 int error; 3587 int error;
3550 3588
3551 error = find_lkb(ls, ms->m_remid, &lkb); 3589 error = find_lkb(ls, ms->m_remid, &lkb);
3552 if (error) { 3590 if (error)
3553 log_debug(ls, "receive_grant from %d no lkb %x", 3591 return error;
3554 ms->m_header.h_nodeid, ms->m_remid);
3555 return;
3556 }
3557 3592
3558 r = lkb->lkb_resource; 3593 r = lkb->lkb_resource;
3559 3594
@@ -3573,20 +3608,18 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3573 unlock_rsb(r); 3608 unlock_rsb(r);
3574 put_rsb(r); 3609 put_rsb(r);
3575 dlm_put_lkb(lkb); 3610 dlm_put_lkb(lkb);
3611 return 0;
3576} 3612}
3577 3613
3578static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms) 3614static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3579{ 3615{
3580 struct dlm_lkb *lkb; 3616 struct dlm_lkb *lkb;
3581 struct dlm_rsb *r; 3617 struct dlm_rsb *r;
3582 int error; 3618 int error;
3583 3619
3584 error = find_lkb(ls, ms->m_remid, &lkb); 3620 error = find_lkb(ls, ms->m_remid, &lkb);
3585 if (error) { 3621 if (error)
3586 log_debug(ls, "receive_bast from %d no lkb %x", 3622 return error;
3587 ms->m_header.h_nodeid, ms->m_remid);
3588 return;
3589 }
3590 3623
3591 r = lkb->lkb_resource; 3624 r = lkb->lkb_resource;
3592 3625
@@ -3602,6 +3635,7 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3602 unlock_rsb(r); 3635 unlock_rsb(r);
3603 put_rsb(r); 3636 put_rsb(r);
3604 dlm_put_lkb(lkb); 3637 dlm_put_lkb(lkb);
3638 return 0;
3605} 3639}
3606 3640
3607static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 3641static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3656,18 +3690,15 @@ static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3656 do_purge(ls, ms->m_nodeid, ms->m_pid); 3690 do_purge(ls, ms->m_nodeid, ms->m_pid);
3657} 3691}
3658 3692
3659static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) 3693static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3660{ 3694{
3661 struct dlm_lkb *lkb; 3695 struct dlm_lkb *lkb;
3662 struct dlm_rsb *r; 3696 struct dlm_rsb *r;
3663 int error, mstype, result; 3697 int error, mstype, result;
3664 3698
3665 error = find_lkb(ls, ms->m_remid, &lkb); 3699 error = find_lkb(ls, ms->m_remid, &lkb);
3666 if (error) { 3700 if (error)
3667 log_debug(ls, "receive_request_reply from %d no lkb %x", 3701 return error;
3668 ms->m_header.h_nodeid, ms->m_remid);
3669 return;
3670 }
3671 3702
3672 r = lkb->lkb_resource; 3703 r = lkb->lkb_resource;
3673 hold_rsb(r); 3704 hold_rsb(r);
@@ -3758,6 +3789,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3758 unlock_rsb(r); 3789 unlock_rsb(r);
3759 put_rsb(r); 3790 put_rsb(r);
3760 dlm_put_lkb(lkb); 3791 dlm_put_lkb(lkb);
3792 return 0;
3761} 3793}
3762 3794
3763static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3795static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -3796,8 +3828,11 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3796 break; 3828 break;
3797 3829
3798 default: 3830 default:
3799 log_error(r->res_ls, "receive_convert_reply %x error %d", 3831 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
3800 lkb->lkb_id, ms->m_result); 3832 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
3833 ms->m_result);
3834 dlm_print_rsb(r);
3835 dlm_print_lkb(lkb);
3801 } 3836 }
3802} 3837}
3803 3838
@@ -3824,20 +3859,18 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3824 put_rsb(r); 3859 put_rsb(r);
3825} 3860}
3826 3861
3827static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) 3862static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3828{ 3863{
3829 struct dlm_lkb *lkb; 3864 struct dlm_lkb *lkb;
3830 int error; 3865 int error;
3831 3866
3832 error = find_lkb(ls, ms->m_remid, &lkb); 3867 error = find_lkb(ls, ms->m_remid, &lkb);
3833 if (error) { 3868 if (error)
3834 log_debug(ls, "receive_convert_reply from %d no lkb %x", 3869 return error;
3835 ms->m_header.h_nodeid, ms->m_remid);
3836 return;
3837 }
3838 3870
3839 _receive_convert_reply(lkb, ms); 3871 _receive_convert_reply(lkb, ms);
3840 dlm_put_lkb(lkb); 3872 dlm_put_lkb(lkb);
3873 return 0;
3841} 3874}
3842 3875
3843static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3876static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
@@ -3876,20 +3909,18 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3876 put_rsb(r); 3909 put_rsb(r);
3877} 3910}
3878 3911
3879static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) 3912static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3880{ 3913{
3881 struct dlm_lkb *lkb; 3914 struct dlm_lkb *lkb;
3882 int error; 3915 int error;
3883 3916
3884 error = find_lkb(ls, ms->m_remid, &lkb); 3917 error = find_lkb(ls, ms->m_remid, &lkb);
3885 if (error) { 3918 if (error)
3886 log_debug(ls, "receive_unlock_reply from %d no lkb %x", 3919 return error;
3887 ms->m_header.h_nodeid, ms->m_remid);
3888 return;
3889 }
3890 3920
3891 _receive_unlock_reply(lkb, ms); 3921 _receive_unlock_reply(lkb, ms);
3892 dlm_put_lkb(lkb); 3922 dlm_put_lkb(lkb);
3923 return 0;
3893} 3924}
3894 3925
3895static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3926static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
@@ -3928,20 +3959,18 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3928 put_rsb(r); 3959 put_rsb(r);
3929} 3960}
3930 3961
3931static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) 3962static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3932{ 3963{
3933 struct dlm_lkb *lkb; 3964 struct dlm_lkb *lkb;
3934 int error; 3965 int error;
3935 3966
3936 error = find_lkb(ls, ms->m_remid, &lkb); 3967 error = find_lkb(ls, ms->m_remid, &lkb);
3937 if (error) { 3968 if (error)
3938 log_debug(ls, "receive_cancel_reply from %d no lkb %x", 3969 return error;
3939 ms->m_header.h_nodeid, ms->m_remid);
3940 return;
3941 }
3942 3970
3943 _receive_cancel_reply(lkb, ms); 3971 _receive_cancel_reply(lkb, ms);
3944 dlm_put_lkb(lkb); 3972 dlm_put_lkb(lkb);
3973 return 0;
3945} 3974}
3946 3975
3947static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) 3976static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3952,7 +3981,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3952 3981
3953 error = find_lkb(ls, ms->m_lkid, &lkb); 3982 error = find_lkb(ls, ms->m_lkid, &lkb);
3954 if (error) { 3983 if (error) {
3955 log_error(ls, "receive_lookup_reply no lkb"); 3984 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
3956 return; 3985 return;
3957 } 3986 }
3958 3987
@@ -3996,8 +4025,11 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3996 dlm_put_lkb(lkb); 4025 dlm_put_lkb(lkb);
3997} 4026}
3998 4027
3999static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms) 4028static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4029 uint32_t saved_seq)
4000{ 4030{
4031 int error = 0, noent = 0;
4032
4001 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { 4033 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4002 log_debug(ls, "ignore non-member message %d from %d %x %x %d", 4034 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
4003 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, 4035 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
@@ -4010,47 +4042,50 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
4010 /* messages sent to a master node */ 4042 /* messages sent to a master node */
4011 4043
4012 case DLM_MSG_REQUEST: 4044 case DLM_MSG_REQUEST:
4013 receive_request(ls, ms); 4045 error = receive_request(ls, ms);
4014 break; 4046 break;
4015 4047
4016 case DLM_MSG_CONVERT: 4048 case DLM_MSG_CONVERT:
4017 receive_convert(ls, ms); 4049 error = receive_convert(ls, ms);
4018 break; 4050 break;
4019 4051
4020 case DLM_MSG_UNLOCK: 4052 case DLM_MSG_UNLOCK:
4021 receive_unlock(ls, ms); 4053 error = receive_unlock(ls, ms);
4022 break; 4054 break;
4023 4055
4024 case DLM_MSG_CANCEL: 4056 case DLM_MSG_CANCEL:
4025 receive_cancel(ls, ms); 4057 noent = 1;
4058 error = receive_cancel(ls, ms);
4026 break; 4059 break;
4027 4060
4028 /* messages sent from a master node (replies to above) */ 4061 /* messages sent from a master node (replies to above) */
4029 4062
4030 case DLM_MSG_REQUEST_REPLY: 4063 case DLM_MSG_REQUEST_REPLY:
4031 receive_request_reply(ls, ms); 4064 error = receive_request_reply(ls, ms);
4032 break; 4065 break;
4033 4066
4034 case DLM_MSG_CONVERT_REPLY: 4067 case DLM_MSG_CONVERT_REPLY:
4035 receive_convert_reply(ls, ms); 4068 error = receive_convert_reply(ls, ms);
4036 break; 4069 break;
4037 4070
4038 case DLM_MSG_UNLOCK_REPLY: 4071 case DLM_MSG_UNLOCK_REPLY:
4039 receive_unlock_reply(ls, ms); 4072 error = receive_unlock_reply(ls, ms);
4040 break; 4073 break;
4041 4074
4042 case DLM_MSG_CANCEL_REPLY: 4075 case DLM_MSG_CANCEL_REPLY:
4043 receive_cancel_reply(ls, ms); 4076 error = receive_cancel_reply(ls, ms);
4044 break; 4077 break;
4045 4078
4046 /* messages sent from a master node (only two types of async msg) */ 4079 /* messages sent from a master node (only two types of async msg) */
4047 4080
4048 case DLM_MSG_GRANT: 4081 case DLM_MSG_GRANT:
4049 receive_grant(ls, ms); 4082 noent = 1;
4083 error = receive_grant(ls, ms);
4050 break; 4084 break;
4051 4085
4052 case DLM_MSG_BAST: 4086 case DLM_MSG_BAST:
4053 receive_bast(ls, ms); 4087 noent = 1;
4088 error = receive_bast(ls, ms);
4054 break; 4089 break;
4055 4090
4056 /* messages sent to a dir node */ 4091 /* messages sent to a dir node */
@@ -4078,6 +4113,30 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
4078 default: 4113 default:
4079 log_error(ls, "unknown message type %d", ms->m_type); 4114 log_error(ls, "unknown message type %d", ms->m_type);
4080 } 4115 }
4116
4117 /*
4118 * When checking for ENOENT, we're checking the result of
4119 * find_lkb(m_remid):
4120 *
4121 * The lock id referenced in the message wasn't found. This may
4122 * happen in normal usage for the async messages and cancel, so
4123 * only use log_debug for them.
4124 *
4125 * Other errors are expected and normal.
4126 */
4127
4128 if (error == -ENOENT && noent) {
4129 log_debug(ls, "receive %d no %x remote %d %x seq %u",
4130 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4131 ms->m_lkid, saved_seq);
4132 } else if (error == -ENOENT) {
4133 log_error(ls, "receive %d no %x remote %d %x seq %u",
4134 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4135 ms->m_lkid, saved_seq);
4136
4137 if (ms->m_type == DLM_MSG_CONVERT)
4138 dlm_dump_rsb_hash(ls, ms->m_hash);
4139 }
4081} 4140}
4082 4141
4083/* If the lockspace is in recovery mode (locking stopped), then normal 4142/* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4095,16 +4154,17 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4095 dlm_add_requestqueue(ls, nodeid, ms); 4154 dlm_add_requestqueue(ls, nodeid, ms);
4096 } else { 4155 } else {
4097 dlm_wait_requestqueue(ls); 4156 dlm_wait_requestqueue(ls);
4098 _receive_message(ls, ms); 4157 _receive_message(ls, ms, 0);
4099 } 4158 }
4100} 4159}
4101 4160
4102/* This is called by dlm_recoverd to process messages that were saved on 4161/* This is called by dlm_recoverd to process messages that were saved on
4103 the requestqueue. */ 4162 the requestqueue. */
4104 4163
4105void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms) 4164void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4165 uint32_t saved_seq)
4106{ 4166{
4107 _receive_message(ls, ms); 4167 _receive_message(ls, ms, saved_seq);
4108} 4168}
4109 4169
4110/* This is called by the midcomms layer when something is received for 4170/* This is called by the midcomms layer when something is received for
@@ -4653,6 +4713,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4653 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 4713 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4654 struct dlm_rsb *r; 4714 struct dlm_rsb *r;
4655 struct dlm_lkb *lkb; 4715 struct dlm_lkb *lkb;
4716 uint32_t remid = 0;
4656 int error; 4717 int error;
4657 4718
4658 if (rl->rl_parent_lkid) { 4719 if (rl->rl_parent_lkid) {
@@ -4660,6 +4721,8 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4660 goto out; 4721 goto out;
4661 } 4722 }
4662 4723
4724 remid = le32_to_cpu(rl->rl_lkid);
4725
4663 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 4726 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4664 R_MASTER, &r); 4727 R_MASTER, &r);
4665 if (error) 4728 if (error)
@@ -4667,7 +4730,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4667 4730
4668 lock_rsb(r); 4731 lock_rsb(r);
4669 4732
4670 lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid)); 4733 lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
4671 if (lkb) { 4734 if (lkb) {
4672 error = -EEXIST; 4735 error = -EEXIST;
4673 goto out_remid; 4736 goto out_remid;
@@ -4696,9 +4759,9 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4696 unlock_rsb(r); 4759 unlock_rsb(r);
4697 put_rsb(r); 4760 put_rsb(r);
4698 out: 4761 out:
4699 if (error) 4762 if (error && error != -EEXIST)
4700 log_debug(ls, "recover_master_copy %d %x", error, 4763 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
4701 le32_to_cpu(rl->rl_lkid)); 4764 rc->rc_header.h_nodeid, remid, error);
4702 rl->rl_result = cpu_to_le32(error); 4765 rl->rl_result = cpu_to_le32(error);
4703 return error; 4766 return error;
4704} 4767}
@@ -4709,41 +4772,49 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4709 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 4772 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4710 struct dlm_rsb *r; 4773 struct dlm_rsb *r;
4711 struct dlm_lkb *lkb; 4774 struct dlm_lkb *lkb;
4712 int error; 4775 uint32_t lkid, remid;
4776 int error, result;
4777
4778 lkid = le32_to_cpu(rl->rl_lkid);
4779 remid = le32_to_cpu(rl->rl_remid);
4780 result = le32_to_cpu(rl->rl_result);
4713 4781
4714 error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb); 4782 error = find_lkb(ls, lkid, &lkb);
4715 if (error) { 4783 if (error) {
4716 log_error(ls, "recover_process_copy no lkid %x", 4784 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
4717 le32_to_cpu(rl->rl_lkid)); 4785 lkid, rc->rc_header.h_nodeid, remid, result);
4718 return error; 4786 return error;
4719 } 4787 }
4720 4788
4721 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 4789 if (!is_process_copy(lkb)) {
4722 4790 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
4723 error = le32_to_cpu(rl->rl_result); 4791 lkid, rc->rc_header.h_nodeid, remid, result);
4792 dlm_print_lkb(lkb);
4793 return -EINVAL;
4794 }
4724 4795
4725 r = lkb->lkb_resource; 4796 r = lkb->lkb_resource;
4726 hold_rsb(r); 4797 hold_rsb(r);
4727 lock_rsb(r); 4798 lock_rsb(r);
4728 4799
4729 switch (error) { 4800 switch (result) {
4730 case -EBADR: 4801 case -EBADR:
4731 /* There's a chance the new master received our lock before 4802 /* There's a chance the new master received our lock before
4732 dlm_recover_master_reply(), this wouldn't happen if we did 4803 dlm_recover_master_reply(), this wouldn't happen if we did
4733 a barrier between recover_masters and recover_locks. */ 4804 a barrier between recover_masters and recover_locks. */
4734 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id, 4805
4735 (unsigned long)r, r->res_name); 4806 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
4807 lkid, rc->rc_header.h_nodeid, remid, result);
4808
4736 dlm_send_rcom_lock(r, lkb); 4809 dlm_send_rcom_lock(r, lkb);
4737 goto out; 4810 goto out;
4738 case -EEXIST: 4811 case -EEXIST:
4739 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4740 /* fall through */
4741 case 0: 4812 case 0:
4742 lkb->lkb_remid = le32_to_cpu(rl->rl_remid); 4813 lkb->lkb_remid = remid;
4743 break; 4814 break;
4744 default: 4815 default:
4745 log_error(ls, "dlm_recover_process_copy unknown error %d %x", 4816 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
4746 error, lkb->lkb_id); 4817 lkid, rc->rc_header.h_nodeid, remid, result);
4747 } 4818 }
4748 4819
4749 /* an ack for dlm_recover_locks() which waits for replies from 4820 /* an ack for dlm_recover_locks() which waits for replies from