aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2006-07-12 17:44:04 -0400
committerSteven Whitehouse <swhiteho@redhat.com>2006-07-13 09:25:34 -0400
commit597d0cae0f99f62501e229bed50e8149604015bb (patch)
treeb6cab09ff6fe2246740848164c0a52d5c03136a0 /fs/dlm/lock.c
parent2eb168ca94aba3bcae350ad9b31870955174a218 (diff)
[DLM] dlm: user locks
This changes the way the dlm handles user locks. The core dlm is now aware of user locks so they can be dealt with more efficiently. There is no more dlm_device module which previously managed its own duplicate copy of every user lock. Signed-off-by: Patrick Caulfield <pcaulfie@redhat.com> Signed-off-by: David Teigland <teigland@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c304
1 files changed, 298 insertions, 6 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 5f6963904107..4e222f873b6c 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -55,8 +55,9 @@
55 R: do_xxxx() 55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply() 56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/ 57*/
58 58#include <linux/types.h>
59#include "dlm_internal.h" 59#include "dlm_internal.h"
60#include <linux/dlm_device.h>
60#include "memory.h" 61#include "memory.h"
61#include "lowcomms.h" 62#include "lowcomms.h"
62#include "requestqueue.h" 63#include "requestqueue.h"
@@ -69,6 +70,7 @@
69#include "rcom.h" 70#include "rcom.h"
70#include "recover.h" 71#include "recover.h"
71#include "lvb_table.h" 72#include "lvb_table.h"
73#include "user.h"
72#include "config.h" 74#include "config.h"
73 75
74static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb); 76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
@@ -84,6 +86,8 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
84 struct dlm_message *ms); 86 struct dlm_message *ms);
85static int receive_extralen(struct dlm_message *ms); 87static int receive_extralen(struct dlm_message *ms);
86 88
89#define FAKE_USER_AST (void*)0xff00ff00
90
87/* 91/*
88 * Lock compatibilty matrix - thanks Steve 92 * Lock compatibilty matrix - thanks Steve
89 * UN = Unlocked state. Not really a state, used as a flag 93 * UN = Unlocked state. Not really a state, used as a flag
@@ -152,7 +156,7 @@ static const int __quecvt_compat_matrix[8][8] = {
152 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 156 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
153}; 157};
154 158
155static void dlm_print_lkb(struct dlm_lkb *lkb) 159void dlm_print_lkb(struct dlm_lkb *lkb)
156{ 160{
157 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n" 161 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
158 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n", 162 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
@@ -291,7 +295,7 @@ static int search_rsb_list(struct list_head *head, char *name, int len,
291 if (len == r->res_length && !memcmp(name, r->res_name, len)) 295 if (len == r->res_length && !memcmp(name, r->res_name, len))
292 goto found; 296 goto found;
293 } 297 }
294 return -ENOENT; 298 return -EBADR;
295 299
296 found: 300 found:
297 if (r->res_nodeid && (flags & R_MASTER)) 301 if (r->res_nodeid && (flags & R_MASTER))
@@ -376,7 +380,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
376 if (!error) 380 if (!error)
377 goto out; 381 goto out;
378 382
379 if (error == -ENOENT && !(flags & R_CREATE)) 383 if (error == -EBADR && !(flags & R_CREATE))
380 goto out; 384 goto out;
381 385
382 /* the rsb was found but wasn't a master copy */ 386 /* the rsb was found but wasn't a master copy */
@@ -920,7 +924,7 @@ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
920 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 924 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
921 return; 925 return;
922 926
923 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 927 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
924 if (b == 1) { 928 if (b == 1) {
925 int len = receive_extralen(ms); 929 int len = receive_extralen(ms);
926 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 930 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
@@ -963,6 +967,8 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
963 lkb->lkb_rqmode = DLM_LOCK_IV; 967 lkb->lkb_rqmode = DLM_LOCK_IV;
964 968
965 switch (lkb->lkb_status) { 969 switch (lkb->lkb_status) {
970 case DLM_LKSTS_GRANTED:
971 break;
966 case DLM_LKSTS_CONVERT: 972 case DLM_LKSTS_CONVERT:
967 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 973 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
968 break; 974 break;
@@ -1727,6 +1733,11 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1727 return -DLM_EUNLOCK; 1733 return -DLM_EUNLOCK;
1728} 1734}
1729 1735
1736/* FIXME: if revert_lock() finds that the lkb is granted, we should
1737 skip the queue_cast(ECANCEL). It indicates that the request/convert
1738 completed (and queued a normal ast) just before the cancel; we don't
1739 want to clobber the sb_result for the normal ast with ECANCEL. */
1740
1730static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 1741static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1731{ 1742{
1732 revert_lock(r, lkb); 1743 revert_lock(r, lkb);
@@ -2739,7 +2750,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2739 confirm_master(r, error); 2750 confirm_master(r, error);
2740 break; 2751 break;
2741 2752
2742 case -ENOENT: 2753 case -EBADR:
2743 case -ENOTBLK: 2754 case -ENOTBLK:
2744 /* find_rsb failed to find rsb or rsb wasn't master */ 2755 /* find_rsb failed to find rsb or rsb wasn't master */
2745 r->res_nodeid = -1; 2756 r->res_nodeid = -1;
@@ -3545,3 +3556,284 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3545 return 0; 3556 return 0;
3546} 3557}
3547 3558
3559int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3560 int mode, uint32_t flags, void *name, unsigned int namelen,
3561 uint32_t parent_lkid)
3562{
3563 struct dlm_lkb *lkb;
3564 struct dlm_args args;
3565 int error;
3566
3567 lock_recovery(ls);
3568
3569 error = create_lkb(ls, &lkb);
3570 if (error) {
3571 kfree(ua);
3572 goto out;
3573 }
3574
3575 if (flags & DLM_LKF_VALBLK) {
3576 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3577 if (!ua->lksb.sb_lvbptr) {
3578 kfree(ua);
3579 __put_lkb(ls, lkb);
3580 error = -ENOMEM;
3581 goto out;
3582 }
3583 }
3584
3585 /* After ua is attached to lkb it will be freed by free_lkb().
3586 When DLM_IFL_USER is set, the dlm knows that this is a userspace
3587 lock and that lkb_astparam is the dlm_user_args structure. */
3588
3589 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3590 FAKE_USER_AST, ua, FAKE_USER_AST, &args);
3591 lkb->lkb_flags |= DLM_IFL_USER;
3592 ua->old_mode = DLM_LOCK_IV;
3593
3594 if (error) {
3595 __put_lkb(ls, lkb);
3596 goto out;
3597 }
3598
3599 error = request_lock(ls, lkb, name, namelen, &args);
3600
3601 switch (error) {
3602 case 0:
3603 break;
3604 case -EINPROGRESS:
3605 error = 0;
3606 break;
3607 case -EAGAIN:
3608 error = 0;
3609 /* fall through */
3610 default:
3611 __put_lkb(ls, lkb);
3612 goto out;
3613 }
3614
3615 /* add this new lkb to the per-process list of locks */
3616 spin_lock(&ua->proc->locks_spin);
3617 kref_get(&lkb->lkb_ref);
3618 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3619 spin_unlock(&ua->proc->locks_spin);
3620 out:
3621 unlock_recovery(ls);
3622 return error;
3623}
3624
3625int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3626 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3627{
3628 struct dlm_lkb *lkb;
3629 struct dlm_args args;
3630 struct dlm_user_args *ua;
3631 int error;
3632
3633 lock_recovery(ls);
3634
3635 error = find_lkb(ls, lkid, &lkb);
3636 if (error)
3637 goto out;
3638
3639 /* user can change the params on its lock when it converts it, or
3640 add an lvb that didn't exist before */
3641
3642 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3643
3644 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3645 ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3646 if (!ua->lksb.sb_lvbptr) {
3647 error = -ENOMEM;
3648 goto out_put;
3649 }
3650 }
3651 if (lvb_in && ua->lksb.sb_lvbptr)
3652 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3653
3654 ua->castparam = ua_tmp->castparam;
3655 ua->castaddr = ua_tmp->castaddr;
3656 ua->bastparam = ua_tmp->bastparam;
3657 ua->bastaddr = ua_tmp->bastaddr;
3658 ua->old_mode = lkb->lkb_grmode;
3659
3660 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, FAKE_USER_AST, ua,
3661 FAKE_USER_AST, &args);
3662 if (error)
3663 goto out_put;
3664
3665 error = convert_lock(ls, lkb, &args);
3666
3667 if (error == -EINPROGRESS || error == -EAGAIN)
3668 error = 0;
3669 out_put:
3670 dlm_put_lkb(lkb);
3671 out:
3672 unlock_recovery(ls);
3673 kfree(ua_tmp);
3674 return error;
3675}
3676
3677int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3678 uint32_t flags, uint32_t lkid, char *lvb_in)
3679{
3680 struct dlm_lkb *lkb;
3681 struct dlm_args args;
3682 struct dlm_user_args *ua;
3683 int error;
3684
3685 lock_recovery(ls);
3686
3687 error = find_lkb(ls, lkid, &lkb);
3688 if (error)
3689 goto out;
3690
3691 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3692
3693 if (lvb_in && ua->lksb.sb_lvbptr)
3694 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3695 ua->castparam = ua_tmp->castparam;
3696
3697 error = set_unlock_args(flags, ua, &args);
3698 if (error)
3699 goto out_put;
3700
3701 error = unlock_lock(ls, lkb, &args);
3702
3703 if (error == -DLM_EUNLOCK)
3704 error = 0;
3705 if (error)
3706 goto out_put;
3707
3708 spin_lock(&ua->proc->locks_spin);
3709 list_del(&lkb->lkb_ownqueue);
3710 spin_unlock(&ua->proc->locks_spin);
3711
3712 /* this removes the reference for the proc->locks list added by
3713 dlm_user_request */
3714 unhold_lkb(lkb);
3715 out_put:
3716 dlm_put_lkb(lkb);
3717 out:
3718 unlock_recovery(ls);
3719 return error;
3720}
3721
3722int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3723 uint32_t flags, uint32_t lkid)
3724{
3725 struct dlm_lkb *lkb;
3726 struct dlm_args args;
3727 struct dlm_user_args *ua;
3728 int error;
3729
3730 lock_recovery(ls);
3731
3732 error = find_lkb(ls, lkid, &lkb);
3733 if (error)
3734 goto out;
3735
3736 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3737 ua->castparam = ua_tmp->castparam;
3738
3739 error = set_unlock_args(flags, ua, &args);
3740 if (error)
3741 goto out_put;
3742
3743 error = cancel_lock(ls, lkb, &args);
3744
3745 if (error == -DLM_ECANCEL)
3746 error = 0;
3747 if (error)
3748 goto out_put;
3749
3750 /* this lkb was removed from the WAITING queue */
3751 if (lkb->lkb_grmode == DLM_LOCK_IV) {
3752 spin_lock(&ua->proc->locks_spin);
3753 list_del(&lkb->lkb_ownqueue);
3754 spin_unlock(&ua->proc->locks_spin);
3755 unhold_lkb(lkb);
3756 }
3757 out_put:
3758 dlm_put_lkb(lkb);
3759 out:
3760 unlock_recovery(ls);
3761 return error;
3762}
3763
3764static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3765{
3766 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3767
3768 if (ua->lksb.sb_lvbptr)
3769 kfree(ua->lksb.sb_lvbptr);
3770 kfree(ua);
3771 lkb->lkb_astparam = (long)NULL;
3772
3773 /* TODO: propogate to master if needed */
3774 return 0;
3775}
3776
3777/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3778 Regardless of what rsb queue the lock is on, it's removed and freed. */
3779
3780static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3781{
3782 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3783 struct dlm_args args;
3784 int error;
3785
3786 /* FIXME: we need to handle the case where the lkb is in limbo
3787 while the rsb is being looked up, currently we assert in
3788 _unlock_lock/is_remote because rsb nodeid is -1. */
3789
3790 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3791
3792 error = unlock_lock(ls, lkb, &args);
3793 if (error == -DLM_EUNLOCK)
3794 error = 0;
3795 return error;
3796}
3797
3798/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3799 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3800 which we clear here. */
3801
3802/* proc CLOSING flag is set so no more device_reads should look at proc->asts
3803 list, and no more device_writes should add lkb's to proc->locks list; so we
3804 shouldn't need to take asts_spin or locks_spin here. this assumes that
3805 device reads/writes/closes are serialized -- FIXME: we may need to serialize
3806 them ourself. */
3807
3808void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3809{
3810 struct dlm_lkb *lkb, *safe;
3811
3812 lock_recovery(ls);
3813 mutex_lock(&ls->ls_clear_proc_locks);
3814
3815 list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3816 if (lkb->lkb_ast_type) {
3817 list_del(&lkb->lkb_astqueue);
3818 unhold_lkb(lkb);
3819 }
3820
3821 list_del(&lkb->lkb_ownqueue);
3822
3823 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3824 lkb->lkb_flags |= DLM_IFL_ORPHAN;
3825 orphan_proc_lock(ls, lkb);
3826 } else {
3827 lkb->lkb_flags |= DLM_IFL_DEAD;
3828 unlock_proc_lock(ls, lkb);
3829 }
3830
3831 /* this removes the reference for the proc->locks list
3832 added by dlm_user_request, it may result in the lkb
3833 being freed */
3834
3835 dlm_put_lkb(lkb);
3836 }
3837 mutex_unlock(&ls->ls_clear_proc_locks);
3838 unlock_recovery(ls);
3839}