aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorJoel Becker <joel.becker@oracle.com>2008-02-01 17:45:08 -0500
committerMark Fasheh <mfasheh@suse.com>2008-04-18 11:56:04 -0400
commitde551246e7bc5558371c3427889a8db1b8cc60f4 (patch)
tree31ae24280b8c3517434a894c32e9aa5faae2a173 /fs
parent0abd6d1803b01c741430af270026d1d95a103d9c (diff)
ocfs2: Remove CANCELGRANT from the view of dlmglue.
o2dlm has the non-standard behavior of providing a cancel callback (unlock_ast) even when the cancel has failed (the locking operation succeeded without canceling). This is called CANCELGRANT after the status code sent to the callback. fs/dlm does not provide this callback, so dlmglue must be changed to live without it. o2dlm_unlock_ast_wrapper() in stackglue now ignores CANCELGRANT calls. Because dlmglue no longer sees CANCELGRANT, ocfs2_unlock_ast() no longer needs to check for it. ocfs2_locking_ast() must catch that a cancel was tried and clear the cancel state. Making these changes opens up a locking race. dlmglue uses the the OCFS2_LOCK_BUSY flag to ensure only one thread is calling the dlm at any one time. But dlmglue must unlock the lockres before calling into the dlm. In the small window of time between unlocking the lockres and calling the dlm, the downconvert thread can try to cancel the lock. The downconvert thread is checking the OCFS2_LOCK_BUSY flag - it doesn't know that ocfs2_dlm_lock() has not yet been called. Because ocfs2_dlm_lock() has not yet been called, the cancel operation will just be a no-op. There's nothing to cancel. With CANCELGRANT, dlmglue uses the CANCELGRANT callback to clear up the cancel state. When it comes around again, it will retry the cancel. Eventually, the first thread will have called into ocfs2_dlm_lock(), and either the lock or the cancel will succeed. The downconvert thread can then do its downconvert. Without CANCELGRANT, there is nothing to clean up the cancellation state. The downconvert thread does not know to retry its operations. More importantly, the original lock may be blocking on the other node that is trying to cancel us. With neither able to make progress, the ast is never called and the cancellation state is never cleaned up that way. dlmglue is deadlocked. The OCFS2_LOCK_PENDING flag is introduced to remedy this window. It is set at the same time OCFS2_LOCK_BUSY is. Thus, the downconvert thread can check whether the lock is cancelable. If not, it just loops around to try again. Once ocfs2_dlm_lock() is called, the thread then clears OCFS2_LOCK_PENDING and wakes the downconvert thread. Now, if the downconvert thread finds the lock BUSY, it can safely try to cancel it. Whether the cancel works or not, the state will be properly set and the lock processing can continue. Signed-off-by: Joel Becker <joel.becker@oracle.com> Signed-off-by: Mark Fasheh <mfasheh@suse.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/ocfs2/dlmglue.c199
-rw-r--r--fs/ocfs2/ocfs2.h4
-rw-r--r--fs/ocfs2/stackglue.c40
3 files changed, 188 insertions, 55 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index c7653bb343e1..295c47f7aba2 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -311,12 +311,13 @@ static int ocfs2_inode_lock_update(struct inode *inode,
311 struct buffer_head **bh); 311 struct buffer_head **bh);
312static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 312static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
313static inline int ocfs2_highest_compat_lock_level(int level); 313static inline int ocfs2_highest_compat_lock_level(int level);
314static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 314static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
315 int new_level); 315 int new_level);
316static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 316static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
317 struct ocfs2_lock_res *lockres, 317 struct ocfs2_lock_res *lockres,
318 int new_level, 318 int new_level,
319 int lvb); 319 int lvb,
320 unsigned int generation);
320static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 321static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
321 struct ocfs2_lock_res *lockres); 322 struct ocfs2_lock_res *lockres);
322static int ocfs2_cancel_convert(struct ocfs2_super *osb, 323static int ocfs2_cancel_convert(struct ocfs2_super *osb,
@@ -736,6 +737,113 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
736 return needs_downconvert; 737 return needs_downconvert;
737} 738}
738 739
740/*
741 * OCFS2_LOCK_PENDING and l_pending_gen.
742 *
743 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting
744 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock()
745 * for more details on the race.
746 *
747 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces
748 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock()
749 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear
750 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns,
751 * the caller is going to try to clear PENDING again. If nothing else is
752 * happening, __lockres_clear_pending() sees PENDING is unset and does
753 * nothing.
754 *
755 * But what if another path (eg downconvert thread) has just started a
756 * new locking action? The other path has re-set PENDING. Our path
757 * cannot clear PENDING, because that will re-open the original race
758 * window.
759 *
760 * [Example]
761 *
762 * ocfs2_meta_lock()
763 * ocfs2_cluster_lock()
764 * set BUSY
765 * set PENDING
766 * drop l_lock
767 * ocfs2_dlm_lock()
768 * ocfs2_locking_ast() ocfs2_downconvert_thread()
769 * clear PENDING ocfs2_unblock_lock()
770 * take_l_lock
771 * !BUSY
772 * ocfs2_prepare_downconvert()
773 * set BUSY
774 * set PENDING
775 * drop l_lock
776 * take l_lock
777 * clear PENDING
778 * drop l_lock
779 * <window>
780 * ocfs2_dlm_lock()
781 *
782 * So as you can see, we now have a window where l_lock is not held,
783 * PENDING is not set, and ocfs2_dlm_lock() has not been called.
784 *
785 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
786 * set by ocfs2_prepare_downconvert(). That wasn't nice.
787 *
788 * To solve this we introduce l_pending_gen. A call to
789 * lockres_clear_pending() will only do so when it is passed a generation
790 * number that matches the lockres. lockres_set_pending() will return the
791 * current generation number. When ocfs2_cluster_lock() goes to clear
792 * PENDING, it passes the generation it got from set_pending(). In our
793 * example above, the generation numbers will *not* match. Thus,
794 * ocfs2_cluster_lock() will not clear the PENDING set by
795 * ocfs2_prepare_downconvert().
796 */
797
798/* Unlocked version for ocfs2_locking_ast() */
799static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
800 unsigned int generation,
801 struct ocfs2_super *osb)
802{
803 assert_spin_locked(&lockres->l_lock);
804
805 /*
806 * The ast and locking functions can race us here. The winner
807 * will clear pending, the loser will not.
808 */
809 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
810 (lockres->l_pending_gen != generation))
811 return;
812
813 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
814 lockres->l_pending_gen++;
815
816 /*
817 * The downconvert thread may have skipped us because we
818 * were PENDING. Wake it up.
819 */
820 if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
821 ocfs2_wake_downconvert_thread(osb);
822}
823
824/* Locked version for callers of ocfs2_dlm_lock() */
825static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
826 unsigned int generation,
827 struct ocfs2_super *osb)
828{
829 unsigned long flags;
830
831 spin_lock_irqsave(&lockres->l_lock, flags);
832 __lockres_clear_pending(lockres, generation, osb);
833 spin_unlock_irqrestore(&lockres->l_lock, flags);
834}
835
836static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
837{
838 assert_spin_locked(&lockres->l_lock);
839 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
840
841 lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
842
843 return lockres->l_pending_gen;
844}
845
846
739static void ocfs2_blocking_ast(void *opaque, int level) 847static void ocfs2_blocking_ast(void *opaque, int level)
740{ 848{
741 struct ocfs2_lock_res *lockres = opaque; 849 struct ocfs2_lock_res *lockres = opaque;
@@ -770,6 +878,7 @@ static void ocfs2_blocking_ast(void *opaque, int level)
770static void ocfs2_locking_ast(void *opaque) 878static void ocfs2_locking_ast(void *opaque)
771{ 879{
772 struct ocfs2_lock_res *lockres = opaque; 880 struct ocfs2_lock_res *lockres = opaque;
881 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
773 unsigned long flags; 882 unsigned long flags;
774 883
775 spin_lock_irqsave(&lockres->l_lock, flags); 884 spin_lock_irqsave(&lockres->l_lock, flags);
@@ -805,6 +914,18 @@ static void ocfs2_locking_ast(void *opaque)
805 * can catch it. */ 914 * can catch it. */
806 lockres->l_action = OCFS2_AST_INVALID; 915 lockres->l_action = OCFS2_AST_INVALID;
807 916
917 /* Did we try to cancel this lock? Clear that state */
918 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
919 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
920
921 /*
922 * We may have beaten the locking functions here. We certainly
923 * know that dlm_lock() has been called :-)
924 * Because we can't have two lock calls in flight at once, we
925 * can use lockres->l_pending_gen.
926 */
927 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb);
928
808 wake_up(&lockres->l_event); 929 wake_up(&lockres->l_event);
809 spin_unlock_irqrestore(&lockres->l_lock, flags); 930 spin_unlock_irqrestore(&lockres->l_lock, flags);
810} 931}
@@ -838,6 +959,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
838{ 959{
839 int ret = 0; 960 int ret = 0;
840 unsigned long flags; 961 unsigned long flags;
962 unsigned int gen;
841 963
842 mlog_entry_void(); 964 mlog_entry_void();
843 965
@@ -854,6 +976,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
854 lockres->l_action = OCFS2_AST_ATTACH; 976 lockres->l_action = OCFS2_AST_ATTACH;
855 lockres->l_requested = level; 977 lockres->l_requested = level;
856 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 978 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
979 gen = lockres_set_pending(lockres);
857 spin_unlock_irqrestore(&lockres->l_lock, flags); 980 spin_unlock_irqrestore(&lockres->l_lock, flags);
858 981
859 ret = ocfs2_dlm_lock(osb->cconn, 982 ret = ocfs2_dlm_lock(osb->cconn,
@@ -863,6 +986,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
863 lockres->l_name, 986 lockres->l_name,
864 OCFS2_LOCK_ID_MAX_LEN - 1, 987 OCFS2_LOCK_ID_MAX_LEN - 1,
865 lockres); 988 lockres);
989 lockres_clear_pending(lockres, gen, osb);
866 if (ret) { 990 if (ret) {
867 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 991 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
868 ocfs2_recover_from_dlm_error(lockres, 1); 992 ocfs2_recover_from_dlm_error(lockres, 1);
@@ -988,6 +1112,7 @@ static int ocfs2_cluster_lock(struct ocfs2_super *osb,
988 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1112 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
989 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1113 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
990 unsigned long flags; 1114 unsigned long flags;
1115 unsigned int gen;
991 1116
992 mlog_entry_void(); 1117 mlog_entry_void();
993 1118
@@ -1046,6 +1171,7 @@ again:
1046 1171
1047 lockres->l_requested = level; 1172 lockres->l_requested = level;
1048 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1173 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1174 gen = lockres_set_pending(lockres);
1049 spin_unlock_irqrestore(&lockres->l_lock, flags); 1175 spin_unlock_irqrestore(&lockres->l_lock, flags);
1050 1176
1051 BUG_ON(level == DLM_LOCK_IV); 1177 BUG_ON(level == DLM_LOCK_IV);
@@ -1062,6 +1188,7 @@ again:
1062 lockres->l_name, 1188 lockres->l_name,
1063 OCFS2_LOCK_ID_MAX_LEN - 1, 1189 OCFS2_LOCK_ID_MAX_LEN - 1,
1064 lockres); 1190 lockres);
1191 lockres_clear_pending(lockres, gen, osb);
1065 if (ret) { 1192 if (ret) {
1066 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1193 if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1067 (ret != -EAGAIN)) { 1194 (ret != -EAGAIN)) {
@@ -1506,6 +1633,7 @@ out:
1506void ocfs2_file_unlock(struct file *file) 1633void ocfs2_file_unlock(struct file *file)
1507{ 1634{
1508 int ret; 1635 int ret;
1636 unsigned int gen;
1509 unsigned long flags; 1637 unsigned long flags;
1510 struct ocfs2_file_private *fp = file->private_data; 1638 struct ocfs2_file_private *fp = file->private_data;
1511 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1639 struct ocfs2_lock_res *lockres = &fp->fp_flock;
@@ -1531,11 +1659,11 @@ void ocfs2_file_unlock(struct file *file)
1531 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1659 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1532 lockres->l_blocking = DLM_LOCK_EX; 1660 lockres->l_blocking = DLM_LOCK_EX;
1533 1661
1534 ocfs2_prepare_downconvert(lockres, LKM_NLMODE); 1662 gen = ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
1535 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1663 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1536 spin_unlock_irqrestore(&lockres->l_lock, flags); 1664 spin_unlock_irqrestore(&lockres->l_lock, flags);
1537 1665
1538 ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0); 1666 ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0, gen);
1539 if (ret) { 1667 if (ret) {
1540 mlog_errno(ret); 1668 mlog_errno(ret);
1541 return; 1669 return;
@@ -2555,23 +2683,7 @@ static void ocfs2_unlock_ast(void *opaque, int error)
2555 lockres->l_unlock_action); 2683 lockres->l_unlock_action);
2556 2684
2557 spin_lock_irqsave(&lockres->l_lock, flags); 2685 spin_lock_irqsave(&lockres->l_lock, flags);
2558 /* We tried to cancel a convert request, but it was already 2686 if (error) {
2559 * granted. All we want to do here is clear our unlock
2560 * state. The wake_up call done at the bottom is redundant
2561 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2562 * hurt anything anyway */
2563 if (error == -DLM_ECANCEL &&
2564 lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2565 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2566
2567 /* We don't clear the busy flag in this case as it
2568 * should have been cleared by the ast which the dlm
2569 * has called. */
2570 goto complete_unlock;
2571 }
2572
2573 /* DLM_EUNLOCK is the success code for unlock */
2574 if (error != -DLM_EUNLOCK) {
2575 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 2687 mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
2576 "unlock_action %d\n", error, lockres->l_name, 2688 "unlock_action %d\n", error, lockres->l_name,
2577 lockres->l_unlock_action); 2689 lockres->l_unlock_action);
@@ -2592,7 +2704,6 @@ static void ocfs2_unlock_ast(void *opaque, int error)
2592 } 2704 }
2593 2705
2594 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2706 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2595complete_unlock:
2596 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2707 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2597 spin_unlock_irqrestore(&lockres->l_lock, flags); 2708 spin_unlock_irqrestore(&lockres->l_lock, flags);
2598 2709
@@ -2768,8 +2879,8 @@ int ocfs2_drop_inode_locks(struct inode *inode)
2768 return status; 2879 return status;
2769} 2880}
2770 2881
2771static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 2882static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2772 int new_level) 2883 int new_level)
2773{ 2884{
2774 assert_spin_locked(&lockres->l_lock); 2885 assert_spin_locked(&lockres->l_lock);
2775 2886
@@ -2787,12 +2898,14 @@ static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2787 lockres->l_action = OCFS2_AST_DOWNCONVERT; 2898 lockres->l_action = OCFS2_AST_DOWNCONVERT;
2788 lockres->l_requested = new_level; 2899 lockres->l_requested = new_level;
2789 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2900 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2901 return lockres_set_pending(lockres);
2790} 2902}
2791 2903
2792static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 2904static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2793 struct ocfs2_lock_res *lockres, 2905 struct ocfs2_lock_res *lockres,
2794 int new_level, 2906 int new_level,
2795 int lvb) 2907 int lvb,
2908 unsigned int generation)
2796{ 2909{
2797 int ret; 2910 int ret;
2798 u32 dlm_flags = DLM_LKF_CONVERT; 2911 u32 dlm_flags = DLM_LKF_CONVERT;
@@ -2809,6 +2922,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2809 lockres->l_name, 2922 lockres->l_name,
2810 OCFS2_LOCK_ID_MAX_LEN - 1, 2923 OCFS2_LOCK_ID_MAX_LEN - 1,
2811 lockres); 2924 lockres);
2925 lockres_clear_pending(lockres, generation, osb);
2812 if (ret) { 2926 if (ret) {
2813 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 2927 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
2814 ocfs2_recover_from_dlm_error(lockres, 1); 2928 ocfs2_recover_from_dlm_error(lockres, 1);
@@ -2883,6 +2997,7 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2883 int new_level; 2997 int new_level;
2884 int ret = 0; 2998 int ret = 0;
2885 int set_lvb = 0; 2999 int set_lvb = 0;
3000 unsigned int gen;
2886 3001
2887 mlog_entry_void(); 3002 mlog_entry_void();
2888 3003
@@ -2892,6 +3007,32 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2892 3007
2893recheck: 3008recheck:
2894 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3009 if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3010 /* XXX
3011 * This is a *big* race. The OCFS2_LOCK_PENDING flag
3012 * exists entirely for one reason - another thread has set
3013 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3014 *
3015 * If we do ocfs2_cancel_convert() before the other thread
3016 * calls dlm_lock(), our cancel will do nothing. We will
3017 * get no ast, and we will have no way of knowing the
3018 * cancel failed. Meanwhile, the other thread will call
3019 * into dlm_lock() and wait...forever.
3020 *
3021 * Why forever? Because another node has asked for the
3022 * lock first; that's why we're here in unblock_lock().
3023 *
3024 * The solution is OCFS2_LOCK_PENDING. When PENDING is
3025 * set, we just requeue the unblock. Only when the other
3026 * thread has called dlm_lock() and cleared PENDING will
3027 * we then cancel their request.
3028 *
3029 * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3030 * at the same time they set OCFS2_DLM_BUSY. They must
3031 * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3032 */
3033 if (lockres->l_flags & OCFS2_LOCK_PENDING)
3034 goto leave_requeue;
3035
2895 ctl->requeue = 1; 3036 ctl->requeue = 1;
2896 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3037 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2897 spin_unlock_irqrestore(&lockres->l_lock, flags); 3038 spin_unlock_irqrestore(&lockres->l_lock, flags);
@@ -2971,9 +3112,11 @@ downconvert:
2971 lockres->l_ops->set_lvb(lockres); 3112 lockres->l_ops->set_lvb(lockres);
2972 } 3113 }
2973 3114
2974 ocfs2_prepare_downconvert(lockres, new_level); 3115 gen = ocfs2_prepare_downconvert(lockres, new_level);
2975 spin_unlock_irqrestore(&lockres->l_lock, flags); 3116 spin_unlock_irqrestore(&lockres->l_lock, flags);
2976 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb); 3117 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3118 gen);
3119
2977leave: 3120leave:
2978 mlog_exit(ret); 3121 mlog_exit(ret);
2979 return ret; 3122 return ret;
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 31dc28b48392..af929eca5412 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -98,6 +98,9 @@ enum ocfs2_unlock_action {
98 * dropped. */ 98 * dropped. */
99#define OCFS2_LOCK_QUEUED (0x00000100) /* queued for downconvert */ 99#define OCFS2_LOCK_QUEUED (0x00000100) /* queued for downconvert */
100#define OCFS2_LOCK_NOCACHE (0x00000200) /* don't use a holder count */ 100#define OCFS2_LOCK_NOCACHE (0x00000200) /* don't use a holder count */
101#define OCFS2_LOCK_PENDING (0x00000400) /* This lockres is pending a
102 call to dlm_lock. Only
103 exists with BUSY set. */
101 104
102struct ocfs2_lock_res_ops; 105struct ocfs2_lock_res_ops;
103 106
@@ -124,6 +127,7 @@ struct ocfs2_lock_res {
124 enum ocfs2_unlock_action l_unlock_action; 127 enum ocfs2_unlock_action l_unlock_action;
125 int l_requested; 128 int l_requested;
126 int l_blocking; 129 int l_blocking;
130 unsigned int l_pending_gen;
127 131
128 wait_queue_head_t l_event; 132 wait_queue_head_t l_event;
129 133
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index 670fa945c212..abdb9f6f4cc9 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -104,8 +104,8 @@ static int flags_to_o2dlm(u32 flags)
104 * 104 *
105 * DLM_NORMAL: 0 105 * DLM_NORMAL: 0
106 * DLM_NOTQUEUED: -EAGAIN 106 * DLM_NOTQUEUED: -EAGAIN
107 * DLM_CANCELGRANT: -DLM_ECANCEL 107 * DLM_CANCELGRANT: -EBUSY
108 * DLM_CANCEL: -DLM_EUNLOCK 108 * DLM_CANCEL: -DLM_ECANCEL
109 */ 109 */
110/* Keep in sync with dlmapi.h */ 110/* Keep in sync with dlmapi.h */
111static int status_map[] = { 111static int status_map[] = {
@@ -113,13 +113,13 @@ static int status_map[] = {
113 [DLM_GRANTED] = -EINVAL, 113 [DLM_GRANTED] = -EINVAL,
114 [DLM_DENIED] = -EACCES, 114 [DLM_DENIED] = -EACCES,
115 [DLM_DENIED_NOLOCKS] = -EACCES, 115 [DLM_DENIED_NOLOCKS] = -EACCES,
116 [DLM_WORKING] = -EBUSY, 116 [DLM_WORKING] = -EACCES,
117 [DLM_BLOCKED] = -EINVAL, 117 [DLM_BLOCKED] = -EINVAL,
118 [DLM_BLOCKED_ORPHAN] = -EINVAL, 118 [DLM_BLOCKED_ORPHAN] = -EINVAL,
119 [DLM_DENIED_GRACE_PERIOD] = -EACCES, 119 [DLM_DENIED_GRACE_PERIOD] = -EACCES,
120 [DLM_SYSERR] = -ENOMEM, /* It is what it is */ 120 [DLM_SYSERR] = -ENOMEM, /* It is what it is */
121 [DLM_NOSUPPORT] = -EPROTO, 121 [DLM_NOSUPPORT] = -EPROTO,
122 [DLM_CANCELGRANT] = -DLM_ECANCEL, /* Cancel after grant */ 122 [DLM_CANCELGRANT] = -EBUSY, /* Cancel after grant */
123 [DLM_IVLOCKID] = -EINVAL, 123 [DLM_IVLOCKID] = -EINVAL,
124 [DLM_SYNC] = -EINVAL, 124 [DLM_SYNC] = -EINVAL,
125 [DLM_BADTYPE] = -EINVAL, 125 [DLM_BADTYPE] = -EINVAL,
@@ -137,7 +137,7 @@ static int status_map[] = {
137 [DLM_VALNOTVALID] = -EINVAL, 137 [DLM_VALNOTVALID] = -EINVAL,
138 [DLM_REJECTED] = -EPERM, 138 [DLM_REJECTED] = -EPERM,
139 [DLM_ABORT] = -EINVAL, 139 [DLM_ABORT] = -EINVAL,
140 [DLM_CANCEL] = -DLM_EUNLOCK, /* Successful cancel */ 140 [DLM_CANCEL] = -DLM_ECANCEL, /* Successful cancel */
141 [DLM_IVRESHANDLE] = -EINVAL, 141 [DLM_IVRESHANDLE] = -EINVAL,
142 [DLM_DEADLOCK] = -EDEADLK, 142 [DLM_DEADLOCK] = -EDEADLK,
143 [DLM_DENIED_NOASTS] = -EINVAL, 143 [DLM_DENIED_NOASTS] = -EINVAL,
@@ -152,6 +152,7 @@ static int status_map[] = {
152 [DLM_MIGRATING] = -ERESTART, 152 [DLM_MIGRATING] = -ERESTART,
153 [DLM_MAXSTATS] = -EINVAL, 153 [DLM_MAXSTATS] = -EINVAL,
154}; 154};
155
155static int dlm_status_to_errno(enum dlm_status status) 156static int dlm_status_to_errno(enum dlm_status status)
156{ 157{
157 BUG_ON(status > (sizeof(status_map) / sizeof(status_map[0]))); 158 BUG_ON(status > (sizeof(status_map) / sizeof(status_map[0])));
@@ -175,38 +176,23 @@ static void o2dlm_blocking_ast_wrapper(void *astarg, int level)
175 176
176static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status) 177static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
177{ 178{
178 int error; 179 int error = dlm_status_to_errno(status);
179 180
180 BUG_ON(lproto == NULL); 181 BUG_ON(lproto == NULL);
181 182
182 /* 183 /*
183 * XXX: CANCEL values are sketchy.
184 *
185 * Currently we have preserved the o2dlm paradigm. You can get
186 * unlock_ast() whether the cancel succeded or not.
187 *
188 * First, we're going to pass DLM_EUNLOCK just like fs/dlm does for
189 * successful unlocks. That is a clean behavior.
190 *
191 * In o2dlm, you can get both the lock_ast() for the lock being 184 * In o2dlm, you can get both the lock_ast() for the lock being
192 * granted and the unlock_ast() for the CANCEL failing. A 185 * granted and the unlock_ast() for the CANCEL failing. A
193 * successful cancel sends DLM_NORMAL here. If the 186 * successful cancel sends DLM_NORMAL here. If the
194 * lock grant happened before the cancel arrived, you get 187 * lock grant happened before the cancel arrived, you get
195 * DLM_CANCELGRANT. For now, we'll use DLM_ECANCEL to signify 188 * DLM_CANCELGRANT.
196 * CANCELGRANT - the CANCEL was supposed to happen but didn't. We
197 * can then use DLM_EUNLOCK to signify a successful CANCEL -
198 * effectively, the CANCEL caused the lock to roll back.
199 * 189 *
200 * In the future, we will likely move the o2dlm to send only one 190 * There's no need for the double-ast. If we see DLM_CANCELGRANT,
201 * ast - either unlock_ast() for a successful CANCEL or lock_ast() 191 * we just ignore it. We expect the lock_ast() to handle the
202 * when the grant succeeds. At that point, we'll send DLM_ECANCEL 192 * granted lock.
203 * for all cancel results (CANCELGRANT will no longer exist).
204 */ 193 */
205 error = dlm_status_to_errno(status); 194 if (status == DLM_CANCELGRANT)
206 195 return;
207 /* Successful unlock is DLM_EUNLOCK */
208 if (!error)
209 error = -DLM_EUNLOCK;
210 196
211 lproto->lp_unlock_ast(astarg, error); 197 lproto->lp_unlock_ast(astarg, error);
212} 198}