aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2')
-rw-r--r--fs/ocfs2/cluster/tcp_internal.h5
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h75
-rw-r--r--fs/ocfs2/dlm/dlmdebug.c18
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c117
-rw-r--r--fs/ocfs2/dlm/dlmlock.c4
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c394
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c123
-rw-r--r--fs/ocfs2/dlm/dlmthread.c167
8 files changed, 729 insertions, 174 deletions
diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h
index b700dc9624d1..775c911342f4 100644
--- a/fs/ocfs2/cluster/tcp_internal.h
+++ b/fs/ocfs2/cluster/tcp_internal.h
@@ -38,6 +38,9 @@
38 * locking semantics of the file system using the protocol. It should 38 * locking semantics of the file system using the protocol. It should
39 * be somewhere else, I'm sure, but right now it isn't. 39 * be somewhere else, I'm sure, but right now it isn't.
40 * 40 *
41 * New in version 6:
42 * - DLM lockres remote refcount fixes.
43 *
41 * New in version 5: 44 * New in version 5:
42 * - Network timeout checking protocol 45 * - Network timeout checking protocol
43 * 46 *
@@ -51,7 +54,7 @@
51 * - full 64 bit i_size in the metadata lock lvbs 54 * - full 64 bit i_size in the metadata lock lvbs
52 * - introduction of "rw" lock and pushing meta/data locking down 55 * - introduction of "rw" lock and pushing meta/data locking down
53 */ 56 */
54#define O2NET_PROTOCOL_VERSION 5ULL 57#define O2NET_PROTOCOL_VERSION 6ULL
55struct o2net_handshake { 58struct o2net_handshake {
56 __be64 protocol_version; 59 __be64 protocol_version;
57 __be64 connector_id; 60 __be64 connector_id;
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 6b6ff76538c5..9fa427119a3c 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -222,6 +222,7 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm,
222#define DLM_LOCK_RES_DIRTY 0x00000008 222#define DLM_LOCK_RES_DIRTY 0x00000008
223#define DLM_LOCK_RES_IN_PROGRESS 0x00000010 223#define DLM_LOCK_RES_IN_PROGRESS 0x00000010
224#define DLM_LOCK_RES_MIGRATING 0x00000020 224#define DLM_LOCK_RES_MIGRATING 0x00000020
225#define DLM_LOCK_RES_DROPPING_REF 0x00000040
225 226
226/* max milliseconds to wait to sync up a network failure with a node death */ 227/* max milliseconds to wait to sync up a network failure with a node death */
227#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000) 228#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
@@ -265,6 +266,8 @@ struct dlm_lock_resource
265 u8 owner; //node which owns the lock resource, or unknown 266 u8 owner; //node which owns the lock resource, or unknown
266 u16 state; 267 u16 state;
267 char lvb[DLM_LVB_LEN]; 268 char lvb[DLM_LVB_LEN];
269 unsigned int inflight_locks;
270 unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
268}; 271};
269 272
270struct dlm_migratable_lock 273struct dlm_migratable_lock
@@ -367,7 +370,7 @@ enum {
367 DLM_CONVERT_LOCK_MSG, /* 504 */ 370 DLM_CONVERT_LOCK_MSG, /* 504 */
368 DLM_PROXY_AST_MSG, /* 505 */ 371 DLM_PROXY_AST_MSG, /* 505 */
369 DLM_UNLOCK_LOCK_MSG, /* 506 */ 372 DLM_UNLOCK_LOCK_MSG, /* 506 */
370 DLM_UNUSED_MSG2, /* 507 */ 373 DLM_DEREF_LOCKRES_MSG, /* 507 */
371 DLM_MIGRATE_REQUEST_MSG, /* 508 */ 374 DLM_MIGRATE_REQUEST_MSG, /* 508 */
372 DLM_MIG_LOCKRES_MSG, /* 509 */ 375 DLM_MIG_LOCKRES_MSG, /* 509 */
373 DLM_QUERY_JOIN_MSG, /* 510 */ 376 DLM_QUERY_JOIN_MSG, /* 510 */
@@ -417,6 +420,9 @@ struct dlm_master_request
417 u8 name[O2NM_MAX_NAME_LEN]; 420 u8 name[O2NM_MAX_NAME_LEN];
418}; 421};
419 422
423#define DLM_ASSERT_RESPONSE_REASSERT 0x00000001
424#define DLM_ASSERT_RESPONSE_MASTERY_REF 0x00000002
425
420#define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001 426#define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001
421#define DLM_ASSERT_MASTER_REQUERY 0x00000002 427#define DLM_ASSERT_MASTER_REQUERY 0x00000002
422#define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004 428#define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004
@@ -430,6 +436,8 @@ struct dlm_assert_master
430 u8 name[O2NM_MAX_NAME_LEN]; 436 u8 name[O2NM_MAX_NAME_LEN];
431}; 437};
432 438
439#define DLM_MIGRATE_RESPONSE_MASTERY_REF 0x00000001
440
433struct dlm_migrate_request 441struct dlm_migrate_request
434{ 442{
435 u8 master; 443 u8 master;
@@ -648,6 +656,16 @@ struct dlm_finalize_reco
648 __be32 pad2; 656 __be32 pad2;
649}; 657};
650 658
659struct dlm_deref_lockres
660{
661 u32 pad1;
662 u16 pad2;
663 u8 node_idx;
664 u8 namelen;
665
666 u8 name[O2NM_MAX_NAME_LEN];
667};
668
651static inline enum dlm_status 669static inline enum dlm_status
652__dlm_lockres_state_to_status(struct dlm_lock_resource *res) 670__dlm_lockres_state_to_status(struct dlm_lock_resource *res)
653{ 671{
@@ -721,8 +739,8 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
721 struct dlm_lock_resource *res); 739 struct dlm_lock_resource *res);
722void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 740void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
723 struct dlm_lock_resource *res); 741 struct dlm_lock_resource *res);
724void dlm_purge_lockres(struct dlm_ctxt *dlm, 742int dlm_purge_lockres(struct dlm_ctxt *dlm,
725 struct dlm_lock_resource *lockres); 743 struct dlm_lock_resource *lockres);
726static inline void dlm_lockres_get(struct dlm_lock_resource *res) 744static inline void dlm_lockres_get(struct dlm_lock_resource *res)
727{ 745{
728 /* This is called on every lookup, so it might be worth 746 /* This is called on every lookup, so it might be worth
@@ -733,6 +751,10 @@ void dlm_lockres_put(struct dlm_lock_resource *res);
733void __dlm_unhash_lockres(struct dlm_lock_resource *res); 751void __dlm_unhash_lockres(struct dlm_lock_resource *res);
734void __dlm_insert_lockres(struct dlm_ctxt *dlm, 752void __dlm_insert_lockres(struct dlm_ctxt *dlm,
735 struct dlm_lock_resource *res); 753 struct dlm_lock_resource *res);
754struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
755 const char *name,
756 unsigned int len,
757 unsigned int hash);
736struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 758struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
737 const char *name, 759 const char *name,
738 unsigned int len, 760 unsigned int len,
@@ -753,6 +775,47 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
753 const char *name, 775 const char *name,
754 unsigned int namelen); 776 unsigned int namelen);
755 777
778#define dlm_lockres_set_refmap_bit(bit,res) \
779 __dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__)
780#define dlm_lockres_clear_refmap_bit(bit,res) \
781 __dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__)
782
783static inline void __dlm_lockres_set_refmap_bit(int bit,
784 struct dlm_lock_resource *res,
785 const char *file,
786 int line)
787{
788 //printk("%s:%d:%.*s: setting bit %d\n", file, line,
789 // res->lockname.len, res->lockname.name, bit);
790 set_bit(bit, res->refmap);
791}
792
793static inline void __dlm_lockres_clear_refmap_bit(int bit,
794 struct dlm_lock_resource *res,
795 const char *file,
796 int line)
797{
798 //printk("%s:%d:%.*s: clearing bit %d\n", file, line,
799 // res->lockname.len, res->lockname.name, bit);
800 clear_bit(bit, res->refmap);
801}
802
803void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
804 struct dlm_lock_resource *res,
805 const char *file,
806 int line);
807void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
808 struct dlm_lock_resource *res,
809 int new_lockres,
810 const char *file,
811 int line);
812#define dlm_lockres_drop_inflight_ref(d,r) \
813 __dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__)
814#define dlm_lockres_grab_inflight_ref(d,r) \
815 __dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__)
816#define dlm_lockres_grab_inflight_ref_new(d,r) \
817 __dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__)
818
756void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); 819void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
757void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock); 820void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
758void dlm_do_local_ast(struct dlm_ctxt *dlm, 821void dlm_do_local_ast(struct dlm_ctxt *dlm,
@@ -805,6 +868,7 @@ int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
805int dlm_migrate_lockres(struct dlm_ctxt *dlm, 868int dlm_migrate_lockres(struct dlm_ctxt *dlm,
806 struct dlm_lock_resource *res, 869 struct dlm_lock_resource *res,
807 u8 target); 870 u8 target);
871int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
808int dlm_finish_migration(struct dlm_ctxt *dlm, 872int dlm_finish_migration(struct dlm_ctxt *dlm,
809 struct dlm_lock_resource *res, 873 struct dlm_lock_resource *res,
810 u8 old_master); 874 u8 old_master);
@@ -814,6 +878,7 @@ void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res);
814 878
815int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data); 879int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data);
816int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data); 880int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data);
881int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
817int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data); 882int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data);
818int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data); 883int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
819int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data); 884int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data);
@@ -856,10 +921,12 @@ static inline void __dlm_wait_on_lockres(struct dlm_lock_resource *res)
856int dlm_init_mle_cache(void); 921int dlm_init_mle_cache(void);
857void dlm_destroy_mle_cache(void); 922void dlm_destroy_mle_cache(void);
858void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up); 923void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up);
924int dlm_drop_lockres_ref(struct dlm_ctxt *dlm,
925 struct dlm_lock_resource *res);
859void dlm_clean_master_list(struct dlm_ctxt *dlm, 926void dlm_clean_master_list(struct dlm_ctxt *dlm,
860 u8 dead_node); 927 u8 dead_node);
861int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock); 928int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
862 929int __dlm_lockres_has_locks(struct dlm_lock_resource *res);
863int __dlm_lockres_unused(struct dlm_lock_resource *res); 930int __dlm_lockres_unused(struct dlm_lock_resource *res);
864 931
865static inline const char * dlm_lock_mode_name(int mode) 932static inline const char * dlm_lock_mode_name(int mode)
diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c
index 3f6c8d88f7af..1015cc7bf9cb 100644
--- a/fs/ocfs2/dlm/dlmdebug.c
+++ b/fs/ocfs2/dlm/dlmdebug.c
@@ -53,6 +53,23 @@ void dlm_print_one_lock_resource(struct dlm_lock_resource *res)
53 spin_unlock(&res->spinlock); 53 spin_unlock(&res->spinlock);
54} 54}
55 55
56static void dlm_print_lockres_refmap(struct dlm_lock_resource *res)
57{
58 int bit;
59 assert_spin_locked(&res->spinlock);
60
61 mlog(ML_NOTICE, " refmap nodes: [ ");
62 bit = 0;
63 while (1) {
64 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
65 if (bit >= O2NM_MAX_NODES)
66 break;
67 printk("%u ", bit);
68 bit++;
69 }
70 printk("], inflight=%u\n", res->inflight_locks);
71}
72
56void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) 73void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
57{ 74{
58 struct list_head *iter2; 75 struct list_head *iter2;
@@ -65,6 +82,7 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
65 res->owner, res->state); 82 res->owner, res->state);
66 mlog(ML_NOTICE, " last used: %lu, on purge list: %s\n", 83 mlog(ML_NOTICE, " last used: %lu, on purge list: %s\n",
67 res->last_used, list_empty(&res->purge) ? "no" : "yes"); 84 res->last_used, list_empty(&res->purge) ? "no" : "yes");
85 dlm_print_lockres_refmap(res);
68 mlog(ML_NOTICE, " granted queue: \n"); 86 mlog(ML_NOTICE, " granted queue: \n");
69 list_for_each(iter2, &res->granted) { 87 list_for_each(iter2, &res->granted) {
70 lock = list_entry(iter2, struct dlm_lock, list); 88 lock = list_entry(iter2, struct dlm_lock, list);
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index f0b25f2dd205..3995de360264 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -125,10 +125,10 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
125 hlist_add_head(&res->hash_node, bucket); 125 hlist_add_head(&res->hash_node, bucket);
126} 126}
127 127
128struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 128struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
129 const char *name, 129 const char *name,
130 unsigned int len, 130 unsigned int len,
131 unsigned int hash) 131 unsigned int hash)
132{ 132{
133 struct hlist_head *bucket; 133 struct hlist_head *bucket;
134 struct hlist_node *list; 134 struct hlist_node *list;
@@ -154,6 +154,37 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
154 return NULL; 154 return NULL;
155} 155}
156 156
157/* intended to be called by functions which do not care about lock
158 * resources which are being purged (most net _handler functions).
159 * this will return NULL for any lock resource which is found but
160 * currently in the process of dropping its mastery reference.
161 * use __dlm_lookup_lockres_full when you need the lock resource
162 * regardless (e.g. dlm_get_lock_resource) */
163struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
164 const char *name,
165 unsigned int len,
166 unsigned int hash)
167{
168 struct dlm_lock_resource *res = NULL;
169
170 mlog_entry("%.*s\n", len, name);
171
172 assert_spin_locked(&dlm->spinlock);
173
174 res = __dlm_lookup_lockres_full(dlm, name, len, hash);
175 if (res) {
176 spin_lock(&res->spinlock);
177 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
178 spin_unlock(&res->spinlock);
179 dlm_lockres_put(res);
180 return NULL;
181 }
182 spin_unlock(&res->spinlock);
183 }
184
185 return res;
186}
187
157struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, 188struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
158 const char *name, 189 const char *name,
159 unsigned int len) 190 unsigned int len)
@@ -330,43 +361,60 @@ static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
330 wake_up(&dlm_domain_events); 361 wake_up(&dlm_domain_events);
331} 362}
332 363
333static void dlm_migrate_all_locks(struct dlm_ctxt *dlm) 364static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
334{ 365{
335 int i; 366 int i, num, n, ret = 0;
336 struct dlm_lock_resource *res; 367 struct dlm_lock_resource *res;
368 struct hlist_node *iter;
369 struct hlist_head *bucket;
370 int dropped;
337 371
338 mlog(0, "Migrating locks from domain %s\n", dlm->name); 372 mlog(0, "Migrating locks from domain %s\n", dlm->name);
339restart: 373
374 num = 0;
340 spin_lock(&dlm->spinlock); 375 spin_lock(&dlm->spinlock);
341 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 376 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
342 while (!hlist_empty(dlm_lockres_hash(dlm, i))) { 377redo_bucket:
343 res = hlist_entry(dlm_lockres_hash(dlm, i)->first, 378 n = 0;
344 struct dlm_lock_resource, hash_node); 379 bucket = dlm_lockres_hash(dlm, i);
345 /* need reference when manually grabbing lockres */ 380 iter = bucket->first;
381 while (iter) {
382 n++;
383 res = hlist_entry(iter, struct dlm_lock_resource,
384 hash_node);
346 dlm_lockres_get(res); 385 dlm_lockres_get(res);
347 /* this should unhash the lockres 386 /* migrate, if necessary. this will drop the dlm
348 * and exit with dlm->spinlock */ 387 * spinlock and retake it if it does migration. */
349 mlog(0, "purging res=%p\n", res); 388 dropped = dlm_empty_lockres(dlm, res);
350 if (dlm_lockres_is_dirty(dlm, res)) { 389
351 /* HACK! this should absolutely go. 390 spin_lock(&res->spinlock);
352 * need to figure out why some empty 391 __dlm_lockres_calc_usage(dlm, res);
353 * lockreses are still marked dirty */ 392 iter = res->hash_node.next;
354 mlog(ML_ERROR, "lockres %.*s dirty!\n", 393 spin_unlock(&res->spinlock);
355 res->lockname.len, res->lockname.name); 394
356
357 spin_unlock(&dlm->spinlock);
358 dlm_kick_thread(dlm, res);
359 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
360 dlm_lockres_put(res);
361 goto restart;
362 }
363 dlm_purge_lockres(dlm, res);
364 dlm_lockres_put(res); 395 dlm_lockres_put(res);
396
397 cond_resched_lock(&dlm->spinlock);
398
399 if (dropped)
400 goto redo_bucket;
365 } 401 }
402 num += n;
403 mlog(0, "%s: touched %d lockreses in bucket %d "
404 "(tot=%d)\n", dlm->name, n, i, num);
366 } 405 }
367 spin_unlock(&dlm->spinlock); 406 spin_unlock(&dlm->spinlock);
368 407 wake_up(&dlm->dlm_thread_wq);
408
409 /* let the dlm thread take care of purging, keep scanning until
410 * nothing remains in the hash */
411 if (num) {
412 mlog(0, "%s: %d lock resources in hash last pass\n",
413 dlm->name, num);
414 ret = -EAGAIN;
415 }
369 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name); 416 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
417 return ret;
370} 418}
371 419
372static int dlm_no_joining_node(struct dlm_ctxt *dlm) 420static int dlm_no_joining_node(struct dlm_ctxt *dlm)
@@ -571,7 +619,9 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
571 /* We changed dlm state, notify the thread */ 619 /* We changed dlm state, notify the thread */
572 dlm_kick_thread(dlm, NULL); 620 dlm_kick_thread(dlm, NULL);
573 621
574 dlm_migrate_all_locks(dlm); 622 while (dlm_migrate_all_locks(dlm)) {
623 mlog(0, "%s: more migration to do\n", dlm->name);
624 }
575 dlm_mark_domain_leaving(dlm); 625 dlm_mark_domain_leaving(dlm);
576 dlm_leave_domain(dlm); 626 dlm_leave_domain(dlm);
577 dlm_complete_dlm_shutdown(dlm); 627 dlm_complete_dlm_shutdown(dlm);
@@ -1082,6 +1132,13 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1082 if (status) 1132 if (status)
1083 goto bail; 1133 goto bail;
1084 1134
1135 status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1136 sizeof(struct dlm_deref_lockres),
1137 dlm_deref_lockres_handler,
1138 dlm, &dlm->dlm_domain_handlers);
1139 if (status)
1140 goto bail;
1141
1085 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, 1142 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1086 sizeof(struct dlm_migrate_request), 1143 sizeof(struct dlm_migrate_request),
1087 dlm_migrate_request_handler, 1144 dlm_migrate_request_handler,
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index e5ca3db197f6..ac91a76b1e78 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -163,6 +163,10 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
163 kick_thread = 1; 163 kick_thread = 1;
164 } 164 }
165 } 165 }
166 /* reduce the inflight count, this may result in the lockres
167 * being purged below during calc_usage */
168 if (lock->ml.node == dlm->node_num)
169 dlm_lockres_drop_inflight_ref(dlm, res);
166 170
167 spin_unlock(&res->spinlock); 171 spin_unlock(&res->spinlock);
168 wake_up(&res->wq); 172 wake_up(&res->wq);
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 0ad872055cb3..4645ec2e0fc3 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -99,9 +99,9 @@ static void dlm_mle_node_up(struct dlm_ctxt *dlm,
99 int idx); 99 int idx);
100 100
101static void dlm_assert_master_worker(struct dlm_work_item *item, void *data); 101static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
102static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, 102static int dlm_do_assert_master(struct dlm_ctxt *dlm,
103 unsigned int namelen, void *nodemap, 103 struct dlm_lock_resource *res,
104 u32 flags); 104 void *nodemap, u32 flags);
105 105
106static inline int dlm_mle_equal(struct dlm_ctxt *dlm, 106static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
107 struct dlm_master_list_entry *mle, 107 struct dlm_master_list_entry *mle,
@@ -237,7 +237,8 @@ static int dlm_find_mle(struct dlm_ctxt *dlm,
237 struct dlm_master_list_entry **mle, 237 struct dlm_master_list_entry **mle,
238 char *name, unsigned int namelen); 238 char *name, unsigned int namelen);
239 239
240static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to); 240static int dlm_do_master_request(struct dlm_lock_resource *res,
241 struct dlm_master_list_entry *mle, int to);
241 242
242 243
243static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm, 244static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
@@ -687,6 +688,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
687 INIT_LIST_HEAD(&res->purge); 688 INIT_LIST_HEAD(&res->purge);
688 atomic_set(&res->asts_reserved, 0); 689 atomic_set(&res->asts_reserved, 0);
689 res->migration_pending = 0; 690 res->migration_pending = 0;
691 res->inflight_locks = 0;
690 692
691 kref_init(&res->refs); 693 kref_init(&res->refs);
692 694
@@ -700,6 +702,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
700 res->last_used = 0; 702 res->last_used = 0;
701 703
702 memset(res->lvb, 0, DLM_LVB_LEN); 704 memset(res->lvb, 0, DLM_LVB_LEN);
705 memset(res->refmap, 0, sizeof(res->refmap));
703} 706}
704 707
705struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, 708struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
@@ -722,6 +725,42 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
722 return res; 725 return res;
723} 726}
724 727
728void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
729 struct dlm_lock_resource *res,
730 int new_lockres,
731 const char *file,
732 int line)
733{
734 if (!new_lockres)
735 assert_spin_locked(&res->spinlock);
736
737 if (!test_bit(dlm->node_num, res->refmap)) {
738 BUG_ON(res->inflight_locks != 0);
739 dlm_lockres_set_refmap_bit(dlm->node_num, res);
740 }
741 res->inflight_locks++;
742 mlog(0, "%s:%.*s: inflight++: now %u\n",
743 dlm->name, res->lockname.len, res->lockname.name,
744 res->inflight_locks);
745}
746
747void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
748 struct dlm_lock_resource *res,
749 const char *file,
750 int line)
751{
752 assert_spin_locked(&res->spinlock);
753
754 BUG_ON(res->inflight_locks == 0);
755 res->inflight_locks--;
756 mlog(0, "%s:%.*s: inflight--: now %u\n",
757 dlm->name, res->lockname.len, res->lockname.name,
758 res->inflight_locks);
759 if (res->inflight_locks == 0)
760 dlm_lockres_clear_refmap_bit(dlm->node_num, res);
761 wake_up(&res->wq);
762}
763
725/* 764/*
726 * lookup a lock resource by name. 765 * lookup a lock resource by name.
727 * may already exist in the hashtable. 766 * may already exist in the hashtable.
@@ -752,6 +791,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
752 unsigned int hash; 791 unsigned int hash;
753 int tries = 0; 792 int tries = 0;
754 int bit, wait_on_recovery = 0; 793 int bit, wait_on_recovery = 0;
794 int drop_inflight_if_nonlocal = 0;
755 795
756 BUG_ON(!lockid); 796 BUG_ON(!lockid);
757 797
@@ -761,9 +801,30 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
761 801
762lookup: 802lookup:
763 spin_lock(&dlm->spinlock); 803 spin_lock(&dlm->spinlock);
764 tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash); 804 tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
765 if (tmpres) { 805 if (tmpres) {
806 int dropping_ref = 0;
807
808 spin_lock(&tmpres->spinlock);
809 if (tmpres->owner == dlm->node_num) {
810 BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
811 dlm_lockres_grab_inflight_ref(dlm, tmpres);
812 } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
813 dropping_ref = 1;
814 spin_unlock(&tmpres->spinlock);
766 spin_unlock(&dlm->spinlock); 815 spin_unlock(&dlm->spinlock);
816
817 /* wait until done messaging the master, drop our ref to allow
818 * the lockres to be purged, start over. */
819 if (dropping_ref) {
820 spin_lock(&tmpres->spinlock);
821 __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
822 spin_unlock(&tmpres->spinlock);
823 dlm_lockres_put(tmpres);
824 tmpres = NULL;
825 goto lookup;
826 }
827
767 mlog(0, "found in hash!\n"); 828 mlog(0, "found in hash!\n");
768 if (res) 829 if (res)
769 dlm_lockres_put(res); 830 dlm_lockres_put(res);
@@ -793,6 +854,7 @@ lookup:
793 spin_lock(&res->spinlock); 854 spin_lock(&res->spinlock);
794 dlm_change_lockres_owner(dlm, res, dlm->node_num); 855 dlm_change_lockres_owner(dlm, res, dlm->node_num);
795 __dlm_insert_lockres(dlm, res); 856 __dlm_insert_lockres(dlm, res);
857 dlm_lockres_grab_inflight_ref(dlm, res);
796 spin_unlock(&res->spinlock); 858 spin_unlock(&res->spinlock);
797 spin_unlock(&dlm->spinlock); 859 spin_unlock(&dlm->spinlock);
798 /* lockres still marked IN_PROGRESS */ 860 /* lockres still marked IN_PROGRESS */
@@ -805,29 +867,40 @@ lookup:
805 /* if we found a block, wait for lock to be mastered by another node */ 867 /* if we found a block, wait for lock to be mastered by another node */
806 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen); 868 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
807 if (blocked) { 869 if (blocked) {
870 int mig;
808 if (mle->type == DLM_MLE_MASTER) { 871 if (mle->type == DLM_MLE_MASTER) {
809 mlog(ML_ERROR, "master entry for nonexistent lock!\n"); 872 mlog(ML_ERROR, "master entry for nonexistent lock!\n");
810 BUG(); 873 BUG();
811 } else if (mle->type == DLM_MLE_MIGRATION) { 874 }
812 /* migration is in progress! */ 875 mig = (mle->type == DLM_MLE_MIGRATION);
813 /* the good news is that we now know the 876 /* if there is a migration in progress, let the migration
814 * "current" master (mle->master). */ 877 * finish before continuing. we can wait for the absence
815 878 * of the MIGRATION mle: either the migrate finished or
879 * one of the nodes died and the mle was cleaned up.
880 * if there is a BLOCK here, but it already has a master
881 * set, we are too late. the master does not have a ref
882 * for us in the refmap. detach the mle and drop it.
883 * either way, go back to the top and start over. */
884 if (mig || mle->master != O2NM_MAX_NODES) {
885 BUG_ON(mig && mle->master == dlm->node_num);
886 /* we arrived too late. the master does not
887 * have a ref for us. retry. */
888 mlog(0, "%s:%.*s: late on %s\n",
889 dlm->name, namelen, lockid,
890 mig ? "MIGRATION" : "BLOCK");
816 spin_unlock(&dlm->master_lock); 891 spin_unlock(&dlm->master_lock);
817 assert_spin_locked(&dlm->spinlock);
818
819 /* set the lockres owner and hash it */
820 spin_lock(&res->spinlock);
821 dlm_set_lockres_owner(dlm, res, mle->master);
822 __dlm_insert_lockres(dlm, res);
823 spin_unlock(&res->spinlock);
824 spin_unlock(&dlm->spinlock); 892 spin_unlock(&dlm->spinlock);
825 893
826 /* master is known, detach */ 894 /* master is known, detach */
827 dlm_mle_detach_hb_events(dlm, mle); 895 if (!mig)
896 dlm_mle_detach_hb_events(dlm, mle);
828 dlm_put_mle(mle); 897 dlm_put_mle(mle);
829 mle = NULL; 898 mle = NULL;
830 goto wake_waiters; 899 /* this is lame, but we cant wait on either
900 * the mle or lockres waitqueue here */
901 if (mig)
902 msleep(100);
903 goto lookup;
831 } 904 }
832 } else { 905 } else {
833 /* go ahead and try to master lock on this node */ 906 /* go ahead and try to master lock on this node */
@@ -858,6 +931,13 @@ lookup:
858 931
859 /* finally add the lockres to its hash bucket */ 932 /* finally add the lockres to its hash bucket */
860 __dlm_insert_lockres(dlm, res); 933 __dlm_insert_lockres(dlm, res);
934 /* since this lockres is new it doesnt not require the spinlock */
935 dlm_lockres_grab_inflight_ref_new(dlm, res);
936
937 /* if this node does not become the master make sure to drop
938 * this inflight reference below */
939 drop_inflight_if_nonlocal = 1;
940
861 /* get an extra ref on the mle in case this is a BLOCK 941 /* get an extra ref on the mle in case this is a BLOCK
862 * if so, the creator of the BLOCK may try to put the last 942 * if so, the creator of the BLOCK may try to put the last
863 * ref at this time in the assert master handler, so we 943 * ref at this time in the assert master handler, so we
@@ -910,7 +990,7 @@ redo_request:
910 ret = -EINVAL; 990 ret = -EINVAL;
911 dlm_node_iter_init(mle->vote_map, &iter); 991 dlm_node_iter_init(mle->vote_map, &iter);
912 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 992 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
913 ret = dlm_do_master_request(mle, nodenum); 993 ret = dlm_do_master_request(res, mle, nodenum);
914 if (ret < 0) 994 if (ret < 0)
915 mlog_errno(ret); 995 mlog_errno(ret);
916 if (mle->master != O2NM_MAX_NODES) { 996 if (mle->master != O2NM_MAX_NODES) {
@@ -960,6 +1040,8 @@ wait:
960 1040
961wake_waiters: 1041wake_waiters:
962 spin_lock(&res->spinlock); 1042 spin_lock(&res->spinlock);
1043 if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
1044 dlm_lockres_drop_inflight_ref(dlm, res);
963 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 1045 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
964 spin_unlock(&res->spinlock); 1046 spin_unlock(&res->spinlock);
965 wake_up(&res->wq); 1047 wake_up(&res->wq);
@@ -998,7 +1080,7 @@ recheck:
998 /* this will cause the master to re-assert across 1080 /* this will cause the master to re-assert across
999 * the whole cluster, freeing up mles */ 1081 * the whole cluster, freeing up mles */
1000 if (res->owner != dlm->node_num) { 1082 if (res->owner != dlm->node_num) {
1001 ret = dlm_do_master_request(mle, res->owner); 1083 ret = dlm_do_master_request(res, mle, res->owner);
1002 if (ret < 0) { 1084 if (ret < 0) {
1003 /* give recovery a chance to run */ 1085 /* give recovery a chance to run */
1004 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); 1086 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
@@ -1062,6 +1144,8 @@ recheck:
1062 * now tell other nodes that I am 1144 * now tell other nodes that I am
1063 * mastering this. */ 1145 * mastering this. */
1064 mle->master = dlm->node_num; 1146 mle->master = dlm->node_num;
1147 /* ref was grabbed in get_lock_resource
1148 * will be dropped in dlmlock_master */
1065 assert = 1; 1149 assert = 1;
1066 sleep = 0; 1150 sleep = 0;
1067 } 1151 }
@@ -1087,7 +1171,8 @@ recheck:
1087 (atomic_read(&mle->woken) == 1), 1171 (atomic_read(&mle->woken) == 1),
1088 timeo); 1172 timeo);
1089 if (res->owner == O2NM_MAX_NODES) { 1173 if (res->owner == O2NM_MAX_NODES) {
1090 mlog(0, "waiting again\n"); 1174 mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1175 res->lockname.len, res->lockname.name);
1091 goto recheck; 1176 goto recheck;
1092 } 1177 }
1093 mlog(0, "done waiting, master is %u\n", res->owner); 1178 mlog(0, "done waiting, master is %u\n", res->owner);
@@ -1100,8 +1185,7 @@ recheck:
1100 m = dlm->node_num; 1185 m = dlm->node_num;
1101 mlog(0, "about to master %.*s here, this=%u\n", 1186 mlog(0, "about to master %.*s here, this=%u\n",
1102 res->lockname.len, res->lockname.name, m); 1187 res->lockname.len, res->lockname.name, m);
1103 ret = dlm_do_assert_master(dlm, res->lockname.name, 1188 ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1104 res->lockname.len, mle->vote_map, 0);
1105 if (ret) { 1189 if (ret) {
1106 /* This is a failure in the network path, 1190 /* This is a failure in the network path,
1107 * not in the response to the assert_master 1191 * not in the response to the assert_master
@@ -1117,6 +1201,8 @@ recheck:
1117 1201
1118 /* set the lockres owner */ 1202 /* set the lockres owner */
1119 spin_lock(&res->spinlock); 1203 spin_lock(&res->spinlock);
1204 /* mastery reference obtained either during
1205 * assert_master_handler or in get_lock_resource */
1120 dlm_change_lockres_owner(dlm, res, m); 1206 dlm_change_lockres_owner(dlm, res, m);
1121 spin_unlock(&res->spinlock); 1207 spin_unlock(&res->spinlock);
1122 1208
@@ -1283,7 +1369,8 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1283 * 1369 *
1284 */ 1370 */
1285 1371
1286static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to) 1372static int dlm_do_master_request(struct dlm_lock_resource *res,
1373 struct dlm_master_list_entry *mle, int to)
1287{ 1374{
1288 struct dlm_ctxt *dlm = mle->dlm; 1375 struct dlm_ctxt *dlm = mle->dlm;
1289 struct dlm_master_request request; 1376 struct dlm_master_request request;
@@ -1339,6 +1426,9 @@ again:
1339 case DLM_MASTER_RESP_YES: 1426 case DLM_MASTER_RESP_YES:
1340 set_bit(to, mle->response_map); 1427 set_bit(to, mle->response_map);
1341 mlog(0, "node %u is the master, response=YES\n", to); 1428 mlog(0, "node %u is the master, response=YES\n", to);
1429 mlog(0, "%s:%.*s: master node %u now knows I have a "
1430 "reference\n", dlm->name, res->lockname.len,
1431 res->lockname.name, to);
1342 mle->master = to; 1432 mle->master = to;
1343 break; 1433 break;
1344 case DLM_MASTER_RESP_NO: 1434 case DLM_MASTER_RESP_NO:
@@ -1428,8 +1518,10 @@ way_up_top:
1428 } 1518 }
1429 1519
1430 if (res->owner == dlm->node_num) { 1520 if (res->owner == dlm->node_num) {
1521 mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1522 dlm->name, namelen, name, request->node_idx);
1523 dlm_lockres_set_refmap_bit(request->node_idx, res);
1431 spin_unlock(&res->spinlock); 1524 spin_unlock(&res->spinlock);
1432 // mlog(0, "this node is the master\n");
1433 response = DLM_MASTER_RESP_YES; 1525 response = DLM_MASTER_RESP_YES;
1434 if (mle) 1526 if (mle)
1435 kmem_cache_free(dlm_mle_cache, mle); 1527 kmem_cache_free(dlm_mle_cache, mle);
@@ -1477,7 +1569,6 @@ way_up_top:
1477 mlog(0, "node %u is master, but trying to migrate to " 1569 mlog(0, "node %u is master, but trying to migrate to "
1478 "node %u.\n", tmpmle->master, tmpmle->new_master); 1570 "node %u.\n", tmpmle->master, tmpmle->new_master);
1479 if (tmpmle->master == dlm->node_num) { 1571 if (tmpmle->master == dlm->node_num) {
1480 response = DLM_MASTER_RESP_YES;
1481 mlog(ML_ERROR, "no owner on lockres, but this " 1572 mlog(ML_ERROR, "no owner on lockres, but this "
1482 "node is trying to migrate it to %u?!\n", 1573 "node is trying to migrate it to %u?!\n",
1483 tmpmle->new_master); 1574 tmpmle->new_master);
@@ -1494,6 +1585,10 @@ way_up_top:
1494 * go back and clean the mles on any 1585 * go back and clean the mles on any
1495 * other nodes */ 1586 * other nodes */
1496 dispatch_assert = 1; 1587 dispatch_assert = 1;
1588 dlm_lockres_set_refmap_bit(request->node_idx, res);
1589 mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1590 dlm->name, namelen, name,
1591 request->node_idx);
1497 } else 1592 } else
1498 response = DLM_MASTER_RESP_NO; 1593 response = DLM_MASTER_RESP_NO;
1499 } else { 1594 } else {
@@ -1607,15 +1702,17 @@ send_response:
1607 * can periodically run all locks owned by this node 1702 * can periodically run all locks owned by this node
1608 * and re-assert across the cluster... 1703 * and re-assert across the cluster...
1609 */ 1704 */
1610static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, 1705int dlm_do_assert_master(struct dlm_ctxt *dlm,
1611 unsigned int namelen, void *nodemap, 1706 struct dlm_lock_resource *res,
1612 u32 flags) 1707 void *nodemap, u32 flags)
1613{ 1708{
1614 struct dlm_assert_master assert; 1709 struct dlm_assert_master assert;
1615 int to, tmpret; 1710 int to, tmpret;
1616 struct dlm_node_iter iter; 1711 struct dlm_node_iter iter;
1617 int ret = 0; 1712 int ret = 0;
1618 int reassert; 1713 int reassert;
1714 const char *lockname = res->lockname.name;
1715 unsigned int namelen = res->lockname.len;
1619 1716
1620 BUG_ON(namelen > O2NM_MAX_NAME_LEN); 1717 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1621again: 1718again:
@@ -1647,6 +1744,7 @@ again:
1647 mlog(0, "link to %d went down!\n", to); 1744 mlog(0, "link to %d went down!\n", to);
1648 /* any nonzero status return will do */ 1745 /* any nonzero status return will do */
1649 ret = tmpret; 1746 ret = tmpret;
1747 r = 0;
1650 } else if (r < 0) { 1748 } else if (r < 0) {
1651 /* ok, something horribly messed. kill thyself. */ 1749 /* ok, something horribly messed. kill thyself. */
1652 mlog(ML_ERROR,"during assert master of %.*s to %u, " 1750 mlog(ML_ERROR,"during assert master of %.*s to %u, "
@@ -1661,12 +1759,29 @@ again:
1661 spin_unlock(&dlm->master_lock); 1759 spin_unlock(&dlm->master_lock);
1662 spin_unlock(&dlm->spinlock); 1760 spin_unlock(&dlm->spinlock);
1663 BUG(); 1761 BUG();
1664 } else if (r == EAGAIN) { 1762 }
1763
1764 if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1765 !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1766 mlog(ML_ERROR, "%.*s: very strange, "
1767 "master MLE but no lockres on %u\n",
1768 namelen, lockname, to);
1769 }
1770
1771 if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1665 mlog(0, "%.*s: node %u create mles on other " 1772 mlog(0, "%.*s: node %u create mles on other "
1666 "nodes and requests a re-assert\n", 1773 "nodes and requests a re-assert\n",
1667 namelen, lockname, to); 1774 namelen, lockname, to);
1668 reassert = 1; 1775 reassert = 1;
1669 } 1776 }
1777 if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1778 mlog(0, "%.*s: node %u has a reference to this "
1779 "lockres, set the bit in the refmap\n",
1780 namelen, lockname, to);
1781 spin_lock(&res->spinlock);
1782 dlm_lockres_set_refmap_bit(to, res);
1783 spin_unlock(&res->spinlock);
1784 }
1670 } 1785 }
1671 1786
1672 if (reassert) 1787 if (reassert)
@@ -1693,7 +1808,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1693 char *name; 1808 char *name;
1694 unsigned int namelen, hash; 1809 unsigned int namelen, hash;
1695 u32 flags; 1810 u32 flags;
1696 int master_request = 0; 1811 int master_request = 0, have_lockres_ref = 0;
1697 int ret = 0; 1812 int ret = 0;
1698 1813
1699 if (!dlm_grab(dlm)) 1814 if (!dlm_grab(dlm))
@@ -1864,6 +1979,7 @@ ok:
1864 dlm_change_lockres_owner(dlm, res, mle->master); 1979 dlm_change_lockres_owner(dlm, res, mle->master);
1865 } 1980 }
1866 spin_unlock(&res->spinlock); 1981 spin_unlock(&res->spinlock);
1982 have_lockres_ref = 1;
1867 } 1983 }
1868 1984
1869 /* master is known, detach if not already detached. 1985 /* master is known, detach if not already detached.
@@ -1918,7 +2034,19 @@ done:
1918 dlm_put(dlm); 2034 dlm_put(dlm);
1919 if (master_request) { 2035 if (master_request) {
1920 mlog(0, "need to tell master to reassert\n"); 2036 mlog(0, "need to tell master to reassert\n");
1921 ret = EAGAIN; // positive. negative would shoot down the node. 2037 /* positive. negative would shoot down the node. */
2038 ret |= DLM_ASSERT_RESPONSE_REASSERT;
2039 if (!have_lockres_ref) {
2040 mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2041 "mle present here for %s:%.*s, but no lockres!\n",
2042 assert->node_idx, dlm->name, namelen, name);
2043 }
2044 }
2045 if (have_lockres_ref) {
2046 /* let the master know we have a reference to the lockres */
2047 ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2048 mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2049 dlm->name, namelen, name, assert->node_idx);
1922 } 2050 }
1923 return ret; 2051 return ret;
1924 2052
@@ -2023,9 +2151,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2023 * even if one or more nodes die */ 2151 * even if one or more nodes die */
2024 mlog(0, "worker about to master %.*s here, this=%u\n", 2152 mlog(0, "worker about to master %.*s here, this=%u\n",
2025 res->lockname.len, res->lockname.name, dlm->node_num); 2153 res->lockname.len, res->lockname.name, dlm->node_num);
2026 ret = dlm_do_assert_master(dlm, res->lockname.name, 2154 ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2027 res->lockname.len,
2028 nodemap, flags);
2029 if (ret < 0) { 2155 if (ret < 0) {
2030 /* no need to restart, we are done */ 2156 /* no need to restart, we are done */
2031 if (!dlm_is_host_down(ret)) 2157 if (!dlm_is_host_down(ret))
@@ -2097,6 +2223,104 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2097 return ret; 2223 return ret;
2098} 2224}
2099 2225
2226/*
2227 * DLM_DEREF_LOCKRES_MSG
2228 */
2229
2230int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2231{
2232 struct dlm_deref_lockres deref;
2233 int ret = 0, r;
2234 const char *lockname;
2235 unsigned int namelen;
2236
2237 lockname = res->lockname.name;
2238 namelen = res->lockname.len;
2239 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2240
2241 mlog(0, "%s:%.*s: sending deref to %d\n",
2242 dlm->name, namelen, lockname, res->owner);
2243 memset(&deref, 0, sizeof(deref));
2244 deref.node_idx = dlm->node_num;
2245 deref.namelen = namelen;
2246 memcpy(deref.name, lockname, namelen);
2247
2248 ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2249 &deref, sizeof(deref), res->owner, &r);
2250 if (ret < 0)
2251 mlog_errno(ret);
2252 else if (r < 0) {
2253 /* BAD. other node says I did not have a ref. */
2254 mlog(ML_ERROR,"while dropping ref on %s:%.*s "
2255 "(master=%u) got %d.\n", dlm->name, namelen,
2256 lockname, res->owner, r);
2257 dlm_print_one_lock_resource(res);
2258 BUG();
2259 }
2260 return ret;
2261}
2262
2263int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
2264{
2265 struct dlm_ctxt *dlm = data;
2266 struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2267 struct dlm_lock_resource *res = NULL;
2268 char *name;
2269 unsigned int namelen;
2270 int ret = -EINVAL;
2271 u8 node;
2272 unsigned int hash;
2273
2274 if (!dlm_grab(dlm))
2275 return 0;
2276
2277 name = deref->name;
2278 namelen = deref->namelen;
2279 node = deref->node_idx;
2280
2281 if (namelen > DLM_LOCKID_NAME_MAX) {
2282 mlog(ML_ERROR, "Invalid name length!");
2283 goto done;
2284 }
2285 if (deref->node_idx >= O2NM_MAX_NODES) {
2286 mlog(ML_ERROR, "Invalid node number: %u\n", node);
2287 goto done;
2288 }
2289
2290 hash = dlm_lockid_hash(name, namelen);
2291
2292 spin_lock(&dlm->spinlock);
2293 res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2294 if (!res) {
2295 spin_unlock(&dlm->spinlock);
2296 mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2297 dlm->name, namelen, name);
2298 goto done;
2299 }
2300 spin_unlock(&dlm->spinlock);
2301
2302 spin_lock(&res->spinlock);
2303 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2304 if (test_bit(node, res->refmap)) {
2305 ret = 0;
2306 dlm_lockres_clear_refmap_bit(node, res);
2307 } else {
2308 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2309 "but it is already dropped!\n", dlm->name, namelen,
2310 name, node);
2311 __dlm_print_one_lock_resource(res);
2312 }
2313 spin_unlock(&res->spinlock);
2314
2315 if (!ret)
2316 dlm_lockres_calc_usage(dlm, res);
2317done:
2318 if (res)
2319 dlm_lockres_put(res);
2320 dlm_put(dlm);
2321 return ret;
2322}
2323
2100 2324
2101/* 2325/*
2102 * DLM_MIGRATE_LOCKRES 2326 * DLM_MIGRATE_LOCKRES
@@ -2376,6 +2600,53 @@ leave:
2376 return ret; 2600 return ret;
2377} 2601}
2378 2602
2603#define DLM_MIGRATION_RETRY_MS 100
2604
2605/* Should be called only after beginning the domain leave process.
2606 * There should not be any remaining locks on nonlocal lock resources,
2607 * and there should be no local locks left on locally mastered resources.
2608 *
2609 * Called with the dlm spinlock held, may drop it to do migration, but
2610 * will re-acquire before exit.
2611 *
2612 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
2613int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2614{
2615 int ret;
2616 int lock_dropped = 0;
2617
2618 if (res->owner != dlm->node_num) {
2619 if (!__dlm_lockres_unused(res)) {
2620 mlog(ML_ERROR, "%s:%.*s: this node is not master, "
2621 "trying to free this but locks remain\n",
2622 dlm->name, res->lockname.len, res->lockname.name);
2623 }
2624 goto leave;
2625 }
2626
2627 /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2628 spin_unlock(&dlm->spinlock);
2629 lock_dropped = 1;
2630 while (1) {
2631 ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
2632 if (ret >= 0)
2633 break;
2634 if (ret == -ENOTEMPTY) {
2635 mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
2636 res->lockname.len, res->lockname.name);
2637 BUG();
2638 }
2639
2640 mlog(0, "lockres %.*s: migrate failed, "
2641 "retrying\n", res->lockname.len,
2642 res->lockname.name);
2643 msleep(DLM_MIGRATION_RETRY_MS);
2644 }
2645 spin_lock(&dlm->spinlock);
2646leave:
2647 return lock_dropped;
2648}
2649
2379int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock) 2650int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2380{ 2651{
2381 int ret; 2652 int ret;
@@ -2490,7 +2761,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2490{ 2761{
2491 struct list_head *iter, *iter2; 2762 struct list_head *iter, *iter2;
2492 struct list_head *queue = &res->granted; 2763 struct list_head *queue = &res->granted;
2493 int i; 2764 int i, bit;
2494 struct dlm_lock *lock; 2765 struct dlm_lock *lock;
2495 2766
2496 assert_spin_locked(&res->spinlock); 2767 assert_spin_locked(&res->spinlock);
@@ -2508,12 +2779,28 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2508 BUG_ON(!list_empty(&lock->bast_list)); 2779 BUG_ON(!list_empty(&lock->bast_list));
2509 BUG_ON(lock->ast_pending); 2780 BUG_ON(lock->ast_pending);
2510 BUG_ON(lock->bast_pending); 2781 BUG_ON(lock->bast_pending);
2782 dlm_lockres_clear_refmap_bit(lock->ml.node, res);
2511 list_del_init(&lock->list); 2783 list_del_init(&lock->list);
2512 dlm_lock_put(lock); 2784 dlm_lock_put(lock);
2513 } 2785 }
2514 } 2786 }
2515 queue++; 2787 queue++;
2516 } 2788 }
2789 bit = 0;
2790 while (1) {
2791 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2792 if (bit >= O2NM_MAX_NODES)
2793 break;
2794 /* do not clear the local node reference, if there is a
2795 * process holding this, let it drop the ref itself */
2796 if (bit != dlm->node_num) {
2797 mlog(0, "%s:%.*s: node %u had a ref to this "
2798 "migrating lockres, clearing\n", dlm->name,
2799 res->lockname.len, res->lockname.name, bit);
2800 dlm_lockres_clear_refmap_bit(bit, res);
2801 }
2802 bit++;
2803 }
2517} 2804}
2518 2805
2519/* for now this is not too intelligent. we will 2806/* for now this is not too intelligent. we will
@@ -2601,6 +2888,16 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2601 mlog(0, "migrate request (node %u) returned %d!\n", 2888 mlog(0, "migrate request (node %u) returned %d!\n",
2602 nodenum, status); 2889 nodenum, status);
2603 ret = status; 2890 ret = status;
2891 } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
2892 /* during the migration request we short-circuited
2893 * the mastery of the lockres. make sure we have
2894 * a mastery ref for nodenum */
2895 mlog(0, "%s:%.*s: need ref for node %u\n",
2896 dlm->name, res->lockname.len, res->lockname.name,
2897 nodenum);
2898 spin_lock(&res->spinlock);
2899 dlm_lockres_set_refmap_bit(nodenum, res);
2900 spin_unlock(&res->spinlock);
2604 } 2901 }
2605 } 2902 }
2606 2903
@@ -2745,7 +3042,13 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2745 /* remove it from the list so that only one 3042 /* remove it from the list so that only one
2746 * mle will be found */ 3043 * mle will be found */
2747 list_del_init(&tmp->list); 3044 list_del_init(&tmp->list);
2748 __dlm_mle_detach_hb_events(dlm, mle); 3045 /* this was obviously WRONG. mle is uninited here. should be tmp. */
3046 __dlm_mle_detach_hb_events(dlm, tmp);
3047 ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3048 mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3049 "telling master to get ref for cleared out mle "
3050 "during migration\n", dlm->name, namelen, name,
3051 master, new_master);
2749 } 3052 }
2750 spin_unlock(&tmp->spinlock); 3053 spin_unlock(&tmp->spinlock);
2751 } 3054 }
@@ -2753,6 +3056,8 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2753 /* now add a migration mle to the tail of the list */ 3056 /* now add a migration mle to the tail of the list */
2754 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen); 3057 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
2755 mle->new_master = new_master; 3058 mle->new_master = new_master;
3059 /* the new master will be sending an assert master for this.
3060 * at that point we will get the refmap reference */
2756 mle->master = master; 3061 mle->master = master;
2757 /* do this for consistency with other mle types */ 3062 /* do this for consistency with other mle types */
2758 set_bit(new_master, mle->maybe_map); 3063 set_bit(new_master, mle->maybe_map);
@@ -2902,6 +3207,13 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2902 clear_bit(dlm->node_num, iter.node_map); 3207 clear_bit(dlm->node_num, iter.node_map);
2903 spin_unlock(&dlm->spinlock); 3208 spin_unlock(&dlm->spinlock);
2904 3209
3210 /* ownership of the lockres is changing. account for the
3211 * mastery reference here since old_master will briefly have
3212 * a reference after the migration completes */
3213 spin_lock(&res->spinlock);
3214 dlm_lockres_set_refmap_bit(old_master, res);
3215 spin_unlock(&res->spinlock);
3216
2905 mlog(0, "now time to do a migrate request to other nodes\n"); 3217 mlog(0, "now time to do a migrate request to other nodes\n");
2906 ret = dlm_do_migrate_request(dlm, res, old_master, 3218 ret = dlm_do_migrate_request(dlm, res, old_master,
2907 dlm->node_num, &iter); 3219 dlm->node_num, &iter);
@@ -2914,8 +3226,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2914 res->lockname.len, res->lockname.name); 3226 res->lockname.len, res->lockname.name);
2915 /* this call now finishes out the nodemap 3227 /* this call now finishes out the nodemap
2916 * even if one or more nodes die */ 3228 * even if one or more nodes die */
2917 ret = dlm_do_assert_master(dlm, res->lockname.name, 3229 ret = dlm_do_assert_master(dlm, res, iter.node_map,
2918 res->lockname.len, iter.node_map,
2919 DLM_ASSERT_MASTER_FINISH_MIGRATION); 3230 DLM_ASSERT_MASTER_FINISH_MIGRATION);
2920 if (ret < 0) { 3231 if (ret < 0) {
2921 /* no longer need to retry. all living nodes contacted. */ 3232 /* no longer need to retry. all living nodes contacted. */
@@ -2927,8 +3238,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2927 set_bit(old_master, iter.node_map); 3238 set_bit(old_master, iter.node_map);
2928 mlog(0, "doing assert master of %.*s back to %u\n", 3239 mlog(0, "doing assert master of %.*s back to %u\n",
2929 res->lockname.len, res->lockname.name, old_master); 3240 res->lockname.len, res->lockname.name, old_master);
2930 ret = dlm_do_assert_master(dlm, res->lockname.name, 3241 ret = dlm_do_assert_master(dlm, res, iter.node_map,
2931 res->lockname.len, iter.node_map,
2932 DLM_ASSERT_MASTER_FINISH_MIGRATION); 3242 DLM_ASSERT_MASTER_FINISH_MIGRATION);
2933 if (ret < 0) { 3243 if (ret < 0) {
2934 mlog(0, "assert master to original master failed " 3244 mlog(0, "assert master to original master failed "
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 367a11e9e2ed..d011a2a22742 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -1129,6 +1129,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
1129 if (total_locks == mres_total_locks) 1129 if (total_locks == mres_total_locks)
1130 mres->flags |= DLM_MRES_ALL_DONE; 1130 mres->flags |= DLM_MRES_ALL_DONE;
1131 1131
1132 mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
1133 dlm->name, res->lockname.len, res->lockname.name,
1134 orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
1135 send_to);
1136
1132 /* send it */ 1137 /* send it */
1133 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, 1138 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
1134 sz, send_to, &status); 1139 sz, send_to, &status);
@@ -1213,6 +1218,34 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
1213 return 0; 1218 return 0;
1214} 1219}
1215 1220
1221static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
1222 struct dlm_migratable_lockres *mres)
1223{
1224 struct dlm_lock dummy;
1225 memset(&dummy, 0, sizeof(dummy));
1226 dummy.ml.cookie = 0;
1227 dummy.ml.type = LKM_IVMODE;
1228 dummy.ml.convert_type = LKM_IVMODE;
1229 dummy.ml.highest_blocked = LKM_IVMODE;
1230 dummy.lksb = NULL;
1231 dummy.ml.node = dlm->node_num;
1232 dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
1233}
1234
1235static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
1236 struct dlm_migratable_lock *ml,
1237 u8 *nodenum)
1238{
1239 if (unlikely(ml->cookie == 0 &&
1240 ml->type == LKM_IVMODE &&
1241 ml->convert_type == LKM_IVMODE &&
1242 ml->highest_blocked == LKM_IVMODE &&
1243 ml->list == DLM_BLOCKED_LIST)) {
1244 *nodenum = ml->node;
1245 return 1;
1246 }
1247 return 0;
1248}
1216 1249
1217int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 1250int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1218 struct dlm_migratable_lockres *mres, 1251 struct dlm_migratable_lockres *mres,
@@ -1260,6 +1293,14 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1260 goto error; 1293 goto error;
1261 } 1294 }
1262 } 1295 }
1296 if (total_locks == 0) {
1297 /* send a dummy lock to indicate a mastery reference only */
1298 mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
1299 dlm->name, res->lockname.len, res->lockname.name,
1300 send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
1301 "migration");
1302 dlm_add_dummy_lock(dlm, mres);
1303 }
1263 /* flush any remaining locks */ 1304 /* flush any remaining locks */
1264 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); 1305 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
1265 if (ret < 0) 1306 if (ret < 0)
@@ -1386,13 +1427,16 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1386 /* add an extra ref for just-allocated lockres 1427 /* add an extra ref for just-allocated lockres
1387 * otherwise the lockres will be purged immediately */ 1428 * otherwise the lockres will be purged immediately */
1388 dlm_lockres_get(res); 1429 dlm_lockres_get(res);
1389
1390 } 1430 }
1391 1431
1392 /* at this point we have allocated everything we need, 1432 /* at this point we have allocated everything we need,
1393 * and we have a hashed lockres with an extra ref and 1433 * and we have a hashed lockres with an extra ref and
1394 * the proper res->state flags. */ 1434 * the proper res->state flags. */
1395 ret = 0; 1435 ret = 0;
1436 spin_lock(&res->spinlock);
1437 /* drop this either when master requery finds a different master
1438 * or when a lock is added by the recovery worker */
1439 dlm_lockres_grab_inflight_ref(dlm, res);
1396 if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1440 if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
1397 /* migration cannot have an unknown master */ 1441 /* migration cannot have an unknown master */
1398 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); 1442 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
@@ -1400,10 +1444,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1400 "unknown owner.. will need to requery: " 1444 "unknown owner.. will need to requery: "
1401 "%.*s\n", mres->lockname_len, mres->lockname); 1445 "%.*s\n", mres->lockname_len, mres->lockname);
1402 } else { 1446 } else {
1403 spin_lock(&res->spinlock); 1447 /* take a reference now to pin the lockres, drop it
1448 * when locks are added in the worker */
1404 dlm_change_lockres_owner(dlm, res, dlm->node_num); 1449 dlm_change_lockres_owner(dlm, res, dlm->node_num);
1405 spin_unlock(&res->spinlock);
1406 } 1450 }
1451 spin_unlock(&res->spinlock);
1407 1452
1408 /* queue up work for dlm_mig_lockres_worker */ 1453 /* queue up work for dlm_mig_lockres_worker */
1409 dlm_grab(dlm); /* get an extra ref for the work item */ 1454 dlm_grab(dlm); /* get an extra ref for the work item */
@@ -1459,6 +1504,9 @@ again:
1459 "this node will take it.\n", 1504 "this node will take it.\n",
1460 res->lockname.len, res->lockname.name); 1505 res->lockname.len, res->lockname.name);
1461 } else { 1506 } else {
1507 spin_lock(&res->spinlock);
1508 dlm_lockres_drop_inflight_ref(dlm, res);
1509 spin_unlock(&res->spinlock);
1462 mlog(0, "master needs to respond to sender " 1510 mlog(0, "master needs to respond to sender "
1463 "that node %u still owns %.*s\n", 1511 "that node %u still owns %.*s\n",
1464 real_master, res->lockname.len, 1512 real_master, res->lockname.len,
@@ -1666,10 +1714,25 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1666 int i, bad; 1714 int i, bad;
1667 struct list_head *iter; 1715 struct list_head *iter;
1668 struct dlm_lock *lock = NULL; 1716 struct dlm_lock *lock = NULL;
1717 u8 from = O2NM_MAX_NODES;
1718 unsigned int added = 0;
1669 1719
1670 mlog(0, "running %d locks for this lockres\n", mres->num_locks); 1720 mlog(0, "running %d locks for this lockres\n", mres->num_locks);
1671 for (i=0; i<mres->num_locks; i++) { 1721 for (i=0; i<mres->num_locks; i++) {
1672 ml = &(mres->ml[i]); 1722 ml = &(mres->ml[i]);
1723
1724 if (dlm_is_dummy_lock(dlm, ml, &from)) {
1725 /* placeholder, just need to set the refmap bit */
1726 BUG_ON(mres->num_locks != 1);
1727 mlog(0, "%s:%.*s: dummy lock for %u\n",
1728 dlm->name, mres->lockname_len, mres->lockname,
1729 from);
1730 spin_lock(&res->spinlock);
1731 dlm_lockres_set_refmap_bit(from, res);
1732 spin_unlock(&res->spinlock);
1733 added++;
1734 break;
1735 }
1673 BUG_ON(ml->highest_blocked != LKM_IVMODE); 1736 BUG_ON(ml->highest_blocked != LKM_IVMODE);
1674 newlock = NULL; 1737 newlock = NULL;
1675 lksb = NULL; 1738 lksb = NULL;
@@ -1711,6 +1774,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1711 /* do not alter lock refcount. switching lists. */ 1774 /* do not alter lock refcount. switching lists. */
1712 list_move_tail(&lock->list, queue); 1775 list_move_tail(&lock->list, queue);
1713 spin_unlock(&res->spinlock); 1776 spin_unlock(&res->spinlock);
1777 added++;
1714 1778
1715 mlog(0, "just reordered a local lock!\n"); 1779 mlog(0, "just reordered a local lock!\n");
1716 continue; 1780 continue;
@@ -1817,12 +1881,24 @@ skip_lvb:
1817 if (!bad) { 1881 if (!bad) {
1818 dlm_lock_get(newlock); 1882 dlm_lock_get(newlock);
1819 list_add_tail(&newlock->list, queue); 1883 list_add_tail(&newlock->list, queue);
1884 mlog(0, "%s:%.*s: added lock for node %u, "
1885 "setting refmap bit\n", dlm->name,
1886 res->lockname.len, res->lockname.name, ml->node);
1887 dlm_lockres_set_refmap_bit(ml->node, res);
1888 added++;
1820 } 1889 }
1821 spin_unlock(&res->spinlock); 1890 spin_unlock(&res->spinlock);
1822 } 1891 }
1823 mlog(0, "done running all the locks\n"); 1892 mlog(0, "done running all the locks\n");
1824 1893
1825leave: 1894leave:
1895 /* balance the ref taken when the work was queued */
1896 if (added > 0) {
1897 spin_lock(&res->spinlock);
1898 dlm_lockres_drop_inflight_ref(dlm, res);
1899 spin_unlock(&res->spinlock);
1900 }
1901
1826 if (ret < 0) { 1902 if (ret < 0) {
1827 mlog_errno(ret); 1903 mlog_errno(ret);
1828 if (newlock) 1904 if (newlock)
@@ -1935,9 +2011,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1935 if (res->owner == dead_node) { 2011 if (res->owner == dead_node) {
1936 list_del_init(&res->recovering); 2012 list_del_init(&res->recovering);
1937 spin_lock(&res->spinlock); 2013 spin_lock(&res->spinlock);
2014 /* new_master has our reference from
2015 * the lock state sent during recovery */
1938 dlm_change_lockres_owner(dlm, res, new_master); 2016 dlm_change_lockres_owner(dlm, res, new_master);
1939 res->state &= ~DLM_LOCK_RES_RECOVERING; 2017 res->state &= ~DLM_LOCK_RES_RECOVERING;
1940 if (!__dlm_lockres_unused(res)) 2018 if (__dlm_lockres_has_locks(res))
1941 __dlm_dirty_lockres(dlm, res); 2019 __dlm_dirty_lockres(dlm, res);
1942 spin_unlock(&res->spinlock); 2020 spin_unlock(&res->spinlock);
1943 wake_up(&res->wq); 2021 wake_up(&res->wq);
@@ -1977,9 +2055,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1977 dlm_lockres_put(res); 2055 dlm_lockres_put(res);
1978 } 2056 }
1979 spin_lock(&res->spinlock); 2057 spin_lock(&res->spinlock);
2058 /* new_master has our reference from
2059 * the lock state sent during recovery */
1980 dlm_change_lockres_owner(dlm, res, new_master); 2060 dlm_change_lockres_owner(dlm, res, new_master);
1981 res->state &= ~DLM_LOCK_RES_RECOVERING; 2061 res->state &= ~DLM_LOCK_RES_RECOVERING;
1982 if (!__dlm_lockres_unused(res)) 2062 if (__dlm_lockres_has_locks(res))
1983 __dlm_dirty_lockres(dlm, res); 2063 __dlm_dirty_lockres(dlm, res);
1984 spin_unlock(&res->spinlock); 2064 spin_unlock(&res->spinlock);
1985 wake_up(&res->wq); 2065 wake_up(&res->wq);
@@ -2048,6 +2128,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2048{ 2128{
2049 struct list_head *iter, *tmpiter; 2129 struct list_head *iter, *tmpiter;
2050 struct dlm_lock *lock; 2130 struct dlm_lock *lock;
2131 unsigned int freed = 0;
2051 2132
2052 /* this node is the lockres master: 2133 /* this node is the lockres master:
2053 * 1) remove any stale locks for the dead node 2134 * 1) remove any stale locks for the dead node
@@ -2062,6 +2143,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2062 if (lock->ml.node == dead_node) { 2143 if (lock->ml.node == dead_node) {
2063 list_del_init(&lock->list); 2144 list_del_init(&lock->list);
2064 dlm_lock_put(lock); 2145 dlm_lock_put(lock);
2146 freed++;
2065 } 2147 }
2066 } 2148 }
2067 list_for_each_safe(iter, tmpiter, &res->converting) { 2149 list_for_each_safe(iter, tmpiter, &res->converting) {
@@ -2069,6 +2151,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2069 if (lock->ml.node == dead_node) { 2151 if (lock->ml.node == dead_node) {
2070 list_del_init(&lock->list); 2152 list_del_init(&lock->list);
2071 dlm_lock_put(lock); 2153 dlm_lock_put(lock);
2154 freed++;
2072 } 2155 }
2073 } 2156 }
2074 list_for_each_safe(iter, tmpiter, &res->blocked) { 2157 list_for_each_safe(iter, tmpiter, &res->blocked) {
@@ -2076,9 +2159,23 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2076 if (lock->ml.node == dead_node) { 2159 if (lock->ml.node == dead_node) {
2077 list_del_init(&lock->list); 2160 list_del_init(&lock->list);
2078 dlm_lock_put(lock); 2161 dlm_lock_put(lock);
2162 freed++;
2079 } 2163 }
2080 } 2164 }
2081 2165
2166 if (freed) {
2167 mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
2168 "dropping ref from lockres\n", dlm->name,
2169 res->lockname.len, res->lockname.name, freed, dead_node);
2170 BUG_ON(!test_bit(dead_node, res->refmap));
2171 dlm_lockres_clear_refmap_bit(dead_node, res);
2172 } else if (test_bit(dead_node, res->refmap)) {
2173 mlog(0, "%s:%.*s: dead node %u had a ref, but had "
2174 "no locks and had not purged before dying\n", dlm->name,
2175 res->lockname.len, res->lockname.name, dead_node);
2176 dlm_lockres_clear_refmap_bit(dead_node, res);
2177 }
2178
2082 /* do not kick thread yet */ 2179 /* do not kick thread yet */
2083 __dlm_dirty_lockres(dlm, res); 2180 __dlm_dirty_lockres(dlm, res);
2084} 2181}
@@ -2141,9 +2238,21 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
2141 spin_lock(&res->spinlock); 2238 spin_lock(&res->spinlock);
2142 /* zero the lvb if necessary */ 2239 /* zero the lvb if necessary */
2143 dlm_revalidate_lvb(dlm, res, dead_node); 2240 dlm_revalidate_lvb(dlm, res, dead_node);
2144 if (res->owner == dead_node) 2241 if (res->owner == dead_node) {
2242 if (res->state & DLM_LOCK_RES_DROPPING_REF)
2243 mlog(0, "%s:%.*s: owned by "
2244 "dead node %u, this node was "
2245 "dropping its ref when it died. "
2246 "continue, dropping the flag.\n",
2247 dlm->name, res->lockname.len,
2248 res->lockname.name, dead_node);
2249
2250 /* the wake_up for this will happen when the
2251 * RECOVERING flag is dropped later */
2252 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
2253
2145 dlm_move_lockres_to_recovery_list(dlm, res); 2254 dlm_move_lockres_to_recovery_list(dlm, res);
2146 else if (res->owner == dlm->node_num) { 2255 } else if (res->owner == dlm->node_num) {
2147 dlm_free_dead_locks(dlm, res, dead_node); 2256 dlm_free_dead_locks(dlm, res, dead_node);
2148 __dlm_lockres_calc_usage(dlm, res); 2257 __dlm_lockres_calc_usage(dlm, res);
2149 } 2258 }
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index 0c822f3ffb05..620eb824ce1d 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -54,9 +54,6 @@
54#include "cluster/masklog.h" 54#include "cluster/masklog.h"
55 55
56static int dlm_thread(void *data); 56static int dlm_thread(void *data);
57static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
58 struct dlm_lock_resource *lockres);
59
60static void dlm_flush_asts(struct dlm_ctxt *dlm); 57static void dlm_flush_asts(struct dlm_ctxt *dlm);
61 58
62#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 59#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num)
@@ -82,14 +79,33 @@ repeat:
82 current->state = TASK_RUNNING; 79 current->state = TASK_RUNNING;
83} 80}
84 81
85 82int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
86int __dlm_lockres_unused(struct dlm_lock_resource *res)
87{ 83{
88 if (list_empty(&res->granted) && 84 if (list_empty(&res->granted) &&
89 list_empty(&res->converting) && 85 list_empty(&res->converting) &&
90 list_empty(&res->blocked) && 86 list_empty(&res->blocked))
91 list_empty(&res->dirty)) 87 return 0;
92 return 1; 88 return 1;
89}
90
91/* "unused": the lockres has no locks, is not on the dirty list,
92 * has no inflight locks (in the gap between mastery and acquiring
93 * the first lock), and has no bits in its refmap.
94 * truly ready to be freed. */
95int __dlm_lockres_unused(struct dlm_lock_resource *res)
96{
97 if (!__dlm_lockres_has_locks(res) &&
98 list_empty(&res->dirty)) {
99 /* try not to scan the bitmap unless the first two
100 * conditions are already true */
101 int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
102 if (bit >= O2NM_MAX_NODES) {
103 /* since the bit for dlm->node_num is not
104 * set, inflight_locks better be zero */
105 BUG_ON(res->inflight_locks != 0);
106 return 1;
107 }
108 }
93 return 0; 109 return 0;
94} 110}
95 111
@@ -106,46 +122,21 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
106 assert_spin_locked(&res->spinlock); 122 assert_spin_locked(&res->spinlock);
107 123
108 if (__dlm_lockres_unused(res)){ 124 if (__dlm_lockres_unused(res)){
109 /* For now, just keep any resource we master */
110 if (res->owner == dlm->node_num)
111 {
112 if (!list_empty(&res->purge)) {
113 mlog(0, "we master %s:%.*s, but it is on "
114 "the purge list. Removing\n",
115 dlm->name, res->lockname.len,
116 res->lockname.name);
117 list_del_init(&res->purge);
118 dlm->purge_count--;
119 }
120 return;
121 }
122
123 if (list_empty(&res->purge)) { 125 if (list_empty(&res->purge)) {
124 mlog(0, "putting lockres %.*s from purge list\n", 126 mlog(0, "putting lockres %.*s:%p onto purge list\n",
125 res->lockname.len, res->lockname.name); 127 res->lockname.len, res->lockname.name, res);
126 128
127 res->last_used = jiffies; 129 res->last_used = jiffies;
130 dlm_lockres_get(res);
128 list_add_tail(&res->purge, &dlm->purge_list); 131 list_add_tail(&res->purge, &dlm->purge_list);
129 dlm->purge_count++; 132 dlm->purge_count++;
130
131 /* if this node is not the owner, there is
132 * no way to keep track of who the owner could be.
133 * unhash it to avoid serious problems. */
134 if (res->owner != dlm->node_num) {
135 mlog(0, "%s:%.*s: doing immediate "
136 "purge of lockres owned by %u\n",
137 dlm->name, res->lockname.len,
138 res->lockname.name, res->owner);
139
140 dlm_purge_lockres_now(dlm, res);
141 }
142 } 133 }
143 } else if (!list_empty(&res->purge)) { 134 } else if (!list_empty(&res->purge)) {
144 mlog(0, "removing lockres %.*s from purge list, " 135 mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
145 "owner=%u\n", res->lockname.len, res->lockname.name, 136 res->lockname.len, res->lockname.name, res, res->owner);
146 res->owner);
147 137
148 list_del_init(&res->purge); 138 list_del_init(&res->purge);
139 dlm_lockres_put(res);
149 dlm->purge_count--; 140 dlm->purge_count--;
150 } 141 }
151} 142}
@@ -163,68 +154,60 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
163 spin_unlock(&dlm->spinlock); 154 spin_unlock(&dlm->spinlock);
164} 155}
165 156
166/* TODO: Eventual API: Called with the dlm spinlock held, may drop it 157int dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
167 * to do migration, but will re-acquire before exit. */
168void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
169{ 158{
170 int master; 159 int master;
171 int ret; 160 int ret = 0;
172
173 spin_lock(&lockres->spinlock);
174 master = lockres->owner == dlm->node_num;
175 spin_unlock(&lockres->spinlock);
176
177 mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
178 lockres->lockname.name, master);
179 161
180 /* Non master is the easy case -- no migration required, just 162 spin_lock(&res->spinlock);
181 * quit. */ 163 if (!__dlm_lockres_unused(res)) {
164 spin_unlock(&res->spinlock);
165 mlog(0, "%s:%.*s: tried to purge but not unused\n",
166 dlm->name, res->lockname.len, res->lockname.name);
167 return -ENOTEMPTY;
168 }
169 master = (res->owner == dlm->node_num);
182 if (!master) 170 if (!master)
183 goto finish; 171 res->state |= DLM_LOCK_RES_DROPPING_REF;
184 172 spin_unlock(&res->spinlock);
185 /* Wheee! Migrate lockres here! */
186 spin_unlock(&dlm->spinlock);
187again:
188 173
189 ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES); 174 mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
190 if (ret == -ENOTEMPTY) { 175 res->lockname.name, master);
191 mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
192 lockres->lockname.len, lockres->lockname.name);
193 176
194 BUG(); 177 if (!master) {
195 } else if (ret < 0) { 178 /* drop spinlock to do messaging, retake below */
196 mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n", 179 spin_unlock(&dlm->spinlock);
197 lockres->lockname.len, lockres->lockname.name); 180 /* clear our bit from the master's refmap, ignore errors */
198 msleep(100); 181 ret = dlm_drop_lockres_ref(dlm, res);
199 goto again; 182 if (ret < 0) {
183 mlog_errno(ret);
184 if (!dlm_is_host_down(ret))
185 BUG();
186 }
187 mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
188 dlm->name, res->lockname.len, res->lockname.name, ret);
189 spin_lock(&dlm->spinlock);
200 } 190 }
201 191
202 spin_lock(&dlm->spinlock); 192 if (!list_empty(&res->purge)) {
203 193 mlog(0, "removing lockres %.*s:%p from purgelist, "
204finish: 194 "master = %d\n", res->lockname.len, res->lockname.name,
205 if (!list_empty(&lockres->purge)) { 195 res, master);
206 list_del_init(&lockres->purge); 196 list_del_init(&res->purge);
197 dlm_lockres_put(res);
207 dlm->purge_count--; 198 dlm->purge_count--;
208 } 199 }
209 __dlm_unhash_lockres(lockres); 200 __dlm_unhash_lockres(res);
210}
211
212/* make an unused lockres go away immediately.
213 * as soon as the dlm spinlock is dropped, this lockres
214 * will not be found. kfree still happens on last put. */
215static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
216 struct dlm_lock_resource *lockres)
217{
218 assert_spin_locked(&dlm->spinlock);
219 assert_spin_locked(&lockres->spinlock);
220
221 BUG_ON(!__dlm_lockres_unused(lockres));
222 201
223 if (!list_empty(&lockres->purge)) { 202 /* lockres is not in the hash now. drop the flag and wake up
224 list_del_init(&lockres->purge); 203 * any processes waiting in dlm_get_lock_resource. */
225 dlm->purge_count--; 204 if (!master) {
205 spin_lock(&res->spinlock);
206 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
207 spin_unlock(&res->spinlock);
208 wake_up(&res->wq);
226 } 209 }
227 __dlm_unhash_lockres(lockres); 210 return 0;
228} 211}
229 212
230static void dlm_run_purge_list(struct dlm_ctxt *dlm, 213static void dlm_run_purge_list(struct dlm_ctxt *dlm,
@@ -268,13 +251,17 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
268 break; 251 break;
269 } 252 }
270 253
254 mlog(0, "removing lockres %.*s:%p from purgelist\n",
255 lockres->lockname.len, lockres->lockname.name, lockres);
271 list_del_init(&lockres->purge); 256 list_del_init(&lockres->purge);
257 dlm_lockres_put(lockres);
272 dlm->purge_count--; 258 dlm->purge_count--;
273 259
274 /* This may drop and reacquire the dlm spinlock if it 260 /* This may drop and reacquire the dlm spinlock if it
275 * has to do migration. */ 261 * has to do migration. */
276 mlog(0, "calling dlm_purge_lockres!\n"); 262 mlog(0, "calling dlm_purge_lockres!\n");
277 dlm_purge_lockres(dlm, lockres); 263 if (dlm_purge_lockres(dlm, lockres))
264 BUG();
278 mlog(0, "DONE calling dlm_purge_lockres!\n"); 265 mlog(0, "DONE calling dlm_purge_lockres!\n");
279 266
280 /* Avoid adding any scheduling latencies */ 267 /* Avoid adding any scheduling latencies */