aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlmglue.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlmglue.c')
-rw-r--r--fs/ocfs2/dlmglue.c655
1 files changed, 405 insertions, 250 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 351130c9b734..394d25a131a5 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -27,18 +27,11 @@
27#include <linux/slab.h> 27#include <linux/slab.h>
28#include <linux/highmem.h> 28#include <linux/highmem.h>
29#include <linux/mm.h> 29#include <linux/mm.h>
30#include <linux/crc32.h>
31#include <linux/kthread.h> 30#include <linux/kthread.h>
32#include <linux/pagemap.h> 31#include <linux/pagemap.h>
33#include <linux/debugfs.h> 32#include <linux/debugfs.h>
34#include <linux/seq_file.h> 33#include <linux/seq_file.h>
35 34
36#include <cluster/heartbeat.h>
37#include <cluster/nodemanager.h>
38#include <cluster/tcp.h>
39
40#include <dlm/dlmapi.h>
41
42#define MLOG_MASK_PREFIX ML_DLM_GLUE 35#define MLOG_MASK_PREFIX ML_DLM_GLUE
43#include <cluster/masklog.h> 36#include <cluster/masklog.h>
44 37
@@ -53,6 +46,7 @@
53#include "heartbeat.h" 46#include "heartbeat.h"
54#include "inode.h" 47#include "inode.h"
55#include "journal.h" 48#include "journal.h"
49#include "stackglue.h"
56#include "slot_map.h" 50#include "slot_map.h"
57#include "super.h" 51#include "super.h"
58#include "uptodate.h" 52#include "uptodate.h"
@@ -113,7 +107,8 @@ static void ocfs2_dump_meta_lvb_info(u64 level,
113 unsigned int line, 107 unsigned int line,
114 struct ocfs2_lock_res *lockres) 108 struct ocfs2_lock_res *lockres)
115{ 109{
116 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 110 struct ocfs2_meta_lvb *lvb =
111 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
117 112
118 mlog(level, "LVB information for %s (called from %s:%u):\n", 113 mlog(level, "LVB information for %s (called from %s:%u):\n",
119 lockres->l_name, function, line); 114 lockres->l_name, function, line);
@@ -259,31 +254,6 @@ static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
259 .flags = 0, 254 .flags = 0,
260}; 255};
261 256
262/*
263 * This is the filesystem locking protocol version.
264 *
265 * Whenever the filesystem does new things with locks (adds or removes a
266 * lock, orders them differently, does different things underneath a lock),
267 * the version must be changed. The protocol is negotiated when joining
268 * the dlm domain. A node may join the domain if its major version is
269 * identical to all other nodes and its minor version is greater than
270 * or equal to all other nodes. When its minor version is greater than
271 * the other nodes, it will run at the minor version specified by the
272 * other nodes.
273 *
274 * If a locking change is made that will not be compatible with older
275 * versions, the major number must be increased and the minor version set
276 * to zero. If a change merely adds a behavior that can be disabled when
277 * speaking to older versions, the minor version must be increased. If a
278 * change adds a fully backwards compatible change (eg, LVB changes that
279 * are just ignored by older versions), the version does not need to be
280 * updated.
281 */
282const struct dlm_protocol_version ocfs2_locking_protocol = {
283 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
284 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
285};
286
287static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 257static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
288{ 258{
289 return lockres->l_type == OCFS2_LOCK_TYPE_META || 259 return lockres->l_type == OCFS2_LOCK_TYPE_META ||
@@ -316,7 +286,7 @@ static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *l
316static int ocfs2_lock_create(struct ocfs2_super *osb, 286static int ocfs2_lock_create(struct ocfs2_super *osb,
317 struct ocfs2_lock_res *lockres, 287 struct ocfs2_lock_res *lockres,
318 int level, 288 int level,
319 int dlm_flags); 289 u32 dlm_flags);
320static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 290static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
321 int wanted); 291 int wanted);
322static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 292static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
@@ -330,10 +300,9 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
330 struct ocfs2_lock_res *lockres); 300 struct ocfs2_lock_res *lockres);
331static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 301static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
332 int convert); 302 int convert);
333#define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \ 303#define ocfs2_log_dlm_error(_func, _err, _lockres) do { \
334 mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \ 304 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \
335 "resource %s: %s\n", dlm_errname(_stat), _func, \ 305 _err, _func, _lockres->l_name); \
336 _lockres->l_name, dlm_errmsg(_stat)); \
337} while (0) 306} while (0)
338static int ocfs2_downconvert_thread(void *arg); 307static int ocfs2_downconvert_thread(void *arg);
339static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 308static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
@@ -342,12 +311,13 @@ static int ocfs2_inode_lock_update(struct inode *inode,
342 struct buffer_head **bh); 311 struct buffer_head **bh);
343static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 312static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
344static inline int ocfs2_highest_compat_lock_level(int level); 313static inline int ocfs2_highest_compat_lock_level(int level);
345static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 314static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
346 int new_level); 315 int new_level);
347static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 316static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
348 struct ocfs2_lock_res *lockres, 317 struct ocfs2_lock_res *lockres,
349 int new_level, 318 int new_level,
350 int lvb); 319 int lvb,
320 unsigned int generation);
351static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 321static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
352 struct ocfs2_lock_res *lockres); 322 struct ocfs2_lock_res *lockres);
353static int ocfs2_cancel_convert(struct ocfs2_super *osb, 323static int ocfs2_cancel_convert(struct ocfs2_super *osb,
@@ -406,9 +376,9 @@ static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
406 res->l_ops = ops; 376 res->l_ops = ops;
407 res->l_priv = priv; 377 res->l_priv = priv;
408 378
409 res->l_level = LKM_IVMODE; 379 res->l_level = DLM_LOCK_IV;
410 res->l_requested = LKM_IVMODE; 380 res->l_requested = DLM_LOCK_IV;
411 res->l_blocking = LKM_IVMODE; 381 res->l_blocking = DLM_LOCK_IV;
412 res->l_action = OCFS2_AST_INVALID; 382 res->l_action = OCFS2_AST_INVALID;
413 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 383 res->l_unlock_action = OCFS2_UNLOCK_INVALID;
414 384
@@ -604,10 +574,10 @@ static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
604 BUG_ON(!lockres); 574 BUG_ON(!lockres);
605 575
606 switch(level) { 576 switch(level) {
607 case LKM_EXMODE: 577 case DLM_LOCK_EX:
608 lockres->l_ex_holders++; 578 lockres->l_ex_holders++;
609 break; 579 break;
610 case LKM_PRMODE: 580 case DLM_LOCK_PR:
611 lockres->l_ro_holders++; 581 lockres->l_ro_holders++;
612 break; 582 break;
613 default: 583 default:
@@ -625,11 +595,11 @@ static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
625 BUG_ON(!lockres); 595 BUG_ON(!lockres);
626 596
627 switch(level) { 597 switch(level) {
628 case LKM_EXMODE: 598 case DLM_LOCK_EX:
629 BUG_ON(!lockres->l_ex_holders); 599 BUG_ON(!lockres->l_ex_holders);
630 lockres->l_ex_holders--; 600 lockres->l_ex_holders--;
631 break; 601 break;
632 case LKM_PRMODE: 602 case DLM_LOCK_PR:
633 BUG_ON(!lockres->l_ro_holders); 603 BUG_ON(!lockres->l_ro_holders);
634 lockres->l_ro_holders--; 604 lockres->l_ro_holders--;
635 break; 605 break;
@@ -644,12 +614,12 @@ static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
644 * lock types are added. */ 614 * lock types are added. */
645static inline int ocfs2_highest_compat_lock_level(int level) 615static inline int ocfs2_highest_compat_lock_level(int level)
646{ 616{
647 int new_level = LKM_EXMODE; 617 int new_level = DLM_LOCK_EX;
648 618
649 if (level == LKM_EXMODE) 619 if (level == DLM_LOCK_EX)
650 new_level = LKM_NLMODE; 620 new_level = DLM_LOCK_NL;
651 else if (level == LKM_PRMODE) 621 else if (level == DLM_LOCK_PR)
652 new_level = LKM_PRMODE; 622 new_level = DLM_LOCK_PR;
653 return new_level; 623 return new_level;
654} 624}
655 625
@@ -688,12 +658,12 @@ static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res
688 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 658 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
689 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 659 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
690 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 660 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
691 BUG_ON(lockres->l_blocking <= LKM_NLMODE); 661 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
692 662
693 lockres->l_level = lockres->l_requested; 663 lockres->l_level = lockres->l_requested;
694 if (lockres->l_level <= 664 if (lockres->l_level <=
695 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 665 ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
696 lockres->l_blocking = LKM_NLMODE; 666 lockres->l_blocking = DLM_LOCK_NL;
697 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 667 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
698 } 668 }
699 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 669 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
@@ -712,7 +682,7 @@ static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lo
712 * information is already up to data. Convert from NL to 682 * information is already up to data. Convert from NL to
713 * *anything* however should mark ourselves as needing an 683 * *anything* however should mark ourselves as needing an
714 * update */ 684 * update */
715 if (lockres->l_level == LKM_NLMODE && 685 if (lockres->l_level == DLM_LOCK_NL &&
716 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 686 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
717 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 687 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
718 688
@@ -729,7 +699,7 @@ static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *loc
729 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 699 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
730 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 700 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
731 701
732 if (lockres->l_requested > LKM_NLMODE && 702 if (lockres->l_requested > DLM_LOCK_NL &&
733 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 703 !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
734 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 704 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
735 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 705 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
@@ -767,6 +737,113 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
767 return needs_downconvert; 737 return needs_downconvert;
768} 738}
769 739
740/*
741 * OCFS2_LOCK_PENDING and l_pending_gen.
742 *
743 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting
744 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock()
745 * for more details on the race.
746 *
747 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces
748 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock()
749 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear
750 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns,
751 * the caller is going to try to clear PENDING again. If nothing else is
752 * happening, __lockres_clear_pending() sees PENDING is unset and does
753 * nothing.
754 *
755 * But what if another path (eg downconvert thread) has just started a
756 * new locking action? The other path has re-set PENDING. Our path
757 * cannot clear PENDING, because that will re-open the original race
758 * window.
759 *
760 * [Example]
761 *
762 * ocfs2_meta_lock()
763 * ocfs2_cluster_lock()
764 * set BUSY
765 * set PENDING
766 * drop l_lock
767 * ocfs2_dlm_lock()
768 * ocfs2_locking_ast() ocfs2_downconvert_thread()
769 * clear PENDING ocfs2_unblock_lock()
770 * take_l_lock
771 * !BUSY
772 * ocfs2_prepare_downconvert()
773 * set BUSY
774 * set PENDING
775 * drop l_lock
776 * take l_lock
777 * clear PENDING
778 * drop l_lock
779 * <window>
780 * ocfs2_dlm_lock()
781 *
782 * So as you can see, we now have a window where l_lock is not held,
783 * PENDING is not set, and ocfs2_dlm_lock() has not been called.
784 *
785 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
786 * set by ocfs2_prepare_downconvert(). That wasn't nice.
787 *
788 * To solve this we introduce l_pending_gen. A call to
789 * lockres_clear_pending() will only do so when it is passed a generation
790 * number that matches the lockres. lockres_set_pending() will return the
791 * current generation number. When ocfs2_cluster_lock() goes to clear
792 * PENDING, it passes the generation it got from set_pending(). In our
793 * example above, the generation numbers will *not* match. Thus,
794 * ocfs2_cluster_lock() will not clear the PENDING set by
795 * ocfs2_prepare_downconvert().
796 */
797
798/* Unlocked version for ocfs2_locking_ast() */
799static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
800 unsigned int generation,
801 struct ocfs2_super *osb)
802{
803 assert_spin_locked(&lockres->l_lock);
804
805 /*
806 * The ast and locking functions can race us here. The winner
807 * will clear pending, the loser will not.
808 */
809 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
810 (lockres->l_pending_gen != generation))
811 return;
812
813 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
814 lockres->l_pending_gen++;
815
816 /*
817 * The downconvert thread may have skipped us because we
818 * were PENDING. Wake it up.
819 */
820 if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
821 ocfs2_wake_downconvert_thread(osb);
822}
823
824/* Locked version for callers of ocfs2_dlm_lock() */
825static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
826 unsigned int generation,
827 struct ocfs2_super *osb)
828{
829 unsigned long flags;
830
831 spin_lock_irqsave(&lockres->l_lock, flags);
832 __lockres_clear_pending(lockres, generation, osb);
833 spin_unlock_irqrestore(&lockres->l_lock, flags);
834}
835
836static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
837{
838 assert_spin_locked(&lockres->l_lock);
839 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
840
841 lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
842
843 return lockres->l_pending_gen;
844}
845
846
770static void ocfs2_blocking_ast(void *opaque, int level) 847static void ocfs2_blocking_ast(void *opaque, int level)
771{ 848{
772 struct ocfs2_lock_res *lockres = opaque; 849 struct ocfs2_lock_res *lockres = opaque;
@@ -774,7 +851,7 @@ static void ocfs2_blocking_ast(void *opaque, int level)
774 int needs_downconvert; 851 int needs_downconvert;
775 unsigned long flags; 852 unsigned long flags;
776 853
777 BUG_ON(level <= LKM_NLMODE); 854 BUG_ON(level <= DLM_LOCK_NL);
778 855
779 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", 856 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
780 lockres->l_name, level, lockres->l_level, 857 lockres->l_name, level, lockres->l_level,
@@ -801,14 +878,22 @@ static void ocfs2_blocking_ast(void *opaque, int level)
801static void ocfs2_locking_ast(void *opaque) 878static void ocfs2_locking_ast(void *opaque)
802{ 879{
803 struct ocfs2_lock_res *lockres = opaque; 880 struct ocfs2_lock_res *lockres = opaque;
804 struct dlm_lockstatus *lksb = &lockres->l_lksb; 881 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
805 unsigned long flags; 882 unsigned long flags;
883 int status;
806 884
807 spin_lock_irqsave(&lockres->l_lock, flags); 885 spin_lock_irqsave(&lockres->l_lock, flags);
808 886
809 if (lksb->status != DLM_NORMAL) { 887 status = ocfs2_dlm_lock_status(&lockres->l_lksb);
810 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n", 888
811 lockres->l_name, lksb->status); 889 if (status == -EAGAIN) {
890 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
891 goto out;
892 }
893
894 if (status) {
895 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
896 lockres->l_name, status);
812 spin_unlock_irqrestore(&lockres->l_lock, flags); 897 spin_unlock_irqrestore(&lockres->l_lock, flags);
813 return; 898 return;
814 } 899 }
@@ -831,11 +916,23 @@ static void ocfs2_locking_ast(void *opaque)
831 lockres->l_unlock_action); 916 lockres->l_unlock_action);
832 BUG(); 917 BUG();
833 } 918 }
834 919out:
835 /* set it to something invalid so if we get called again we 920 /* set it to something invalid so if we get called again we
836 * can catch it. */ 921 * can catch it. */
837 lockres->l_action = OCFS2_AST_INVALID; 922 lockres->l_action = OCFS2_AST_INVALID;
838 923
924 /* Did we try to cancel this lock? Clear that state */
925 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
926 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
927
928 /*
929 * We may have beaten the locking functions here. We certainly
930 * know that dlm_lock() has been called :-)
931 * Because we can't have two lock calls in flight at once, we
932 * can use lockres->l_pending_gen.
933 */
934 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb);
935
839 wake_up(&lockres->l_event); 936 wake_up(&lockres->l_event);
840 spin_unlock_irqrestore(&lockres->l_lock, flags); 937 spin_unlock_irqrestore(&lockres->l_lock, flags);
841} 938}
@@ -865,15 +962,15 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
865static int ocfs2_lock_create(struct ocfs2_super *osb, 962static int ocfs2_lock_create(struct ocfs2_super *osb,
866 struct ocfs2_lock_res *lockres, 963 struct ocfs2_lock_res *lockres,
867 int level, 964 int level,
868 int dlm_flags) 965 u32 dlm_flags)
869{ 966{
870 int ret = 0; 967 int ret = 0;
871 enum dlm_status status = DLM_NORMAL;
872 unsigned long flags; 968 unsigned long flags;
969 unsigned int gen;
873 970
874 mlog_entry_void(); 971 mlog_entry_void();
875 972
876 mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level, 973 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
877 dlm_flags); 974 dlm_flags);
878 975
879 spin_lock_irqsave(&lockres->l_lock, flags); 976 spin_lock_irqsave(&lockres->l_lock, flags);
@@ -886,24 +983,23 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
886 lockres->l_action = OCFS2_AST_ATTACH; 983 lockres->l_action = OCFS2_AST_ATTACH;
887 lockres->l_requested = level; 984 lockres->l_requested = level;
888 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 985 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
986 gen = lockres_set_pending(lockres);
889 spin_unlock_irqrestore(&lockres->l_lock, flags); 987 spin_unlock_irqrestore(&lockres->l_lock, flags);
890 988
891 status = dlmlock(osb->dlm, 989 ret = ocfs2_dlm_lock(osb->cconn,
892 level, 990 level,
893 &lockres->l_lksb, 991 &lockres->l_lksb,
894 dlm_flags, 992 dlm_flags,
895 lockres->l_name, 993 lockres->l_name,
896 OCFS2_LOCK_ID_MAX_LEN - 1, 994 OCFS2_LOCK_ID_MAX_LEN - 1,
897 ocfs2_locking_ast, 995 lockres);
898 lockres, 996 lockres_clear_pending(lockres, gen, osb);
899 ocfs2_blocking_ast); 997 if (ret) {
900 if (status != DLM_NORMAL) { 998 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
901 ocfs2_log_dlm_error("dlmlock", status, lockres);
902 ret = -EINVAL;
903 ocfs2_recover_from_dlm_error(lockres, 1); 999 ocfs2_recover_from_dlm_error(lockres, 1);
904 } 1000 }
905 1001
906 mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name); 1002 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
907 1003
908bail: 1004bail:
909 mlog_exit(ret); 1005 mlog_exit(ret);
@@ -1016,21 +1112,22 @@ static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1016static int ocfs2_cluster_lock(struct ocfs2_super *osb, 1112static int ocfs2_cluster_lock(struct ocfs2_super *osb,
1017 struct ocfs2_lock_res *lockres, 1113 struct ocfs2_lock_res *lockres,
1018 int level, 1114 int level,
1019 int lkm_flags, 1115 u32 lkm_flags,
1020 int arg_flags) 1116 int arg_flags)
1021{ 1117{
1022 struct ocfs2_mask_waiter mw; 1118 struct ocfs2_mask_waiter mw;
1023 enum dlm_status status;
1024 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1119 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1025 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1120 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1026 unsigned long flags; 1121 unsigned long flags;
1122 unsigned int gen;
1123 int noqueue_attempted = 0;
1027 1124
1028 mlog_entry_void(); 1125 mlog_entry_void();
1029 1126
1030 ocfs2_init_mask_waiter(&mw); 1127 ocfs2_init_mask_waiter(&mw);
1031 1128
1032 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1129 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1033 lkm_flags |= LKM_VALBLK; 1130 lkm_flags |= DLM_LKF_VALBLK;
1034 1131
1035again: 1132again:
1036 wait = 0; 1133 wait = 0;
@@ -1068,52 +1165,56 @@ again:
1068 } 1165 }
1069 1166
1070 if (level > lockres->l_level) { 1167 if (level > lockres->l_level) {
1168 if (noqueue_attempted > 0) {
1169 ret = -EAGAIN;
1170 goto unlock;
1171 }
1172 if (lkm_flags & DLM_LKF_NOQUEUE)
1173 noqueue_attempted = 1;
1174
1071 if (lockres->l_action != OCFS2_AST_INVALID) 1175 if (lockres->l_action != OCFS2_AST_INVALID)
1072 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1176 mlog(ML_ERROR, "lockres %s has action %u pending\n",
1073 lockres->l_name, lockres->l_action); 1177 lockres->l_name, lockres->l_action);
1074 1178
1075 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1179 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1076 lockres->l_action = OCFS2_AST_ATTACH; 1180 lockres->l_action = OCFS2_AST_ATTACH;
1077 lkm_flags &= ~LKM_CONVERT; 1181 lkm_flags &= ~DLM_LKF_CONVERT;
1078 } else { 1182 } else {
1079 lockres->l_action = OCFS2_AST_CONVERT; 1183 lockres->l_action = OCFS2_AST_CONVERT;
1080 lkm_flags |= LKM_CONVERT; 1184 lkm_flags |= DLM_LKF_CONVERT;
1081 } 1185 }
1082 1186
1083 lockres->l_requested = level; 1187 lockres->l_requested = level;
1084 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1188 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1189 gen = lockres_set_pending(lockres);
1085 spin_unlock_irqrestore(&lockres->l_lock, flags); 1190 spin_unlock_irqrestore(&lockres->l_lock, flags);
1086 1191
1087 BUG_ON(level == LKM_IVMODE); 1192 BUG_ON(level == DLM_LOCK_IV);
1088 BUG_ON(level == LKM_NLMODE); 1193 BUG_ON(level == DLM_LOCK_NL);
1089 1194
1090 mlog(0, "lock %s, convert from %d to level = %d\n", 1195 mlog(0, "lock %s, convert from %d to level = %d\n",
1091 lockres->l_name, lockres->l_level, level); 1196 lockres->l_name, lockres->l_level, level);
1092 1197
1093 /* call dlm_lock to upgrade lock now */ 1198 /* call dlm_lock to upgrade lock now */
1094 status = dlmlock(osb->dlm, 1199 ret = ocfs2_dlm_lock(osb->cconn,
1095 level, 1200 level,
1096 &lockres->l_lksb, 1201 &lockres->l_lksb,
1097 lkm_flags, 1202 lkm_flags,
1098 lockres->l_name, 1203 lockres->l_name,
1099 OCFS2_LOCK_ID_MAX_LEN - 1, 1204 OCFS2_LOCK_ID_MAX_LEN - 1,
1100 ocfs2_locking_ast, 1205 lockres);
1101 lockres, 1206 lockres_clear_pending(lockres, gen, osb);
1102 ocfs2_blocking_ast); 1207 if (ret) {
1103 if (status != DLM_NORMAL) { 1208 if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1104 if ((lkm_flags & LKM_NOQUEUE) && 1209 (ret != -EAGAIN)) {
1105 (status == DLM_NOTQUEUED)) 1210 ocfs2_log_dlm_error("ocfs2_dlm_lock",
1106 ret = -EAGAIN; 1211 ret, lockres);
1107 else {
1108 ocfs2_log_dlm_error("dlmlock", status,
1109 lockres);
1110 ret = -EINVAL;
1111 } 1212 }
1112 ocfs2_recover_from_dlm_error(lockres, 1); 1213 ocfs2_recover_from_dlm_error(lockres, 1);
1113 goto out; 1214 goto out;
1114 } 1215 }
1115 1216
1116 mlog(0, "lock %s, successfull return from dlmlock\n", 1217 mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n",
1117 lockres->l_name); 1218 lockres->l_name);
1118 1219
1119 /* At this point we've gone inside the dlm and need to 1220 /* At this point we've gone inside the dlm and need to
@@ -1177,9 +1278,9 @@ static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1177 int ex, 1278 int ex,
1178 int local) 1279 int local)
1179{ 1280{
1180 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1281 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1181 unsigned long flags; 1282 unsigned long flags;
1182 int lkm_flags = local ? LKM_LOCAL : 0; 1283 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1183 1284
1184 spin_lock_irqsave(&lockres->l_lock, flags); 1285 spin_lock_irqsave(&lockres->l_lock, flags);
1185 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1286 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
@@ -1222,7 +1323,7 @@ int ocfs2_create_new_inode_locks(struct inode *inode)
1222 } 1323 }
1223 1324
1224 /* 1325 /*
1225 * We don't want to use LKM_LOCAL on a meta data lock as they 1326 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1226 * don't use a generation in their lock names. 1327 * don't use a generation in their lock names.
1227 */ 1328 */
1228 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1329 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
@@ -1261,7 +1362,7 @@ int ocfs2_rw_lock(struct inode *inode, int write)
1261 1362
1262 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1363 lockres = &OCFS2_I(inode)->ip_rw_lockres;
1263 1364
1264 level = write ? LKM_EXMODE : LKM_PRMODE; 1365 level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1265 1366
1266 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1367 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1267 0); 1368 0);
@@ -1274,7 +1375,7 @@ int ocfs2_rw_lock(struct inode *inode, int write)
1274 1375
1275void ocfs2_rw_unlock(struct inode *inode, int write) 1376void ocfs2_rw_unlock(struct inode *inode, int write)
1276{ 1377{
1277 int level = write ? LKM_EXMODE : LKM_PRMODE; 1378 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1278 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1379 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1279 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1380 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1280 1381
@@ -1312,7 +1413,7 @@ int ocfs2_open_lock(struct inode *inode)
1312 lockres = &OCFS2_I(inode)->ip_open_lockres; 1413 lockres = &OCFS2_I(inode)->ip_open_lockres;
1313 1414
1314 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1415 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1315 LKM_PRMODE, 0, 0); 1416 DLM_LOCK_PR, 0, 0);
1316 if (status < 0) 1417 if (status < 0)
1317 mlog_errno(status); 1418 mlog_errno(status);
1318 1419
@@ -1340,16 +1441,16 @@ int ocfs2_try_open_lock(struct inode *inode, int write)
1340 1441
1341 lockres = &OCFS2_I(inode)->ip_open_lockres; 1442 lockres = &OCFS2_I(inode)->ip_open_lockres;
1342 1443
1343 level = write ? LKM_EXMODE : LKM_PRMODE; 1444 level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1344 1445
1345 /* 1446 /*
1346 * The file system may already holding a PRMODE/EXMODE open lock. 1447 * The file system may already holding a PRMODE/EXMODE open lock.
1347 * Since we pass LKM_NOQUEUE, the request won't block waiting on 1448 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1348 * other nodes and the -EAGAIN will indicate to the caller that 1449 * other nodes and the -EAGAIN will indicate to the caller that
1349 * this inode is still in use. 1450 * this inode is still in use.
1350 */ 1451 */
1351 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1452 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1352 level, LKM_NOQUEUE, 0); 1453 level, DLM_LKF_NOQUEUE, 0);
1353 1454
1354out: 1455out:
1355 mlog_exit(status); 1456 mlog_exit(status);
@@ -1374,10 +1475,10 @@ void ocfs2_open_unlock(struct inode *inode)
1374 1475
1375 if(lockres->l_ro_holders) 1476 if(lockres->l_ro_holders)
1376 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1477 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1377 LKM_PRMODE); 1478 DLM_LOCK_PR);
1378 if(lockres->l_ex_holders) 1479 if(lockres->l_ex_holders)
1379 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1480 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1380 LKM_EXMODE); 1481 DLM_LOCK_EX);
1381 1482
1382out: 1483out:
1383 mlog_exit_void(); 1484 mlog_exit_void();
@@ -1464,7 +1565,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
1464 ocfs2_init_mask_waiter(&mw); 1565 ocfs2_init_mask_waiter(&mw);
1465 1566
1466 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1567 if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1467 (lockres->l_level > LKM_NLMODE)) { 1568 (lockres->l_level > DLM_LOCK_NL)) {
1468 mlog(ML_ERROR, 1569 mlog(ML_ERROR,
1469 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1570 "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1470 "level: %u\n", lockres->l_name, lockres->l_flags, 1571 "level: %u\n", lockres->l_name, lockres->l_flags,
@@ -1503,14 +1604,12 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
1503 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1604 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1504 spin_unlock_irqrestore(&lockres->l_lock, flags); 1605 spin_unlock_irqrestore(&lockres->l_lock, flags);
1505 1606
1506 ret = dlmlock(osb->dlm, level, &lockres->l_lksb, lkm_flags, 1607 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
1507 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, 1608 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
1508 ocfs2_locking_ast, lockres, ocfs2_blocking_ast); 1609 lockres);
1509 if (ret != DLM_NORMAL) { 1610 if (ret) {
1510 if (trylock && ret == DLM_NOTQUEUED) 1611 if (!trylock || (ret != -EAGAIN)) {
1511 ret = -EAGAIN; 1612 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1512 else {
1513 ocfs2_log_dlm_error("dlmlock", ret, lockres);
1514 ret = -EINVAL; 1613 ret = -EINVAL;
1515 } 1614 }
1516 1615
@@ -1537,6 +1636,10 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
1537 * to just bubble sucess back up to the user. 1636 * to just bubble sucess back up to the user.
1538 */ 1637 */
1539 ret = ocfs2_flock_handle_signal(lockres, level); 1638 ret = ocfs2_flock_handle_signal(lockres, level);
1639 } else if (!ret && (level > lockres->l_level)) {
1640 /* Trylock failed asynchronously */
1641 BUG_ON(!trylock);
1642 ret = -EAGAIN;
1540 } 1643 }
1541 1644
1542out: 1645out:
@@ -1549,6 +1652,7 @@ out:
1549void ocfs2_file_unlock(struct file *file) 1652void ocfs2_file_unlock(struct file *file)
1550{ 1653{
1551 int ret; 1654 int ret;
1655 unsigned int gen;
1552 unsigned long flags; 1656 unsigned long flags;
1553 struct ocfs2_file_private *fp = file->private_data; 1657 struct ocfs2_file_private *fp = file->private_data;
1554 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1658 struct ocfs2_lock_res *lockres = &fp->fp_flock;
@@ -1572,13 +1676,13 @@ void ocfs2_file_unlock(struct file *file)
1572 * Fake a blocking ast for the downconvert code. 1676 * Fake a blocking ast for the downconvert code.
1573 */ 1677 */
1574 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1678 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1575 lockres->l_blocking = LKM_EXMODE; 1679 lockres->l_blocking = DLM_LOCK_EX;
1576 1680
1577 ocfs2_prepare_downconvert(lockres, LKM_NLMODE); 1681 gen = ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
1578 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1682 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1579 spin_unlock_irqrestore(&lockres->l_lock, flags); 1683 spin_unlock_irqrestore(&lockres->l_lock, flags);
1580 1684
1581 ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0); 1685 ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0, gen);
1582 if (ret) { 1686 if (ret) {
1583 mlog_errno(ret); 1687 mlog_errno(ret);
1584 return; 1688 return;
@@ -1601,11 +1705,11 @@ static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
1601 * condition. */ 1705 * condition. */
1602 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1706 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1603 switch(lockres->l_blocking) { 1707 switch(lockres->l_blocking) {
1604 case LKM_EXMODE: 1708 case DLM_LOCK_EX:
1605 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 1709 if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1606 kick = 1; 1710 kick = 1;
1607 break; 1711 break;
1608 case LKM_PRMODE: 1712 case DLM_LOCK_PR:
1609 if (!lockres->l_ex_holders) 1713 if (!lockres->l_ex_holders)
1610 kick = 1; 1714 kick = 1;
1611 break; 1715 break;
@@ -1648,7 +1752,7 @@ static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1648 1752
1649 mlog_entry_void(); 1753 mlog_entry_void();
1650 1754
1651 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1755 lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
1652 1756
1653 /* 1757 /*
1654 * Invalidate the LVB of a deleted inode - this way other 1758 * Invalidate the LVB of a deleted inode - this way other
@@ -1700,7 +1804,7 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1700 1804
1701 mlog_meta_lvb(0, lockres); 1805 mlog_meta_lvb(0, lockres);
1702 1806
1703 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1807 lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
1704 1808
1705 /* We're safe here without the lockres lock... */ 1809 /* We're safe here without the lockres lock... */
1706 spin_lock(&oi->ip_lock); 1810 spin_lock(&oi->ip_lock);
@@ -1735,7 +1839,8 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1735static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 1839static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1736 struct ocfs2_lock_res *lockres) 1840 struct ocfs2_lock_res *lockres)
1737{ 1841{
1738 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1842 struct ocfs2_meta_lvb *lvb =
1843 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
1739 1844
1740 if (lvb->lvb_version == OCFS2_LVB_VERSION 1845 if (lvb->lvb_version == OCFS2_LVB_VERSION
1741 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 1846 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
@@ -1923,7 +2028,8 @@ int ocfs2_inode_lock_full(struct inode *inode,
1923 int ex, 2028 int ex,
1924 int arg_flags) 2029 int arg_flags)
1925{ 2030{
1926 int status, level, dlm_flags, acquired; 2031 int status, level, acquired;
2032 u32 dlm_flags;
1927 struct ocfs2_lock_res *lockres = NULL; 2033 struct ocfs2_lock_res *lockres = NULL;
1928 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2034 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1929 struct buffer_head *local_bh = NULL; 2035 struct buffer_head *local_bh = NULL;
@@ -1950,14 +2056,13 @@ int ocfs2_inode_lock_full(struct inode *inode,
1950 goto local; 2056 goto local;
1951 2057
1952 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2058 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1953 wait_event(osb->recovery_event, 2059 ocfs2_wait_for_recovery(osb);
1954 ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1955 2060
1956 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2061 lockres = &OCFS2_I(inode)->ip_inode_lockres;
1957 level = ex ? LKM_EXMODE : LKM_PRMODE; 2062 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1958 dlm_flags = 0; 2063 dlm_flags = 0;
1959 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2064 if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1960 dlm_flags |= LKM_NOQUEUE; 2065 dlm_flags |= DLM_LKF_NOQUEUE;
1961 2066
1962 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags); 2067 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1963 if (status < 0) { 2068 if (status < 0) {
@@ -1974,8 +2079,7 @@ int ocfs2_inode_lock_full(struct inode *inode,
1974 * committed to owning this lock so we don't allow signals to 2079 * committed to owning this lock so we don't allow signals to
1975 * abort the operation. */ 2080 * abort the operation. */
1976 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2081 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1977 wait_event(osb->recovery_event, 2082 ocfs2_wait_for_recovery(osb);
1978 ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1979 2083
1980local: 2084local:
1981 /* 2085 /*
@@ -2109,7 +2213,7 @@ int ocfs2_inode_lock_atime(struct inode *inode,
2109void ocfs2_inode_unlock(struct inode *inode, 2213void ocfs2_inode_unlock(struct inode *inode,
2110 int ex) 2214 int ex)
2111{ 2215{
2112 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2216 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2113 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2217 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2114 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2218 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2115 2219
@@ -2130,10 +2234,8 @@ int ocfs2_super_lock(struct ocfs2_super *osb,
2130 int ex) 2234 int ex)
2131{ 2235{
2132 int status = 0; 2236 int status = 0;
2133 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2237 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2134 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2238 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2135 struct buffer_head *bh;
2136 struct ocfs2_slot_info *si = osb->slot_info;
2137 2239
2138 mlog_entry_void(); 2240 mlog_entry_void();
2139 2241
@@ -2159,11 +2261,7 @@ int ocfs2_super_lock(struct ocfs2_super *osb,
2159 goto bail; 2261 goto bail;
2160 } 2262 }
2161 if (status) { 2263 if (status) {
2162 bh = si->si_bh; 2264 status = ocfs2_refresh_slot_info(osb);
2163 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
2164 si->si_inode);
2165 if (status == 0)
2166 ocfs2_update_slot_info(si);
2167 2265
2168 ocfs2_complete_lock_res_refresh(lockres, status); 2266 ocfs2_complete_lock_res_refresh(lockres, status);
2169 2267
@@ -2178,7 +2276,7 @@ bail:
2178void ocfs2_super_unlock(struct ocfs2_super *osb, 2276void ocfs2_super_unlock(struct ocfs2_super *osb,
2179 int ex) 2277 int ex)
2180{ 2278{
2181 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2279 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2182 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2280 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2183 2281
2184 if (!ocfs2_mount_local(osb)) 2282 if (!ocfs2_mount_local(osb))
@@ -2196,7 +2294,7 @@ int ocfs2_rename_lock(struct ocfs2_super *osb)
2196 if (ocfs2_mount_local(osb)) 2294 if (ocfs2_mount_local(osb))
2197 return 0; 2295 return 0;
2198 2296
2199 status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0); 2297 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2200 if (status < 0) 2298 if (status < 0)
2201 mlog_errno(status); 2299 mlog_errno(status);
2202 2300
@@ -2208,13 +2306,13 @@ void ocfs2_rename_unlock(struct ocfs2_super *osb)
2208 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2306 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2209 2307
2210 if (!ocfs2_mount_local(osb)) 2308 if (!ocfs2_mount_local(osb))
2211 ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE); 2309 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2212} 2310}
2213 2311
2214int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2312int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2215{ 2313{
2216 int ret; 2314 int ret;
2217 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2315 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2218 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2316 struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2219 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2317 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2220 2318
@@ -2235,7 +2333,7 @@ int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2235 2333
2236void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2334void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2237{ 2335{
2238 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2336 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2239 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2337 struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2240 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2338 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2241 2339
@@ -2400,7 +2498,7 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2400 lockres->l_blocking); 2498 lockres->l_blocking);
2401 2499
2402 /* Dump the raw LVB */ 2500 /* Dump the raw LVB */
2403 lvb = lockres->l_lksb.lvb; 2501 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2404 for(i = 0; i < DLM_LVB_LEN; i++) 2502 for(i = 0; i < DLM_LVB_LEN; i++)
2405 seq_printf(m, "0x%x\t", lvb[i]); 2503 seq_printf(m, "0x%x\t", lvb[i]);
2406 2504
@@ -2409,7 +2507,7 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2409 return 0; 2507 return 0;
2410} 2508}
2411 2509
2412static struct seq_operations ocfs2_dlm_seq_ops = { 2510static const struct seq_operations ocfs2_dlm_seq_ops = {
2413 .start = ocfs2_dlm_seq_start, 2511 .start = ocfs2_dlm_seq_start,
2414 .stop = ocfs2_dlm_seq_stop, 2512 .stop = ocfs2_dlm_seq_stop,
2415 .next = ocfs2_dlm_seq_next, 2513 .next = ocfs2_dlm_seq_next,
@@ -2504,13 +2602,14 @@ static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2504int ocfs2_dlm_init(struct ocfs2_super *osb) 2602int ocfs2_dlm_init(struct ocfs2_super *osb)
2505{ 2603{
2506 int status = 0; 2604 int status = 0;
2507 u32 dlm_key; 2605 struct ocfs2_cluster_connection *conn = NULL;
2508 struct dlm_ctxt *dlm = NULL;
2509 2606
2510 mlog_entry_void(); 2607 mlog_entry_void();
2511 2608
2512 if (ocfs2_mount_local(osb)) 2609 if (ocfs2_mount_local(osb)) {
2610 osb->node_num = 0;
2513 goto local; 2611 goto local;
2612 }
2514 2613
2515 status = ocfs2_dlm_init_debug(osb); 2614 status = ocfs2_dlm_init_debug(osb);
2516 if (status < 0) { 2615 if (status < 0) {
@@ -2527,26 +2626,31 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
2527 goto bail; 2626 goto bail;
2528 } 2627 }
2529 2628
2530 /* used by the dlm code to make message headers unique, each
2531 * node in this domain must agree on this. */
2532 dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2533
2534 /* for now, uuid == domain */ 2629 /* for now, uuid == domain */
2535 dlm = dlm_register_domain(osb->uuid_str, dlm_key, 2630 status = ocfs2_cluster_connect(osb->osb_cluster_stack,
2536 &osb->osb_locking_proto); 2631 osb->uuid_str,
2537 if (IS_ERR(dlm)) { 2632 strlen(osb->uuid_str),
2538 status = PTR_ERR(dlm); 2633 ocfs2_do_node_down, osb,
2634 &conn);
2635 if (status) {
2539 mlog_errno(status); 2636 mlog_errno(status);
2540 goto bail; 2637 goto bail;
2541 } 2638 }
2542 2639
2543 dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb); 2640 status = ocfs2_cluster_this_node(&osb->node_num);
2641 if (status < 0) {
2642 mlog_errno(status);
2643 mlog(ML_ERROR,
2644 "could not find this host's node number\n");
2645 ocfs2_cluster_disconnect(conn, 0);
2646 goto bail;
2647 }
2544 2648
2545local: 2649local:
2546 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2650 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2547 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2651 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2548 2652
2549 osb->dlm = dlm; 2653 osb->cconn = conn;
2550 2654
2551 status = 0; 2655 status = 0;
2552bail: 2656bail:
@@ -2560,14 +2664,19 @@ bail:
2560 return status; 2664 return status;
2561} 2665}
2562 2666
2563void ocfs2_dlm_shutdown(struct ocfs2_super *osb) 2667void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
2668 int hangup_pending)
2564{ 2669{
2565 mlog_entry_void(); 2670 mlog_entry_void();
2566 2671
2567 dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2568
2569 ocfs2_drop_osb_locks(osb); 2672 ocfs2_drop_osb_locks(osb);
2570 2673
2674 /*
2675 * Now that we have dropped all locks and ocfs2_dismount_volume()
2676 * has disabled recovery, the DLM won't be talking to us. It's
2677 * safe to tear things down before disconnecting the cluster.
2678 */
2679
2571 if (osb->dc_task) { 2680 if (osb->dc_task) {
2572 kthread_stop(osb->dc_task); 2681 kthread_stop(osb->dc_task);
2573 osb->dc_task = NULL; 2682 osb->dc_task = NULL;
@@ -2576,15 +2685,15 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2576 ocfs2_lock_res_free(&osb->osb_super_lockres); 2685 ocfs2_lock_res_free(&osb->osb_super_lockres);
2577 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2686 ocfs2_lock_res_free(&osb->osb_rename_lockres);
2578 2687
2579 dlm_unregister_domain(osb->dlm); 2688 ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
2580 osb->dlm = NULL; 2689 osb->cconn = NULL;
2581 2690
2582 ocfs2_dlm_shutdown_debug(osb); 2691 ocfs2_dlm_shutdown_debug(osb);
2583 2692
2584 mlog_exit_void(); 2693 mlog_exit_void();
2585} 2694}
2586 2695
2587static void ocfs2_unlock_ast(void *opaque, enum dlm_status status) 2696static void ocfs2_unlock_ast(void *opaque, int error)
2588{ 2697{
2589 struct ocfs2_lock_res *lockres = opaque; 2698 struct ocfs2_lock_res *lockres = opaque;
2590 unsigned long flags; 2699 unsigned long flags;
@@ -2595,24 +2704,9 @@ static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2595 lockres->l_unlock_action); 2704 lockres->l_unlock_action);
2596 2705
2597 spin_lock_irqsave(&lockres->l_lock, flags); 2706 spin_lock_irqsave(&lockres->l_lock, flags);
2598 /* We tried to cancel a convert request, but it was already 2707 if (error) {
2599 * granted. All we want to do here is clear our unlock 2708 mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
2600 * state. The wake_up call done at the bottom is redundant 2709 "unlock_action %d\n", error, lockres->l_name,
2601 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2602 * hurt anything anyway */
2603 if (status == DLM_CANCELGRANT &&
2604 lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2605 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2606
2607 /* We don't clear the busy flag in this case as it
2608 * should have been cleared by the ast which the dlm
2609 * has called. */
2610 goto complete_unlock;
2611 }
2612
2613 if (status != DLM_NORMAL) {
2614 mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2615 "unlock_action %d\n", status, lockres->l_name,
2616 lockres->l_unlock_action); 2710 lockres->l_unlock_action);
2617 spin_unlock_irqrestore(&lockres->l_lock, flags); 2711 spin_unlock_irqrestore(&lockres->l_lock, flags);
2618 return; 2712 return;
@@ -2624,14 +2718,13 @@ static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2624 lockres->l_action = OCFS2_AST_INVALID; 2718 lockres->l_action = OCFS2_AST_INVALID;
2625 break; 2719 break;
2626 case OCFS2_UNLOCK_DROP_LOCK: 2720 case OCFS2_UNLOCK_DROP_LOCK:
2627 lockres->l_level = LKM_IVMODE; 2721 lockres->l_level = DLM_LOCK_IV;
2628 break; 2722 break;
2629 default: 2723 default:
2630 BUG(); 2724 BUG();
2631 } 2725 }
2632 2726
2633 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2727 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2634complete_unlock:
2635 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2728 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2636 spin_unlock_irqrestore(&lockres->l_lock, flags); 2729 spin_unlock_irqrestore(&lockres->l_lock, flags);
2637 2730
@@ -2643,16 +2736,16 @@ complete_unlock:
2643static int ocfs2_drop_lock(struct ocfs2_super *osb, 2736static int ocfs2_drop_lock(struct ocfs2_super *osb,
2644 struct ocfs2_lock_res *lockres) 2737 struct ocfs2_lock_res *lockres)
2645{ 2738{
2646 enum dlm_status status; 2739 int ret;
2647 unsigned long flags; 2740 unsigned long flags;
2648 int lkm_flags = 0; 2741 u32 lkm_flags = 0;
2649 2742
2650 /* We didn't get anywhere near actually using this lockres. */ 2743 /* We didn't get anywhere near actually using this lockres. */
2651 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 2744 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2652 goto out; 2745 goto out;
2653 2746
2654 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 2747 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2655 lkm_flags |= LKM_VALBLK; 2748 lkm_flags |= DLM_LKF_VALBLK;
2656 2749
2657 spin_lock_irqsave(&lockres->l_lock, flags); 2750 spin_lock_irqsave(&lockres->l_lock, flags);
2658 2751
@@ -2678,7 +2771,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
2678 2771
2679 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2772 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2680 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 2773 if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2681 lockres->l_level == LKM_EXMODE && 2774 lockres->l_level == DLM_LOCK_EX &&
2682 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2775 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2683 lockres->l_ops->set_lvb(lockres); 2776 lockres->l_ops->set_lvb(lockres);
2684 } 2777 }
@@ -2707,15 +2800,15 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
2707 2800
2708 mlog(0, "lock %s\n", lockres->l_name); 2801 mlog(0, "lock %s\n", lockres->l_name);
2709 2802
2710 status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags, 2803 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
2711 ocfs2_unlock_ast, lockres); 2804 lockres);
2712 if (status != DLM_NORMAL) { 2805 if (ret) {
2713 ocfs2_log_dlm_error("dlmunlock", status, lockres); 2806 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
2714 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 2807 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2715 dlm_print_one_lock(lockres->l_lksb.lockid); 2808 ocfs2_dlm_dump_lksb(&lockres->l_lksb);
2716 BUG(); 2809 BUG();
2717 } 2810 }
2718 mlog(0, "lock %s, successfull return from dlmunlock\n", 2811 mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n",
2719 lockres->l_name); 2812 lockres->l_name);
2720 2813
2721 ocfs2_wait_on_busy_lock(lockres); 2814 ocfs2_wait_on_busy_lock(lockres);
@@ -2806,15 +2899,15 @@ int ocfs2_drop_inode_locks(struct inode *inode)
2806 return status; 2899 return status;
2807} 2900}
2808 2901
2809static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 2902static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2810 int new_level) 2903 int new_level)
2811{ 2904{
2812 assert_spin_locked(&lockres->l_lock); 2905 assert_spin_locked(&lockres->l_lock);
2813 2906
2814 BUG_ON(lockres->l_blocking <= LKM_NLMODE); 2907 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
2815 2908
2816 if (lockres->l_level <= new_level) { 2909 if (lockres->l_level <= new_level) {
2817 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n", 2910 mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n",
2818 lockres->l_level, new_level); 2911 lockres->l_level, new_level);
2819 BUG(); 2912 BUG();
2820 } 2913 }
@@ -2825,33 +2918,33 @@ static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2825 lockres->l_action = OCFS2_AST_DOWNCONVERT; 2918 lockres->l_action = OCFS2_AST_DOWNCONVERT;
2826 lockres->l_requested = new_level; 2919 lockres->l_requested = new_level;
2827 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2920 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2921 return lockres_set_pending(lockres);
2828} 2922}
2829 2923
2830static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 2924static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2831 struct ocfs2_lock_res *lockres, 2925 struct ocfs2_lock_res *lockres,
2832 int new_level, 2926 int new_level,
2833 int lvb) 2927 int lvb,
2928 unsigned int generation)
2834{ 2929{
2835 int ret, dlm_flags = LKM_CONVERT; 2930 int ret;
2836 enum dlm_status status; 2931 u32 dlm_flags = DLM_LKF_CONVERT;
2837 2932
2838 mlog_entry_void(); 2933 mlog_entry_void();
2839 2934
2840 if (lvb) 2935 if (lvb)
2841 dlm_flags |= LKM_VALBLK; 2936 dlm_flags |= DLM_LKF_VALBLK;
2842 2937
2843 status = dlmlock(osb->dlm, 2938 ret = ocfs2_dlm_lock(osb->cconn,
2844 new_level, 2939 new_level,
2845 &lockres->l_lksb, 2940 &lockres->l_lksb,
2846 dlm_flags, 2941 dlm_flags,
2847 lockres->l_name, 2942 lockres->l_name,
2848 OCFS2_LOCK_ID_MAX_LEN - 1, 2943 OCFS2_LOCK_ID_MAX_LEN - 1,
2849 ocfs2_locking_ast, 2944 lockres);
2850 lockres, 2945 lockres_clear_pending(lockres, generation, osb);
2851 ocfs2_blocking_ast); 2946 if (ret) {
2852 if (status != DLM_NORMAL) { 2947 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
2853 ocfs2_log_dlm_error("dlmlock", status, lockres);
2854 ret = -EINVAL;
2855 ocfs2_recover_from_dlm_error(lockres, 1); 2948 ocfs2_recover_from_dlm_error(lockres, 1);
2856 goto bail; 2949 goto bail;
2857 } 2950 }
@@ -2862,7 +2955,7 @@ bail:
2862 return ret; 2955 return ret;
2863} 2956}
2864 2957
2865/* returns 1 when the caller should unlock and call dlmunlock */ 2958/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
2866static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 2959static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2867 struct ocfs2_lock_res *lockres) 2960 struct ocfs2_lock_res *lockres)
2868{ 2961{
@@ -2898,24 +2991,18 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2898 struct ocfs2_lock_res *lockres) 2991 struct ocfs2_lock_res *lockres)
2899{ 2992{
2900 int ret; 2993 int ret;
2901 enum dlm_status status;
2902 2994
2903 mlog_entry_void(); 2995 mlog_entry_void();
2904 mlog(0, "lock %s\n", lockres->l_name); 2996 mlog(0, "lock %s\n", lockres->l_name);
2905 2997
2906 ret = 0; 2998 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
2907 status = dlmunlock(osb->dlm, 2999 DLM_LKF_CANCEL, lockres);
2908 &lockres->l_lksb, 3000 if (ret) {
2909 LKM_CANCEL, 3001 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
2910 ocfs2_unlock_ast,
2911 lockres);
2912 if (status != DLM_NORMAL) {
2913 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2914 ret = -EINVAL;
2915 ocfs2_recover_from_dlm_error(lockres, 0); 3002 ocfs2_recover_from_dlm_error(lockres, 0);
2916 } 3003 }
2917 3004
2918 mlog(0, "lock %s return from dlmunlock\n", lockres->l_name); 3005 mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);
2919 3006
2920 mlog_exit(ret); 3007 mlog_exit(ret);
2921 return ret; 3008 return ret;
@@ -2930,6 +3017,7 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2930 int new_level; 3017 int new_level;
2931 int ret = 0; 3018 int ret = 0;
2932 int set_lvb = 0; 3019 int set_lvb = 0;
3020 unsigned int gen;
2933 3021
2934 mlog_entry_void(); 3022 mlog_entry_void();
2935 3023
@@ -2939,6 +3027,32 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2939 3027
2940recheck: 3028recheck:
2941 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3029 if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3030 /* XXX
3031 * This is a *big* race. The OCFS2_LOCK_PENDING flag
3032 * exists entirely for one reason - another thread has set
3033 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3034 *
3035 * If we do ocfs2_cancel_convert() before the other thread
3036 * calls dlm_lock(), our cancel will do nothing. We will
3037 * get no ast, and we will have no way of knowing the
3038 * cancel failed. Meanwhile, the other thread will call
3039 * into dlm_lock() and wait...forever.
3040 *
3041 * Why forever? Because another node has asked for the
3042 * lock first; that's why we're here in unblock_lock().
3043 *
3044 * The solution is OCFS2_LOCK_PENDING. When PENDING is
3045 * set, we just requeue the unblock. Only when the other
3046 * thread has called dlm_lock() and cleared PENDING will
3047 * we then cancel their request.
3048 *
3049 * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3050 * at the same time they set OCFS2_DLM_BUSY. They must
3051 * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3052 */
3053 if (lockres->l_flags & OCFS2_LOCK_PENDING)
3054 goto leave_requeue;
3055
2942 ctl->requeue = 1; 3056 ctl->requeue = 1;
2943 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3057 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2944 spin_unlock_irqrestore(&lockres->l_lock, flags); 3058 spin_unlock_irqrestore(&lockres->l_lock, flags);
@@ -2952,13 +3066,13 @@ recheck:
2952 3066
2953 /* if we're blocking an exclusive and we have *any* holders, 3067 /* if we're blocking an exclusive and we have *any* holders,
2954 * then requeue. */ 3068 * then requeue. */
2955 if ((lockres->l_blocking == LKM_EXMODE) 3069 if ((lockres->l_blocking == DLM_LOCK_EX)
2956 && (lockres->l_ex_holders || lockres->l_ro_holders)) 3070 && (lockres->l_ex_holders || lockres->l_ro_holders))
2957 goto leave_requeue; 3071 goto leave_requeue;
2958 3072
2959 /* If it's a PR we're blocking, then only 3073 /* If it's a PR we're blocking, then only
2960 * requeue if we've got any EX holders */ 3074 * requeue if we've got any EX holders */
2961 if (lockres->l_blocking == LKM_PRMODE && 3075 if (lockres->l_blocking == DLM_LOCK_PR &&
2962 lockres->l_ex_holders) 3076 lockres->l_ex_holders)
2963 goto leave_requeue; 3077 goto leave_requeue;
2964 3078
@@ -3005,7 +3119,7 @@ downconvert:
3005 ctl->requeue = 0; 3119 ctl->requeue = 0;
3006 3120
3007 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3121 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3008 if (lockres->l_level == LKM_EXMODE) 3122 if (lockres->l_level == DLM_LOCK_EX)
3009 set_lvb = 1; 3123 set_lvb = 1;
3010 3124
3011 /* 3125 /*
@@ -3018,9 +3132,11 @@ downconvert:
3018 lockres->l_ops->set_lvb(lockres); 3132 lockres->l_ops->set_lvb(lockres);
3019 } 3133 }
3020 3134
3021 ocfs2_prepare_downconvert(lockres, new_level); 3135 gen = ocfs2_prepare_downconvert(lockres, new_level);
3022 spin_unlock_irqrestore(&lockres->l_lock, flags); 3136 spin_unlock_irqrestore(&lockres->l_lock, flags);
3023 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb); 3137 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3138 gen);
3139
3024leave: 3140leave:
3025 mlog_exit(ret); 3141 mlog_exit(ret);
3026 return ret; 3142 return ret;
@@ -3042,7 +3158,7 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3042 inode = ocfs2_lock_res_inode(lockres); 3158 inode = ocfs2_lock_res_inode(lockres);
3043 mapping = inode->i_mapping; 3159 mapping = inode->i_mapping;
3044 3160
3045 if (S_ISREG(inode->i_mode)) 3161 if (!S_ISREG(inode->i_mode))
3046 goto out; 3162 goto out;
3047 3163
3048 /* 3164 /*
@@ -3059,7 +3175,7 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3059 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3175 (unsigned long long)OCFS2_I(inode)->ip_blkno);
3060 } 3176 }
3061 sync_mapping_buffers(mapping); 3177 sync_mapping_buffers(mapping);
3062 if (blocking == LKM_EXMODE) { 3178 if (blocking == DLM_LOCK_EX) {
3063 truncate_inode_pages(mapping, 0); 3179 truncate_inode_pages(mapping, 0);
3064 } else { 3180 } else {
3065 /* We only need to wait on the I/O if we're not also 3181 /* We only need to wait on the I/O if we're not also
@@ -3080,8 +3196,8 @@ static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3080 struct inode *inode = ocfs2_lock_res_inode(lockres); 3196 struct inode *inode = ocfs2_lock_res_inode(lockres);
3081 int checkpointed = ocfs2_inode_fully_checkpointed(inode); 3197 int checkpointed = ocfs2_inode_fully_checkpointed(inode);
3082 3198
3083 BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE); 3199 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3084 BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed); 3200 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3085 3201
3086 if (checkpointed) 3202 if (checkpointed)
3087 return 1; 3203 return 1;
@@ -3145,7 +3261,7 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3145 * valid. The downconvert code will retain a PR for this node, 3261 * valid. The downconvert code will retain a PR for this node,
3146 * so there's no further work to do. 3262 * so there's no further work to do.
3147 */ 3263 */
3148 if (blocking == LKM_PRMODE) 3264 if (blocking == DLM_LOCK_PR)
3149 return UNBLOCK_CONTINUE; 3265 return UNBLOCK_CONTINUE;
3150 3266
3151 /* 3267 /*
@@ -3219,8 +3335,47 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3219 return UNBLOCK_CONTINUE_POST; 3335 return UNBLOCK_CONTINUE_POST;
3220} 3336}
3221 3337
3222void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3338/*
3223 struct ocfs2_lock_res *lockres) 3339 * This is the filesystem locking protocol. It provides the lock handling
3340 * hooks for the underlying DLM. It has a maximum version number.
3341 * The version number allows interoperability with systems running at
3342 * the same major number and an equal or smaller minor number.
3343 *
3344 * Whenever the filesystem does new things with locks (adds or removes a
3345 * lock, orders them differently, does different things underneath a lock),
3346 * the version must be changed. The protocol is negotiated when joining
3347 * the dlm domain. A node may join the domain if its major version is
3348 * identical to all other nodes and its minor version is greater than
3349 * or equal to all other nodes. When its minor version is greater than
3350 * the other nodes, it will run at the minor version specified by the
3351 * other nodes.
3352 *
3353 * If a locking change is made that will not be compatible with older
3354 * versions, the major number must be increased and the minor version set
3355 * to zero. If a change merely adds a behavior that can be disabled when
3356 * speaking to older versions, the minor version must be increased. If a
3357 * change adds a fully backwards compatible change (eg, LVB changes that
3358 * are just ignored by older versions), the version does not need to be
3359 * updated.
3360 */
3361static struct ocfs2_locking_protocol lproto = {
3362 .lp_max_version = {
3363 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
3364 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
3365 },
3366 .lp_lock_ast = ocfs2_locking_ast,
3367 .lp_blocking_ast = ocfs2_blocking_ast,
3368 .lp_unlock_ast = ocfs2_unlock_ast,
3369};
3370
3371void ocfs2_set_locking_protocol(void)
3372{
3373 ocfs2_stack_glue_set_locking_protocol(&lproto);
3374}
3375
3376
3377static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3378 struct ocfs2_lock_res *lockres)
3224{ 3379{
3225 int status; 3380 int status;
3226 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3381 struct ocfs2_unblock_ctl ctl = {0, 0,};
@@ -3356,7 +3511,7 @@ static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
3356 return should_wake; 3511 return should_wake;
3357} 3512}
3358 3513
3359int ocfs2_downconvert_thread(void *arg) 3514static int ocfs2_downconvert_thread(void *arg)
3360{ 3515{
3361 int status = 0; 3516 int status = 0;
3362 struct ocfs2_super *osb = arg; 3517 struct ocfs2_super *osb = arg;