aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm
diff options
context:
space:
mode:
authorJonathan Herman <hermanjl@cs.unc.edu>2013-01-17 16:15:55 -0500
committerJonathan Herman <hermanjl@cs.unc.edu>2013-01-17 16:15:55 -0500
commit8dea78da5cee153b8af9c07a2745f6c55057fe12 (patch)
treea8f4d49d63b1ecc92f2fddceba0655b2472c5bd9 /fs/ocfs2/dlm
parent406089d01562f1e2bf9f089fd7637009ebaad589 (diff)
Patched in Tegra support.
Diffstat (limited to 'fs/ocfs2/dlm')
-rw-r--r--fs/ocfs2/dlm/dlmast.c2
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h62
-rw-r--r--fs/ocfs2/dlm/dlmdebug.c1
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c46
-rw-r--r--fs/ocfs2/dlm/dlmlock.c54
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c175
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c164
-rw-r--r--fs/ocfs2/dlm/dlmthread.c16
8 files changed, 276 insertions, 244 deletions
diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c
index fbec0be6232..3a3ed4bb794 100644
--- a/fs/ocfs2/dlm/dlmast.c
+++ b/fs/ocfs2/dlm/dlmast.c
@@ -293,7 +293,7 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data,
293 struct dlm_proxy_ast *past = (struct dlm_proxy_ast *) msg->buf; 293 struct dlm_proxy_ast *past = (struct dlm_proxy_ast *) msg->buf;
294 char *name; 294 char *name;
295 struct list_head *iter, *head=NULL; 295 struct list_head *iter, *head=NULL;
296 __be64 cookie; 296 u64 cookie;
297 u32 flags; 297 u32 flags;
298 u8 node; 298 u8 node;
299 299
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index de854cca12a..d602abb51b6 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -679,7 +679,7 @@ struct dlm_query_join_packet {
679}; 679};
680 680
681union dlm_query_join_response { 681union dlm_query_join_response {
682 __be32 intval; 682 u32 intval;
683 struct dlm_query_join_packet packet; 683 struct dlm_query_join_packet packet;
684}; 684};
685 685
@@ -755,8 +755,8 @@ struct dlm_query_region {
755struct dlm_node_info { 755struct dlm_node_info {
756 u8 ni_nodenum; 756 u8 ni_nodenum;
757 u8 pad1; 757 u8 pad1;
758 __be16 ni_ipv4_port; 758 u16 ni_ipv4_port;
759 __be32 ni_ipv4_address; 759 u32 ni_ipv4_address;
760}; 760};
761 761
762struct dlm_query_nodeinfo { 762struct dlm_query_nodeinfo {
@@ -859,8 +859,8 @@ void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
859void dlm_wait_for_recovery(struct dlm_ctxt *dlm); 859void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
860void dlm_kick_recovery_thread(struct dlm_ctxt *dlm); 860void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
861int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node); 861int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
862void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout); 862int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
863void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout); 863int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout);
864 864
865void dlm_put(struct dlm_ctxt *dlm); 865void dlm_put(struct dlm_ctxt *dlm);
866struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm); 866struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm);
@@ -877,8 +877,9 @@ static inline void dlm_lockres_get(struct dlm_lock_resource *res)
877 kref_get(&res->refs); 877 kref_get(&res->refs);
878} 878}
879void dlm_lockres_put(struct dlm_lock_resource *res); 879void dlm_lockres_put(struct dlm_lock_resource *res);
880void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); 880void __dlm_unhash_lockres(struct dlm_lock_resource *res);
881void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); 881void __dlm_insert_lockres(struct dlm_ctxt *dlm,
882 struct dlm_lock_resource *res);
882struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, 883struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
883 const char *name, 884 const char *name,
884 unsigned int len, 885 unsigned int len,
@@ -901,15 +902,46 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
901 const char *name, 902 const char *name,
902 unsigned int namelen); 903 unsigned int namelen);
903 904
904void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm, 905#define dlm_lockres_set_refmap_bit(bit,res) \
905 struct dlm_lock_resource *res, int bit); 906 __dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__)
906void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm, 907#define dlm_lockres_clear_refmap_bit(bit,res) \
907 struct dlm_lock_resource *res, int bit); 908 __dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__)
908 909
909void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, 910static inline void __dlm_lockres_set_refmap_bit(int bit,
910 struct dlm_lock_resource *res); 911 struct dlm_lock_resource *res,
911void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, 912 const char *file,
912 struct dlm_lock_resource *res); 913 int line)
914{
915 //printk("%s:%d:%.*s: setting bit %d\n", file, line,
916 // res->lockname.len, res->lockname.name, bit);
917 set_bit(bit, res->refmap);
918}
919
920static inline void __dlm_lockres_clear_refmap_bit(int bit,
921 struct dlm_lock_resource *res,
922 const char *file,
923 int line)
924{
925 //printk("%s:%d:%.*s: clearing bit %d\n", file, line,
926 // res->lockname.len, res->lockname.name, bit);
927 clear_bit(bit, res->refmap);
928}
929
930void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
931 struct dlm_lock_resource *res,
932 const char *file,
933 int line);
934void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
935 struct dlm_lock_resource *res,
936 int new_lockres,
937 const char *file,
938 int line);
939#define dlm_lockres_drop_inflight_ref(d,r) \
940 __dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__)
941#define dlm_lockres_grab_inflight_ref(d,r) \
942 __dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__)
943#define dlm_lockres_grab_inflight_ref_new(d,r) \
944 __dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__)
913 945
914void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); 946void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
915void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock); 947void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c
index 0e28e242226..56f82cb912e 100644
--- a/fs/ocfs2/dlm/dlmdebug.c
+++ b/fs/ocfs2/dlm/dlmdebug.c
@@ -30,7 +30,6 @@
30#include <linux/sysctl.h> 30#include <linux/sysctl.h>
31#include <linux/spinlock.h> 31#include <linux/spinlock.h>
32#include <linux/debugfs.h> 32#include <linux/debugfs.h>
33#include <linux/export.h>
34 33
35#include "cluster/heartbeat.h" 34#include "cluster/heartbeat.h"
36#include "cluster/nodemanager.h" 35#include "cluster/nodemanager.h"
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 9e89d70df33..6ed6b95dcf9 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -157,18 +157,16 @@ static int dlm_protocol_compare(struct dlm_protocol_version *existing,
157 157
158static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); 158static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
159 159
160void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 160void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
161{ 161{
162 if (hlist_unhashed(&res->hash_node)) 162 if (!hlist_unhashed(&lockres->hash_node)) {
163 return; 163 hlist_del_init(&lockres->hash_node);
164 164 dlm_lockres_put(lockres);
165 mlog(0, "%s: Unhash res %.*s\n", dlm->name, res->lockname.len, 165 }
166 res->lockname.name);
167 hlist_del_init(&res->hash_node);
168 dlm_lockres_put(res);
169} 166}
170 167
171void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 168void __dlm_insert_lockres(struct dlm_ctxt *dlm,
169 struct dlm_lock_resource *res)
172{ 170{
173 struct hlist_head *bucket; 171 struct hlist_head *bucket;
174 struct qstr *q; 172 struct qstr *q;
@@ -182,9 +180,6 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
182 dlm_lockres_get(res); 180 dlm_lockres_get(res);
183 181
184 hlist_add_head(&res->hash_node, bucket); 182 hlist_add_head(&res->hash_node, bucket);
185
186 mlog(0, "%s: Hash res %.*s\n", dlm->name, res->lockname.len,
187 res->lockname.name);
188} 183}
189 184
190struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, 185struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
@@ -544,17 +539,17 @@ again:
544 539
545static void __dlm_print_nodes(struct dlm_ctxt *dlm) 540static void __dlm_print_nodes(struct dlm_ctxt *dlm)
546{ 541{
547 int node = -1, num = 0; 542 int node = -1;
548 543
549 assert_spin_locked(&dlm->spinlock); 544 assert_spin_locked(&dlm->spinlock);
550 545
551 printk("( "); 546 printk(KERN_NOTICE "o2dlm: Nodes in domain %s: ", dlm->name);
547
552 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 548 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
553 node + 1)) < O2NM_MAX_NODES) { 549 node + 1)) < O2NM_MAX_NODES) {
554 printk("%d ", node); 550 printk("%d ", node);
555 ++num;
556 } 551 }
557 printk(") %u nodes\n", num); 552 printk("\n");
558} 553}
559 554
560static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 555static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
@@ -571,10 +566,11 @@ static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
571 566
572 node = exit_msg->node_idx; 567 node = exit_msg->node_idx;
573 568
569 printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s\n", node, dlm->name);
570
574 spin_lock(&dlm->spinlock); 571 spin_lock(&dlm->spinlock);
575 clear_bit(node, dlm->domain_map); 572 clear_bit(node, dlm->domain_map);
576 clear_bit(node, dlm->exit_domain_map); 573 clear_bit(node, dlm->exit_domain_map);
577 printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s ", node, dlm->name);
578 __dlm_print_nodes(dlm); 574 __dlm_print_nodes(dlm);
579 575
580 /* notify anything attached to the heartbeat events */ 576 /* notify anything attached to the heartbeat events */
@@ -759,7 +755,6 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
759 755
760 dlm_mark_domain_leaving(dlm); 756 dlm_mark_domain_leaving(dlm);
761 dlm_leave_domain(dlm); 757 dlm_leave_domain(dlm);
762 printk(KERN_NOTICE "o2dlm: Leaving domain %s\n", dlm->name);
763 dlm_force_free_mles(dlm); 758 dlm_force_free_mles(dlm);
764 dlm_complete_dlm_shutdown(dlm); 759 dlm_complete_dlm_shutdown(dlm);
765 } 760 }
@@ -818,7 +813,7 @@ static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet,
818 union dlm_query_join_response response; 813 union dlm_query_join_response response;
819 814
820 response.packet = *packet; 815 response.packet = *packet;
821 *wire = be32_to_cpu(response.intval); 816 *wire = cpu_to_be32(response.intval);
822} 817}
823 818
824static void dlm_query_join_wire_to_packet(u32 wire, 819static void dlm_query_join_wire_to_packet(u32 wire,
@@ -975,7 +970,7 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
975 clear_bit(assert->node_idx, dlm->exit_domain_map); 970 clear_bit(assert->node_idx, dlm->exit_domain_map);
976 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 971 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
977 972
978 printk(KERN_NOTICE "o2dlm: Node %u joins domain %s ", 973 printk(KERN_NOTICE "o2dlm: Node %u joins domain %s\n",
979 assert->node_idx, dlm->name); 974 assert->node_idx, dlm->name);
980 __dlm_print_nodes(dlm); 975 __dlm_print_nodes(dlm);
981 976
@@ -1706,10 +1701,8 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1706bail: 1701bail:
1707 spin_lock(&dlm->spinlock); 1702 spin_lock(&dlm->spinlock);
1708 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); 1703 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1709 if (!status) { 1704 if (!status)
1710 printk(KERN_NOTICE "o2dlm: Joining domain %s ", dlm->name);
1711 __dlm_print_nodes(dlm); 1705 __dlm_print_nodes(dlm);
1712 }
1713 spin_unlock(&dlm->spinlock); 1706 spin_unlock(&dlm->spinlock);
1714 1707
1715 if (ctxt) { 1708 if (ctxt) {
@@ -2138,6 +2131,13 @@ struct dlm_ctxt * dlm_register_domain(const char *domain,
2138 goto leave; 2131 goto leave;
2139 } 2132 }
2140 2133
2134 if (!o2hb_check_local_node_heartbeating()) {
2135 mlog(ML_ERROR, "the local node has not been configured, or is "
2136 "not heartbeating\n");
2137 ret = -EPROTO;
2138 goto leave;
2139 }
2140
2141 mlog(0, "register called for domain \"%s\"\n", domain); 2141 mlog(0, "register called for domain \"%s\"\n", domain);
2142 2142
2143retry: 2143retry:
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index 975810b9849..8d39e0fd66f 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -183,6 +183,10 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
183 kick_thread = 1; 183 kick_thread = 1;
184 } 184 }
185 } 185 }
186 /* reduce the inflight count, this may result in the lockres
187 * being purged below during calc_usage */
188 if (lock->ml.node == dlm->node_num)
189 dlm_lockres_drop_inflight_ref(dlm, res);
186 190
187 spin_unlock(&res->spinlock); 191 spin_unlock(&res->spinlock);
188 wake_up(&res->wq); 192 wake_up(&res->wq);
@@ -227,16 +231,10 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
227 lock->ml.type, res->lockname.len, 231 lock->ml.type, res->lockname.len,
228 res->lockname.name, flags); 232 res->lockname.name, flags);
229 233
230 /*
231 * Wait if resource is getting recovered, remastered, etc.
232 * If the resource was remastered and new owner is self, then exit.
233 */
234 spin_lock(&res->spinlock); 234 spin_lock(&res->spinlock);
235
236 /* will exit this call with spinlock held */
235 __dlm_wait_on_lockres(res); 237 __dlm_wait_on_lockres(res);
236 if (res->owner == dlm->node_num) {
237 spin_unlock(&res->spinlock);
238 return DLM_RECOVERING;
239 }
240 res->state |= DLM_LOCK_RES_IN_PROGRESS; 238 res->state |= DLM_LOCK_RES_IN_PROGRESS;
241 239
242 /* add lock to local (secondary) queue */ 240 /* add lock to local (secondary) queue */
@@ -321,23 +319,27 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
321 tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, 319 tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create,
322 sizeof(create), res->owner, &status); 320 sizeof(create), res->owner, &status);
323 if (tmpret >= 0) { 321 if (tmpret >= 0) {
324 ret = status; 322 // successfully sent and received
323 ret = status; // this is already a dlm_status
325 if (ret == DLM_REJECTED) { 324 if (ret == DLM_REJECTED) {
326 mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer " 325 mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres "
327 "owned by node %u. That node is coming back up " 326 "no longer owned by %u. that node is coming back "
328 "currently.\n", dlm->name, create.namelen, 327 "up currently.\n", dlm->name, create.namelen,
329 create.name, res->owner); 328 create.name, res->owner);
330 dlm_print_one_lock_resource(res); 329 dlm_print_one_lock_resource(res);
331 BUG(); 330 BUG();
332 } 331 }
333 } else { 332 } else {
334 mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to " 333 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
335 "node %u\n", dlm->name, create.namelen, create.name, 334 "node %u\n", tmpret, DLM_CREATE_LOCK_MSG, dlm->key,
336 tmpret, res->owner); 335 res->owner);
337 if (dlm_is_host_down(tmpret)) 336 if (dlm_is_host_down(tmpret)) {
338 ret = DLM_RECOVERING; 337 ret = DLM_RECOVERING;
339 else 338 mlog(0, "node %u died so returning DLM_RECOVERING "
339 "from lock message!\n", res->owner);
340 } else {
340 ret = dlm_err_to_dlm_status(tmpret); 341 ret = dlm_err_to_dlm_status(tmpret);
342 }
341 } 343 }
342 344
343 return ret; 345 return ret;
@@ -438,7 +440,7 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
438 /* zero memory only if kernel-allocated */ 440 /* zero memory only if kernel-allocated */
439 lksb = kzalloc(sizeof(*lksb), GFP_NOFS); 441 lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
440 if (!lksb) { 442 if (!lksb) {
441 kmem_cache_free(dlm_lock_cache, lock); 443 kfree(lock);
442 return NULL; 444 return NULL;
443 } 445 }
444 kernel_allocated = 1; 446 kernel_allocated = 1;
@@ -716,10 +718,18 @@ retry_lock:
716 718
717 if (status == DLM_RECOVERING || status == DLM_MIGRATING || 719 if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
718 status == DLM_FORWARD) { 720 status == DLM_FORWARD) {
721 mlog(0, "retrying lock with migration/"
722 "recovery/in progress\n");
719 msleep(100); 723 msleep(100);
724 /* no waiting for dlm_reco_thread */
720 if (recovery) { 725 if (recovery) {
721 if (status != DLM_RECOVERING) 726 if (status != DLM_RECOVERING)
722 goto retry_lock; 727 goto retry_lock;
728
729 mlog(0, "%s: got RECOVERING "
730 "for $RECOVERY lock, master "
731 "was %u\n", dlm->name,
732 res->owner);
723 /* wait to see the node go down, then 733 /* wait to see the node go down, then
724 * drop down and allow the lockres to 734 * drop down and allow the lockres to
725 * get cleaned up. need to remaster. */ 735 * get cleaned up. need to remaster. */
@@ -731,14 +741,6 @@ retry_lock:
731 } 741 }
732 } 742 }
733 743
734 /* Inflight taken in dlm_get_lock_resource() is dropped here */
735 spin_lock(&res->spinlock);
736 dlm_lockres_drop_inflight_ref(dlm, res);
737 spin_unlock(&res->spinlock);
738
739 dlm_lockres_calc_usage(dlm, res);
740 dlm_kick_thread(dlm, res);
741
742 if (status != DLM_NORMAL) { 744 if (status != DLM_NORMAL) {
743 lock->lksb->flags &= ~DLM_LKSB_GET_LVB; 745 lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
744 if (status != DLM_NOTQUEUED) 746 if (status != DLM_NOTQUEUED)
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 005261c333b..11eefb8c12e 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -631,54 +631,39 @@ error:
631 return NULL; 631 return NULL;
632} 632}
633 633
634void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm, 634void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
635 struct dlm_lock_resource *res, int bit) 635 struct dlm_lock_resource *res,
636 int new_lockres,
637 const char *file,
638 int line)
636{ 639{
637 assert_spin_locked(&res->spinlock); 640 if (!new_lockres)
638 641 assert_spin_locked(&res->spinlock);
639 mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
640 res->lockname.name, bit, __builtin_return_address(0));
641
642 set_bit(bit, res->refmap);
643}
644
645void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
646 struct dlm_lock_resource *res, int bit)
647{
648 assert_spin_locked(&res->spinlock);
649
650 mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
651 res->lockname.name, bit, __builtin_return_address(0));
652
653 clear_bit(bit, res->refmap);
654}
655
656
657void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
658 struct dlm_lock_resource *res)
659{
660 assert_spin_locked(&res->spinlock);
661 642
643 if (!test_bit(dlm->node_num, res->refmap)) {
644 BUG_ON(res->inflight_locks != 0);
645 dlm_lockres_set_refmap_bit(dlm->node_num, res);
646 }
662 res->inflight_locks++; 647 res->inflight_locks++;
663 648 mlog(0, "%s:%.*s: inflight++: now %u\n",
664 mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name, 649 dlm->name, res->lockname.len, res->lockname.name,
665 res->lockname.len, res->lockname.name, res->inflight_locks, 650 res->inflight_locks);
666 __builtin_return_address(0));
667} 651}
668 652
669void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, 653void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
670 struct dlm_lock_resource *res) 654 struct dlm_lock_resource *res,
655 const char *file,
656 int line)
671{ 657{
672 assert_spin_locked(&res->spinlock); 658 assert_spin_locked(&res->spinlock);
673 659
674 BUG_ON(res->inflight_locks == 0); 660 BUG_ON(res->inflight_locks == 0);
675
676 res->inflight_locks--; 661 res->inflight_locks--;
677 662 mlog(0, "%s:%.*s: inflight--: now %u\n",
678 mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name, 663 dlm->name, res->lockname.len, res->lockname.name,
679 res->lockname.len, res->lockname.name, res->inflight_locks, 664 res->inflight_locks);
680 __builtin_return_address(0)); 665 if (res->inflight_locks == 0)
681 666 dlm_lockres_clear_refmap_bit(dlm->node_num, res);
682 wake_up(&res->wq); 667 wake_up(&res->wq);
683} 668}
684 669
@@ -712,6 +697,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
712 unsigned int hash; 697 unsigned int hash;
713 int tries = 0; 698 int tries = 0;
714 int bit, wait_on_recovery = 0; 699 int bit, wait_on_recovery = 0;
700 int drop_inflight_if_nonlocal = 0;
715 701
716 BUG_ON(!lockid); 702 BUG_ON(!lockid);
717 703
@@ -723,33 +709,36 @@ lookup:
723 spin_lock(&dlm->spinlock); 709 spin_lock(&dlm->spinlock);
724 tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash); 710 tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
725 if (tmpres) { 711 if (tmpres) {
712 int dropping_ref = 0;
713
726 spin_unlock(&dlm->spinlock); 714 spin_unlock(&dlm->spinlock);
715
727 spin_lock(&tmpres->spinlock); 716 spin_lock(&tmpres->spinlock);
728 /* Wait on the thread that is mastering the resource */ 717 /* We wait for the other thread that is mastering the resource */
729 if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { 718 if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
730 __dlm_wait_on_lockres(tmpres); 719 __dlm_wait_on_lockres(tmpres);
731 BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN); 720 BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
732 spin_unlock(&tmpres->spinlock);
733 dlm_lockres_put(tmpres);
734 tmpres = NULL;
735 goto lookup;
736 } 721 }
737 722
738 /* Wait on the resource purge to complete before continuing */ 723 if (tmpres->owner == dlm->node_num) {
739 if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) { 724 BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
740 BUG_ON(tmpres->owner == dlm->node_num); 725 dlm_lockres_grab_inflight_ref(dlm, tmpres);
741 __dlm_wait_on_lockres_flags(tmpres, 726 } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
742 DLM_LOCK_RES_DROPPING_REF); 727 dropping_ref = 1;
728 spin_unlock(&tmpres->spinlock);
729
730 /* wait until done messaging the master, drop our ref to allow
731 * the lockres to be purged, start over. */
732 if (dropping_ref) {
733 spin_lock(&tmpres->spinlock);
734 __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
743 spin_unlock(&tmpres->spinlock); 735 spin_unlock(&tmpres->spinlock);
744 dlm_lockres_put(tmpres); 736 dlm_lockres_put(tmpres);
745 tmpres = NULL; 737 tmpres = NULL;
746 goto lookup; 738 goto lookup;
747 } 739 }
748 740
749 /* Grab inflight ref to pin the resource */ 741 mlog(0, "found in hash!\n");
750 dlm_lockres_grab_inflight_ref(dlm, tmpres);
751
752 spin_unlock(&tmpres->spinlock);
753 if (res) 742 if (res)
754 dlm_lockres_put(res); 743 dlm_lockres_put(res);
755 res = tmpres; 744 res = tmpres;
@@ -840,8 +829,8 @@ lookup:
840 * but they might own this lockres. wait on them. */ 829 * but they might own this lockres. wait on them. */
841 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); 830 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
842 if (bit < O2NM_MAX_NODES) { 831 if (bit < O2NM_MAX_NODES) {
843 mlog(0, "%s: res %.*s, At least one node (%d) " 832 mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
844 "to recover before lock mastery can begin\n", 833 "recover before lock mastery can begin\n",
845 dlm->name, namelen, (char *)lockid, bit); 834 dlm->name, namelen, (char *)lockid, bit);
846 wait_on_recovery = 1; 835 wait_on_recovery = 1;
847 } 836 }
@@ -854,11 +843,12 @@ lookup:
854 843
855 /* finally add the lockres to its hash bucket */ 844 /* finally add the lockres to its hash bucket */
856 __dlm_insert_lockres(dlm, res); 845 __dlm_insert_lockres(dlm, res);
846 /* since this lockres is new it doesn't not require the spinlock */
847 dlm_lockres_grab_inflight_ref_new(dlm, res);
857 848
858 /* Grab inflight ref to pin the resource */ 849 /* if this node does not become the master make sure to drop
859 spin_lock(&res->spinlock); 850 * this inflight reference below */
860 dlm_lockres_grab_inflight_ref(dlm, res); 851 drop_inflight_if_nonlocal = 1;
861 spin_unlock(&res->spinlock);
862 852
863 /* get an extra ref on the mle in case this is a BLOCK 853 /* get an extra ref on the mle in case this is a BLOCK
864 * if so, the creator of the BLOCK may try to put the last 854 * if so, the creator of the BLOCK may try to put the last
@@ -874,8 +864,8 @@ redo_request:
874 * dlm spinlock would be detectable be a change on the mle, 864 * dlm spinlock would be detectable be a change on the mle,
875 * so we only need to clear out the recovery map once. */ 865 * so we only need to clear out the recovery map once. */
876 if (dlm_is_recovery_lock(lockid, namelen)) { 866 if (dlm_is_recovery_lock(lockid, namelen)) {
877 mlog(0, "%s: Recovery map is not empty, but must " 867 mlog(ML_NOTICE, "%s: recovery map is not empty, but "
878 "master $RECOVERY lock now\n", dlm->name); 868 "must master $RECOVERY lock now\n", dlm->name);
879 if (!dlm_pre_master_reco_lockres(dlm, res)) 869 if (!dlm_pre_master_reco_lockres(dlm, res))
880 wait_on_recovery = 0; 870 wait_on_recovery = 0;
881 else { 871 else {
@@ -893,8 +883,8 @@ redo_request:
893 spin_lock(&dlm->spinlock); 883 spin_lock(&dlm->spinlock);
894 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); 884 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
895 if (bit < O2NM_MAX_NODES) { 885 if (bit < O2NM_MAX_NODES) {
896 mlog(0, "%s: res %.*s, At least one node (%d) " 886 mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
897 "to recover before lock mastery can begin\n", 887 "recover before lock mastery can begin\n",
898 dlm->name, namelen, (char *)lockid, bit); 888 dlm->name, namelen, (char *)lockid, bit);
899 wait_on_recovery = 1; 889 wait_on_recovery = 1;
900 } else 890 } else
@@ -923,8 +913,8 @@ redo_request:
923 * yet, keep going until it does. this is how the 913 * yet, keep going until it does. this is how the
924 * master will know that asserts are needed back to 914 * master will know that asserts are needed back to
925 * the lower nodes. */ 915 * the lower nodes. */
926 mlog(0, "%s: res %.*s, Requests only up to %u but " 916 mlog(0, "%s:%.*s: requests only up to %u but master "
927 "master is %u, keep going\n", dlm->name, namelen, 917 "is %u, keep going\n", dlm->name, namelen,
928 lockid, nodenum, mle->master); 918 lockid, nodenum, mle->master);
929 } 919 }
930 } 920 }
@@ -934,12 +924,13 @@ wait:
934 ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); 924 ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
935 if (ret < 0) { 925 if (ret < 0) {
936 wait_on_recovery = 1; 926 wait_on_recovery = 1;
937 mlog(0, "%s: res %.*s, Node map changed, redo the master " 927 mlog(0, "%s:%.*s: node map changed, redo the "
938 "request now, blocked=%d\n", dlm->name, res->lockname.len, 928 "master request now, blocked=%d\n",
929 dlm->name, res->lockname.len,
939 res->lockname.name, blocked); 930 res->lockname.name, blocked);
940 if (++tries > 20) { 931 if (++tries > 20) {
941 mlog(ML_ERROR, "%s: res %.*s, Spinning on " 932 mlog(ML_ERROR, "%s:%.*s: spinning on "
942 "dlm_wait_for_lock_mastery, blocked = %d\n", 933 "dlm_wait_for_lock_mastery, blocked=%d\n",
943 dlm->name, res->lockname.len, 934 dlm->name, res->lockname.len,
944 res->lockname.name, blocked); 935 res->lockname.name, blocked);
945 dlm_print_one_lock_resource(res); 936 dlm_print_one_lock_resource(res);
@@ -949,8 +940,7 @@ wait:
949 goto redo_request; 940 goto redo_request;
950 } 941 }
951 942
952 mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len, 943 mlog(0, "lockres mastered by %u\n", res->owner);
953 res->lockname.name, res->owner);
954 /* make sure we never continue without this */ 944 /* make sure we never continue without this */
955 BUG_ON(res->owner == O2NM_MAX_NODES); 945 BUG_ON(res->owner == O2NM_MAX_NODES);
956 946
@@ -962,6 +952,8 @@ wait:
962 952
963wake_waiters: 953wake_waiters:
964 spin_lock(&res->spinlock); 954 spin_lock(&res->spinlock);
955 if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
956 dlm_lockres_drop_inflight_ref(dlm, res);
965 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 957 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
966 spin_unlock(&res->spinlock); 958 spin_unlock(&res->spinlock);
967 wake_up(&res->wq); 959 wake_up(&res->wq);
@@ -1434,7 +1426,9 @@ way_up_top:
1434 } 1426 }
1435 1427
1436 if (res->owner == dlm->node_num) { 1428 if (res->owner == dlm->node_num) {
1437 dlm_lockres_set_refmap_bit(dlm, res, request->node_idx); 1429 mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1430 dlm->name, namelen, name, request->node_idx);
1431 dlm_lockres_set_refmap_bit(request->node_idx, res);
1438 spin_unlock(&res->spinlock); 1432 spin_unlock(&res->spinlock);
1439 response = DLM_MASTER_RESP_YES; 1433 response = DLM_MASTER_RESP_YES;
1440 if (mle) 1434 if (mle)
@@ -1499,8 +1493,10 @@ way_up_top:
1499 * go back and clean the mles on any 1493 * go back and clean the mles on any
1500 * other nodes */ 1494 * other nodes */
1501 dispatch_assert = 1; 1495 dispatch_assert = 1;
1502 dlm_lockres_set_refmap_bit(dlm, res, 1496 dlm_lockres_set_refmap_bit(request->node_idx, res);
1503 request->node_idx); 1497 mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1498 dlm->name, namelen, name,
1499 request->node_idx);
1504 } else 1500 } else
1505 response = DLM_MASTER_RESP_NO; 1501 response = DLM_MASTER_RESP_NO;
1506 } else { 1502 } else {
@@ -1706,7 +1702,7 @@ again:
1706 "lockres, set the bit in the refmap\n", 1702 "lockres, set the bit in the refmap\n",
1707 namelen, lockname, to); 1703 namelen, lockname, to);
1708 spin_lock(&res->spinlock); 1704 spin_lock(&res->spinlock);
1709 dlm_lockres_set_refmap_bit(dlm, res, to); 1705 dlm_lockres_set_refmap_bit(to, res);
1710 spin_unlock(&res->spinlock); 1706 spin_unlock(&res->spinlock);
1711 } 1707 }
1712 } 1708 }
@@ -2191,6 +2187,8 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2191 namelen = res->lockname.len; 2187 namelen = res->lockname.len;
2192 BUG_ON(namelen > O2NM_MAX_NAME_LEN); 2188 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2193 2189
2190 mlog(0, "%s:%.*s: sending deref to %d\n",
2191 dlm->name, namelen, lockname, res->owner);
2194 memset(&deref, 0, sizeof(deref)); 2192 memset(&deref, 0, sizeof(deref));
2195 deref.node_idx = dlm->node_num; 2193 deref.node_idx = dlm->node_num;
2196 deref.namelen = namelen; 2194 deref.namelen = namelen;
@@ -2199,12 +2197,14 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2199 ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key, 2197 ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2200 &deref, sizeof(deref), res->owner, &r); 2198 &deref, sizeof(deref), res->owner, &r);
2201 if (ret < 0) 2199 if (ret < 0)
2202 mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n", 2200 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
2203 dlm->name, namelen, lockname, ret, res->owner); 2201 "node %u\n", ret, DLM_DEREF_LOCKRES_MSG, dlm->key,
2202 res->owner);
2204 else if (r < 0) { 2203 else if (r < 0) {
2205 /* BAD. other node says I did not have a ref. */ 2204 /* BAD. other node says I did not have a ref. */
2206 mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n", 2205 mlog(ML_ERROR,"while dropping ref on %s:%.*s "
2207 dlm->name, namelen, lockname, res->owner, r); 2206 "(master=%u) got %d.\n", dlm->name, namelen,
2207 lockname, res->owner, r);
2208 dlm_print_one_lock_resource(res); 2208 dlm_print_one_lock_resource(res);
2209 BUG(); 2209 BUG();
2210 } 2210 }
@@ -2260,7 +2260,7 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2260 else { 2260 else {
2261 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); 2261 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2262 if (test_bit(node, res->refmap)) { 2262 if (test_bit(node, res->refmap)) {
2263 dlm_lockres_clear_refmap_bit(dlm, res, node); 2263 dlm_lockres_clear_refmap_bit(node, res);
2264 cleared = 1; 2264 cleared = 1;
2265 } 2265 }
2266 } 2266 }
@@ -2320,7 +2320,7 @@ static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2320 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); 2320 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2321 if (test_bit(node, res->refmap)) { 2321 if (test_bit(node, res->refmap)) {
2322 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); 2322 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2323 dlm_lockres_clear_refmap_bit(dlm, res, node); 2323 dlm_lockres_clear_refmap_bit(node, res);
2324 cleared = 1; 2324 cleared = 1;
2325 } 2325 }
2326 spin_unlock(&res->spinlock); 2326 spin_unlock(&res->spinlock);
@@ -2802,8 +2802,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2802 BUG_ON(!list_empty(&lock->bast_list)); 2802 BUG_ON(!list_empty(&lock->bast_list));
2803 BUG_ON(lock->ast_pending); 2803 BUG_ON(lock->ast_pending);
2804 BUG_ON(lock->bast_pending); 2804 BUG_ON(lock->bast_pending);
2805 dlm_lockres_clear_refmap_bit(dlm, res, 2805 dlm_lockres_clear_refmap_bit(lock->ml.node, res);
2806 lock->ml.node);
2807 list_del_init(&lock->list); 2806 list_del_init(&lock->list);
2808 dlm_lock_put(lock); 2807 dlm_lock_put(lock);
2809 /* In a normal unlock, we would have added a 2808 /* In a normal unlock, we would have added a
@@ -2824,7 +2823,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2824 mlog(0, "%s:%.*s: node %u had a ref to this " 2823 mlog(0, "%s:%.*s: node %u had a ref to this "
2825 "migrating lockres, clearing\n", dlm->name, 2824 "migrating lockres, clearing\n", dlm->name,
2826 res->lockname.len, res->lockname.name, bit); 2825 res->lockname.len, res->lockname.name, bit);
2827 dlm_lockres_clear_refmap_bit(dlm, res, bit); 2826 dlm_lockres_clear_refmap_bit(bit, res);
2828 } 2827 }
2829 bit++; 2828 bit++;
2830 } 2829 }
@@ -2917,9 +2916,9 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2917 &migrate, sizeof(migrate), nodenum, 2916 &migrate, sizeof(migrate), nodenum,
2918 &status); 2917 &status);
2919 if (ret < 0) { 2918 if (ret < 0) {
2920 mlog(ML_ERROR, "%s: res %.*s, Error %d send " 2919 mlog(ML_ERROR, "Error %d when sending message %u (key "
2921 "MIGRATE_REQUEST to node %u\n", dlm->name, 2920 "0x%x) to node %u\n", ret, DLM_MIGRATE_REQUEST_MSG,
2922 migrate.namelen, migrate.name, ret, nodenum); 2921 dlm->key, nodenum);
2923 if (!dlm_is_host_down(ret)) { 2922 if (!dlm_is_host_down(ret)) {
2924 mlog(ML_ERROR, "unhandled error=%d!\n", ret); 2923 mlog(ML_ERROR, "unhandled error=%d!\n", ret);
2925 BUG(); 2924 BUG();
@@ -2938,7 +2937,7 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2938 dlm->name, res->lockname.len, res->lockname.name, 2937 dlm->name, res->lockname.len, res->lockname.name,
2939 nodenum); 2938 nodenum);
2940 spin_lock(&res->spinlock); 2939 spin_lock(&res->spinlock);
2941 dlm_lockres_set_refmap_bit(dlm, res, nodenum); 2940 dlm_lockres_set_refmap_bit(nodenum, res);
2942 spin_unlock(&res->spinlock); 2941 spin_unlock(&res->spinlock);
2943 } 2942 }
2944 } 2943 }
@@ -3272,7 +3271,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3272 * mastery reference here since old_master will briefly have 3271 * mastery reference here since old_master will briefly have
3273 * a reference after the migration completes */ 3272 * a reference after the migration completes */
3274 spin_lock(&res->spinlock); 3273 spin_lock(&res->spinlock);
3275 dlm_lockres_set_refmap_bit(dlm, res, old_master); 3274 dlm_lockres_set_refmap_bit(old_master, res);
3276 spin_unlock(&res->spinlock); 3275 spin_unlock(&res->spinlock);
3277 3276
3278 mlog(0, "now time to do a migrate request to other nodes\n"); 3277 mlog(0, "now time to do a migrate request to other nodes\n");
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 01ebfd0bdad..7efab6d28a2 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -362,38 +362,40 @@ static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
362} 362}
363 363
364 364
365void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) 365int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
366{ 366{
367 if (dlm_is_node_dead(dlm, node)) 367 if (timeout) {
368 return; 368 mlog(ML_NOTICE, "%s: waiting %dms for notification of "
369 369 "death of node %u\n", dlm->name, timeout, node);
370 printk(KERN_NOTICE "o2dlm: Waiting on the death of node %u in "
371 "domain %s\n", node, dlm->name);
372
373 if (timeout)
374 wait_event_timeout(dlm->dlm_reco_thread_wq, 370 wait_event_timeout(dlm->dlm_reco_thread_wq,
375 dlm_is_node_dead(dlm, node), 371 dlm_is_node_dead(dlm, node),
376 msecs_to_jiffies(timeout)); 372 msecs_to_jiffies(timeout));
377 else 373 } else {
374 mlog(ML_NOTICE, "%s: waiting indefinitely for notification "
375 "of death of node %u\n", dlm->name, node);
378 wait_event(dlm->dlm_reco_thread_wq, 376 wait_event(dlm->dlm_reco_thread_wq,
379 dlm_is_node_dead(dlm, node)); 377 dlm_is_node_dead(dlm, node));
378 }
379 /* for now, return 0 */
380 return 0;
380} 381}
381 382
382void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) 383int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
383{ 384{
384 if (dlm_is_node_recovered(dlm, node)) 385 if (timeout) {
385 return; 386 mlog(0, "%s: waiting %dms for notification of "
386 387 "recovery of node %u\n", dlm->name, timeout, node);
387 printk(KERN_NOTICE "o2dlm: Waiting on the recovery of node %u in "
388 "domain %s\n", node, dlm->name);
389
390 if (timeout)
391 wait_event_timeout(dlm->dlm_reco_thread_wq, 388 wait_event_timeout(dlm->dlm_reco_thread_wq,
392 dlm_is_node_recovered(dlm, node), 389 dlm_is_node_recovered(dlm, node),
393 msecs_to_jiffies(timeout)); 390 msecs_to_jiffies(timeout));
394 else 391 } else {
392 mlog(0, "%s: waiting indefinitely for notification "
393 "of recovery of node %u\n", dlm->name, node);
395 wait_event(dlm->dlm_reco_thread_wq, 394 wait_event(dlm->dlm_reco_thread_wq,
396 dlm_is_node_recovered(dlm, node)); 395 dlm_is_node_recovered(dlm, node));
396 }
397 /* for now, return 0 */
398 return 0;
397} 399}
398 400
399/* callers of the top-level api calls (dlmlock/dlmunlock) should 401/* callers of the top-level api calls (dlmlock/dlmunlock) should
@@ -428,8 +430,6 @@ static void dlm_begin_recovery(struct dlm_ctxt *dlm)
428{ 430{
429 spin_lock(&dlm->spinlock); 431 spin_lock(&dlm->spinlock);
430 BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE); 432 BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE);
431 printk(KERN_NOTICE "o2dlm: Begin recovery on domain %s for node %u\n",
432 dlm->name, dlm->reco.dead_node);
433 dlm->reco.state |= DLM_RECO_STATE_ACTIVE; 433 dlm->reco.state |= DLM_RECO_STATE_ACTIVE;
434 spin_unlock(&dlm->spinlock); 434 spin_unlock(&dlm->spinlock);
435} 435}
@@ -440,18 +440,9 @@ static void dlm_end_recovery(struct dlm_ctxt *dlm)
440 BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE)); 440 BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE));
441 dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE; 441 dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE;
442 spin_unlock(&dlm->spinlock); 442 spin_unlock(&dlm->spinlock);
443 printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name);
444 wake_up(&dlm->reco.event); 443 wake_up(&dlm->reco.event);
445} 444}
446 445
447static void dlm_print_recovery_master(struct dlm_ctxt *dlm)
448{
449 printk(KERN_NOTICE "o2dlm: Node %u (%s) is the Recovery Master for the "
450 "dead node %u in domain %s\n", dlm->reco.new_master,
451 (dlm->node_num == dlm->reco.new_master ? "me" : "he"),
452 dlm->reco.dead_node, dlm->name);
453}
454
455static int dlm_do_recovery(struct dlm_ctxt *dlm) 446static int dlm_do_recovery(struct dlm_ctxt *dlm)
456{ 447{
457 int status = 0; 448 int status = 0;
@@ -514,8 +505,9 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
514 } 505 }
515 mlog(0, "another node will master this recovery session.\n"); 506 mlog(0, "another node will master this recovery session.\n");
516 } 507 }
517 508 mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n",
518 dlm_print_recovery_master(dlm); 509 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), dlm->reco.new_master,
510 dlm->node_num, dlm->reco.dead_node);
519 511
520 /* it is safe to start everything back up here 512 /* it is safe to start everything back up here
521 * because all of the dead node's lock resources 513 * because all of the dead node's lock resources
@@ -526,13 +518,15 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
526 return 0; 518 return 0;
527 519
528master_here: 520master_here:
529 dlm_print_recovery_master(dlm); 521 mlog(ML_NOTICE, "(%d) Node %u is the Recovery Master for the Dead Node "
522 "%u for Domain %s\n", task_pid_nr(dlm->dlm_reco_thread_task),
523 dlm->node_num, dlm->reco.dead_node, dlm->name);
530 524
531 status = dlm_remaster_locks(dlm, dlm->reco.dead_node); 525 status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
532 if (status < 0) { 526 if (status < 0) {
533 /* we should never hit this anymore */ 527 /* we should never hit this anymore */
534 mlog(ML_ERROR, "%s: Error %d remastering locks for node %u, " 528 mlog(ML_ERROR, "error %d remastering locks for node %u, "
535 "retrying.\n", dlm->name, status, dlm->reco.dead_node); 529 "retrying.\n", status, dlm->reco.dead_node);
536 /* yield a bit to allow any final network messages 530 /* yield a bit to allow any final network messages
537 * to get handled on remaining nodes */ 531 * to get handled on remaining nodes */
538 msleep(100); 532 msleep(100);
@@ -573,7 +567,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
573 BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT); 567 BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT);
574 ndata->state = DLM_RECO_NODE_DATA_REQUESTING; 568 ndata->state = DLM_RECO_NODE_DATA_REQUESTING;
575 569
576 mlog(0, "%s: Requesting lock info from node %u\n", dlm->name, 570 mlog(0, "requesting lock info from node %u\n",
577 ndata->node_num); 571 ndata->node_num);
578 572
579 if (ndata->node_num == dlm->node_num) { 573 if (ndata->node_num == dlm->node_num) {
@@ -646,7 +640,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
646 spin_unlock(&dlm_reco_state_lock); 640 spin_unlock(&dlm_reco_state_lock);
647 } 641 }
648 642
649 mlog(0, "%s: Done requesting all lock info\n", dlm->name); 643 mlog(0, "done requesting all lock info\n");
650 644
651 /* nodes should be sending reco data now 645 /* nodes should be sending reco data now
652 * just need to wait */ 646 * just need to wait */
@@ -808,9 +802,10 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
808 802
809 /* negative status is handled by caller */ 803 /* negative status is handled by caller */
810 if (ret < 0) 804 if (ret < 0)
811 mlog(ML_ERROR, "%s: Error %d send LOCK_REQUEST to node %u " 805 mlog(ML_ERROR, "Error %d when sending message %u (key "
812 "to recover dead node %u\n", dlm->name, ret, 806 "0x%x) to node %u\n", ret, DLM_LOCK_REQUEST_MSG,
813 request_from, dead_node); 807 dlm->key, request_from);
808
814 // return from here, then 809 // return from here, then
815 // sleep until all received or error 810 // sleep until all received or error
816 return ret; 811 return ret;
@@ -961,9 +956,9 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
961 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, 956 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
962 sizeof(done_msg), send_to, &tmpret); 957 sizeof(done_msg), send_to, &tmpret);
963 if (ret < 0) { 958 if (ret < 0) {
964 mlog(ML_ERROR, "%s: Error %d send RECO_DATA_DONE to node %u " 959 mlog(ML_ERROR, "Error %d when sending message %u (key "
965 "to recover dead node %u\n", dlm->name, ret, send_to, 960 "0x%x) to node %u\n", ret, DLM_RECO_DATA_DONE_MSG,
966 dead_node); 961 dlm->key, send_to);
967 if (!dlm_is_host_down(ret)) { 962 if (!dlm_is_host_down(ret)) {
968 BUG(); 963 BUG();
969 } 964 }
@@ -1132,11 +1127,9 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
1132 if (ret < 0) { 1127 if (ret < 0) {
1133 /* XXX: negative status is not handled. 1128 /* XXX: negative status is not handled.
1134 * this will end up killing this node. */ 1129 * this will end up killing this node. */
1135 mlog(ML_ERROR, "%s: res %.*s, Error %d send MIG_LOCKRES to " 1130 mlog(ML_ERROR, "Error %d when sending message %u (key "
1136 "node %u (%s)\n", dlm->name, mres->lockname_len, 1131 "0x%x) to node %u\n", ret, DLM_MIG_LOCKRES_MSG,
1137 mres->lockname, ret, send_to, 1132 dlm->key, send_to);
1138 (orig_flags & DLM_MRES_MIGRATION ?
1139 "migration" : "recovery"));
1140 } else { 1133 } else {
1141 /* might get an -ENOMEM back here */ 1134 /* might get an -ENOMEM back here */
1142 ret = status; 1135 ret = status;
@@ -1774,7 +1767,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1774 dlm->name, mres->lockname_len, mres->lockname, 1767 dlm->name, mres->lockname_len, mres->lockname,
1775 from); 1768 from);
1776 spin_lock(&res->spinlock); 1769 spin_lock(&res->spinlock);
1777 dlm_lockres_set_refmap_bit(dlm, res, from); 1770 dlm_lockres_set_refmap_bit(from, res);
1778 spin_unlock(&res->spinlock); 1771 spin_unlock(&res->spinlock);
1779 added++; 1772 added++;
1780 break; 1773 break;
@@ -1972,7 +1965,7 @@ skip_lvb:
1972 mlog(0, "%s:%.*s: added lock for node %u, " 1965 mlog(0, "%s:%.*s: added lock for node %u, "
1973 "setting refmap bit\n", dlm->name, 1966 "setting refmap bit\n", dlm->name,
1974 res->lockname.len, res->lockname.name, ml->node); 1967 res->lockname.len, res->lockname.name, ml->node);
1975 dlm_lockres_set_refmap_bit(dlm, res, ml->node); 1968 dlm_lockres_set_refmap_bit(ml->node, res);
1976 added++; 1969 added++;
1977 } 1970 }
1978 spin_unlock(&res->spinlock); 1971 spin_unlock(&res->spinlock);
@@ -2091,9 +2084,6 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
2091 2084
2092 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { 2085 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
2093 if (res->owner == dead_node) { 2086 if (res->owner == dead_node) {
2094 mlog(0, "%s: res %.*s, Changing owner from %u to %u\n",
2095 dlm->name, res->lockname.len, res->lockname.name,
2096 res->owner, new_master);
2097 list_del_init(&res->recovering); 2087 list_del_init(&res->recovering);
2098 spin_lock(&res->spinlock); 2088 spin_lock(&res->spinlock);
2099 /* new_master has our reference from 2089 /* new_master has our reference from
@@ -2115,30 +2105,40 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
2115 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 2105 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
2116 bucket = dlm_lockres_hash(dlm, i); 2106 bucket = dlm_lockres_hash(dlm, i);
2117 hlist_for_each_entry(res, hash_iter, bucket, hash_node) { 2107 hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
2118 if (!(res->state & DLM_LOCK_RES_RECOVERING)) 2108 if (res->state & DLM_LOCK_RES_RECOVERING) {
2119 continue; 2109 if (res->owner == dead_node) {
2120 2110 mlog(0, "(this=%u) res %.*s owner=%u "
2121 if (res->owner != dead_node && 2111 "was not on recovering list, but "
2122 res->owner != dlm->node_num) 2112 "clearing state anyway\n",
2123 continue; 2113 dlm->node_num, res->lockname.len,
2114 res->lockname.name, new_master);
2115 } else if (res->owner == dlm->node_num) {
2116 mlog(0, "(this=%u) res %.*s owner=%u "
2117 "was not on recovering list, "
2118 "owner is THIS node, clearing\n",
2119 dlm->node_num, res->lockname.len,
2120 res->lockname.name, new_master);
2121 } else
2122 continue;
2124 2123
2125 if (!list_empty(&res->recovering)) { 2124 if (!list_empty(&res->recovering)) {
2126 list_del_init(&res->recovering); 2125 mlog(0, "%s:%.*s: lockres was "
2127 dlm_lockres_put(res); 2126 "marked RECOVERING, owner=%u\n",
2127 dlm->name, res->lockname.len,
2128 res->lockname.name, res->owner);
2129 list_del_init(&res->recovering);
2130 dlm_lockres_put(res);
2131 }
2132 spin_lock(&res->spinlock);
2133 /* new_master has our reference from
2134 * the lock state sent during recovery */
2135 dlm_change_lockres_owner(dlm, res, new_master);
2136 res->state &= ~DLM_LOCK_RES_RECOVERING;
2137 if (__dlm_lockres_has_locks(res))
2138 __dlm_dirty_lockres(dlm, res);
2139 spin_unlock(&res->spinlock);
2140 wake_up(&res->wq);
2128 } 2141 }
2129
2130 /* new_master has our reference from
2131 * the lock state sent during recovery */
2132 mlog(0, "%s: res %.*s, Changing owner from %u to %u\n",
2133 dlm->name, res->lockname.len, res->lockname.name,
2134 res->owner, new_master);
2135 spin_lock(&res->spinlock);
2136 dlm_change_lockres_owner(dlm, res, new_master);
2137 res->state &= ~DLM_LOCK_RES_RECOVERING;
2138 if (__dlm_lockres_has_locks(res))
2139 __dlm_dirty_lockres(dlm, res);
2140 spin_unlock(&res->spinlock);
2141 wake_up(&res->wq);
2142 } 2142 }
2143 } 2143 }
2144} 2144}
@@ -2252,12 +2252,12 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2252 res->lockname.len, res->lockname.name, freed, dead_node); 2252 res->lockname.len, res->lockname.name, freed, dead_node);
2253 __dlm_print_one_lock_resource(res); 2253 __dlm_print_one_lock_resource(res);
2254 } 2254 }
2255 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); 2255 dlm_lockres_clear_refmap_bit(dead_node, res);
2256 } else if (test_bit(dead_node, res->refmap)) { 2256 } else if (test_bit(dead_node, res->refmap)) {
2257 mlog(0, "%s:%.*s: dead node %u had a ref, but had " 2257 mlog(0, "%s:%.*s: dead node %u had a ref, but had "
2258 "no locks and had not purged before dying\n", dlm->name, 2258 "no locks and had not purged before dying\n", dlm->name,
2259 res->lockname.len, res->lockname.name, dead_node); 2259 res->lockname.len, res->lockname.name, dead_node);
2260 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); 2260 dlm_lockres_clear_refmap_bit(dead_node, res);
2261 } 2261 }
2262 2262
2263 /* do not kick thread yet */ 2263 /* do not kick thread yet */
@@ -2324,9 +2324,9 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
2324 dlm_revalidate_lvb(dlm, res, dead_node); 2324 dlm_revalidate_lvb(dlm, res, dead_node);
2325 if (res->owner == dead_node) { 2325 if (res->owner == dead_node) {
2326 if (res->state & DLM_LOCK_RES_DROPPING_REF) { 2326 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
2327 mlog(ML_NOTICE, "%s: res %.*s, Skip " 2327 mlog(ML_NOTICE, "Ignore %.*s for "
2328 "recovery as it is being freed\n", 2328 "recovery as it is being freed\n",
2329 dlm->name, res->lockname.len, 2329 res->lockname.len,
2330 res->lockname.name); 2330 res->lockname.name);
2331 } else 2331 } else
2332 dlm_move_lockres_to_recovery_list(dlm, 2332 dlm_move_lockres_to_recovery_list(dlm,
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index e73c833fc2a..1d6d1d22c47 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -94,26 +94,24 @@ int __dlm_lockres_unused(struct dlm_lock_resource *res)
94{ 94{
95 int bit; 95 int bit;
96 96
97 assert_spin_locked(&res->spinlock);
98
99 if (__dlm_lockres_has_locks(res)) 97 if (__dlm_lockres_has_locks(res))
100 return 0; 98 return 0;
101 99
102 /* Locks are in the process of being created */
103 if (res->inflight_locks)
104 return 0;
105
106 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) 100 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY)
107 return 0; 101 return 0;
108 102
109 if (res->state & DLM_LOCK_RES_RECOVERING) 103 if (res->state & DLM_LOCK_RES_RECOVERING)
110 return 0; 104 return 0;
111 105
112 /* Another node has this resource with this node as the master */
113 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); 106 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
114 if (bit < O2NM_MAX_NODES) 107 if (bit < O2NM_MAX_NODES)
115 return 0; 108 return 0;
116 109
110 /*
111 * since the bit for dlm->node_num is not set, inflight_locks better
112 * be zero
113 */
114 BUG_ON(res->inflight_locks != 0);
117 return 1; 115 return 1;
118} 116}
119 117
@@ -187,6 +185,8 @@ static void dlm_purge_lockres(struct dlm_ctxt *dlm,
187 /* clear our bit from the master's refmap, ignore errors */ 185 /* clear our bit from the master's refmap, ignore errors */
188 ret = dlm_drop_lockres_ref(dlm, res); 186 ret = dlm_drop_lockres_ref(dlm, res);
189 if (ret < 0) { 187 if (ret < 0) {
188 mlog(ML_ERROR, "%s: deref %.*s failed %d\n", dlm->name,
189 res->lockname.len, res->lockname.name, ret);
190 if (!dlm_is_host_down(ret)) 190 if (!dlm_is_host_down(ret))
191 BUG(); 191 BUG();
192 } 192 }
@@ -209,7 +209,7 @@ static void dlm_purge_lockres(struct dlm_ctxt *dlm,
209 BUG(); 209 BUG();
210 } 210 }
211 211
212 __dlm_unhash_lockres(dlm, res); 212 __dlm_unhash_lockres(res);
213 213
214 /* lockres is not in the hash now. drop the flag and wake up 214 /* lockres is not in the hash now. drop the flag and wake up
215 * any processes waiting in dlm_get_lock_resource. */ 215 * any processes waiting in dlm_get_lock_resource. */