aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlmglue.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlmglue.c')
-rw-r--r--fs/ocfs2/dlmglue.c199
1 files changed, 171 insertions, 28 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index c7653bb343e1..295c47f7aba2 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -311,12 +311,13 @@ static int ocfs2_inode_lock_update(struct inode *inode,
311 struct buffer_head **bh); 311 struct buffer_head **bh);
312static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 312static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
313static inline int ocfs2_highest_compat_lock_level(int level); 313static inline int ocfs2_highest_compat_lock_level(int level);
314static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 314static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
315 int new_level); 315 int new_level);
316static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 316static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
317 struct ocfs2_lock_res *lockres, 317 struct ocfs2_lock_res *lockres,
318 int new_level, 318 int new_level,
319 int lvb); 319 int lvb,
320 unsigned int generation);
320static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 321static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
321 struct ocfs2_lock_res *lockres); 322 struct ocfs2_lock_res *lockres);
322static int ocfs2_cancel_convert(struct ocfs2_super *osb, 323static int ocfs2_cancel_convert(struct ocfs2_super *osb,
@@ -736,6 +737,113 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
736 return needs_downconvert; 737 return needs_downconvert;
737} 738}
738 739
740/*
741 * OCFS2_LOCK_PENDING and l_pending_gen.
742 *
743 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting
744 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock()
745 * for more details on the race.
746 *
747 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces
748 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock()
749 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear
750 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns,
751 * the caller is going to try to clear PENDING again. If nothing else is
752 * happening, __lockres_clear_pending() sees PENDING is unset and does
753 * nothing.
754 *
755 * But what if another path (eg downconvert thread) has just started a
756 * new locking action? The other path has re-set PENDING. Our path
757 * cannot clear PENDING, because that will re-open the original race
758 * window.
759 *
760 * [Example]
761 *
762 * ocfs2_meta_lock()
763 * ocfs2_cluster_lock()
764 * set BUSY
765 * set PENDING
766 * drop l_lock
767 * ocfs2_dlm_lock()
768 * ocfs2_locking_ast() ocfs2_downconvert_thread()
769 * clear PENDING ocfs2_unblock_lock()
770 * take_l_lock
771 * !BUSY
772 * ocfs2_prepare_downconvert()
773 * set BUSY
774 * set PENDING
775 * drop l_lock
776 * take l_lock
777 * clear PENDING
778 * drop l_lock
779 * <window>
780 * ocfs2_dlm_lock()
781 *
782 * So as you can see, we now have a window where l_lock is not held,
783 * PENDING is not set, and ocfs2_dlm_lock() has not been called.
784 *
785 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
786 * set by ocfs2_prepare_downconvert(). That wasn't nice.
787 *
788 * To solve this we introduce l_pending_gen. A call to
789 * lockres_clear_pending() will only do so when it is passed a generation
790 * number that matches the lockres. lockres_set_pending() will return the
791 * current generation number. When ocfs2_cluster_lock() goes to clear
792 * PENDING, it passes the generation it got from set_pending(). In our
793 * example above, the generation numbers will *not* match. Thus,
794 * ocfs2_cluster_lock() will not clear the PENDING set by
795 * ocfs2_prepare_downconvert().
796 */
797
798/* Unlocked version for ocfs2_locking_ast() */
799static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
800 unsigned int generation,
801 struct ocfs2_super *osb)
802{
803 assert_spin_locked(&lockres->l_lock);
804
805 /*
806 * The ast and locking functions can race us here. The winner
807 * will clear pending, the loser will not.
808 */
809 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
810 (lockres->l_pending_gen != generation))
811 return;
812
813 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
814 lockres->l_pending_gen++;
815
816 /*
817 * The downconvert thread may have skipped us because we
818 * were PENDING. Wake it up.
819 */
820 if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
821 ocfs2_wake_downconvert_thread(osb);
822}
823
824/* Locked version for callers of ocfs2_dlm_lock() */
825static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
826 unsigned int generation,
827 struct ocfs2_super *osb)
828{
829 unsigned long flags;
830
831 spin_lock_irqsave(&lockres->l_lock, flags);
832 __lockres_clear_pending(lockres, generation, osb);
833 spin_unlock_irqrestore(&lockres->l_lock, flags);
834}
835
836static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
837{
838 assert_spin_locked(&lockres->l_lock);
839 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
840
841 lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
842
843 return lockres->l_pending_gen;
844}
845
846
739static void ocfs2_blocking_ast(void *opaque, int level) 847static void ocfs2_blocking_ast(void *opaque, int level)
740{ 848{
741 struct ocfs2_lock_res *lockres = opaque; 849 struct ocfs2_lock_res *lockres = opaque;
@@ -770,6 +878,7 @@ static void ocfs2_blocking_ast(void *opaque, int level)
770static void ocfs2_locking_ast(void *opaque) 878static void ocfs2_locking_ast(void *opaque)
771{ 879{
772 struct ocfs2_lock_res *lockres = opaque; 880 struct ocfs2_lock_res *lockres = opaque;
881 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
773 unsigned long flags; 882 unsigned long flags;
774 883
775 spin_lock_irqsave(&lockres->l_lock, flags); 884 spin_lock_irqsave(&lockres->l_lock, flags);
@@ -805,6 +914,18 @@ static void ocfs2_locking_ast(void *opaque)
805 * can catch it. */ 914 * can catch it. */
806 lockres->l_action = OCFS2_AST_INVALID; 915 lockres->l_action = OCFS2_AST_INVALID;
807 916
917 /* Did we try to cancel this lock? Clear that state */
918 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
919 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
920
921 /*
922 * We may have beaten the locking functions here. We certainly
923 * know that dlm_lock() has been called :-)
924 * Because we can't have two lock calls in flight at once, we
925 * can use lockres->l_pending_gen.
926 */
927 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb);
928
808 wake_up(&lockres->l_event); 929 wake_up(&lockres->l_event);
809 spin_unlock_irqrestore(&lockres->l_lock, flags); 930 spin_unlock_irqrestore(&lockres->l_lock, flags);
810} 931}
@@ -838,6 +959,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
838{ 959{
839 int ret = 0; 960 int ret = 0;
840 unsigned long flags; 961 unsigned long flags;
962 unsigned int gen;
841 963
842 mlog_entry_void(); 964 mlog_entry_void();
843 965
@@ -854,6 +976,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
854 lockres->l_action = OCFS2_AST_ATTACH; 976 lockres->l_action = OCFS2_AST_ATTACH;
855 lockres->l_requested = level; 977 lockres->l_requested = level;
856 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 978 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
979 gen = lockres_set_pending(lockres);
857 spin_unlock_irqrestore(&lockres->l_lock, flags); 980 spin_unlock_irqrestore(&lockres->l_lock, flags);
858 981
859 ret = ocfs2_dlm_lock(osb->cconn, 982 ret = ocfs2_dlm_lock(osb->cconn,
@@ -863,6 +986,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
863 lockres->l_name, 986 lockres->l_name,
864 OCFS2_LOCK_ID_MAX_LEN - 1, 987 OCFS2_LOCK_ID_MAX_LEN - 1,
865 lockres); 988 lockres);
989 lockres_clear_pending(lockres, gen, osb);
866 if (ret) { 990 if (ret) {
867 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 991 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
868 ocfs2_recover_from_dlm_error(lockres, 1); 992 ocfs2_recover_from_dlm_error(lockres, 1);
@@ -988,6 +1112,7 @@ static int ocfs2_cluster_lock(struct ocfs2_super *osb,
988 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1112 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
989 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1113 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
990 unsigned long flags; 1114 unsigned long flags;
1115 unsigned int gen;
991 1116
992 mlog_entry_void(); 1117 mlog_entry_void();
993 1118
@@ -1046,6 +1171,7 @@ again:
1046 1171
1047 lockres->l_requested = level; 1172 lockres->l_requested = level;
1048 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1173 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1174 gen = lockres_set_pending(lockres);
1049 spin_unlock_irqrestore(&lockres->l_lock, flags); 1175 spin_unlock_irqrestore(&lockres->l_lock, flags);
1050 1176
1051 BUG_ON(level == DLM_LOCK_IV); 1177 BUG_ON(level == DLM_LOCK_IV);
@@ -1062,6 +1188,7 @@ again:
1062 lockres->l_name, 1188 lockres->l_name,
1063 OCFS2_LOCK_ID_MAX_LEN - 1, 1189 OCFS2_LOCK_ID_MAX_LEN - 1,
1064 lockres); 1190 lockres);
1191 lockres_clear_pending(lockres, gen, osb);
1065 if (ret) { 1192 if (ret) {
1066 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1193 if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1067 (ret != -EAGAIN)) { 1194 (ret != -EAGAIN)) {
@@ -1506,6 +1633,7 @@ out:
1506void ocfs2_file_unlock(struct file *file) 1633void ocfs2_file_unlock(struct file *file)
1507{ 1634{
1508 int ret; 1635 int ret;
1636 unsigned int gen;
1509 unsigned long flags; 1637 unsigned long flags;
1510 struct ocfs2_file_private *fp = file->private_data; 1638 struct ocfs2_file_private *fp = file->private_data;
1511 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1639 struct ocfs2_lock_res *lockres = &fp->fp_flock;
@@ -1531,11 +1659,11 @@ void ocfs2_file_unlock(struct file *file)
1531 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1659 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1532 lockres->l_blocking = DLM_LOCK_EX; 1660 lockres->l_blocking = DLM_LOCK_EX;
1533 1661
1534 ocfs2_prepare_downconvert(lockres, LKM_NLMODE); 1662 gen = ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
1535 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1663 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1536 spin_unlock_irqrestore(&lockres->l_lock, flags); 1664 spin_unlock_irqrestore(&lockres->l_lock, flags);
1537 1665
1538 ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0); 1666 ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0, gen);
1539 if (ret) { 1667 if (ret) {
1540 mlog_errno(ret); 1668 mlog_errno(ret);
1541 return; 1669 return;
@@ -2555,23 +2683,7 @@ static void ocfs2_unlock_ast(void *opaque, int error)
2555 lockres->l_unlock_action); 2683 lockres->l_unlock_action);
2556 2684
2557 spin_lock_irqsave(&lockres->l_lock, flags); 2685 spin_lock_irqsave(&lockres->l_lock, flags);
2558 /* We tried to cancel a convert request, but it was already 2686 if (error) {
2559 * granted. All we want to do here is clear our unlock
2560 * state. The wake_up call done at the bottom is redundant
2561 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2562 * hurt anything anyway */
2563 if (error == -DLM_ECANCEL &&
2564 lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2565 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2566
2567 /* We don't clear the busy flag in this case as it
2568 * should have been cleared by the ast which the dlm
2569 * has called. */
2570 goto complete_unlock;
2571 }
2572
2573 /* DLM_EUNLOCK is the success code for unlock */
2574 if (error != -DLM_EUNLOCK) {
2575 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 2687 mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
2576 "unlock_action %d\n", error, lockres->l_name, 2688 "unlock_action %d\n", error, lockres->l_name,
2577 lockres->l_unlock_action); 2689 lockres->l_unlock_action);
@@ -2592,7 +2704,6 @@ static void ocfs2_unlock_ast(void *opaque, int error)
2592 } 2704 }
2593 2705
2594 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2706 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2595complete_unlock:
2596 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2707 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2597 spin_unlock_irqrestore(&lockres->l_lock, flags); 2708 spin_unlock_irqrestore(&lockres->l_lock, flags);
2598 2709
@@ -2768,8 +2879,8 @@ int ocfs2_drop_inode_locks(struct inode *inode)
2768 return status; 2879 return status;
2769} 2880}
2770 2881
2771static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 2882static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2772 int new_level) 2883 int new_level)
2773{ 2884{
2774 assert_spin_locked(&lockres->l_lock); 2885 assert_spin_locked(&lockres->l_lock);
2775 2886
@@ -2787,12 +2898,14 @@ static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2787 lockres->l_action = OCFS2_AST_DOWNCONVERT; 2898 lockres->l_action = OCFS2_AST_DOWNCONVERT;
2788 lockres->l_requested = new_level; 2899 lockres->l_requested = new_level;
2789 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2900 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2901 return lockres_set_pending(lockres);
2790} 2902}
2791 2903
2792static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 2904static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2793 struct ocfs2_lock_res *lockres, 2905 struct ocfs2_lock_res *lockres,
2794 int new_level, 2906 int new_level,
2795 int lvb) 2907 int lvb,
2908 unsigned int generation)
2796{ 2909{
2797 int ret; 2910 int ret;
2798 u32 dlm_flags = DLM_LKF_CONVERT; 2911 u32 dlm_flags = DLM_LKF_CONVERT;
@@ -2809,6 +2922,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2809 lockres->l_name, 2922 lockres->l_name,
2810 OCFS2_LOCK_ID_MAX_LEN - 1, 2923 OCFS2_LOCK_ID_MAX_LEN - 1,
2811 lockres); 2924 lockres);
2925 lockres_clear_pending(lockres, generation, osb);
2812 if (ret) { 2926 if (ret) {
2813 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 2927 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
2814 ocfs2_recover_from_dlm_error(lockres, 1); 2928 ocfs2_recover_from_dlm_error(lockres, 1);
@@ -2883,6 +2997,7 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2883 int new_level; 2997 int new_level;
2884 int ret = 0; 2998 int ret = 0;
2885 int set_lvb = 0; 2999 int set_lvb = 0;
3000 unsigned int gen;
2886 3001
2887 mlog_entry_void(); 3002 mlog_entry_void();
2888 3003
@@ -2892,6 +3007,32 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2892 3007
2893recheck: 3008recheck:
2894 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3009 if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3010 /* XXX
3011 * This is a *big* race. The OCFS2_LOCK_PENDING flag
3012 * exists entirely for one reason - another thread has set
3013 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3014 *
3015 * If we do ocfs2_cancel_convert() before the other thread
3016 * calls dlm_lock(), our cancel will do nothing. We will
3017 * get no ast, and we will have no way of knowing the
3018 * cancel failed. Meanwhile, the other thread will call
3019 * into dlm_lock() and wait...forever.
3020 *
3021 * Why forever? Because another node has asked for the
3022 * lock first; that's why we're here in unblock_lock().
3023 *
3024 * The solution is OCFS2_LOCK_PENDING. When PENDING is
3025 * set, we just requeue the unblock. Only when the other
3026 * thread has called dlm_lock() and cleared PENDING will
3027 * we then cancel their request.
3028 *
3029 * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3030 * at the same time they set OCFS2_DLM_BUSY. They must
3031 * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3032 */
3033 if (lockres->l_flags & OCFS2_LOCK_PENDING)
3034 goto leave_requeue;
3035
2895 ctl->requeue = 1; 3036 ctl->requeue = 1;
2896 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3037 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2897 spin_unlock_irqrestore(&lockres->l_lock, flags); 3038 spin_unlock_irqrestore(&lockres->l_lock, flags);
@@ -2971,9 +3112,11 @@ downconvert:
2971 lockres->l_ops->set_lvb(lockres); 3112 lockres->l_ops->set_lvb(lockres);
2972 } 3113 }
2973 3114
2974 ocfs2_prepare_downconvert(lockres, new_level); 3115 gen = ocfs2_prepare_downconvert(lockres, new_level);
2975 spin_unlock_irqrestore(&lockres->l_lock, flags); 3116 spin_unlock_irqrestore(&lockres->l_lock, flags);
2976 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb); 3117 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3118 gen);
3119
2977leave: 3120leave:
2978 mlog_exit(ret); 3121 mlog_exit(ret);
2979 return ret; 3122 return ret;