aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c142
1 files changed, 85 insertions, 57 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 2082daf083d8..3915b8e14146 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3638,55 +3638,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3638 dlm_put_lkb(lkb); 3638 dlm_put_lkb(lkb);
3639} 3639}
3640 3640
3641int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) 3641static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3642{ 3642{
3643 struct dlm_message *ms = (struct dlm_message *) hd;
3644 struct dlm_ls *ls;
3645 int error = 0;
3646
3647 if (!recovery)
3648 dlm_message_in(ms);
3649
3650 ls = dlm_find_lockspace_global(hd->h_lockspace);
3651 if (!ls) {
3652 log_print("drop message %d from %d for unknown lockspace %d",
3653 ms->m_type, nodeid, hd->h_lockspace);
3654 return -EINVAL;
3655 }
3656
3657 /* recovery may have just ended leaving a bunch of backed-up requests
3658 in the requestqueue; wait while dlm_recoverd clears them */
3659
3660 if (!recovery)
3661 dlm_wait_requestqueue(ls);
3662
3663 /* recovery may have just started while there were a bunch of
3664 in-flight requests -- save them in requestqueue to be processed
3665 after recovery. we can't let dlm_recvd block on the recovery
3666 lock. if dlm_recoverd is calling this function to clear the
3667 requestqueue, it needs to be interrupted (-EINTR) if another
3668 recovery operation is starting. */
3669
3670 while (1) {
3671 if (dlm_locking_stopped(ls)) {
3672 if (recovery) {
3673 error = -EINTR;
3674 goto out;
3675 }
3676 error = dlm_add_requestqueue(ls, nodeid, hd);
3677 if (error == -EAGAIN)
3678 continue;
3679 else {
3680 error = -EINTR;
3681 goto out;
3682 }
3683 }
3684
3685 if (dlm_lock_recovery_try(ls))
3686 break;
3687 schedule();
3688 }
3689
3690 switch (ms->m_type) { 3643 switch (ms->m_type) {
3691 3644
3692 /* messages sent to a master node */ 3645 /* messages sent to a master node */
@@ -3761,17 +3714,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3761 log_error(ls, "unknown message type %d", ms->m_type); 3714 log_error(ls, "unknown message type %d", ms->m_type);
3762 } 3715 }
3763 3716
3764 dlm_unlock_recovery(ls);
3765 out:
3766 dlm_put_lockspace(ls);
3767 dlm_astd_wake(); 3717 dlm_astd_wake();
3768 return error;
3769} 3718}
3770 3719
3720/* If the lockspace is in recovery mode (locking stopped), then normal
3721 messages are saved on the requestqueue for processing after recovery is
3722 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
3723 messages off the requestqueue before we process new ones. This occurs right
3724 after recovery completes when we transition from saving all messages on
3725 requestqueue, to processing all the saved messages, to processing new
3726 messages as they arrive. */
3771 3727
3772/* 3728static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3773 * Recovery related 3729 int nodeid)
3774 */ 3730{
3731 if (dlm_locking_stopped(ls)) {
3732 dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
3733 } else {
3734 dlm_wait_requestqueue(ls);
3735 _receive_message(ls, ms);
3736 }
3737}
3738
3739/* This is called by dlm_recoverd to process messages that were saved on
3740 the requestqueue. */
3741
3742void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3743{
3744 _receive_message(ls, ms);
3745}
3746
3747/* This is called by the midcomms layer when something is received for
3748 the lockspace. It could be either a MSG (normal message sent as part of
3749 standard locking activity) or an RCOM (recovery message sent as part of
3750 lockspace recovery). */
3751
3752void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
3753{
3754 struct dlm_message *ms = (struct dlm_message *) hd;
3755 struct dlm_rcom *rc = (struct dlm_rcom *) hd;
3756 struct dlm_ls *ls;
3757 int type = 0;
3758
3759 switch (hd->h_cmd) {
3760 case DLM_MSG:
3761 dlm_message_in(ms);
3762 type = ms->m_type;
3763 break;
3764 case DLM_RCOM:
3765 dlm_rcom_in(rc);
3766 type = rc->rc_type;
3767 break;
3768 default:
3769 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3770 return;
3771 }
3772
3773 if (hd->h_nodeid != nodeid) {
3774 log_print("invalid h_nodeid %d from %d lockspace %x",
3775 hd->h_nodeid, nodeid, hd->h_lockspace);
3776 return;
3777 }
3778
3779 ls = dlm_find_lockspace_global(hd->h_lockspace);
3780 if (!ls) {
3781 log_print("invalid h_lockspace %x from %d cmd %d type %d",
3782 hd->h_lockspace, nodeid, hd->h_cmd, type);
3783
3784 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3785 dlm_send_ls_not_ready(nodeid, rc);
3786 return;
3787 }
3788
3789 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3790 be inactive (in this ls) before transitioning to recovery mode */
3791
3792 down_read(&ls->ls_recv_active);
3793 if (hd->h_cmd == DLM_MSG)
3794 dlm_receive_message(ls, ms, nodeid);
3795 else
3796 dlm_receive_rcom(ls, rc, nodeid);
3797 up_read(&ls->ls_recv_active);
3798
3799 dlm_put_lockspace(ls);
3800}
3775 3801
3776static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) 3802static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3777{ 3803{
@@ -4429,7 +4455,8 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4429 4455
4430 if (lvb_in && ua->lksb.sb_lvbptr) 4456 if (lvb_in && ua->lksb.sb_lvbptr)
4431 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 4457 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4432 ua->castparam = ua_tmp->castparam; 4458 if (ua_tmp->castparam)
4459 ua->castparam = ua_tmp->castparam;
4433 ua->user_lksb = ua_tmp->user_lksb; 4460 ua->user_lksb = ua_tmp->user_lksb;
4434 4461
4435 error = set_unlock_args(flags, ua, &args); 4462 error = set_unlock_args(flags, ua, &args);
@@ -4474,7 +4501,8 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4474 goto out; 4501 goto out;
4475 4502
4476 ua = (struct dlm_user_args *)lkb->lkb_astparam; 4503 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4477 ua->castparam = ua_tmp->castparam; 4504 if (ua_tmp->castparam)
4505 ua->castparam = ua_tmp->castparam;
4478 ua->user_lksb = ua_tmp->user_lksb; 4506 ua->user_lksb = ua_tmp->user_lksb;
4479 4507
4480 error = set_unlock_args(flags, ua, &args); 4508 error = set_unlock_args(flags, ua, &args);