aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm
diff options
context:
space:
mode:
authorJames Bottomley <jejb@mulgrave.il.steeleye.com>2006-06-28 14:06:39 -0400
committerJames Bottomley <jejb@mulgrave.il.steeleye.com>2006-06-28 14:06:39 -0400
commitf28e71617ddaf2483e3e5c5237103484a303743f (patch)
tree67627d2d8ddbf6a4449371e9261d796c013b1fa1 /fs/ocfs2/dlm
parentdc6a78f1af10d28fb8c395034ae1e099b85c05b0 (diff)
parenta39727f212426b9d5f9267b3318a2afaf9922d3b (diff)
Merge ../linux-2.6/
Conflicts: drivers/scsi/aacraid/comminit.c Fixed up by removing the now renamed CONFIG_IOMMU option from aacraid Signed-off-by: James Bottomley <James.Bottomley@SteelEye.com>
Diffstat (limited to 'fs/ocfs2/dlm')
-rw-r--r--fs/ocfs2/dlm/dlmast.c15
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h63
-rw-r--r--fs/ocfs2/dlm/dlmconvert.c33
-rw-r--r--fs/ocfs2/dlm/dlmdebug.c6
-rw-r--r--fs/ocfs2/dlm/dlmdebug.h30
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c103
-rw-r--r--fs/ocfs2/dlm/dlmfs.c6
-rw-r--r--fs/ocfs2/dlm/dlmlock.c73
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c448
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c593
-rw-r--r--fs/ocfs2/dlm/dlmthread.c74
-rw-r--r--fs/ocfs2/dlm/dlmunlock.c13
-rw-r--r--fs/ocfs2/dlm/userdlm.c2
13 files changed, 1060 insertions, 399 deletions
diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c
index 355593dd8ef8..42775e2bbe2c 100644
--- a/fs/ocfs2/dlm/dlmast.c
+++ b/fs/ocfs2/dlm/dlmast.c
@@ -197,12 +197,14 @@ static void dlm_update_lvb(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
197 lock->ml.node == dlm->node_num ? "master" : 197 lock->ml.node == dlm->node_num ? "master" :
198 "remote"); 198 "remote");
199 memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN); 199 memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN);
200 } else if (lksb->flags & DLM_LKSB_PUT_LVB) {
201 mlog(0, "setting lvb from lockres for %s node\n",
202 lock->ml.node == dlm->node_num ? "master" :
203 "remote");
204 memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
205 } 200 }
201 /* Do nothing for lvb put requests - they should be done in
202 * place when the lock is downconverted - otherwise we risk
203 * racing gets and puts which could result in old lvb data
204 * being propagated. We leave the put flag set and clear it
205 * here. In the future we might want to clear it at the time
206 * the put is actually done.
207 */
206 spin_unlock(&res->spinlock); 208 spin_unlock(&res->spinlock);
207 } 209 }
208 210
@@ -381,8 +383,7 @@ do_ast:
381 ret = DLM_NORMAL; 383 ret = DLM_NORMAL;
382 if (past->type == DLM_AST) { 384 if (past->type == DLM_AST) {
383 /* do not alter lock refcount. switching lists. */ 385 /* do not alter lock refcount. switching lists. */
384 list_del_init(&lock->list); 386 list_move_tail(&lock->list, &res->granted);
385 list_add_tail(&lock->list, &res->granted);
386 mlog(0, "ast: adding to granted list... type=%d, " 387 mlog(0, "ast: adding to granted list... type=%d, "
387 "convert_type=%d\n", lock->ml.type, lock->ml.convert_type); 388 "convert_type=%d\n", lock->ml.type, lock->ml.convert_type);
388 if (lock->ml.convert_type != LKM_IVMODE) { 389 if (lock->ml.convert_type != LKM_IVMODE) {
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 88cc43df18f1..9bdc9cf65991 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -37,7 +37,17 @@
37#define DLM_THREAD_SHUFFLE_INTERVAL 5 // flush everything every 5 passes 37#define DLM_THREAD_SHUFFLE_INTERVAL 5 // flush everything every 5 passes
38#define DLM_THREAD_MS 200 // flush at least every 200 ms 38#define DLM_THREAD_MS 200 // flush at least every 200 ms
39 39
40#define DLM_HASH_BUCKETS (PAGE_SIZE / sizeof(struct hlist_head)) 40#define DLM_HASH_SIZE_DEFAULT (1 << 14)
41#if DLM_HASH_SIZE_DEFAULT < PAGE_SIZE
42# define DLM_HASH_PAGES 1
43#else
44# define DLM_HASH_PAGES (DLM_HASH_SIZE_DEFAULT / PAGE_SIZE)
45#endif
46#define DLM_BUCKETS_PER_PAGE (PAGE_SIZE / sizeof(struct hlist_head))
47#define DLM_HASH_BUCKETS (DLM_HASH_PAGES * DLM_BUCKETS_PER_PAGE)
48
49/* Intended to make it easier for us to switch out hash functions */
50#define dlm_lockid_hash(_n, _l) full_name_hash(_n, _l)
41 51
42enum dlm_ast_type { 52enum dlm_ast_type {
43 DLM_AST = 0, 53 DLM_AST = 0,
@@ -61,7 +71,8 @@ static inline int dlm_is_recovery_lock(const char *lock_name, int name_len)
61 return 0; 71 return 0;
62} 72}
63 73
64#define DLM_RECO_STATE_ACTIVE 0x0001 74#define DLM_RECO_STATE_ACTIVE 0x0001
75#define DLM_RECO_STATE_FINALIZE 0x0002
65 76
66struct dlm_recovery_ctxt 77struct dlm_recovery_ctxt
67{ 78{
@@ -85,7 +96,7 @@ enum dlm_ctxt_state {
85struct dlm_ctxt 96struct dlm_ctxt
86{ 97{
87 struct list_head list; 98 struct list_head list;
88 struct hlist_head *lockres_hash; 99 struct hlist_head **lockres_hash;
89 struct list_head dirty_list; 100 struct list_head dirty_list;
90 struct list_head purge_list; 101 struct list_head purge_list;
91 struct list_head pending_asts; 102 struct list_head pending_asts;
@@ -120,6 +131,7 @@ struct dlm_ctxt
120 struct o2hb_callback_func dlm_hb_down; 131 struct o2hb_callback_func dlm_hb_down;
121 struct task_struct *dlm_thread_task; 132 struct task_struct *dlm_thread_task;
122 struct task_struct *dlm_reco_thread_task; 133 struct task_struct *dlm_reco_thread_task;
134 struct workqueue_struct *dlm_worker;
123 wait_queue_head_t dlm_thread_wq; 135 wait_queue_head_t dlm_thread_wq;
124 wait_queue_head_t dlm_reco_thread_wq; 136 wait_queue_head_t dlm_reco_thread_wq;
125 wait_queue_head_t ast_wq; 137 wait_queue_head_t ast_wq;
@@ -132,6 +144,11 @@ struct dlm_ctxt
132 struct list_head dlm_eviction_callbacks; 144 struct list_head dlm_eviction_callbacks;
133}; 145};
134 146
147static inline struct hlist_head *dlm_lockres_hash(struct dlm_ctxt *dlm, unsigned i)
148{
149 return dlm->lockres_hash[(i / DLM_BUCKETS_PER_PAGE) % DLM_HASH_PAGES] + (i % DLM_BUCKETS_PER_PAGE);
150}
151
135/* these keventd work queue items are for less-frequently 152/* these keventd work queue items are for less-frequently
136 * called functions that cannot be directly called from the 153 * called functions that cannot be directly called from the
137 * net message handlers for some reason, usually because 154 * net message handlers for some reason, usually because
@@ -216,20 +233,29 @@ struct dlm_lock_resource
216 /* WARNING: Please see the comment in dlm_init_lockres before 233 /* WARNING: Please see the comment in dlm_init_lockres before
217 * adding fields here. */ 234 * adding fields here. */
218 struct hlist_node hash_node; 235 struct hlist_node hash_node;
236 struct qstr lockname;
219 struct kref refs; 237 struct kref refs;
220 238
221 /* please keep these next 3 in this order 239 /*
222 * some funcs want to iterate over all lists */ 240 * Please keep granted, converting, and blocked in this order,
241 * as some funcs want to iterate over all lists.
242 *
243 * All four lists are protected by the hash's reference.
244 */
223 struct list_head granted; 245 struct list_head granted;
224 struct list_head converting; 246 struct list_head converting;
225 struct list_head blocked; 247 struct list_head blocked;
248 struct list_head purge;
226 249
250 /*
251 * These two lists require you to hold an additional reference
252 * while they are on the list.
253 */
227 struct list_head dirty; 254 struct list_head dirty;
228 struct list_head recovering; // dlm_recovery_ctxt.resources list 255 struct list_head recovering; // dlm_recovery_ctxt.resources list
229 256
230 /* unused lock resources have their last_used stamped and are 257 /* unused lock resources have their last_used stamped and are
231 * put on a list for the dlm thread to run. */ 258 * put on a list for the dlm thread to run. */
232 struct list_head purge;
233 unsigned long last_used; 259 unsigned long last_used;
234 260
235 unsigned migration_pending:1; 261 unsigned migration_pending:1;
@@ -238,7 +264,6 @@ struct dlm_lock_resource
238 wait_queue_head_t wq; 264 wait_queue_head_t wq;
239 u8 owner; //node which owns the lock resource, or unknown 265 u8 owner; //node which owns the lock resource, or unknown
240 u16 state; 266 u16 state;
241 struct qstr lockname;
242 char lvb[DLM_LVB_LEN]; 267 char lvb[DLM_LVB_LEN];
243}; 268};
244 269
@@ -300,6 +325,15 @@ enum dlm_lockres_list {
300 DLM_BLOCKED_LIST 325 DLM_BLOCKED_LIST
301}; 326};
302 327
328static inline int dlm_lvb_is_empty(char *lvb)
329{
330 int i;
331 for (i=0; i<DLM_LVB_LEN; i++)
332 if (lvb[i])
333 return 0;
334 return 1;
335}
336
303static inline struct list_head * 337static inline struct list_head *
304dlm_list_idx_to_ptr(struct dlm_lock_resource *res, enum dlm_lockres_list idx) 338dlm_list_idx_to_ptr(struct dlm_lock_resource *res, enum dlm_lockres_list idx)
305{ 339{
@@ -609,7 +643,8 @@ struct dlm_finalize_reco
609{ 643{
610 u8 node_idx; 644 u8 node_idx;
611 u8 dead_node; 645 u8 dead_node;
612 __be16 pad1; 646 u8 flags;
647 u8 pad1;
613 __be32 pad2; 648 __be32 pad2;
614}; 649};
615 650
@@ -676,6 +711,7 @@ void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
676void dlm_kick_recovery_thread(struct dlm_ctxt *dlm); 711void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
677int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node); 712int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
678int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout); 713int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
714int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout);
679 715
680void dlm_put(struct dlm_ctxt *dlm); 716void dlm_put(struct dlm_ctxt *dlm);
681struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm); 717struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm);
@@ -687,14 +723,20 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
687 struct dlm_lock_resource *res); 723 struct dlm_lock_resource *res);
688void dlm_purge_lockres(struct dlm_ctxt *dlm, 724void dlm_purge_lockres(struct dlm_ctxt *dlm,
689 struct dlm_lock_resource *lockres); 725 struct dlm_lock_resource *lockres);
690void dlm_lockres_get(struct dlm_lock_resource *res); 726static inline void dlm_lockres_get(struct dlm_lock_resource *res)
727{
728 /* This is called on every lookup, so it might be worth
729 * inlining. */
730 kref_get(&res->refs);
731}
691void dlm_lockres_put(struct dlm_lock_resource *res); 732void dlm_lockres_put(struct dlm_lock_resource *res);
692void __dlm_unhash_lockres(struct dlm_lock_resource *res); 733void __dlm_unhash_lockres(struct dlm_lock_resource *res);
693void __dlm_insert_lockres(struct dlm_ctxt *dlm, 734void __dlm_insert_lockres(struct dlm_ctxt *dlm,
694 struct dlm_lock_resource *res); 735 struct dlm_lock_resource *res);
695struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 736struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
696 const char *name, 737 const char *name,
697 unsigned int len); 738 unsigned int len,
739 unsigned int hash);
698struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, 740struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
699 const char *name, 741 const char *name,
700 unsigned int len); 742 unsigned int len);
@@ -819,6 +861,7 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm,
819 u8 dead_node); 861 u8 dead_node);
820int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock); 862int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
821 863
864int __dlm_lockres_unused(struct dlm_lock_resource *res);
822 865
823static inline const char * dlm_lock_mode_name(int mode) 866static inline const char * dlm_lock_mode_name(int mode)
824{ 867{
diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c
index 8285228d9e37..c764dc8e40a2 100644
--- a/fs/ocfs2/dlm/dlmconvert.c
+++ b/fs/ocfs2/dlm/dlmconvert.c
@@ -214,6 +214,9 @@ grant:
214 if (lock->ml.node == dlm->node_num) 214 if (lock->ml.node == dlm->node_num)
215 mlog(0, "doing in-place convert for nonlocal lock\n"); 215 mlog(0, "doing in-place convert for nonlocal lock\n");
216 lock->ml.type = type; 216 lock->ml.type = type;
217 if (lock->lksb->flags & DLM_LKSB_PUT_LVB)
218 memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN);
219
217 status = DLM_NORMAL; 220 status = DLM_NORMAL;
218 *call_ast = 1; 221 *call_ast = 1;
219 goto unlock_exit; 222 goto unlock_exit;
@@ -231,8 +234,7 @@ switch_queues:
231 234
232 lock->ml.convert_type = type; 235 lock->ml.convert_type = type;
233 /* do not alter lock refcount. switching lists. */ 236 /* do not alter lock refcount. switching lists. */
234 list_del_init(&lock->list); 237 list_move_tail(&lock->list, &res->converting);
235 list_add_tail(&lock->list, &res->converting);
236 238
237unlock_exit: 239unlock_exit:
238 spin_unlock(&lock->spinlock); 240 spin_unlock(&lock->spinlock);
@@ -248,8 +250,7 @@ void dlm_revert_pending_convert(struct dlm_lock_resource *res,
248 struct dlm_lock *lock) 250 struct dlm_lock *lock)
249{ 251{
250 /* do not alter lock refcount. switching lists. */ 252 /* do not alter lock refcount. switching lists. */
251 list_del_init(&lock->list); 253 list_move_tail(&lock->list, &res->granted);
252 list_add_tail(&lock->list, &res->granted);
253 lock->ml.convert_type = LKM_IVMODE; 254 lock->ml.convert_type = LKM_IVMODE;
254 lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB); 255 lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
255} 256}
@@ -294,8 +295,7 @@ enum dlm_status dlmconvert_remote(struct dlm_ctxt *dlm,
294 res->state |= DLM_LOCK_RES_IN_PROGRESS; 295 res->state |= DLM_LOCK_RES_IN_PROGRESS;
295 /* move lock to local convert queue */ 296 /* move lock to local convert queue */
296 /* do not alter lock refcount. switching lists. */ 297 /* do not alter lock refcount. switching lists. */
297 list_del_init(&lock->list); 298 list_move_tail(&lock->list, &res->converting);
298 list_add_tail(&lock->list, &res->converting);
299 lock->convert_pending = 1; 299 lock->convert_pending = 1;
300 lock->ml.convert_type = type; 300 lock->ml.convert_type = type;
301 301
@@ -464,6 +464,12 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
464 } 464 }
465 465
466 spin_lock(&res->spinlock); 466 spin_lock(&res->spinlock);
467 status = __dlm_lockres_state_to_status(res);
468 if (status != DLM_NORMAL) {
469 spin_unlock(&res->spinlock);
470 dlm_error(status);
471 goto leave;
472 }
467 list_for_each(iter, &res->granted) { 473 list_for_each(iter, &res->granted) {
468 lock = list_entry(iter, struct dlm_lock, list); 474 lock = list_entry(iter, struct dlm_lock, list);
469 if (lock->ml.cookie == cnv->cookie && 475 if (lock->ml.cookie == cnv->cookie &&
@@ -473,6 +479,21 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
473 } 479 }
474 lock = NULL; 480 lock = NULL;
475 } 481 }
482 if (!lock) {
483 __dlm_print_one_lock_resource(res);
484 list_for_each(iter, &res->granted) {
485 lock = list_entry(iter, struct dlm_lock, list);
486 if (lock->ml.node == cnv->node_idx) {
487 mlog(ML_ERROR, "There is something here "
488 "for node %u, lock->ml.cookie=%llu, "
489 "cnv->cookie=%llu\n", cnv->node_idx,
490 (unsigned long long)lock->ml.cookie,
491 (unsigned long long)cnv->cookie);
492 break;
493 }
494 }
495 lock = NULL;
496 }
476 spin_unlock(&res->spinlock); 497 spin_unlock(&res->spinlock);
477 if (!lock) { 498 if (!lock) {
478 status = DLM_IVLOCKID; 499 status = DLM_IVLOCKID;
diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c
index c7eae5d3324e..3f6c8d88f7af 100644
--- a/fs/ocfs2/dlm/dlmdebug.c
+++ b/fs/ocfs2/dlm/dlmdebug.c
@@ -37,10 +37,8 @@
37 37
38#include "dlmapi.h" 38#include "dlmapi.h"
39#include "dlmcommon.h" 39#include "dlmcommon.h"
40#include "dlmdebug.h"
41 40
42#include "dlmdomain.h" 41#include "dlmdomain.h"
43#include "dlmdebug.h"
44 42
45#define MLOG_MASK_PREFIX ML_DLM 43#define MLOG_MASK_PREFIX ML_DLM
46#include "cluster/masklog.h" 44#include "cluster/masklog.h"
@@ -120,6 +118,7 @@ void dlm_print_one_lock(struct dlm_lock *lockid)
120} 118}
121EXPORT_SYMBOL_GPL(dlm_print_one_lock); 119EXPORT_SYMBOL_GPL(dlm_print_one_lock);
122 120
121#if 0
123void dlm_dump_lock_resources(struct dlm_ctxt *dlm) 122void dlm_dump_lock_resources(struct dlm_ctxt *dlm)
124{ 123{
125 struct dlm_lock_resource *res; 124 struct dlm_lock_resource *res;
@@ -136,12 +135,13 @@ void dlm_dump_lock_resources(struct dlm_ctxt *dlm)
136 135
137 spin_lock(&dlm->spinlock); 136 spin_lock(&dlm->spinlock);
138 for (i=0; i<DLM_HASH_BUCKETS; i++) { 137 for (i=0; i<DLM_HASH_BUCKETS; i++) {
139 bucket = &(dlm->lockres_hash[i]); 138 bucket = dlm_lockres_hash(dlm, i);
140 hlist_for_each_entry(res, iter, bucket, hash_node) 139 hlist_for_each_entry(res, iter, bucket, hash_node)
141 dlm_print_one_lock_resource(res); 140 dlm_print_one_lock_resource(res);
142 } 141 }
143 spin_unlock(&dlm->spinlock); 142 spin_unlock(&dlm->spinlock);
144} 143}
144#endif /* 0 */
145 145
146static const char *dlm_errnames[] = { 146static const char *dlm_errnames[] = {
147 [DLM_NORMAL] = "DLM_NORMAL", 147 [DLM_NORMAL] = "DLM_NORMAL",
diff --git a/fs/ocfs2/dlm/dlmdebug.h b/fs/ocfs2/dlm/dlmdebug.h
deleted file mode 100644
index 6858510c3ccd..000000000000
--- a/fs/ocfs2/dlm/dlmdebug.h
+++ /dev/null
@@ -1,30 +0,0 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmdebug.h
5 *
6 * Copyright (C) 2004 Oracle. All rights reserved.
7 *
8 * This program is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public
19 * License along with this program; if not, write to the
20 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21 * Boston, MA 021110-1307, USA.
22 *
23 */
24
25#ifndef DLMDEBUG_H
26#define DLMDEBUG_H
27
28void dlm_dump_lock_resources(struct dlm_ctxt *dlm);
29
30#endif
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 8f3a9e3106fd..b8c23f7ba67e 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -41,7 +41,6 @@
41#include "dlmapi.h" 41#include "dlmapi.h"
42#include "dlmcommon.h" 42#include "dlmcommon.h"
43 43
44#include "dlmdebug.h"
45#include "dlmdomain.h" 44#include "dlmdomain.h"
46 45
47#include "dlmver.h" 46#include "dlmver.h"
@@ -49,6 +48,33 @@
49#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) 48#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
50#include "cluster/masklog.h" 49#include "cluster/masklog.h"
51 50
51static void dlm_free_pagevec(void **vec, int pages)
52{
53 while (pages--)
54 free_page((unsigned long)vec[pages]);
55 kfree(vec);
56}
57
58static void **dlm_alloc_pagevec(int pages)
59{
60 void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
61 int i;
62
63 if (!vec)
64 return NULL;
65
66 for (i = 0; i < pages; i++)
67 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
68 goto out_free;
69
70 mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
71 pages, DLM_HASH_PAGES, (unsigned long)DLM_BUCKETS_PER_PAGE);
72 return vec;
73out_free:
74 dlm_free_pagevec(vec, i);
75 return NULL;
76}
77
52/* 78/*
53 * 79 *
54 * spinlock lock ordering: if multiple locks are needed, obey this ordering: 80 * spinlock lock ordering: if multiple locks are needed, obey this ordering:
@@ -62,7 +88,7 @@
62 * 88 *
63 */ 89 */
64 90
65spinlock_t dlm_domain_lock = SPIN_LOCK_UNLOCKED; 91DEFINE_SPINLOCK(dlm_domain_lock);
66LIST_HEAD(dlm_domains); 92LIST_HEAD(dlm_domains);
67static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events); 93static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
68 94
@@ -90,8 +116,7 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
90 assert_spin_locked(&dlm->spinlock); 116 assert_spin_locked(&dlm->spinlock);
91 117
92 q = &res->lockname; 118 q = &res->lockname;
93 q->hash = full_name_hash(q->name, q->len); 119 bucket = dlm_lockres_hash(dlm, q->hash);
94 bucket = &(dlm->lockres_hash[q->hash % DLM_HASH_BUCKETS]);
95 120
96 /* get a reference for our hashtable */ 121 /* get a reference for our hashtable */
97 dlm_lockres_get(res); 122 dlm_lockres_get(res);
@@ -100,34 +125,32 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
100} 125}
101 126
102struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 127struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
103 const char *name, 128 const char *name,
104 unsigned int len) 129 unsigned int len,
130 unsigned int hash)
105{ 131{
106 unsigned int hash;
107 struct hlist_node *iter;
108 struct dlm_lock_resource *tmpres=NULL;
109 struct hlist_head *bucket; 132 struct hlist_head *bucket;
133 struct hlist_node *list;
110 134
111 mlog_entry("%.*s\n", len, name); 135 mlog_entry("%.*s\n", len, name);
112 136
113 assert_spin_locked(&dlm->spinlock); 137 assert_spin_locked(&dlm->spinlock);
114 138
115 hash = full_name_hash(name, len); 139 bucket = dlm_lockres_hash(dlm, hash);
116
117 bucket = &(dlm->lockres_hash[hash % DLM_HASH_BUCKETS]);
118
119 /* check for pre-existing lock */
120 hlist_for_each(iter, bucket) {
121 tmpres = hlist_entry(iter, struct dlm_lock_resource, hash_node);
122 if (tmpres->lockname.len == len &&
123 memcmp(tmpres->lockname.name, name, len) == 0) {
124 dlm_lockres_get(tmpres);
125 break;
126 }
127 140
128 tmpres = NULL; 141 hlist_for_each(list, bucket) {
142 struct dlm_lock_resource *res = hlist_entry(list,
143 struct dlm_lock_resource, hash_node);
144 if (res->lockname.name[0] != name[0])
145 continue;
146 if (unlikely(res->lockname.len != len))
147 continue;
148 if (memcmp(res->lockname.name + 1, name + 1, len - 1))
149 continue;
150 dlm_lockres_get(res);
151 return res;
129 } 152 }
130 return tmpres; 153 return NULL;
131} 154}
132 155
133struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, 156struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
@@ -135,9 +158,10 @@ struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
135 unsigned int len) 158 unsigned int len)
136{ 159{
137 struct dlm_lock_resource *res; 160 struct dlm_lock_resource *res;
161 unsigned int hash = dlm_lockid_hash(name, len);
138 162
139 spin_lock(&dlm->spinlock); 163 spin_lock(&dlm->spinlock);
140 res = __dlm_lookup_lockres(dlm, name, len); 164 res = __dlm_lookup_lockres(dlm, name, len, hash);
141 spin_unlock(&dlm->spinlock); 165 spin_unlock(&dlm->spinlock);
142 return res; 166 return res;
143} 167}
@@ -194,7 +218,7 @@ static int dlm_wait_on_domain_helper(const char *domain)
194static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) 218static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
195{ 219{
196 if (dlm->lockres_hash) 220 if (dlm->lockres_hash)
197 free_page((unsigned long) dlm->lockres_hash); 221 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
198 222
199 if (dlm->name) 223 if (dlm->name)
200 kfree(dlm->name); 224 kfree(dlm->name);
@@ -278,11 +302,21 @@ int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
278 return ret; 302 return ret;
279} 303}
280 304
305static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
306{
307 if (dlm->dlm_worker) {
308 flush_workqueue(dlm->dlm_worker);
309 destroy_workqueue(dlm->dlm_worker);
310 dlm->dlm_worker = NULL;
311 }
312}
313
281static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm) 314static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
282{ 315{
283 dlm_unregister_domain_handlers(dlm); 316 dlm_unregister_domain_handlers(dlm);
284 dlm_complete_thread(dlm); 317 dlm_complete_thread(dlm);
285 dlm_complete_recovery_thread(dlm); 318 dlm_complete_recovery_thread(dlm);
319 dlm_destroy_dlm_worker(dlm);
286 320
287 /* We've left the domain. Now we can take ourselves out of the 321 /* We've left the domain. Now we can take ourselves out of the
288 * list and allow the kref stuff to help us free the 322 * list and allow the kref stuff to help us free the
@@ -304,8 +338,8 @@ static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
304restart: 338restart:
305 spin_lock(&dlm->spinlock); 339 spin_lock(&dlm->spinlock);
306 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 340 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
307 while (!hlist_empty(&dlm->lockres_hash[i])) { 341 while (!hlist_empty(dlm_lockres_hash(dlm, i))) {
308 res = hlist_entry(dlm->lockres_hash[i].first, 342 res = hlist_entry(dlm_lockres_hash(dlm, i)->first,
309 struct dlm_lock_resource, hash_node); 343 struct dlm_lock_resource, hash_node);
310 /* need reference when manually grabbing lockres */ 344 /* need reference when manually grabbing lockres */
311 dlm_lockres_get(res); 345 dlm_lockres_get(res);
@@ -1126,6 +1160,13 @@ static int dlm_join_domain(struct dlm_ctxt *dlm)
1126 goto bail; 1160 goto bail;
1127 } 1161 }
1128 1162
1163 dlm->dlm_worker = create_singlethread_workqueue("dlm_wq");
1164 if (!dlm->dlm_worker) {
1165 status = -ENOMEM;
1166 mlog_errno(status);
1167 goto bail;
1168 }
1169
1129 do { 1170 do {
1130 unsigned int backoff; 1171 unsigned int backoff;
1131 status = dlm_try_to_join_domain(dlm); 1172 status = dlm_try_to_join_domain(dlm);
@@ -1166,6 +1207,7 @@ bail:
1166 dlm_unregister_domain_handlers(dlm); 1207 dlm_unregister_domain_handlers(dlm);
1167 dlm_complete_thread(dlm); 1208 dlm_complete_thread(dlm);
1168 dlm_complete_recovery_thread(dlm); 1209 dlm_complete_recovery_thread(dlm);
1210 dlm_destroy_dlm_worker(dlm);
1169 } 1211 }
1170 1212
1171 return status; 1213 return status;
@@ -1191,7 +1233,7 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1191 goto leave; 1233 goto leave;
1192 } 1234 }
1193 1235
1194 dlm->lockres_hash = (struct hlist_head *) __get_free_page(GFP_KERNEL); 1236 dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1195 if (!dlm->lockres_hash) { 1237 if (!dlm->lockres_hash) {
1196 mlog_errno(-ENOMEM); 1238 mlog_errno(-ENOMEM);
1197 kfree(dlm->name); 1239 kfree(dlm->name);
@@ -1200,8 +1242,8 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1200 goto leave; 1242 goto leave;
1201 } 1243 }
1202 1244
1203 for (i=0; i<DLM_HASH_BUCKETS; i++) 1245 for (i = 0; i < DLM_HASH_BUCKETS; i++)
1204 INIT_HLIST_HEAD(&dlm->lockres_hash[i]); 1246 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
1205 1247
1206 strcpy(dlm->name, domain); 1248 strcpy(dlm->name, domain);
1207 dlm->key = key; 1249 dlm->key = key;
@@ -1231,6 +1273,7 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1231 1273
1232 dlm->dlm_thread_task = NULL; 1274 dlm->dlm_thread_task = NULL;
1233 dlm->dlm_reco_thread_task = NULL; 1275 dlm->dlm_reco_thread_task = NULL;
1276 dlm->dlm_worker = NULL;
1234 init_waitqueue_head(&dlm->dlm_thread_wq); 1277 init_waitqueue_head(&dlm->dlm_thread_wq);
1235 init_waitqueue_head(&dlm->dlm_reco_thread_wq); 1278 init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1236 init_waitqueue_head(&dlm->reco.event); 1279 init_waitqueue_head(&dlm->reco.event);
diff --git a/fs/ocfs2/dlm/dlmfs.c b/fs/ocfs2/dlm/dlmfs.c
index 7273d9fa6bab..033ad1701232 100644
--- a/fs/ocfs2/dlm/dlmfs.c
+++ b/fs/ocfs2/dlm/dlmfs.c
@@ -116,7 +116,7 @@ static int dlmfs_file_open(struct inode *inode,
116 * doesn't make sense for LVB writes. */ 116 * doesn't make sense for LVB writes. */
117 file->f_flags &= ~O_APPEND; 117 file->f_flags &= ~O_APPEND;
118 118
119 fp = kmalloc(sizeof(*fp), GFP_KERNEL); 119 fp = kmalloc(sizeof(*fp), GFP_NOFS);
120 if (!fp) { 120 if (!fp) {
121 status = -ENOMEM; 121 status = -ENOMEM;
122 goto bail; 122 goto bail;
@@ -196,7 +196,7 @@ static ssize_t dlmfs_file_read(struct file *filp,
196 else 196 else
197 readlen = count - *ppos; 197 readlen = count - *ppos;
198 198
199 lvb_buf = kmalloc(readlen, GFP_KERNEL); 199 lvb_buf = kmalloc(readlen, GFP_NOFS);
200 if (!lvb_buf) 200 if (!lvb_buf)
201 return -ENOMEM; 201 return -ENOMEM;
202 202
@@ -240,7 +240,7 @@ static ssize_t dlmfs_file_write(struct file *filp,
240 else 240 else
241 writelen = count - *ppos; 241 writelen = count - *ppos;
242 242
243 lvb_buf = kmalloc(writelen, GFP_KERNEL); 243 lvb_buf = kmalloc(writelen, GFP_NOFS);
244 if (!lvb_buf) 244 if (!lvb_buf)
245 return -ENOMEM; 245 return -ENOMEM;
246 246
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index 6fea28318d6d..5ca57ec650c7 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -53,7 +53,7 @@
53#define MLOG_MASK_PREFIX ML_DLM 53#define MLOG_MASK_PREFIX ML_DLM
54#include "cluster/masklog.h" 54#include "cluster/masklog.h"
55 55
56static spinlock_t dlm_cookie_lock = SPIN_LOCK_UNLOCKED; 56static DEFINE_SPINLOCK(dlm_cookie_lock);
57static u64 dlm_next_cookie = 1; 57static u64 dlm_next_cookie = 1;
58 58
59static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, 59static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
@@ -201,6 +201,7 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
201 struct dlm_lock *lock, int flags) 201 struct dlm_lock *lock, int flags)
202{ 202{
203 enum dlm_status status = DLM_DENIED; 203 enum dlm_status status = DLM_DENIED;
204 int lockres_changed = 1;
204 205
205 mlog_entry("type=%d\n", lock->ml.type); 206 mlog_entry("type=%d\n", lock->ml.type);
206 mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len, 207 mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len,
@@ -226,8 +227,25 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
226 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 227 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
227 lock->lock_pending = 0; 228 lock->lock_pending = 0;
228 if (status != DLM_NORMAL) { 229 if (status != DLM_NORMAL) {
229 if (status != DLM_NOTQUEUED) 230 if (status == DLM_RECOVERING &&
231 dlm_is_recovery_lock(res->lockname.name,
232 res->lockname.len)) {
233 /* recovery lock was mastered by dead node.
234 * we need to have calc_usage shoot down this
235 * lockres and completely remaster it. */
236 mlog(0, "%s: recovery lock was owned by "
237 "dead node %u, remaster it now.\n",
238 dlm->name, res->owner);
239 } else if (status != DLM_NOTQUEUED) {
240 /*
241 * DO NOT call calc_usage, as this would unhash
242 * the remote lockres before we ever get to use
243 * it. treat as if we never made any change to
244 * the lockres.
245 */
246 lockres_changed = 0;
230 dlm_error(status); 247 dlm_error(status);
248 }
231 dlm_revert_pending_lock(res, lock); 249 dlm_revert_pending_lock(res, lock);
232 dlm_lock_put(lock); 250 dlm_lock_put(lock);
233 } else if (dlm_is_recovery_lock(res->lockname.name, 251 } else if (dlm_is_recovery_lock(res->lockname.name,
@@ -239,12 +257,12 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
239 mlog(0, "%s: $RECOVERY lock for this node (%u) is " 257 mlog(0, "%s: $RECOVERY lock for this node (%u) is "
240 "mastered by %u; got lock, manually granting (no ast)\n", 258 "mastered by %u; got lock, manually granting (no ast)\n",
241 dlm->name, dlm->node_num, res->owner); 259 dlm->name, dlm->node_num, res->owner);
242 list_del_init(&lock->list); 260 list_move_tail(&lock->list, &res->granted);
243 list_add_tail(&lock->list, &res->granted);
244 } 261 }
245 spin_unlock(&res->spinlock); 262 spin_unlock(&res->spinlock);
246 263
247 dlm_lockres_calc_usage(dlm, res); 264 if (lockres_changed)
265 dlm_lockres_calc_usage(dlm, res);
248 266
249 wake_up(&res->wq); 267 wake_up(&res->wq);
250 return status; 268 return status;
@@ -281,6 +299,14 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
281 if (tmpret >= 0) { 299 if (tmpret >= 0) {
282 // successfully sent and received 300 // successfully sent and received
283 ret = status; // this is already a dlm_status 301 ret = status; // this is already a dlm_status
302 if (ret == DLM_REJECTED) {
303 mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres "
304 "no longer owned by %u. that node is coming back "
305 "up currently.\n", dlm->name, create.namelen,
306 create.name, res->owner);
307 dlm_print_one_lock_resource(res);
308 BUG();
309 }
284 } else { 310 } else {
285 mlog_errno(tmpret); 311 mlog_errno(tmpret);
286 if (dlm_is_host_down(tmpret)) { 312 if (dlm_is_host_down(tmpret)) {
@@ -382,13 +408,13 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
382 struct dlm_lock *lock; 408 struct dlm_lock *lock;
383 int kernel_allocated = 0; 409 int kernel_allocated = 0;
384 410
385 lock = kcalloc(1, sizeof(*lock), GFP_KERNEL); 411 lock = kcalloc(1, sizeof(*lock), GFP_NOFS);
386 if (!lock) 412 if (!lock)
387 return NULL; 413 return NULL;
388 414
389 if (!lksb) { 415 if (!lksb) {
390 /* zero memory only if kernel-allocated */ 416 /* zero memory only if kernel-allocated */
391 lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL); 417 lksb = kcalloc(1, sizeof(*lksb), GFP_NOFS);
392 if (!lksb) { 418 if (!lksb) {
393 kfree(lock); 419 kfree(lock);
394 return NULL; 420 return NULL;
@@ -429,11 +455,16 @@ int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data)
429 if (!dlm_grab(dlm)) 455 if (!dlm_grab(dlm))
430 return DLM_REJECTED; 456 return DLM_REJECTED;
431 457
432 mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
433 "Domain %s not fully joined!\n", dlm->name);
434
435 name = create->name; 458 name = create->name;
436 namelen = create->namelen; 459 namelen = create->namelen;
460 status = DLM_REJECTED;
461 if (!dlm_domain_fully_joined(dlm)) {
462 mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
463 "sending a create_lock message for lock %.*s!\n",
464 dlm->name, create->node_idx, namelen, name);
465 dlm_error(status);
466 goto leave;
467 }
437 468
438 status = DLM_IVBUFLEN; 469 status = DLM_IVBUFLEN;
439 if (namelen > DLM_LOCKID_NAME_MAX) { 470 if (namelen > DLM_LOCKID_NAME_MAX) {
@@ -669,18 +700,22 @@ retry_lock:
669 msleep(100); 700 msleep(100);
670 /* no waiting for dlm_reco_thread */ 701 /* no waiting for dlm_reco_thread */
671 if (recovery) { 702 if (recovery) {
672 if (status == DLM_RECOVERING) { 703 if (status != DLM_RECOVERING)
673 mlog(0, "%s: got RECOVERING " 704 goto retry_lock;
674 "for $REOCVERY lock, master " 705
675 "was %u\n", dlm->name, 706 mlog(0, "%s: got RECOVERING "
676 res->owner); 707 "for $RECOVERY lock, master "
677 dlm_wait_for_node_death(dlm, res->owner, 708 "was %u\n", dlm->name,
678 DLM_NODE_DEATH_WAIT_MAX); 709 res->owner);
679 } 710 /* wait to see the node go down, then
711 * drop down and allow the lockres to
712 * get cleaned up. need to remaster. */
713 dlm_wait_for_node_death(dlm, res->owner,
714 DLM_NODE_DEATH_WAIT_MAX);
680 } else { 715 } else {
681 dlm_wait_for_recovery(dlm); 716 dlm_wait_for_recovery(dlm);
717 goto retry_lock;
682 } 718 }
683 goto retry_lock;
684 } 719 }
685 720
686 if (status != DLM_NORMAL) { 721 if (status != DLM_NORMAL) {
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 940be4c13b1f..1b8346dd0572 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -47,7 +47,6 @@
47 47
48#include "dlmapi.h" 48#include "dlmapi.h"
49#include "dlmcommon.h" 49#include "dlmcommon.h"
50#include "dlmdebug.h"
51#include "dlmdomain.h" 50#include "dlmdomain.h"
52 51
53#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER) 52#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
@@ -74,6 +73,7 @@ struct dlm_master_list_entry
74 wait_queue_head_t wq; 73 wait_queue_head_t wq;
75 atomic_t woken; 74 atomic_t woken;
76 struct kref mle_refs; 75 struct kref mle_refs;
76 int inuse;
77 unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 77 unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
78 unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 78 unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
79 unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 79 unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
@@ -127,18 +127,30 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
127 return 1; 127 return 1;
128} 128}
129 129
130#if 0 130#define dlm_print_nodemap(m) _dlm_print_nodemap(m,#m)
131/* Code here is included but defined out as it aids debugging */ 131static void _dlm_print_nodemap(unsigned long *map, const char *mapname)
132{
133 int i;
134 printk("%s=[ ", mapname);
135 for (i=0; i<O2NM_MAX_NODES; i++)
136 if (test_bit(i, map))
137 printk("%d ", i);
138 printk("]");
139}
132 140
133void dlm_print_one_mle(struct dlm_master_list_entry *mle) 141static void dlm_print_one_mle(struct dlm_master_list_entry *mle)
134{ 142{
135 int i = 0, refs; 143 int refs;
136 char *type; 144 char *type;
137 char attached; 145 char attached;
138 u8 master; 146 u8 master;
139 unsigned int namelen; 147 unsigned int namelen;
140 const char *name; 148 const char *name;
141 struct kref *k; 149 struct kref *k;
150 unsigned long *maybe = mle->maybe_map,
151 *vote = mle->vote_map,
152 *resp = mle->response_map,
153 *node = mle->node_map;
142 154
143 k = &mle->mle_refs; 155 k = &mle->mle_refs;
144 if (mle->type == DLM_MLE_BLOCK) 156 if (mle->type == DLM_MLE_BLOCK)
@@ -159,18 +171,29 @@ void dlm_print_one_mle(struct dlm_master_list_entry *mle)
159 name = mle->u.res->lockname.name; 171 name = mle->u.res->lockname.name;
160 } 172 }
161 173
162 mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n", 174 mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
163 i, type, refs, master, mle->new_master, attached, 175 namelen, name, type, refs, master, mle->new_master, attached,
164 namelen, namelen, name); 176 mle->inuse);
177 dlm_print_nodemap(maybe);
178 printk(", ");
179 dlm_print_nodemap(vote);
180 printk(", ");
181 dlm_print_nodemap(resp);
182 printk(", ");
183 dlm_print_nodemap(node);
184 printk(", ");
185 printk("\n");
165} 186}
166 187
188#if 0
189/* Code here is included but defined out as it aids debugging */
190
167static void dlm_dump_mles(struct dlm_ctxt *dlm) 191static void dlm_dump_mles(struct dlm_ctxt *dlm)
168{ 192{
169 struct dlm_master_list_entry *mle; 193 struct dlm_master_list_entry *mle;
170 struct list_head *iter; 194 struct list_head *iter;
171 195
172 mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name); 196 mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
173 mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
174 spin_lock(&dlm->master_lock); 197 spin_lock(&dlm->master_lock);
175 list_for_each(iter, &dlm->master_list) { 198 list_for_each(iter, &dlm->master_list) {
176 mle = list_entry(iter, struct dlm_master_list_entry, list); 199 mle = list_entry(iter, struct dlm_master_list_entry, list);
@@ -314,6 +337,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
314 spin_unlock(&dlm->spinlock); 337 spin_unlock(&dlm->spinlock);
315} 338}
316 339
340static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
341{
342 struct dlm_ctxt *dlm;
343 dlm = mle->dlm;
344
345 assert_spin_locked(&dlm->spinlock);
346 assert_spin_locked(&dlm->master_lock);
347 mle->inuse++;
348 kref_get(&mle->mle_refs);
349}
350
351static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
352{
353 struct dlm_ctxt *dlm;
354 dlm = mle->dlm;
355
356 spin_lock(&dlm->spinlock);
357 spin_lock(&dlm->master_lock);
358 mle->inuse--;
359 __dlm_put_mle(mle);
360 spin_unlock(&dlm->master_lock);
361 spin_unlock(&dlm->spinlock);
362
363}
364
317/* remove from list and free */ 365/* remove from list and free */
318static void __dlm_put_mle(struct dlm_master_list_entry *mle) 366static void __dlm_put_mle(struct dlm_master_list_entry *mle)
319{ 367{
@@ -322,9 +370,14 @@ static void __dlm_put_mle(struct dlm_master_list_entry *mle)
322 370
323 assert_spin_locked(&dlm->spinlock); 371 assert_spin_locked(&dlm->spinlock);
324 assert_spin_locked(&dlm->master_lock); 372 assert_spin_locked(&dlm->master_lock);
325 BUG_ON(!atomic_read(&mle->mle_refs.refcount)); 373 if (!atomic_read(&mle->mle_refs.refcount)) {
326 374 /* this may or may not crash, but who cares.
327 kref_put(&mle->mle_refs, dlm_mle_release); 375 * it's a BUG. */
376 mlog(ML_ERROR, "bad mle: %p\n", mle);
377 dlm_print_one_mle(mle);
378 BUG();
379 } else
380 kref_put(&mle->mle_refs, dlm_mle_release);
328} 381}
329 382
330 383
@@ -367,6 +420,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle,
367 memset(mle->response_map, 0, sizeof(mle->response_map)); 420 memset(mle->response_map, 0, sizeof(mle->response_map));
368 mle->master = O2NM_MAX_NODES; 421 mle->master = O2NM_MAX_NODES;
369 mle->new_master = O2NM_MAX_NODES; 422 mle->new_master = O2NM_MAX_NODES;
423 mle->inuse = 0;
370 424
371 if (mle->type == DLM_MLE_MASTER) { 425 if (mle->type == DLM_MLE_MASTER) {
372 BUG_ON(!res); 426 BUG_ON(!res);
@@ -564,6 +618,28 @@ static void dlm_lockres_release(struct kref *kref)
564 mlog(0, "destroying lockres %.*s\n", res->lockname.len, 618 mlog(0, "destroying lockres %.*s\n", res->lockname.len,
565 res->lockname.name); 619 res->lockname.name);
566 620
621 if (!hlist_unhashed(&res->hash_node) ||
622 !list_empty(&res->granted) ||
623 !list_empty(&res->converting) ||
624 !list_empty(&res->blocked) ||
625 !list_empty(&res->dirty) ||
626 !list_empty(&res->recovering) ||
627 !list_empty(&res->purge)) {
628 mlog(ML_ERROR,
629 "Going to BUG for resource %.*s."
630 " We're on a list! [%c%c%c%c%c%c%c]\n",
631 res->lockname.len, res->lockname.name,
632 !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
633 !list_empty(&res->granted) ? 'G' : ' ',
634 !list_empty(&res->converting) ? 'C' : ' ',
635 !list_empty(&res->blocked) ? 'B' : ' ',
636 !list_empty(&res->dirty) ? 'D' : ' ',
637 !list_empty(&res->recovering) ? 'R' : ' ',
638 !list_empty(&res->purge) ? 'P' : ' ');
639
640 dlm_print_one_lock_resource(res);
641 }
642
567 /* By the time we're ready to blow this guy away, we shouldn't 643 /* By the time we're ready to blow this guy away, we shouldn't
568 * be on any lists. */ 644 * be on any lists. */
569 BUG_ON(!hlist_unhashed(&res->hash_node)); 645 BUG_ON(!hlist_unhashed(&res->hash_node));
@@ -579,11 +655,6 @@ static void dlm_lockres_release(struct kref *kref)
579 kfree(res); 655 kfree(res);
580} 656}
581 657
582void dlm_lockres_get(struct dlm_lock_resource *res)
583{
584 kref_get(&res->refs);
585}
586
587void dlm_lockres_put(struct dlm_lock_resource *res) 658void dlm_lockres_put(struct dlm_lock_resource *res)
588{ 659{
589 kref_put(&res->refs, dlm_lockres_release); 660 kref_put(&res->refs, dlm_lockres_release);
@@ -603,7 +674,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
603 memcpy(qname, name, namelen); 674 memcpy(qname, name, namelen);
604 675
605 res->lockname.len = namelen; 676 res->lockname.len = namelen;
606 res->lockname.hash = full_name_hash(name, namelen); 677 res->lockname.hash = dlm_lockid_hash(name, namelen);
607 678
608 init_waitqueue_head(&res->wq); 679 init_waitqueue_head(&res->wq);
609 spin_lock_init(&res->spinlock); 680 spin_lock_init(&res->spinlock);
@@ -637,11 +708,11 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
637{ 708{
638 struct dlm_lock_resource *res; 709 struct dlm_lock_resource *res;
639 710
640 res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 711 res = kmalloc(sizeof(struct dlm_lock_resource), GFP_NOFS);
641 if (!res) 712 if (!res)
642 return NULL; 713 return NULL;
643 714
644 res->lockname.name = kmalloc(namelen, GFP_KERNEL); 715 res->lockname.name = kmalloc(namelen, GFP_NOFS);
645 if (!res->lockname.name) { 716 if (!res->lockname.name) {
646 kfree(res); 717 kfree(res);
647 return NULL; 718 return NULL;
@@ -677,19 +748,20 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
677 int blocked = 0; 748 int blocked = 0;
678 int ret, nodenum; 749 int ret, nodenum;
679 struct dlm_node_iter iter; 750 struct dlm_node_iter iter;
680 unsigned int namelen; 751 unsigned int namelen, hash;
681 int tries = 0; 752 int tries = 0;
682 int bit, wait_on_recovery = 0; 753 int bit, wait_on_recovery = 0;
683 754
684 BUG_ON(!lockid); 755 BUG_ON(!lockid);
685 756
686 namelen = strlen(lockid); 757 namelen = strlen(lockid);
758 hash = dlm_lockid_hash(lockid, namelen);
687 759
688 mlog(0, "get lockres %s (len %d)\n", lockid, namelen); 760 mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
689 761
690lookup: 762lookup:
691 spin_lock(&dlm->spinlock); 763 spin_lock(&dlm->spinlock);
692 tmpres = __dlm_lookup_lockres(dlm, lockid, namelen); 764 tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
693 if (tmpres) { 765 if (tmpres) {
694 spin_unlock(&dlm->spinlock); 766 spin_unlock(&dlm->spinlock);
695 mlog(0, "found in hash!\n"); 767 mlog(0, "found in hash!\n");
@@ -704,7 +776,7 @@ lookup:
704 mlog(0, "allocating a new resource\n"); 776 mlog(0, "allocating a new resource\n");
705 /* nothing found and we need to allocate one. */ 777 /* nothing found and we need to allocate one. */
706 alloc_mle = (struct dlm_master_list_entry *) 778 alloc_mle = (struct dlm_master_list_entry *)
707 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); 779 kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
708 if (!alloc_mle) 780 if (!alloc_mle)
709 goto leave; 781 goto leave;
710 res = dlm_new_lockres(dlm, lockid, namelen); 782 res = dlm_new_lockres(dlm, lockid, namelen);
@@ -790,10 +862,11 @@ lookup:
790 * if so, the creator of the BLOCK may try to put the last 862 * if so, the creator of the BLOCK may try to put the last
791 * ref at this time in the assert master handler, so we 863 * ref at this time in the assert master handler, so we
792 * need an extra one to keep from a bad ptr deref. */ 864 * need an extra one to keep from a bad ptr deref. */
793 dlm_get_mle(mle); 865 dlm_get_mle_inuse(mle);
794 spin_unlock(&dlm->master_lock); 866 spin_unlock(&dlm->master_lock);
795 spin_unlock(&dlm->spinlock); 867 spin_unlock(&dlm->spinlock);
796 868
869redo_request:
797 while (wait_on_recovery) { 870 while (wait_on_recovery) {
798 /* any cluster changes that occurred after dropping the 871 /* any cluster changes that occurred after dropping the
799 * dlm spinlock would be detectable be a change on the mle, 872 * dlm spinlock would be detectable be a change on the mle,
@@ -812,7 +885,7 @@ lookup:
812 } 885 }
813 886
814 dlm_kick_recovery_thread(dlm); 887 dlm_kick_recovery_thread(dlm);
815 msleep(100); 888 msleep(1000);
816 dlm_wait_for_recovery(dlm); 889 dlm_wait_for_recovery(dlm);
817 890
818 spin_lock(&dlm->spinlock); 891 spin_lock(&dlm->spinlock);
@@ -825,13 +898,15 @@ lookup:
825 } else 898 } else
826 wait_on_recovery = 0; 899 wait_on_recovery = 0;
827 spin_unlock(&dlm->spinlock); 900 spin_unlock(&dlm->spinlock);
901
902 if (wait_on_recovery)
903 dlm_wait_for_node_recovery(dlm, bit, 10000);
828 } 904 }
829 905
830 /* must wait for lock to be mastered elsewhere */ 906 /* must wait for lock to be mastered elsewhere */
831 if (blocked) 907 if (blocked)
832 goto wait; 908 goto wait;
833 909
834redo_request:
835 ret = -EINVAL; 910 ret = -EINVAL;
836 dlm_node_iter_init(mle->vote_map, &iter); 911 dlm_node_iter_init(mle->vote_map, &iter);
837 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 912 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
@@ -856,6 +931,7 @@ wait:
856 /* keep going until the response map includes all nodes */ 931 /* keep going until the response map includes all nodes */
857 ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); 932 ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
858 if (ret < 0) { 933 if (ret < 0) {
934 wait_on_recovery = 1;
859 mlog(0, "%s:%.*s: node map changed, redo the " 935 mlog(0, "%s:%.*s: node map changed, redo the "
860 "master request now, blocked=%d\n", 936 "master request now, blocked=%d\n",
861 dlm->name, res->lockname.len, 937 dlm->name, res->lockname.len,
@@ -866,7 +942,7 @@ wait:
866 dlm->name, res->lockname.len, 942 dlm->name, res->lockname.len,
867 res->lockname.name, blocked); 943 res->lockname.name, blocked);
868 dlm_print_one_lock_resource(res); 944 dlm_print_one_lock_resource(res);
869 /* dlm_print_one_mle(mle); */ 945 dlm_print_one_mle(mle);
870 tries = 0; 946 tries = 0;
871 } 947 }
872 goto redo_request; 948 goto redo_request;
@@ -880,7 +956,7 @@ wait:
880 dlm_mle_detach_hb_events(dlm, mle); 956 dlm_mle_detach_hb_events(dlm, mle);
881 dlm_put_mle(mle); 957 dlm_put_mle(mle);
882 /* put the extra ref */ 958 /* put the extra ref */
883 dlm_put_mle(mle); 959 dlm_put_mle_inuse(mle);
884 960
885wake_waiters: 961wake_waiters:
886 spin_lock(&res->spinlock); 962 spin_lock(&res->spinlock);
@@ -921,12 +997,14 @@ recheck:
921 spin_unlock(&res->spinlock); 997 spin_unlock(&res->spinlock);
922 /* this will cause the master to re-assert across 998 /* this will cause the master to re-assert across
923 * the whole cluster, freeing up mles */ 999 * the whole cluster, freeing up mles */
924 ret = dlm_do_master_request(mle, res->owner); 1000 if (res->owner != dlm->node_num) {
925 if (ret < 0) { 1001 ret = dlm_do_master_request(mle, res->owner);
926 /* give recovery a chance to run */ 1002 if (ret < 0) {
927 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); 1003 /* give recovery a chance to run */
928 msleep(500); 1004 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
929 goto recheck; 1005 msleep(500);
1006 goto recheck;
1007 }
930 } 1008 }
931 ret = 0; 1009 ret = 0;
932 goto leave; 1010 goto leave;
@@ -962,6 +1040,12 @@ recheck:
962 "rechecking now\n", dlm->name, res->lockname.len, 1040 "rechecking now\n", dlm->name, res->lockname.len,
963 res->lockname.name); 1041 res->lockname.name);
964 goto recheck; 1042 goto recheck;
1043 } else {
1044 if (!voting_done) {
1045 mlog(0, "map not changed and voting not done "
1046 "for %s:%.*s\n", dlm->name, res->lockname.len,
1047 res->lockname.name);
1048 }
965 } 1049 }
966 1050
967 if (m != O2NM_MAX_NODES) { 1051 if (m != O2NM_MAX_NODES) {
@@ -1129,18 +1213,6 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1129 set_bit(node, mle->vote_map); 1213 set_bit(node, mle->vote_map);
1130 } else { 1214 } else {
1131 mlog(ML_ERROR, "node down! %d\n", node); 1215 mlog(ML_ERROR, "node down! %d\n", node);
1132
1133 /* if the node wasn't involved in mastery skip it,
1134 * but clear it out from the maps so that it will
1135 * not affect mastery of this lockres */
1136 clear_bit(node, mle->response_map);
1137 clear_bit(node, mle->vote_map);
1138 if (!test_bit(node, mle->maybe_map))
1139 goto next;
1140
1141 /* if we're already blocked on lock mastery, and the
1142 * dead node wasn't the expected master, or there is
1143 * another node in the maybe_map, keep waiting */
1144 if (blocked) { 1216 if (blocked) {
1145 int lowest = find_next_bit(mle->maybe_map, 1217 int lowest = find_next_bit(mle->maybe_map,
1146 O2NM_MAX_NODES, 0); 1218 O2NM_MAX_NODES, 0);
@@ -1148,54 +1220,53 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1148 /* act like it was never there */ 1220 /* act like it was never there */
1149 clear_bit(node, mle->maybe_map); 1221 clear_bit(node, mle->maybe_map);
1150 1222
1151 if (node != lowest) 1223 if (node == lowest) {
1152 goto next; 1224 mlog(0, "expected master %u died"
1153 1225 " while this node was blocked "
1154 mlog(ML_ERROR, "expected master %u died while " 1226 "waiting on it!\n", node);
1155 "this node was blocked waiting on it!\n", 1227 lowest = find_next_bit(mle->maybe_map,
1156 node); 1228 O2NM_MAX_NODES,
1157 lowest = find_next_bit(mle->maybe_map, 1229 lowest+1);
1158 O2NM_MAX_NODES, 1230 if (lowest < O2NM_MAX_NODES) {
1159 lowest+1); 1231 mlog(0, "%s:%.*s:still "
1160 if (lowest < O2NM_MAX_NODES) { 1232 "blocked. waiting on %u "
1161 mlog(0, "still blocked. waiting " 1233 "now\n", dlm->name,
1162 "on %u now\n", lowest); 1234 res->lockname.len,
1163 goto next; 1235 res->lockname.name,
1236 lowest);
1237 } else {
1238 /* mle is an MLE_BLOCK, but
1239 * there is now nothing left to
1240 * block on. we need to return
1241 * all the way back out and try
1242 * again with an MLE_MASTER.
1243 * dlm_do_local_recovery_cleanup
1244 * has already run, so the mle
1245 * refcount is ok */
1246 mlog(0, "%s:%.*s: no "
1247 "longer blocking. try to "
1248 "master this here\n",
1249 dlm->name,
1250 res->lockname.len,
1251 res->lockname.name);
1252 mle->type = DLM_MLE_MASTER;
1253 mle->u.res = res;
1254 }
1164 } 1255 }
1165
1166 /* mle is an MLE_BLOCK, but there is now
1167 * nothing left to block on. we need to return
1168 * all the way back out and try again with
1169 * an MLE_MASTER. dlm_do_local_recovery_cleanup
1170 * has already run, so the mle refcount is ok */
1171 mlog(0, "no longer blocking. we can "
1172 "try to master this here\n");
1173 mle->type = DLM_MLE_MASTER;
1174 memset(mle->maybe_map, 0,
1175 sizeof(mle->maybe_map));
1176 memset(mle->response_map, 0,
1177 sizeof(mle->maybe_map));
1178 memcpy(mle->vote_map, mle->node_map,
1179 sizeof(mle->node_map));
1180 mle->u.res = res;
1181 set_bit(dlm->node_num, mle->maybe_map);
1182
1183 ret = -EAGAIN;
1184 goto next;
1185 } 1256 }
1186 1257
1187 clear_bit(node, mle->maybe_map); 1258 /* now blank out everything, as if we had never
1188 if (node > dlm->node_num) 1259 * contacted anyone */
1189 goto next; 1260 memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1190 1261 memset(mle->response_map, 0, sizeof(mle->response_map));
1191 mlog(0, "dead node in map!\n"); 1262 /* reset the vote_map to the current node_map */
1192 /* yuck. go back and re-contact all nodes 1263 memcpy(mle->vote_map, mle->node_map,
1193 * in the vote_map, removing this node. */ 1264 sizeof(mle->node_map));
1194 memset(mle->response_map, 0, 1265 /* put myself into the maybe map */
1195 sizeof(mle->response_map)); 1266 if (mle->type != DLM_MLE_BLOCK)
1267 set_bit(dlm->node_num, mle->maybe_map);
1196 } 1268 }
1197 ret = -EAGAIN; 1269 ret = -EAGAIN;
1198next:
1199 node = dlm_bitmap_diff_iter_next(&bdi, &sc); 1270 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1200 } 1271 }
1201 return ret; 1272 return ret;
@@ -1316,7 +1387,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1316 struct dlm_master_request *request = (struct dlm_master_request *) msg->buf; 1387 struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1317 struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL; 1388 struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1318 char *name; 1389 char *name;
1319 unsigned int namelen; 1390 unsigned int namelen, hash;
1320 int found, ret; 1391 int found, ret;
1321 int set_maybe; 1392 int set_maybe;
1322 int dispatch_assert = 0; 1393 int dispatch_assert = 0;
@@ -1331,6 +1402,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1331 1402
1332 name = request->name; 1403 name = request->name;
1333 namelen = request->namelen; 1404 namelen = request->namelen;
1405 hash = dlm_lockid_hash(name, namelen);
1334 1406
1335 if (namelen > DLM_LOCKID_NAME_MAX) { 1407 if (namelen > DLM_LOCKID_NAME_MAX) {
1336 response = DLM_IVBUFLEN; 1408 response = DLM_IVBUFLEN;
@@ -1339,7 +1411,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1339 1411
1340way_up_top: 1412way_up_top:
1341 spin_lock(&dlm->spinlock); 1413 spin_lock(&dlm->spinlock);
1342 res = __dlm_lookup_lockres(dlm, name, namelen); 1414 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1343 if (res) { 1415 if (res) {
1344 spin_unlock(&dlm->spinlock); 1416 spin_unlock(&dlm->spinlock);
1345 1417
@@ -1459,21 +1531,18 @@ way_up_top:
1459 spin_unlock(&dlm->spinlock); 1531 spin_unlock(&dlm->spinlock);
1460 1532
1461 mle = (struct dlm_master_list_entry *) 1533 mle = (struct dlm_master_list_entry *)
1462 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); 1534 kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1463 if (!mle) { 1535 if (!mle) {
1464 response = DLM_MASTER_RESP_ERROR; 1536 response = DLM_MASTER_RESP_ERROR;
1465 mlog_errno(-ENOMEM); 1537 mlog_errno(-ENOMEM);
1466 goto send_response; 1538 goto send_response;
1467 } 1539 }
1468 spin_lock(&dlm->spinlock);
1469 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
1470 name, namelen);
1471 spin_unlock(&dlm->spinlock);
1472 goto way_up_top; 1540 goto way_up_top;
1473 } 1541 }
1474 1542
1475 // mlog(0, "this is second time thru, already allocated, " 1543 // mlog(0, "this is second time thru, already allocated, "
1476 // "add the block.\n"); 1544 // "add the block.\n");
1545 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1477 set_bit(request->node_idx, mle->maybe_map); 1546 set_bit(request->node_idx, mle->maybe_map);
1478 list_add(&mle->list, &dlm->master_list); 1547 list_add(&mle->list, &dlm->master_list);
1479 response = DLM_MASTER_RESP_NO; 1548 response = DLM_MASTER_RESP_NO;
@@ -1556,6 +1625,8 @@ again:
1556 dlm_node_iter_init(nodemap, &iter); 1625 dlm_node_iter_init(nodemap, &iter);
1557 while ((to = dlm_node_iter_next(&iter)) >= 0) { 1626 while ((to = dlm_node_iter_next(&iter)) >= 0) {
1558 int r = 0; 1627 int r = 0;
1628 struct dlm_master_list_entry *mle = NULL;
1629
1559 mlog(0, "sending assert master to %d (%.*s)\n", to, 1630 mlog(0, "sending assert master to %d (%.*s)\n", to,
1560 namelen, lockname); 1631 namelen, lockname);
1561 memset(&assert, 0, sizeof(assert)); 1632 memset(&assert, 0, sizeof(assert));
@@ -1567,20 +1638,28 @@ again:
1567 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key, 1638 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1568 &assert, sizeof(assert), to, &r); 1639 &assert, sizeof(assert), to, &r);
1569 if (tmpret < 0) { 1640 if (tmpret < 0) {
1570 mlog(ML_ERROR, "assert_master returned %d!\n", tmpret); 1641 mlog(0, "assert_master returned %d!\n", tmpret);
1571 if (!dlm_is_host_down(tmpret)) { 1642 if (!dlm_is_host_down(tmpret)) {
1572 mlog(ML_ERROR, "unhandled error!\n"); 1643 mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1573 BUG(); 1644 BUG();
1574 } 1645 }
1575 /* a node died. finish out the rest of the nodes. */ 1646 /* a node died. finish out the rest of the nodes. */
1576 mlog(ML_ERROR, "link to %d went down!\n", to); 1647 mlog(0, "link to %d went down!\n", to);
1577 /* any nonzero status return will do */ 1648 /* any nonzero status return will do */
1578 ret = tmpret; 1649 ret = tmpret;
1579 } else if (r < 0) { 1650 } else if (r < 0) {
1580 /* ok, something horribly messed. kill thyself. */ 1651 /* ok, something horribly messed. kill thyself. */
1581 mlog(ML_ERROR,"during assert master of %.*s to %u, " 1652 mlog(ML_ERROR,"during assert master of %.*s to %u, "
1582 "got %d.\n", namelen, lockname, to, r); 1653 "got %d.\n", namelen, lockname, to, r);
1583 dlm_dump_lock_resources(dlm); 1654 spin_lock(&dlm->spinlock);
1655 spin_lock(&dlm->master_lock);
1656 if (dlm_find_mle(dlm, &mle, (char *)lockname,
1657 namelen)) {
1658 dlm_print_one_mle(mle);
1659 __dlm_put_mle(mle);
1660 }
1661 spin_unlock(&dlm->master_lock);
1662 spin_unlock(&dlm->spinlock);
1584 BUG(); 1663 BUG();
1585 } else if (r == EAGAIN) { 1664 } else if (r == EAGAIN) {
1586 mlog(0, "%.*s: node %u create mles on other " 1665 mlog(0, "%.*s: node %u create mles on other "
@@ -1612,7 +1691,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1612 struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf; 1691 struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1613 struct dlm_lock_resource *res = NULL; 1692 struct dlm_lock_resource *res = NULL;
1614 char *name; 1693 char *name;
1615 unsigned int namelen; 1694 unsigned int namelen, hash;
1616 u32 flags; 1695 u32 flags;
1617 int master_request = 0; 1696 int master_request = 0;
1618 int ret = 0; 1697 int ret = 0;
@@ -1622,6 +1701,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1622 1701
1623 name = assert->name; 1702 name = assert->name;
1624 namelen = assert->namelen; 1703 namelen = assert->namelen;
1704 hash = dlm_lockid_hash(name, namelen);
1625 flags = be32_to_cpu(assert->flags); 1705 flags = be32_to_cpu(assert->flags);
1626 1706
1627 if (namelen > DLM_LOCKID_NAME_MAX) { 1707 if (namelen > DLM_LOCKID_NAME_MAX) {
@@ -1646,7 +1726,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1646 if (bit >= O2NM_MAX_NODES) { 1726 if (bit >= O2NM_MAX_NODES) {
1647 /* not necessarily an error, though less likely. 1727 /* not necessarily an error, though less likely.
1648 * could be master just re-asserting. */ 1728 * could be master just re-asserting. */
1649 mlog(ML_ERROR, "no bits set in the maybe_map, but %u " 1729 mlog(0, "no bits set in the maybe_map, but %u "
1650 "is asserting! (%.*s)\n", assert->node_idx, 1730 "is asserting! (%.*s)\n", assert->node_idx,
1651 namelen, name); 1731 namelen, name);
1652 } else if (bit != assert->node_idx) { 1732 } else if (bit != assert->node_idx) {
@@ -1658,19 +1738,36 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1658 * number winning the mastery will respond 1738 * number winning the mastery will respond
1659 * YES to mastery requests, but this node 1739 * YES to mastery requests, but this node
1660 * had no way of knowing. let it pass. */ 1740 * had no way of knowing. let it pass. */
1661 mlog(ML_ERROR, "%u is the lowest node, " 1741 mlog(0, "%u is the lowest node, "
1662 "%u is asserting. (%.*s) %u must " 1742 "%u is asserting. (%.*s) %u must "
1663 "have begun after %u won.\n", bit, 1743 "have begun after %u won.\n", bit,
1664 assert->node_idx, namelen, name, bit, 1744 assert->node_idx, namelen, name, bit,
1665 assert->node_idx); 1745 assert->node_idx);
1666 } 1746 }
1667 } 1747 }
1748 if (mle->type == DLM_MLE_MIGRATION) {
1749 if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1750 mlog(0, "%s:%.*s: got cleanup assert"
1751 " from %u for migration\n",
1752 dlm->name, namelen, name,
1753 assert->node_idx);
1754 } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1755 mlog(0, "%s:%.*s: got unrelated assert"
1756 " from %u for migration, ignoring\n",
1757 dlm->name, namelen, name,
1758 assert->node_idx);
1759 __dlm_put_mle(mle);
1760 spin_unlock(&dlm->master_lock);
1761 spin_unlock(&dlm->spinlock);
1762 goto done;
1763 }
1764 }
1668 } 1765 }
1669 spin_unlock(&dlm->master_lock); 1766 spin_unlock(&dlm->master_lock);
1670 1767
1671 /* ok everything checks out with the MLE 1768 /* ok everything checks out with the MLE
1672 * now check to see if there is a lockres */ 1769 * now check to see if there is a lockres */
1673 res = __dlm_lookup_lockres(dlm, name, namelen); 1770 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1674 if (res) { 1771 if (res) {
1675 spin_lock(&res->spinlock); 1772 spin_lock(&res->spinlock);
1676 if (res->state & DLM_LOCK_RES_RECOVERING) { 1773 if (res->state & DLM_LOCK_RES_RECOVERING) {
@@ -1679,7 +1776,8 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1679 goto kill; 1776 goto kill;
1680 } 1777 }
1681 if (!mle) { 1778 if (!mle) {
1682 if (res->owner != assert->node_idx) { 1779 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1780 res->owner != assert->node_idx) {
1683 mlog(ML_ERROR, "assert_master from " 1781 mlog(ML_ERROR, "assert_master from "
1684 "%u, but current owner is " 1782 "%u, but current owner is "
1685 "%u! (%.*s)\n", 1783 "%u! (%.*s)\n",
@@ -1732,6 +1830,7 @@ ok:
1732 if (mle) { 1830 if (mle) {
1733 int extra_ref = 0; 1831 int extra_ref = 0;
1734 int nn = -1; 1832 int nn = -1;
1833 int rr, err = 0;
1735 1834
1736 spin_lock(&mle->spinlock); 1835 spin_lock(&mle->spinlock);
1737 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) 1836 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
@@ -1751,27 +1850,64 @@ ok:
1751 wake_up(&mle->wq); 1850 wake_up(&mle->wq);
1752 spin_unlock(&mle->spinlock); 1851 spin_unlock(&mle->spinlock);
1753 1852
1754 if (mle->type == DLM_MLE_MIGRATION && res) { 1853 if (res) {
1755 mlog(0, "finishing off migration of lockres %.*s, "
1756 "from %u to %u\n",
1757 res->lockname.len, res->lockname.name,
1758 dlm->node_num, mle->new_master);
1759 spin_lock(&res->spinlock); 1854 spin_lock(&res->spinlock);
1760 res->state &= ~DLM_LOCK_RES_MIGRATING; 1855 if (mle->type == DLM_MLE_MIGRATION) {
1761 dlm_change_lockres_owner(dlm, res, mle->new_master); 1856 mlog(0, "finishing off migration of lockres %.*s, "
1762 BUG_ON(res->state & DLM_LOCK_RES_DIRTY); 1857 "from %u to %u\n",
1858 res->lockname.len, res->lockname.name,
1859 dlm->node_num, mle->new_master);
1860 res->state &= ~DLM_LOCK_RES_MIGRATING;
1861 dlm_change_lockres_owner(dlm, res, mle->new_master);
1862 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1863 } else {
1864 dlm_change_lockres_owner(dlm, res, mle->master);
1865 }
1763 spin_unlock(&res->spinlock); 1866 spin_unlock(&res->spinlock);
1764 } 1867 }
1765 /* master is known, detach if not already detached */ 1868
1766 dlm_mle_detach_hb_events(dlm, mle); 1869 /* master is known, detach if not already detached.
1767 dlm_put_mle(mle); 1870 * ensures that only one assert_master call will happen
1768 1871 * on this mle. */
1872 spin_lock(&dlm->spinlock);
1873 spin_lock(&dlm->master_lock);
1874
1875 rr = atomic_read(&mle->mle_refs.refcount);
1876 if (mle->inuse > 0) {
1877 if (extra_ref && rr < 3)
1878 err = 1;
1879 else if (!extra_ref && rr < 2)
1880 err = 1;
1881 } else {
1882 if (extra_ref && rr < 2)
1883 err = 1;
1884 else if (!extra_ref && rr < 1)
1885 err = 1;
1886 }
1887 if (err) {
1888 mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1889 "that will mess up this node, refs=%d, extra=%d, "
1890 "inuse=%d\n", dlm->name, namelen, name,
1891 assert->node_idx, rr, extra_ref, mle->inuse);
1892 dlm_print_one_mle(mle);
1893 }
1894 list_del_init(&mle->list);
1895 __dlm_mle_detach_hb_events(dlm, mle);
1896 __dlm_put_mle(mle);
1769 if (extra_ref) { 1897 if (extra_ref) {
1770 /* the assert master message now balances the extra 1898 /* the assert master message now balances the extra
1771 * ref given by the master / migration request message. 1899 * ref given by the master / migration request message.
1772 * if this is the last put, it will be removed 1900 * if this is the last put, it will be removed
1773 * from the list. */ 1901 * from the list. */
1774 dlm_put_mle(mle); 1902 __dlm_put_mle(mle);
1903 }
1904 spin_unlock(&dlm->master_lock);
1905 spin_unlock(&dlm->spinlock);
1906 } else if (res) {
1907 if (res->owner != assert->node_idx) {
1908 mlog(0, "assert_master from %u, but current "
1909 "owner is %u (%.*s), no mle\n", assert->node_idx,
1910 res->owner, namelen, name);
1775 } 1911 }
1776 } 1912 }
1777 1913
@@ -1788,12 +1924,12 @@ done:
1788 1924
1789kill: 1925kill:
1790 /* kill the caller! */ 1926 /* kill the caller! */
1927 mlog(ML_ERROR, "Bad message received from another node. Dumping state "
1928 "and killing the other node now! This node is OK and can continue.\n");
1929 __dlm_print_one_lock_resource(res);
1791 spin_unlock(&res->spinlock); 1930 spin_unlock(&res->spinlock);
1792 spin_unlock(&dlm->spinlock); 1931 spin_unlock(&dlm->spinlock);
1793 dlm_lockres_put(res); 1932 dlm_lockres_put(res);
1794 mlog(ML_ERROR, "Bad message received from another node. Dumping state "
1795 "and killing the other node now! This node is OK and can continue.\n");
1796 dlm_dump_lock_resources(dlm);
1797 dlm_put(dlm); 1933 dlm_put(dlm);
1798 return -EINVAL; 1934 return -EINVAL;
1799} 1935}
@@ -1803,7 +1939,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1803 int ignore_higher, u8 request_from, u32 flags) 1939 int ignore_higher, u8 request_from, u32 flags)
1804{ 1940{
1805 struct dlm_work_item *item; 1941 struct dlm_work_item *item;
1806 item = kcalloc(1, sizeof(*item), GFP_KERNEL); 1942 item = kcalloc(1, sizeof(*item), GFP_NOFS);
1807 if (!item) 1943 if (!item)
1808 return -ENOMEM; 1944 return -ENOMEM;
1809 1945
@@ -1825,7 +1961,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1825 list_add_tail(&item->list, &dlm->work_list); 1961 list_add_tail(&item->list, &dlm->work_list);
1826 spin_unlock(&dlm->work_lock); 1962 spin_unlock(&dlm->work_lock);
1827 1963
1828 schedule_work(&dlm->dispatched_work); 1964 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
1829 return 0; 1965 return 0;
1830} 1966}
1831 1967
@@ -1866,6 +2002,23 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1866 } 2002 }
1867 } 2003 }
1868 2004
2005 /*
2006 * If we're migrating this lock to someone else, we are no
2007 * longer allowed to assert out own mastery. OTOH, we need to
2008 * prevent migration from starting while we're still asserting
2009 * our dominance. The reserved ast delays migration.
2010 */
2011 spin_lock(&res->spinlock);
2012 if (res->state & DLM_LOCK_RES_MIGRATING) {
2013 mlog(0, "Someone asked us to assert mastery, but we're "
2014 "in the middle of migration. Skipping assert, "
2015 "the new master will handle that.\n");
2016 spin_unlock(&res->spinlock);
2017 goto put;
2018 } else
2019 __dlm_lockres_reserve_ast(res);
2020 spin_unlock(&res->spinlock);
2021
1869 /* this call now finishes out the nodemap 2022 /* this call now finishes out the nodemap
1870 * even if one or more nodes die */ 2023 * even if one or more nodes die */
1871 mlog(0, "worker about to master %.*s here, this=%u\n", 2024 mlog(0, "worker about to master %.*s here, this=%u\n",
@@ -1875,9 +2028,14 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1875 nodemap, flags); 2028 nodemap, flags);
1876 if (ret < 0) { 2029 if (ret < 0) {
1877 /* no need to restart, we are done */ 2030 /* no need to restart, we are done */
1878 mlog_errno(ret); 2031 if (!dlm_is_host_down(ret))
2032 mlog_errno(ret);
1879 } 2033 }
1880 2034
2035 /* Ok, we've asserted ourselves. Let's let migration start. */
2036 dlm_lockres_release_ast(dlm, res);
2037
2038put:
1881 dlm_lockres_put(res); 2039 dlm_lockres_put(res);
1882 2040
1883 mlog(0, "finished with dlm_assert_master_worker\n"); 2041 mlog(0, "finished with dlm_assert_master_worker\n");
@@ -1916,6 +2074,7 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
1916 BUG(); 2074 BUG();
1917 /* host is down, so answer for that node would be 2075 /* host is down, so answer for that node would be
1918 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */ 2076 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
2077 ret = 0;
1919 } 2078 }
1920 2079
1921 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) { 2080 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
@@ -2016,14 +2175,14 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2016 */ 2175 */
2017 2176
2018 ret = -ENOMEM; 2177 ret = -ENOMEM;
2019 mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL); 2178 mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2020 if (!mres) { 2179 if (!mres) {
2021 mlog_errno(ret); 2180 mlog_errno(ret);
2022 goto leave; 2181 goto leave;
2023 } 2182 }
2024 2183
2025 mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, 2184 mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2026 GFP_KERNEL); 2185 GFP_NOFS);
2027 if (!mle) { 2186 if (!mle) {
2028 mlog_errno(ret); 2187 mlog_errno(ret);
2029 goto leave; 2188 goto leave;
@@ -2117,7 +2276,7 @@ fail:
2117 * take both dlm->spinlock and dlm->master_lock */ 2276 * take both dlm->spinlock and dlm->master_lock */
2118 spin_lock(&dlm->spinlock); 2277 spin_lock(&dlm->spinlock);
2119 spin_lock(&dlm->master_lock); 2278 spin_lock(&dlm->master_lock);
2120 dlm_get_mle(mle); 2279 dlm_get_mle_inuse(mle);
2121 spin_unlock(&dlm->master_lock); 2280 spin_unlock(&dlm->master_lock);
2122 spin_unlock(&dlm->spinlock); 2281 spin_unlock(&dlm->spinlock);
2123 2282
@@ -2134,7 +2293,10 @@ fail:
2134 /* migration failed, detach and clean up mle */ 2293 /* migration failed, detach and clean up mle */
2135 dlm_mle_detach_hb_events(dlm, mle); 2294 dlm_mle_detach_hb_events(dlm, mle);
2136 dlm_put_mle(mle); 2295 dlm_put_mle(mle);
2137 dlm_put_mle(mle); 2296 dlm_put_mle_inuse(mle);
2297 spin_lock(&res->spinlock);
2298 res->state &= ~DLM_LOCK_RES_MIGRATING;
2299 spin_unlock(&res->spinlock);
2138 goto leave; 2300 goto leave;
2139 } 2301 }
2140 2302
@@ -2164,8 +2326,8 @@ fail:
2164 /* avoid hang during shutdown when migrating lockres 2326 /* avoid hang during shutdown when migrating lockres
2165 * to a node which also goes down */ 2327 * to a node which also goes down */
2166 if (dlm_is_node_dead(dlm, target)) { 2328 if (dlm_is_node_dead(dlm, target)) {
2167 mlog(0, "%s:%.*s: expected migration target %u " 2329 mlog(0, "%s:%.*s: expected migration "
2168 "is no longer up. restarting.\n", 2330 "target %u is no longer up, restarting\n",
2169 dlm->name, res->lockname.len, 2331 dlm->name, res->lockname.len,
2170 res->lockname.name, target); 2332 res->lockname.name, target);
2171 ret = -ERESTARTSYS; 2333 ret = -ERESTARTSYS;
@@ -2175,7 +2337,10 @@ fail:
2175 /* migration failed, detach and clean up mle */ 2337 /* migration failed, detach and clean up mle */
2176 dlm_mle_detach_hb_events(dlm, mle); 2338 dlm_mle_detach_hb_events(dlm, mle);
2177 dlm_put_mle(mle); 2339 dlm_put_mle(mle);
2178 dlm_put_mle(mle); 2340 dlm_put_mle_inuse(mle);
2341 spin_lock(&res->spinlock);
2342 res->state &= ~DLM_LOCK_RES_MIGRATING;
2343 spin_unlock(&res->spinlock);
2179 goto leave; 2344 goto leave;
2180 } 2345 }
2181 /* TODO: if node died: stop, clean up, return error */ 2346 /* TODO: if node died: stop, clean up, return error */
@@ -2191,7 +2356,7 @@ fail:
2191 2356
2192 /* master is known, detach if not already detached */ 2357 /* master is known, detach if not already detached */
2193 dlm_mle_detach_hb_events(dlm, mle); 2358 dlm_mle_detach_hb_events(dlm, mle);
2194 dlm_put_mle(mle); 2359 dlm_put_mle_inuse(mle);
2195 ret = 0; 2360 ret = 0;
2196 2361
2197 dlm_lockres_calc_usage(dlm, res); 2362 dlm_lockres_calc_usage(dlm, res);
@@ -2462,7 +2627,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
2462 struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf; 2627 struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
2463 struct dlm_master_list_entry *mle = NULL, *oldmle = NULL; 2628 struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
2464 const char *name; 2629 const char *name;
2465 unsigned int namelen; 2630 unsigned int namelen, hash;
2466 int ret = 0; 2631 int ret = 0;
2467 2632
2468 if (!dlm_grab(dlm)) 2633 if (!dlm_grab(dlm))
@@ -2470,10 +2635,11 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
2470 2635
2471 name = migrate->name; 2636 name = migrate->name;
2472 namelen = migrate->namelen; 2637 namelen = migrate->namelen;
2638 hash = dlm_lockid_hash(name, namelen);
2473 2639
2474 /* preallocate.. if this fails, abort */ 2640 /* preallocate.. if this fails, abort */
2475 mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, 2641 mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2476 GFP_KERNEL); 2642 GFP_NOFS);
2477 2643
2478 if (!mle) { 2644 if (!mle) {
2479 ret = -ENOMEM; 2645 ret = -ENOMEM;
@@ -2482,7 +2648,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
2482 2648
2483 /* check for pre-existing lock */ 2649 /* check for pre-existing lock */
2484 spin_lock(&dlm->spinlock); 2650 spin_lock(&dlm->spinlock);
2485 res = __dlm_lookup_lockres(dlm, name, namelen); 2651 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
2486 spin_lock(&dlm->master_lock); 2652 spin_lock(&dlm->master_lock);
2487 2653
2488 if (res) { 2654 if (res) {
@@ -2580,6 +2746,7 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2580 /* remove it from the list so that only one 2746 /* remove it from the list so that only one
2581 * mle will be found */ 2747 * mle will be found */
2582 list_del_init(&tmp->list); 2748 list_del_init(&tmp->list);
2749 __dlm_mle_detach_hb_events(dlm, mle);
2583 } 2750 }
2584 spin_unlock(&tmp->spinlock); 2751 spin_unlock(&tmp->spinlock);
2585 } 2752 }
@@ -2601,6 +2768,7 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
2601 struct list_head *iter, *iter2; 2768 struct list_head *iter, *iter2;
2602 struct dlm_master_list_entry *mle; 2769 struct dlm_master_list_entry *mle;
2603 struct dlm_lock_resource *res; 2770 struct dlm_lock_resource *res;
2771 unsigned int hash;
2604 2772
2605 mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); 2773 mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
2606top: 2774top:
@@ -2640,7 +2808,7 @@ top:
2640 * may result in the mle being unlinked and 2808 * may result in the mle being unlinked and
2641 * freed, but there may still be a process 2809 * freed, but there may still be a process
2642 * waiting in the dlmlock path which is fine. */ 2810 * waiting in the dlmlock path which is fine. */
2643 mlog(ML_ERROR, "node %u was expected master\n", 2811 mlog(0, "node %u was expected master\n",
2644 dead_node); 2812 dead_node);
2645 atomic_set(&mle->woken, 1); 2813 atomic_set(&mle->woken, 1);
2646 spin_unlock(&mle->spinlock); 2814 spin_unlock(&mle->spinlock);
@@ -2673,19 +2841,21 @@ top:
2673 2841
2674 /* remove from the list early. NOTE: unlinking 2842 /* remove from the list early. NOTE: unlinking
2675 * list_head while in list_for_each_safe */ 2843 * list_head while in list_for_each_safe */
2844 __dlm_mle_detach_hb_events(dlm, mle);
2676 spin_lock(&mle->spinlock); 2845 spin_lock(&mle->spinlock);
2677 list_del_init(&mle->list); 2846 list_del_init(&mle->list);
2678 atomic_set(&mle->woken, 1); 2847 atomic_set(&mle->woken, 1);
2679 spin_unlock(&mle->spinlock); 2848 spin_unlock(&mle->spinlock);
2680 wake_up(&mle->wq); 2849 wake_up(&mle->wq);
2681 2850
2682 mlog(0, "node %u died during migration from " 2851 mlog(0, "%s: node %u died during migration from "
2683 "%u to %u!\n", dead_node, 2852 "%u to %u!\n", dlm->name, dead_node,
2684 mle->master, mle->new_master); 2853 mle->master, mle->new_master);
2685 /* if there is a lockres associated with this 2854 /* if there is a lockres associated with this
2686 * mle, find it and set its owner to UNKNOWN */ 2855 * mle, find it and set its owner to UNKNOWN */
2856 hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len);
2687 res = __dlm_lookup_lockres(dlm, mle->u.name.name, 2857 res = __dlm_lookup_lockres(dlm, mle->u.name.name,
2688 mle->u.name.len); 2858 mle->u.name.len, hash);
2689 if (res) { 2859 if (res) {
2690 /* unfortunately if we hit this rare case, our 2860 /* unfortunately if we hit this rare case, our
2691 * lock ordering is messed. we need to drop 2861 * lock ordering is messed. we need to drop
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 805cbabac051..29b2845f370d 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -98,8 +98,8 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data);
98 98
99static u64 dlm_get_next_mig_cookie(void); 99static u64 dlm_get_next_mig_cookie(void);
100 100
101static spinlock_t dlm_reco_state_lock = SPIN_LOCK_UNLOCKED; 101static DEFINE_SPINLOCK(dlm_reco_state_lock);
102static spinlock_t dlm_mig_cookie_lock = SPIN_LOCK_UNLOCKED; 102static DEFINE_SPINLOCK(dlm_mig_cookie_lock);
103static u64 dlm_mig_cookie = 1; 103static u64 dlm_mig_cookie = 1;
104 104
105static u64 dlm_get_next_mig_cookie(void) 105static u64 dlm_get_next_mig_cookie(void)
@@ -115,12 +115,37 @@ static u64 dlm_get_next_mig_cookie(void)
115 return c; 115 return c;
116} 116}
117 117
118static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm,
119 u8 dead_node)
120{
121 assert_spin_locked(&dlm->spinlock);
122 if (dlm->reco.dead_node != dead_node)
123 mlog(0, "%s: changing dead_node from %u to %u\n",
124 dlm->name, dlm->reco.dead_node, dead_node);
125 dlm->reco.dead_node = dead_node;
126}
127
128static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
129 u8 master)
130{
131 assert_spin_locked(&dlm->spinlock);
132 mlog(0, "%s: changing new_master from %u to %u\n",
133 dlm->name, dlm->reco.new_master, master);
134 dlm->reco.new_master = master;
135}
136
137static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm)
138{
139 assert_spin_locked(&dlm->spinlock);
140 clear_bit(dlm->reco.dead_node, dlm->recovery_map);
141 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
142 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
143}
144
118static inline void dlm_reset_recovery(struct dlm_ctxt *dlm) 145static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
119{ 146{
120 spin_lock(&dlm->spinlock); 147 spin_lock(&dlm->spinlock);
121 clear_bit(dlm->reco.dead_node, dlm->recovery_map); 148 __dlm_reset_recovery(dlm);
122 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
123 dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
124 spin_unlock(&dlm->spinlock); 149 spin_unlock(&dlm->spinlock);
125} 150}
126 151
@@ -132,12 +157,21 @@ void dlm_dispatch_work(void *data)
132 struct list_head *iter, *iter2; 157 struct list_head *iter, *iter2;
133 struct dlm_work_item *item; 158 struct dlm_work_item *item;
134 dlm_workfunc_t *workfunc; 159 dlm_workfunc_t *workfunc;
160 int tot=0;
161
162 if (!dlm_joined(dlm))
163 return;
135 164
136 spin_lock(&dlm->work_lock); 165 spin_lock(&dlm->work_lock);
137 list_splice_init(&dlm->work_list, &tmp_list); 166 list_splice_init(&dlm->work_list, &tmp_list);
138 spin_unlock(&dlm->work_lock); 167 spin_unlock(&dlm->work_lock);
139 168
140 list_for_each_safe(iter, iter2, &tmp_list) { 169 list_for_each_safe(iter, iter2, &tmp_list) {
170 tot++;
171 }
172 mlog(0, "%s: work thread has %d work items\n", dlm->name, tot);
173
174 list_for_each_safe(iter, iter2, &tmp_list) {
141 item = list_entry(iter, struct dlm_work_item, list); 175 item = list_entry(iter, struct dlm_work_item, list);
142 workfunc = item->func; 176 workfunc = item->func;
143 list_del_init(&item->list); 177 list_del_init(&item->list);
@@ -220,6 +254,52 @@ void dlm_complete_recovery_thread(struct dlm_ctxt *dlm)
220 * 254 *
221 */ 255 */
222 256
257static void dlm_print_reco_node_status(struct dlm_ctxt *dlm)
258{
259 struct dlm_reco_node_data *ndata;
260 struct dlm_lock_resource *res;
261
262 mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n",
263 dlm->name, dlm->dlm_reco_thread_task->pid,
264 dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive",
265 dlm->reco.dead_node, dlm->reco.new_master);
266
267 list_for_each_entry(ndata, &dlm->reco.node_data, list) {
268 char *st = "unknown";
269 switch (ndata->state) {
270 case DLM_RECO_NODE_DATA_INIT:
271 st = "init";
272 break;
273 case DLM_RECO_NODE_DATA_REQUESTING:
274 st = "requesting";
275 break;
276 case DLM_RECO_NODE_DATA_DEAD:
277 st = "dead";
278 break;
279 case DLM_RECO_NODE_DATA_RECEIVING:
280 st = "receiving";
281 break;
282 case DLM_RECO_NODE_DATA_REQUESTED:
283 st = "requested";
284 break;
285 case DLM_RECO_NODE_DATA_DONE:
286 st = "done";
287 break;
288 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
289 st = "finalize-sent";
290 break;
291 default:
292 st = "bad";
293 break;
294 }
295 mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n",
296 dlm->name, ndata->node_num, st);
297 }
298 list_for_each_entry(res, &dlm->reco.resources, recovering) {
299 mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n",
300 dlm->name, res->lockname.len, res->lockname.name);
301 }
302}
223 303
224#define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000) 304#define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000)
225 305
@@ -267,11 +347,23 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
267{ 347{
268 int dead; 348 int dead;
269 spin_lock(&dlm->spinlock); 349 spin_lock(&dlm->spinlock);
270 dead = test_bit(node, dlm->domain_map); 350 dead = !test_bit(node, dlm->domain_map);
271 spin_unlock(&dlm->spinlock); 351 spin_unlock(&dlm->spinlock);
272 return dead; 352 return dead;
273} 353}
274 354
355/* returns true if node is no longer in the domain
356 * could be dead or just not joined */
357static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
358{
359 int recovered;
360 spin_lock(&dlm->spinlock);
361 recovered = !test_bit(node, dlm->recovery_map);
362 spin_unlock(&dlm->spinlock);
363 return recovered;
364}
365
366
275int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) 367int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
276{ 368{
277 if (timeout) { 369 if (timeout) {
@@ -290,6 +382,24 @@ int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
290 return 0; 382 return 0;
291} 383}
292 384
385int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
386{
387 if (timeout) {
388 mlog(0, "%s: waiting %dms for notification of "
389 "recovery of node %u\n", dlm->name, timeout, node);
390 wait_event_timeout(dlm->dlm_reco_thread_wq,
391 dlm_is_node_recovered(dlm, node),
392 msecs_to_jiffies(timeout));
393 } else {
394 mlog(0, "%s: waiting indefinitely for notification "
395 "of recovery of node %u\n", dlm->name, node);
396 wait_event(dlm->dlm_reco_thread_wq,
397 dlm_is_node_recovered(dlm, node));
398 }
399 /* for now, return 0 */
400 return 0;
401}
402
293/* callers of the top-level api calls (dlmlock/dlmunlock) should 403/* callers of the top-level api calls (dlmlock/dlmunlock) should
294 * block on the dlm->reco.event when recovery is in progress. 404 * block on the dlm->reco.event when recovery is in progress.
295 * the dlm recovery thread will set this state when it begins 405 * the dlm recovery thread will set this state when it begins
@@ -308,6 +418,13 @@ static int dlm_in_recovery(struct dlm_ctxt *dlm)
308 418
309void dlm_wait_for_recovery(struct dlm_ctxt *dlm) 419void dlm_wait_for_recovery(struct dlm_ctxt *dlm)
310{ 420{
421 if (dlm_in_recovery(dlm)) {
422 mlog(0, "%s: reco thread %d in recovery: "
423 "state=%d, master=%u, dead=%u\n",
424 dlm->name, dlm->dlm_reco_thread_task->pid,
425 dlm->reco.state, dlm->reco.new_master,
426 dlm->reco.dead_node);
427 }
311 wait_event(dlm->reco.event, !dlm_in_recovery(dlm)); 428 wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
312} 429}
313 430
@@ -341,7 +458,7 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
341 mlog(0, "new master %u died while recovering %u!\n", 458 mlog(0, "new master %u died while recovering %u!\n",
342 dlm->reco.new_master, dlm->reco.dead_node); 459 dlm->reco.new_master, dlm->reco.dead_node);
343 /* unset the new_master, leave dead_node */ 460 /* unset the new_master, leave dead_node */
344 dlm->reco.new_master = O2NM_INVALID_NODE_NUM; 461 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
345 } 462 }
346 463
347 /* select a target to recover */ 464 /* select a target to recover */
@@ -350,14 +467,14 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
350 467
351 bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0); 468 bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0);
352 if (bit >= O2NM_MAX_NODES || bit < 0) 469 if (bit >= O2NM_MAX_NODES || bit < 0)
353 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; 470 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
354 else 471 else
355 dlm->reco.dead_node = bit; 472 dlm_set_reco_dead_node(dlm, bit);
356 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) { 473 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
357 /* BUG? */ 474 /* BUG? */
358 mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n", 475 mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n",
359 dlm->reco.dead_node); 476 dlm->reco.dead_node);
360 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; 477 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
361 } 478 }
362 479
363 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { 480 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
@@ -366,7 +483,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
366 /* return to main thread loop and sleep. */ 483 /* return to main thread loop and sleep. */
367 return 0; 484 return 0;
368 } 485 }
369 mlog(0, "recovery thread found node %u in the recovery map!\n", 486 mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n",
487 dlm->name, dlm->dlm_reco_thread_task->pid,
370 dlm->reco.dead_node); 488 dlm->reco.dead_node);
371 spin_unlock(&dlm->spinlock); 489 spin_unlock(&dlm->spinlock);
372 490
@@ -389,8 +507,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
389 } 507 }
390 mlog(0, "another node will master this recovery session.\n"); 508 mlog(0, "another node will master this recovery session.\n");
391 } 509 }
392 mlog(0, "dlm=%s, new_master=%u, this node=%u, dead_node=%u\n", 510 mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n",
393 dlm->name, dlm->reco.new_master, 511 dlm->name, dlm->dlm_reco_thread_task->pid, dlm->reco.new_master,
394 dlm->node_num, dlm->reco.dead_node); 512 dlm->node_num, dlm->reco.dead_node);
395 513
396 /* it is safe to start everything back up here 514 /* it is safe to start everything back up here
@@ -402,11 +520,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
402 return 0; 520 return 0;
403 521
404master_here: 522master_here:
405 mlog(0, "mastering recovery of %s:%u here(this=%u)!\n", 523 mlog(0, "(%d) mastering recovery of %s:%u here(this=%u)!\n",
524 dlm->dlm_reco_thread_task->pid,
406 dlm->name, dlm->reco.dead_node, dlm->node_num); 525 dlm->name, dlm->reco.dead_node, dlm->node_num);
407 526
408 status = dlm_remaster_locks(dlm, dlm->reco.dead_node); 527 status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
409 if (status < 0) { 528 if (status < 0) {
529 /* we should never hit this anymore */
410 mlog(ML_ERROR, "error %d remastering locks for node %u, " 530 mlog(ML_ERROR, "error %d remastering locks for node %u, "
411 "retrying.\n", status, dlm->reco.dead_node); 531 "retrying.\n", status, dlm->reco.dead_node);
412 /* yield a bit to allow any final network messages 532 /* yield a bit to allow any final network messages
@@ -433,9 +553,16 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
433 int destroy = 0; 553 int destroy = 0;
434 int pass = 0; 554 int pass = 0;
435 555
436 status = dlm_init_recovery_area(dlm, dead_node); 556 do {
437 if (status < 0) 557 /* we have become recovery master. there is no escaping
438 goto leave; 558 * this, so just keep trying until we get it. */
559 status = dlm_init_recovery_area(dlm, dead_node);
560 if (status < 0) {
561 mlog(ML_ERROR, "%s: failed to alloc recovery area, "
562 "retrying\n", dlm->name);
563 msleep(1000);
564 }
565 } while (status != 0);
439 566
440 /* safe to access the node data list without a lock, since this 567 /* safe to access the node data list without a lock, since this
441 * process is the only one to change the list */ 568 * process is the only one to change the list */
@@ -452,16 +579,36 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
452 continue; 579 continue;
453 } 580 }
454 581
455 status = dlm_request_all_locks(dlm, ndata->node_num, dead_node); 582 do {
456 if (status < 0) { 583 status = dlm_request_all_locks(dlm, ndata->node_num,
457 mlog_errno(status); 584 dead_node);
458 if (dlm_is_host_down(status)) 585 if (status < 0) {
459 ndata->state = DLM_RECO_NODE_DATA_DEAD; 586 mlog_errno(status);
460 else { 587 if (dlm_is_host_down(status)) {
461 destroy = 1; 588 /* node died, ignore it for recovery */
462 goto leave; 589 status = 0;
590 ndata->state = DLM_RECO_NODE_DATA_DEAD;
591 /* wait for the domain map to catch up
592 * with the network state. */
593 wait_event_timeout(dlm->dlm_reco_thread_wq,
594 dlm_is_node_dead(dlm,
595 ndata->node_num),
596 msecs_to_jiffies(1000));
597 mlog(0, "waited 1 sec for %u, "
598 "dead? %s\n", ndata->node_num,
599 dlm_is_node_dead(dlm, ndata->node_num) ?
600 "yes" : "no");
601 } else {
602 /* -ENOMEM on the other node */
603 mlog(0, "%s: node %u returned "
604 "%d during recovery, retrying "
605 "after a short wait\n",
606 dlm->name, ndata->node_num,
607 status);
608 msleep(100);
609 }
463 } 610 }
464 } 611 } while (status != 0);
465 612
466 switch (ndata->state) { 613 switch (ndata->state) {
467 case DLM_RECO_NODE_DATA_INIT: 614 case DLM_RECO_NODE_DATA_INIT:
@@ -473,10 +620,9 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
473 mlog(0, "node %u died after requesting " 620 mlog(0, "node %u died after requesting "
474 "recovery info for node %u\n", 621 "recovery info for node %u\n",
475 ndata->node_num, dead_node); 622 ndata->node_num, dead_node);
476 // start all over 623 /* fine. don't need this node's info.
477 destroy = 1; 624 * continue without it. */
478 status = -EAGAIN; 625 break;
479 goto leave;
480 case DLM_RECO_NODE_DATA_REQUESTING: 626 case DLM_RECO_NODE_DATA_REQUESTING:
481 ndata->state = DLM_RECO_NODE_DATA_REQUESTED; 627 ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
482 mlog(0, "now receiving recovery data from " 628 mlog(0, "now receiving recovery data from "
@@ -520,35 +666,26 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
520 BUG(); 666 BUG();
521 break; 667 break;
522 case DLM_RECO_NODE_DATA_DEAD: 668 case DLM_RECO_NODE_DATA_DEAD:
523 mlog(ML_NOTICE, "node %u died after " 669 mlog(0, "node %u died after "
524 "requesting recovery info for " 670 "requesting recovery info for "
525 "node %u\n", ndata->node_num, 671 "node %u\n", ndata->node_num,
526 dead_node); 672 dead_node);
527 spin_unlock(&dlm_reco_state_lock); 673 break;
528 // start all over
529 destroy = 1;
530 status = -EAGAIN;
531 /* instead of spinning like crazy here,
532 * wait for the domain map to catch up
533 * with the network state. otherwise this
534 * can be hit hundreds of times before
535 * the node is really seen as dead. */
536 wait_event_timeout(dlm->dlm_reco_thread_wq,
537 dlm_is_node_dead(dlm,
538 ndata->node_num),
539 msecs_to_jiffies(1000));
540 mlog(0, "waited 1 sec for %u, "
541 "dead? %s\n", ndata->node_num,
542 dlm_is_node_dead(dlm, ndata->node_num) ?
543 "yes" : "no");
544 goto leave;
545 case DLM_RECO_NODE_DATA_RECEIVING: 674 case DLM_RECO_NODE_DATA_RECEIVING:
546 case DLM_RECO_NODE_DATA_REQUESTED: 675 case DLM_RECO_NODE_DATA_REQUESTED:
676 mlog(0, "%s: node %u still in state %s\n",
677 dlm->name, ndata->node_num,
678 ndata->state==DLM_RECO_NODE_DATA_RECEIVING ?
679 "receiving" : "requested");
547 all_nodes_done = 0; 680 all_nodes_done = 0;
548 break; 681 break;
549 case DLM_RECO_NODE_DATA_DONE: 682 case DLM_RECO_NODE_DATA_DONE:
683 mlog(0, "%s: node %u state is done\n",
684 dlm->name, ndata->node_num);
550 break; 685 break;
551 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 686 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
687 mlog(0, "%s: node %u state is finalize\n",
688 dlm->name, ndata->node_num);
552 break; 689 break;
553 } 690 }
554 } 691 }
@@ -578,7 +715,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
578 jiffies, dlm->reco.dead_node, 715 jiffies, dlm->reco.dead_node,
579 dlm->node_num, dlm->reco.new_master); 716 dlm->node_num, dlm->reco.new_master);
580 destroy = 1; 717 destroy = 1;
581 status = ret; 718 status = 0;
582 /* rescan everything marked dirty along the way */ 719 /* rescan everything marked dirty along the way */
583 dlm_kick_thread(dlm, NULL); 720 dlm_kick_thread(dlm, NULL);
584 break; 721 break;
@@ -591,7 +728,6 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
591 728
592 } 729 }
593 730
594leave:
595 if (destroy) 731 if (destroy)
596 dlm_destroy_recovery_area(dlm, dead_node); 732 dlm_destroy_recovery_area(dlm, dead_node);
597 733
@@ -617,7 +753,7 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
617 } 753 }
618 BUG_ON(num == dead_node); 754 BUG_ON(num == dead_node);
619 755
620 ndata = kcalloc(1, sizeof(*ndata), GFP_KERNEL); 756 ndata = kcalloc(1, sizeof(*ndata), GFP_NOFS);
621 if (!ndata) { 757 if (!ndata) {
622 dlm_destroy_recovery_area(dlm, dead_node); 758 dlm_destroy_recovery_area(dlm, dead_node);
623 return -ENOMEM; 759 return -ENOMEM;
@@ -691,16 +827,25 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
691 if (!dlm_grab(dlm)) 827 if (!dlm_grab(dlm))
692 return -EINVAL; 828 return -EINVAL;
693 829
830 if (lr->dead_node != dlm->reco.dead_node) {
831 mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "
832 "dead_node is %u\n", dlm->name, lr->node_idx,
833 lr->dead_node, dlm->reco.dead_node);
834 dlm_print_reco_node_status(dlm);
835 /* this is a hack */
836 dlm_put(dlm);
837 return -ENOMEM;
838 }
694 BUG_ON(lr->dead_node != dlm->reco.dead_node); 839 BUG_ON(lr->dead_node != dlm->reco.dead_node);
695 840
696 item = kcalloc(1, sizeof(*item), GFP_KERNEL); 841 item = kcalloc(1, sizeof(*item), GFP_NOFS);
697 if (!item) { 842 if (!item) {
698 dlm_put(dlm); 843 dlm_put(dlm);
699 return -ENOMEM; 844 return -ENOMEM;
700 } 845 }
701 846
702 /* this will get freed by dlm_request_all_locks_worker */ 847 /* this will get freed by dlm_request_all_locks_worker */
703 buf = (char *) __get_free_page(GFP_KERNEL); 848 buf = (char *) __get_free_page(GFP_NOFS);
704 if (!buf) { 849 if (!buf) {
705 kfree(item); 850 kfree(item);
706 dlm_put(dlm); 851 dlm_put(dlm);
@@ -715,7 +860,7 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
715 spin_lock(&dlm->work_lock); 860 spin_lock(&dlm->work_lock);
716 list_add_tail(&item->list, &dlm->work_list); 861 list_add_tail(&item->list, &dlm->work_list);
717 spin_unlock(&dlm->work_lock); 862 spin_unlock(&dlm->work_lock);
718 schedule_work(&dlm->dispatched_work); 863 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
719 864
720 dlm_put(dlm); 865 dlm_put(dlm);
721 return 0; 866 return 0;
@@ -730,32 +875,34 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
730 struct list_head *iter; 875 struct list_head *iter;
731 int ret; 876 int ret;
732 u8 dead_node, reco_master; 877 u8 dead_node, reco_master;
878 int skip_all_done = 0;
733 879
734 dlm = item->dlm; 880 dlm = item->dlm;
735 dead_node = item->u.ral.dead_node; 881 dead_node = item->u.ral.dead_node;
736 reco_master = item->u.ral.reco_master; 882 reco_master = item->u.ral.reco_master;
737 mres = (struct dlm_migratable_lockres *)data; 883 mres = (struct dlm_migratable_lockres *)data;
738 884
885 mlog(0, "%s: recovery worker started, dead=%u, master=%u\n",
886 dlm->name, dead_node, reco_master);
887
739 if (dead_node != dlm->reco.dead_node || 888 if (dead_node != dlm->reco.dead_node ||
740 reco_master != dlm->reco.new_master) { 889 reco_master != dlm->reco.new_master) {
741 /* show extra debug info if the recovery state is messed */ 890 /* worker could have been created before the recovery master
742 mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), " 891 * died. if so, do not continue, but do not error. */
743 "request(dead=%u, master=%u)\n", 892 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
744 dlm->name, dlm->reco.dead_node, dlm->reco.new_master, 893 mlog(ML_NOTICE, "%s: will not send recovery state, "
745 dead_node, reco_master); 894 "recovery master %u died, thread=(dead=%u,mas=%u)"
746 mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u " 895 " current=(dead=%u,mas=%u)\n", dlm->name,
747 "entry[0]={c=%u:%llu,l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n", 896 reco_master, dead_node, reco_master,
748 dlm->name, mres->lockname_len, mres->lockname, mres->master, 897 dlm->reco.dead_node, dlm->reco.new_master);
749 mres->num_locks, mres->total_locks, mres->flags, 898 } else {
750 dlm_get_lock_cookie_node(mres->ml[0].cookie), 899 mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, "
751 dlm_get_lock_cookie_seq(mres->ml[0].cookie), 900 "master=%u), request(dead=%u, master=%u)\n",
752 mres->ml[0].list, mres->ml[0].flags, 901 dlm->name, dlm->reco.dead_node,
753 mres->ml[0].type, mres->ml[0].convert_type, 902 dlm->reco.new_master, dead_node, reco_master);
754 mres->ml[0].highest_blocked, mres->ml[0].node); 903 }
755 BUG(); 904 goto leave;
756 } 905 }
757 BUG_ON(dead_node != dlm->reco.dead_node);
758 BUG_ON(reco_master != dlm->reco.new_master);
759 906
760 /* lock resources should have already been moved to the 907 /* lock resources should have already been moved to the
761 * dlm->reco.resources list. now move items from that list 908 * dlm->reco.resources list. now move items from that list
@@ -766,12 +913,20 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
766 dlm_move_reco_locks_to_list(dlm, &resources, dead_node); 913 dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
767 914
768 /* now we can begin blasting lockreses without the dlm lock */ 915 /* now we can begin blasting lockreses without the dlm lock */
916
917 /* any errors returned will be due to the new_master dying,
918 * the dlm_reco_thread should detect this */
769 list_for_each(iter, &resources) { 919 list_for_each(iter, &resources) {
770 res = list_entry (iter, struct dlm_lock_resource, recovering); 920 res = list_entry (iter, struct dlm_lock_resource, recovering);
771 ret = dlm_send_one_lockres(dlm, res, mres, reco_master, 921 ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
772 DLM_MRES_RECOVERY); 922 DLM_MRES_RECOVERY);
773 if (ret < 0) 923 if (ret < 0) {
774 mlog_errno(ret); 924 mlog(ML_ERROR, "%s: node %u went down while sending "
925 "recovery state for dead node %u, ret=%d\n", dlm->name,
926 reco_master, dead_node, ret);
927 skip_all_done = 1;
928 break;
929 }
775 } 930 }
776 931
777 /* move the resources back to the list */ 932 /* move the resources back to the list */
@@ -779,10 +934,15 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
779 list_splice_init(&resources, &dlm->reco.resources); 934 list_splice_init(&resources, &dlm->reco.resources);
780 spin_unlock(&dlm->spinlock); 935 spin_unlock(&dlm->spinlock);
781 936
782 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); 937 if (!skip_all_done) {
783 if (ret < 0) 938 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
784 mlog_errno(ret); 939 if (ret < 0) {
785 940 mlog(ML_ERROR, "%s: node %u went down while sending "
941 "recovery all-done for dead node %u, ret=%d\n",
942 dlm->name, reco_master, dead_node, ret);
943 }
944 }
945leave:
786 free_page((unsigned long)data); 946 free_page((unsigned long)data);
787} 947}
788 948
@@ -801,8 +961,14 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
801 961
802 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, 962 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
803 sizeof(done_msg), send_to, &tmpret); 963 sizeof(done_msg), send_to, &tmpret);
804 /* negative status is ignored by the caller */ 964 if (ret < 0) {
805 if (ret >= 0) 965 if (!dlm_is_host_down(ret)) {
966 mlog_errno(ret);
967 mlog(ML_ERROR, "%s: unknown error sending data-done "
968 "to %u\n", dlm->name, send_to);
969 BUG();
970 }
971 } else
806 ret = tmpret; 972 ret = tmpret;
807 return ret; 973 return ret;
808} 974}
@@ -822,7 +988,11 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
822 mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, " 988 mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "
823 "node_idx=%u, this node=%u\n", done->dead_node, 989 "node_idx=%u, this node=%u\n", done->dead_node,
824 dlm->reco.dead_node, done->node_idx, dlm->node_num); 990 dlm->reco.dead_node, done->node_idx, dlm->node_num);
825 BUG_ON(done->dead_node != dlm->reco.dead_node); 991
992 mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node),
993 "Got DATA DONE: dead_node=%u, reco.dead_node=%u, "
994 "node_idx=%u, this node=%u\n", done->dead_node,
995 dlm->reco.dead_node, done->node_idx, dlm->node_num);
826 996
827 spin_lock(&dlm_reco_state_lock); 997 spin_lock(&dlm_reco_state_lock);
828 list_for_each(iter, &dlm->reco.node_data) { 998 list_for_each(iter, &dlm->reco.node_data) {
@@ -905,13 +1075,11 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
905 mlog(0, "found lockres owned by dead node while " 1075 mlog(0, "found lockres owned by dead node while "
906 "doing recovery for node %u. sending it.\n", 1076 "doing recovery for node %u. sending it.\n",
907 dead_node); 1077 dead_node);
908 list_del_init(&res->recovering); 1078 list_move_tail(&res->recovering, list);
909 list_add_tail(&res->recovering, list);
910 } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { 1079 } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
911 mlog(0, "found UNKNOWN owner while doing recovery " 1080 mlog(0, "found UNKNOWN owner while doing recovery "
912 "for node %u. sending it.\n", dead_node); 1081 "for node %u. sending it.\n", dead_node);
913 list_del_init(&res->recovering); 1082 list_move_tail(&res->recovering, list);
914 list_add_tail(&res->recovering, list);
915 } 1083 }
916 } 1084 }
917 spin_unlock(&dlm->spinlock); 1085 spin_unlock(&dlm->spinlock);
@@ -1023,8 +1191,9 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
1023 ml->type == LKM_PRMODE) { 1191 ml->type == LKM_PRMODE) {
1024 /* if it is already set, this had better be a PR 1192 /* if it is already set, this had better be a PR
1025 * and it has to match */ 1193 * and it has to match */
1026 if (mres->lvb[0] && (ml->type == LKM_EXMODE || 1194 if (!dlm_lvb_is_empty(mres->lvb) &&
1027 memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) { 1195 (ml->type == LKM_EXMODE ||
1196 memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
1028 mlog(ML_ERROR, "mismatched lvbs!\n"); 1197 mlog(ML_ERROR, "mismatched lvbs!\n");
1029 __dlm_print_one_lock_resource(lock->lockres); 1198 __dlm_print_one_lock_resource(lock->lockres);
1030 BUG(); 1199 BUG();
@@ -1083,22 +1252,25 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1083 * we must send it immediately. */ 1252 * we must send it immediately. */
1084 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, 1253 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
1085 res, total_locks); 1254 res, total_locks);
1086 if (ret < 0) { 1255 if (ret < 0)
1087 // TODO 1256 goto error;
1088 mlog(ML_ERROR, "dlm_send_mig_lockres_msg "
1089 "returned %d, TODO\n", ret);
1090 BUG();
1091 }
1092 } 1257 }
1093 } 1258 }
1094 /* flush any remaining locks */ 1259 /* flush any remaining locks */
1095 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); 1260 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
1096 if (ret < 0) { 1261 if (ret < 0)
1097 // TODO 1262 goto error;
1098 mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, " 1263 return ret;
1099 "TODO\n", ret); 1264
1265error:
1266 mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n",
1267 dlm->name, ret);
1268 if (!dlm_is_host_down(ret))
1100 BUG(); 1269 BUG();
1101 } 1270 mlog(0, "%s: node %u went down while sending %s "
1271 "lockres %.*s\n", dlm->name, send_to,
1272 flags & DLM_MRES_RECOVERY ? "recovery" : "migration",
1273 res->lockname.len, res->lockname.name);
1102 return ret; 1274 return ret;
1103} 1275}
1104 1276
@@ -1146,8 +1318,8 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1146 mlog(0, "all done flag. all lockres data received!\n"); 1318 mlog(0, "all done flag. all lockres data received!\n");
1147 1319
1148 ret = -ENOMEM; 1320 ret = -ENOMEM;
1149 buf = kmalloc(be16_to_cpu(msg->data_len), GFP_KERNEL); 1321 buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS);
1150 item = kcalloc(1, sizeof(*item), GFP_KERNEL); 1322 item = kcalloc(1, sizeof(*item), GFP_NOFS);
1151 if (!buf || !item) 1323 if (!buf || !item)
1152 goto leave; 1324 goto leave;
1153 1325
@@ -1238,7 +1410,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1238 spin_lock(&dlm->work_lock); 1410 spin_lock(&dlm->work_lock);
1239 list_add_tail(&item->list, &dlm->work_list); 1411 list_add_tail(&item->list, &dlm->work_list);
1240 spin_unlock(&dlm->work_lock); 1412 spin_unlock(&dlm->work_lock);
1241 schedule_work(&dlm->dispatched_work); 1413 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
1242 1414
1243leave: 1415leave:
1244 dlm_put(dlm); 1416 dlm_put(dlm);
@@ -1406,6 +1578,7 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
1406 struct dlm_ctxt *dlm = data; 1578 struct dlm_ctxt *dlm = data;
1407 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf; 1579 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
1408 struct dlm_lock_resource *res = NULL; 1580 struct dlm_lock_resource *res = NULL;
1581 unsigned int hash;
1409 int master = DLM_LOCK_RES_OWNER_UNKNOWN; 1582 int master = DLM_LOCK_RES_OWNER_UNKNOWN;
1410 u32 flags = DLM_ASSERT_MASTER_REQUERY; 1583 u32 flags = DLM_ASSERT_MASTER_REQUERY;
1411 1584
@@ -1415,8 +1588,10 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
1415 return master; 1588 return master;
1416 } 1589 }
1417 1590
1591 hash = dlm_lockid_hash(req->name, req->namelen);
1592
1418 spin_lock(&dlm->spinlock); 1593 spin_lock(&dlm->spinlock);
1419 res = __dlm_lookup_lockres(dlm, req->name, req->namelen); 1594 res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash);
1420 if (res) { 1595 if (res) {
1421 spin_lock(&res->spinlock); 1596 spin_lock(&res->spinlock);
1422 master = res->owner; 1597 master = res->owner;
@@ -1483,7 +1658,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1483 struct dlm_lock *newlock = NULL; 1658 struct dlm_lock *newlock = NULL;
1484 struct dlm_lockstatus *lksb = NULL; 1659 struct dlm_lockstatus *lksb = NULL;
1485 int ret = 0; 1660 int ret = 0;
1486 int i; 1661 int i, bad;
1487 struct list_head *iter; 1662 struct list_head *iter;
1488 struct dlm_lock *lock = NULL; 1663 struct dlm_lock *lock = NULL;
1489 1664
@@ -1529,8 +1704,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1529 1704
1530 /* move the lock to its proper place */ 1705 /* move the lock to its proper place */
1531 /* do not alter lock refcount. switching lists. */ 1706 /* do not alter lock refcount. switching lists. */
1532 list_del_init(&lock->list); 1707 list_move_tail(&lock->list, queue);
1533 list_add_tail(&lock->list, queue);
1534 spin_unlock(&res->spinlock); 1708 spin_unlock(&res->spinlock);
1535 1709
1536 mlog(0, "just reordered a local lock!\n"); 1710 mlog(0, "just reordered a local lock!\n");
@@ -1553,28 +1727,48 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1553 } 1727 }
1554 lksb->flags |= (ml->flags & 1728 lksb->flags |= (ml->flags &
1555 (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)); 1729 (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
1556 1730
1557 if (mres->lvb[0]) { 1731 if (ml->type == LKM_NLMODE)
1732 goto skip_lvb;
1733
1734 if (!dlm_lvb_is_empty(mres->lvb)) {
1558 if (lksb->flags & DLM_LKSB_PUT_LVB) { 1735 if (lksb->flags & DLM_LKSB_PUT_LVB) {
1559 /* other node was trying to update 1736 /* other node was trying to update
1560 * lvb when node died. recreate the 1737 * lvb when node died. recreate the
1561 * lksb with the updated lvb. */ 1738 * lksb with the updated lvb. */
1562 memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN); 1739 memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
1740 /* the lock resource lvb update must happen
1741 * NOW, before the spinlock is dropped.
1742 * we no longer wait for the AST to update
1743 * the lvb. */
1744 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
1563 } else { 1745 } else {
1564 /* otherwise, the node is sending its 1746 /* otherwise, the node is sending its
1565 * most recent valid lvb info */ 1747 * most recent valid lvb info */
1566 BUG_ON(ml->type != LKM_EXMODE && 1748 BUG_ON(ml->type != LKM_EXMODE &&
1567 ml->type != LKM_PRMODE); 1749 ml->type != LKM_PRMODE);
1568 if (res->lvb[0] && (ml->type == LKM_EXMODE || 1750 if (!dlm_lvb_is_empty(res->lvb) &&
1569 memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { 1751 (ml->type == LKM_EXMODE ||
1570 mlog(ML_ERROR, "received bad lvb!\n"); 1752 memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
1571 __dlm_print_one_lock_resource(res); 1753 int i;
1572 BUG(); 1754 mlog(ML_ERROR, "%s:%.*s: received bad "
1755 "lvb! type=%d\n", dlm->name,
1756 res->lockname.len,
1757 res->lockname.name, ml->type);
1758 printk("lockres lvb=[");
1759 for (i=0; i<DLM_LVB_LEN; i++)
1760 printk("%02x", res->lvb[i]);
1761 printk("]\nmigrated lvb=[");
1762 for (i=0; i<DLM_LVB_LEN; i++)
1763 printk("%02x", mres->lvb[i]);
1764 printk("]\n");
1765 dlm_print_one_lock_resource(res);
1766 BUG();
1573 } 1767 }
1574 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); 1768 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
1575 } 1769 }
1576 } 1770 }
1577 1771skip_lvb:
1578 1772
1579 /* NOTE: 1773 /* NOTE:
1580 * wrt lock queue ordering and recovery: 1774 * wrt lock queue ordering and recovery:
@@ -1592,9 +1786,33 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1592 * relative to each other, but clearly *not* 1786 * relative to each other, but clearly *not*
1593 * preserved relative to locks from other nodes. 1787 * preserved relative to locks from other nodes.
1594 */ 1788 */
1789 bad = 0;
1595 spin_lock(&res->spinlock); 1790 spin_lock(&res->spinlock);
1596 dlm_lock_get(newlock); 1791 list_for_each_entry(lock, queue, list) {
1597 list_add_tail(&newlock->list, queue); 1792 if (lock->ml.cookie == ml->cookie) {
1793 u64 c = lock->ml.cookie;
1794 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
1795 "exists on this lockres!\n", dlm->name,
1796 res->lockname.len, res->lockname.name,
1797 dlm_get_lock_cookie_node(c),
1798 dlm_get_lock_cookie_seq(c));
1799
1800 mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
1801 "node=%u, cookie=%u:%llu, queue=%d\n",
1802 ml->type, ml->convert_type, ml->node,
1803 dlm_get_lock_cookie_node(ml->cookie),
1804 dlm_get_lock_cookie_seq(ml->cookie),
1805 ml->list);
1806
1807 __dlm_print_one_lock_resource(res);
1808 bad = 1;
1809 break;
1810 }
1811 }
1812 if (!bad) {
1813 dlm_lock_get(newlock);
1814 list_add_tail(&newlock->list, queue);
1815 }
1598 spin_unlock(&res->spinlock); 1816 spin_unlock(&res->spinlock);
1599 } 1817 }
1600 mlog(0, "done running all the locks\n"); 1818 mlog(0, "done running all the locks\n");
@@ -1618,8 +1836,14 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
1618 struct dlm_lock *lock; 1836 struct dlm_lock *lock;
1619 1837
1620 res->state |= DLM_LOCK_RES_RECOVERING; 1838 res->state |= DLM_LOCK_RES_RECOVERING;
1621 if (!list_empty(&res->recovering)) 1839 if (!list_empty(&res->recovering)) {
1840 mlog(0,
1841 "Recovering res %s:%.*s, is already on recovery list!\n",
1842 dlm->name, res->lockname.len, res->lockname.name);
1622 list_del_init(&res->recovering); 1843 list_del_init(&res->recovering);
1844 }
1845 /* We need to hold a reference while on the recovery list */
1846 dlm_lockres_get(res);
1623 list_add_tail(&res->recovering, &dlm->reco.resources); 1847 list_add_tail(&res->recovering, &dlm->reco.resources);
1624 1848
1625 /* find any pending locks and put them back on proper list */ 1849 /* find any pending locks and put them back on proper list */
@@ -1708,9 +1932,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1708 spin_lock(&res->spinlock); 1932 spin_lock(&res->spinlock);
1709 dlm_change_lockres_owner(dlm, res, new_master); 1933 dlm_change_lockres_owner(dlm, res, new_master);
1710 res->state &= ~DLM_LOCK_RES_RECOVERING; 1934 res->state &= ~DLM_LOCK_RES_RECOVERING;
1711 __dlm_dirty_lockres(dlm, res); 1935 if (!__dlm_lockres_unused(res))
1936 __dlm_dirty_lockres(dlm, res);
1712 spin_unlock(&res->spinlock); 1937 spin_unlock(&res->spinlock);
1713 wake_up(&res->wq); 1938 wake_up(&res->wq);
1939 dlm_lockres_put(res);
1714 } 1940 }
1715 } 1941 }
1716 1942
@@ -1719,7 +1945,7 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1719 * the RECOVERING state and set the owner 1945 * the RECOVERING state and set the owner
1720 * if necessary */ 1946 * if necessary */
1721 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 1947 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
1722 bucket = &(dlm->lockres_hash[i]); 1948 bucket = dlm_lockres_hash(dlm, i);
1723 hlist_for_each_entry(res, hash_iter, bucket, hash_node) { 1949 hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
1724 if (res->state & DLM_LOCK_RES_RECOVERING) { 1950 if (res->state & DLM_LOCK_RES_RECOVERING) {
1725 if (res->owner == dead_node) { 1951 if (res->owner == dead_node) {
@@ -1743,11 +1969,13 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1743 dlm->name, res->lockname.len, 1969 dlm->name, res->lockname.len,
1744 res->lockname.name, res->owner); 1970 res->lockname.name, res->owner);
1745 list_del_init(&res->recovering); 1971 list_del_init(&res->recovering);
1972 dlm_lockres_put(res);
1746 } 1973 }
1747 spin_lock(&res->spinlock); 1974 spin_lock(&res->spinlock);
1748 dlm_change_lockres_owner(dlm, res, new_master); 1975 dlm_change_lockres_owner(dlm, res, new_master);
1749 res->state &= ~DLM_LOCK_RES_RECOVERING; 1976 res->state &= ~DLM_LOCK_RES_RECOVERING;
1750 __dlm_dirty_lockres(dlm, res); 1977 if (!__dlm_lockres_unused(res))
1978 __dlm_dirty_lockres(dlm, res);
1751 spin_unlock(&res->spinlock); 1979 spin_unlock(&res->spinlock);
1752 wake_up(&res->wq); 1980 wake_up(&res->wq);
1753 } 1981 }
@@ -1884,7 +2112,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
1884 * need to be fired as a result. 2112 * need to be fired as a result.
1885 */ 2113 */
1886 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 2114 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
1887 bucket = &(dlm->lockres_hash[i]); 2115 bucket = dlm_lockres_hash(dlm, i);
1888 hlist_for_each_entry(res, iter, bucket, hash_node) { 2116 hlist_for_each_entry(res, iter, bucket, hash_node) {
1889 /* always prune any $RECOVERY entries for dead nodes, 2117 /* always prune any $RECOVERY entries for dead nodes,
1890 * otherwise hangs can occur during later recovery */ 2118 * otherwise hangs can occur during later recovery */
@@ -1924,6 +2152,20 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx)
1924{ 2152{
1925 assert_spin_locked(&dlm->spinlock); 2153 assert_spin_locked(&dlm->spinlock);
1926 2154
2155 if (dlm->reco.new_master == idx) {
2156 mlog(0, "%s: recovery master %d just died\n",
2157 dlm->name, idx);
2158 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2159 /* finalize1 was reached, so it is safe to clear
2160 * the new_master and dead_node. that recovery
2161 * is complete. */
2162 mlog(0, "%s: dead master %d had reached "
2163 "finalize1 state, clearing\n", dlm->name, idx);
2164 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
2165 __dlm_reset_recovery(dlm);
2166 }
2167 }
2168
1927 /* check to see if the node is already considered dead */ 2169 /* check to see if the node is already considered dead */
1928 if (!test_bit(idx, dlm->live_nodes_map)) { 2170 if (!test_bit(idx, dlm->live_nodes_map)) {
1929 mlog(0, "for domain %s, node %d is already dead. " 2171 mlog(0, "for domain %s, node %d is already dead. "
@@ -2087,7 +2329,7 @@ again:
2087 2329
2088 /* set the new_master to this node */ 2330 /* set the new_master to this node */
2089 spin_lock(&dlm->spinlock); 2331 spin_lock(&dlm->spinlock);
2090 dlm->reco.new_master = dlm->node_num; 2332 dlm_set_reco_master(dlm, dlm->node_num);
2091 spin_unlock(&dlm->spinlock); 2333 spin_unlock(&dlm->spinlock);
2092 } 2334 }
2093 2335
@@ -2125,6 +2367,10 @@ again:
2125 mlog(0, "%s: reco master %u is ready to recover %u\n", 2367 mlog(0, "%s: reco master %u is ready to recover %u\n",
2126 dlm->name, dlm->reco.new_master, dlm->reco.dead_node); 2368 dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
2127 status = -EEXIST; 2369 status = -EEXIST;
2370 } else if (ret == DLM_RECOVERING) {
2371 mlog(0, "dlm=%s dlmlock says master node died (this=%u)\n",
2372 dlm->name, dlm->node_num);
2373 goto again;
2128 } else { 2374 } else {
2129 struct dlm_lock_resource *res; 2375 struct dlm_lock_resource *res;
2130 2376
@@ -2156,7 +2402,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
2156 2402
2157 mlog_entry("%u\n", dead_node); 2403 mlog_entry("%u\n", dead_node);
2158 2404
2159 mlog(0, "dead node is %u\n", dead_node); 2405 mlog(0, "%s: dead node is %u\n", dlm->name, dead_node);
2160 2406
2161 spin_lock(&dlm->spinlock); 2407 spin_lock(&dlm->spinlock);
2162 dlm_node_iter_init(dlm->domain_map, &iter); 2408 dlm_node_iter_init(dlm->domain_map, &iter);
@@ -2214,6 +2460,14 @@ retry:
2214 * another ENOMEM */ 2460 * another ENOMEM */
2215 msleep(100); 2461 msleep(100);
2216 goto retry; 2462 goto retry;
2463 } else if (ret == EAGAIN) {
2464 mlog(0, "%s: trying to start recovery of node "
2465 "%u, but node %u is waiting for last recovery "
2466 "to complete, backoff for a bit\n", dlm->name,
2467 dead_node, nodenum);
2468 /* TODO Look into replacing msleep with cond_resched() */
2469 msleep(100);
2470 goto retry;
2217 } 2471 }
2218 } 2472 }
2219 2473
@@ -2229,8 +2483,20 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2229 if (!dlm_grab(dlm)) 2483 if (!dlm_grab(dlm))
2230 return 0; 2484 return 0;
2231 2485
2232 mlog(0, "node %u wants to recover node %u\n", 2486 spin_lock(&dlm->spinlock);
2233 br->node_idx, br->dead_node); 2487 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2488 mlog(0, "%s: node %u wants to recover node %u (%u:%u) "
2489 "but this node is in finalize state, waiting on finalize2\n",
2490 dlm->name, br->node_idx, br->dead_node,
2491 dlm->reco.dead_node, dlm->reco.new_master);
2492 spin_unlock(&dlm->spinlock);
2493 return EAGAIN;
2494 }
2495 spin_unlock(&dlm->spinlock);
2496
2497 mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n",
2498 dlm->name, br->node_idx, br->dead_node,
2499 dlm->reco.dead_node, dlm->reco.new_master);
2234 2500
2235 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node); 2501 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node);
2236 2502
@@ -2252,8 +2518,8 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2252 "node %u changing it to %u\n", dlm->name, 2518 "node %u changing it to %u\n", dlm->name,
2253 dlm->reco.dead_node, br->node_idx, br->dead_node); 2519 dlm->reco.dead_node, br->node_idx, br->dead_node);
2254 } 2520 }
2255 dlm->reco.new_master = br->node_idx; 2521 dlm_set_reco_master(dlm, br->node_idx);
2256 dlm->reco.dead_node = br->dead_node; 2522 dlm_set_reco_dead_node(dlm, br->dead_node);
2257 if (!test_bit(br->dead_node, dlm->recovery_map)) { 2523 if (!test_bit(br->dead_node, dlm->recovery_map)) {
2258 mlog(0, "recovery master %u sees %u as dead, but this " 2524 mlog(0, "recovery master %u sees %u as dead, but this "
2259 "node has not yet. marking %u as dead\n", 2525 "node has not yet. marking %u as dead\n",
@@ -2272,10 +2538,16 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2272 spin_unlock(&dlm->spinlock); 2538 spin_unlock(&dlm->spinlock);
2273 2539
2274 dlm_kick_recovery_thread(dlm); 2540 dlm_kick_recovery_thread(dlm);
2541
2542 mlog(0, "%s: recovery started by node %u, for %u (%u:%u)\n",
2543 dlm->name, br->node_idx, br->dead_node,
2544 dlm->reco.dead_node, dlm->reco.new_master);
2545
2275 dlm_put(dlm); 2546 dlm_put(dlm);
2276 return 0; 2547 return 0;
2277} 2548}
2278 2549
2550#define DLM_FINALIZE_STAGE2 0x01
2279static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) 2551static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
2280{ 2552{
2281 int ret = 0; 2553 int ret = 0;
@@ -2283,25 +2555,31 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
2283 struct dlm_node_iter iter; 2555 struct dlm_node_iter iter;
2284 int nodenum; 2556 int nodenum;
2285 int status; 2557 int status;
2558 int stage = 1;
2286 2559
2287 mlog(0, "finishing recovery for node %s:%u\n", 2560 mlog(0, "finishing recovery for node %s:%u, "
2288 dlm->name, dlm->reco.dead_node); 2561 "stage %d\n", dlm->name, dlm->reco.dead_node, stage);
2289 2562
2290 spin_lock(&dlm->spinlock); 2563 spin_lock(&dlm->spinlock);
2291 dlm_node_iter_init(dlm->domain_map, &iter); 2564 dlm_node_iter_init(dlm->domain_map, &iter);
2292 spin_unlock(&dlm->spinlock); 2565 spin_unlock(&dlm->spinlock);
2293 2566
2567stage2:
2294 memset(&fr, 0, sizeof(fr)); 2568 memset(&fr, 0, sizeof(fr));
2295 fr.node_idx = dlm->node_num; 2569 fr.node_idx = dlm->node_num;
2296 fr.dead_node = dlm->reco.dead_node; 2570 fr.dead_node = dlm->reco.dead_node;
2571 if (stage == 2)
2572 fr.flags |= DLM_FINALIZE_STAGE2;
2297 2573
2298 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 2574 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2299 if (nodenum == dlm->node_num) 2575 if (nodenum == dlm->node_num)
2300 continue; 2576 continue;
2301 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key, 2577 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key,
2302 &fr, sizeof(fr), nodenum, &status); 2578 &fr, sizeof(fr), nodenum, &status);
2303 if (ret >= 0) { 2579 if (ret >= 0)
2304 ret = status; 2580 ret = status;
2581 if (ret < 0) {
2582 mlog_errno(ret);
2305 if (dlm_is_host_down(ret)) { 2583 if (dlm_is_host_down(ret)) {
2306 /* this has no effect on this recovery 2584 /* this has no effect on this recovery
2307 * session, so set the status to zero to 2585 * session, so set the status to zero to
@@ -2309,13 +2587,17 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
2309 mlog(ML_ERROR, "node %u went down after this " 2587 mlog(ML_ERROR, "node %u went down after this "
2310 "node finished recovery.\n", nodenum); 2588 "node finished recovery.\n", nodenum);
2311 ret = 0; 2589 ret = 0;
2590 continue;
2312 } 2591 }
2313 }
2314 if (ret < 0) {
2315 mlog_errno(ret);
2316 break; 2592 break;
2317 } 2593 }
2318 } 2594 }
2595 if (stage == 1) {
2596 /* reset the node_iter back to the top and send finalize2 */
2597 iter.curnode = -1;
2598 stage = 2;
2599 goto stage2;
2600 }
2319 2601
2320 return ret; 2602 return ret;
2321} 2603}
@@ -2324,14 +2606,19 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2324{ 2606{
2325 struct dlm_ctxt *dlm = data; 2607 struct dlm_ctxt *dlm = data;
2326 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf; 2608 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
2609 int stage = 1;
2327 2610
2328 /* ok to return 0, domain has gone away */ 2611 /* ok to return 0, domain has gone away */
2329 if (!dlm_grab(dlm)) 2612 if (!dlm_grab(dlm))
2330 return 0; 2613 return 0;
2331 2614
2332 mlog(0, "node %u finalizing recovery of node %u\n", 2615 if (fr->flags & DLM_FINALIZE_STAGE2)
2333 fr->node_idx, fr->dead_node); 2616 stage = 2;
2334 2617
2618 mlog(0, "%s: node %u finalizing recovery stage%d of "
2619 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
2620 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
2621
2335 spin_lock(&dlm->spinlock); 2622 spin_lock(&dlm->spinlock);
2336 2623
2337 if (dlm->reco.new_master != fr->node_idx) { 2624 if (dlm->reco.new_master != fr->node_idx) {
@@ -2347,13 +2634,41 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2347 BUG(); 2634 BUG();
2348 } 2635 }
2349 2636
2350 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); 2637 switch (stage) {
2351 2638 case 1:
2352 spin_unlock(&dlm->spinlock); 2639 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
2640 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2641 mlog(ML_ERROR, "%s: received finalize1 from "
2642 "new master %u for dead node %u, but "
2643 "this node has already received it!\n",
2644 dlm->name, fr->node_idx, fr->dead_node);
2645 dlm_print_reco_node_status(dlm);
2646 BUG();
2647 }
2648 dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
2649 spin_unlock(&dlm->spinlock);
2650 break;
2651 case 2:
2652 if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) {
2653 mlog(ML_ERROR, "%s: received finalize2 from "
2654 "new master %u for dead node %u, but "
2655 "this node did not have finalize1!\n",
2656 dlm->name, fr->node_idx, fr->dead_node);
2657 dlm_print_reco_node_status(dlm);
2658 BUG();
2659 }
2660 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
2661 spin_unlock(&dlm->spinlock);
2662 dlm_reset_recovery(dlm);
2663 dlm_kick_recovery_thread(dlm);
2664 break;
2665 default:
2666 BUG();
2667 }
2353 2668
2354 dlm_reset_recovery(dlm); 2669 mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n",
2670 dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
2355 2671
2356 dlm_kick_recovery_thread(dlm);
2357 dlm_put(dlm); 2672 dlm_put(dlm);
2358 return 0; 2673 return 0;
2359} 2674}
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index 5be9d14f12cb..0c822f3ffb05 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -39,6 +39,7 @@
39#include <linux/inet.h> 39#include <linux/inet.h>
40#include <linux/timer.h> 40#include <linux/timer.h>
41#include <linux/kthread.h> 41#include <linux/kthread.h>
42#include <linux/delay.h>
42 43
43 44
44#include "cluster/heartbeat.h" 45#include "cluster/heartbeat.h"
@@ -53,6 +54,8 @@
53#include "cluster/masklog.h" 54#include "cluster/masklog.h"
54 55
55static int dlm_thread(void *data); 56static int dlm_thread(void *data);
57static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
58 struct dlm_lock_resource *lockres);
56 59
57static void dlm_flush_asts(struct dlm_ctxt *dlm); 60static void dlm_flush_asts(struct dlm_ctxt *dlm);
58 61
@@ -80,7 +83,7 @@ repeat:
80} 83}
81 84
82 85
83static int __dlm_lockres_unused(struct dlm_lock_resource *res) 86int __dlm_lockres_unused(struct dlm_lock_resource *res)
84{ 87{
85 if (list_empty(&res->granted) && 88 if (list_empty(&res->granted) &&
86 list_empty(&res->converting) && 89 list_empty(&res->converting) &&
@@ -103,6 +106,20 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
103 assert_spin_locked(&res->spinlock); 106 assert_spin_locked(&res->spinlock);
104 107
105 if (__dlm_lockres_unused(res)){ 108 if (__dlm_lockres_unused(res)){
109 /* For now, just keep any resource we master */
110 if (res->owner == dlm->node_num)
111 {
112 if (!list_empty(&res->purge)) {
113 mlog(0, "we master %s:%.*s, but it is on "
114 "the purge list. Removing\n",
115 dlm->name, res->lockname.len,
116 res->lockname.name);
117 list_del_init(&res->purge);
118 dlm->purge_count--;
119 }
120 return;
121 }
122
106 if (list_empty(&res->purge)) { 123 if (list_empty(&res->purge)) {
107 mlog(0, "putting lockres %.*s from purge list\n", 124 mlog(0, "putting lockres %.*s from purge list\n",
108 res->lockname.len, res->lockname.name); 125 res->lockname.len, res->lockname.name);
@@ -110,10 +127,23 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
110 res->last_used = jiffies; 127 res->last_used = jiffies;
111 list_add_tail(&res->purge, &dlm->purge_list); 128 list_add_tail(&res->purge, &dlm->purge_list);
112 dlm->purge_count++; 129 dlm->purge_count++;
130
131 /* if this node is not the owner, there is
132 * no way to keep track of who the owner could be.
133 * unhash it to avoid serious problems. */
134 if (res->owner != dlm->node_num) {
135 mlog(0, "%s:%.*s: doing immediate "
136 "purge of lockres owned by %u\n",
137 dlm->name, res->lockname.len,
138 res->lockname.name, res->owner);
139
140 dlm_purge_lockres_now(dlm, res);
141 }
113 } 142 }
114 } else if (!list_empty(&res->purge)) { 143 } else if (!list_empty(&res->purge)) {
115 mlog(0, "removing lockres %.*s from purge list\n", 144 mlog(0, "removing lockres %.*s from purge list, "
116 res->lockname.len, res->lockname.name); 145 "owner=%u\n", res->lockname.len, res->lockname.name,
146 res->owner);
117 147
118 list_del_init(&res->purge); 148 list_del_init(&res->purge);
119 dlm->purge_count--; 149 dlm->purge_count--;
@@ -165,6 +195,7 @@ again:
165 } else if (ret < 0) { 195 } else if (ret < 0) {
166 mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n", 196 mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
167 lockres->lockname.len, lockres->lockname.name); 197 lockres->lockname.len, lockres->lockname.name);
198 msleep(100);
168 goto again; 199 goto again;
169 } 200 }
170 201
@@ -178,6 +209,24 @@ finish:
178 __dlm_unhash_lockres(lockres); 209 __dlm_unhash_lockres(lockres);
179} 210}
180 211
212/* make an unused lockres go away immediately.
213 * as soon as the dlm spinlock is dropped, this lockres
214 * will not be found. kfree still happens on last put. */
215static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
216 struct dlm_lock_resource *lockres)
217{
218 assert_spin_locked(&dlm->spinlock);
219 assert_spin_locked(&lockres->spinlock);
220
221 BUG_ON(!__dlm_lockres_unused(lockres));
222
223 if (!list_empty(&lockres->purge)) {
224 list_del_init(&lockres->purge);
225 dlm->purge_count--;
226 }
227 __dlm_unhash_lockres(lockres);
228}
229
181static void dlm_run_purge_list(struct dlm_ctxt *dlm, 230static void dlm_run_purge_list(struct dlm_ctxt *dlm,
182 int purge_now) 231 int purge_now)
183{ 232{
@@ -318,8 +367,7 @@ converting:
318 367
319 target->ml.type = target->ml.convert_type; 368 target->ml.type = target->ml.convert_type;
320 target->ml.convert_type = LKM_IVMODE; 369 target->ml.convert_type = LKM_IVMODE;
321 list_del_init(&target->list); 370 list_move_tail(&target->list, &res->granted);
322 list_add_tail(&target->list, &res->granted);
323 371
324 BUG_ON(!target->lksb); 372 BUG_ON(!target->lksb);
325 target->lksb->status = DLM_NORMAL; 373 target->lksb->status = DLM_NORMAL;
@@ -380,8 +428,7 @@ blocked:
380 target->ml.type, target->ml.node); 428 target->ml.type, target->ml.node);
381 429
382 // target->ml.type is already correct 430 // target->ml.type is already correct
383 list_del_init(&target->list); 431 list_move_tail(&target->list, &res->granted);
384 list_add_tail(&target->list, &res->granted);
385 432
386 BUG_ON(!target->lksb); 433 BUG_ON(!target->lksb);
387 target->lksb->status = DLM_NORMAL; 434 target->lksb->status = DLM_NORMAL;
@@ -422,6 +469,8 @@ void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
422 /* don't shuffle secondary queues */ 469 /* don't shuffle secondary queues */
423 if ((res->owner == dlm->node_num) && 470 if ((res->owner == dlm->node_num) &&
424 !(res->state & DLM_LOCK_RES_DIRTY)) { 471 !(res->state & DLM_LOCK_RES_DIRTY)) {
472 /* ref for dirty_list */
473 dlm_lockres_get(res);
425 list_add_tail(&res->dirty, &dlm->dirty_list); 474 list_add_tail(&res->dirty, &dlm->dirty_list);
426 res->state |= DLM_LOCK_RES_DIRTY; 475 res->state |= DLM_LOCK_RES_DIRTY;
427 } 476 }
@@ -606,6 +655,8 @@ static int dlm_thread(void *data)
606 list_del_init(&res->dirty); 655 list_del_init(&res->dirty);
607 spin_unlock(&res->spinlock); 656 spin_unlock(&res->spinlock);
608 spin_unlock(&dlm->spinlock); 657 spin_unlock(&dlm->spinlock);
658 /* Drop dirty_list ref */
659 dlm_lockres_put(res);
609 660
610 /* lockres can be re-dirtied/re-added to the 661 /* lockres can be re-dirtied/re-added to the
611 * dirty_list in this gap, but that is ok */ 662 * dirty_list in this gap, but that is ok */
@@ -642,8 +693,9 @@ static int dlm_thread(void *data)
642 * spinlock and do NOT have the dlm lock. 693 * spinlock and do NOT have the dlm lock.
643 * safe to reserve/queue asts and run the lists. */ 694 * safe to reserve/queue asts and run the lists. */
644 695
645 mlog(0, "calling dlm_shuffle_lists with dlm=%p, " 696 mlog(0, "calling dlm_shuffle_lists with dlm=%s, "
646 "res=%p\n", dlm, res); 697 "res=%.*s\n", dlm->name,
698 res->lockname.len, res->lockname.name);
647 699
648 /* called while holding lockres lock */ 700 /* called while holding lockres lock */
649 dlm_shuffle_lists(dlm, res); 701 dlm_shuffle_lists(dlm, res);
@@ -657,6 +709,8 @@ in_progress:
657 /* if the lock was in-progress, stick 709 /* if the lock was in-progress, stick
658 * it on the back of the list */ 710 * it on the back of the list */
659 if (delay) { 711 if (delay) {
712 /* ref for dirty_list */
713 dlm_lockres_get(res);
660 spin_lock(&res->spinlock); 714 spin_lock(&res->spinlock);
661 list_add_tail(&res->dirty, &dlm->dirty_list); 715 list_add_tail(&res->dirty, &dlm->dirty_list);
662 res->state |= DLM_LOCK_RES_DIRTY; 716 res->state |= DLM_LOCK_RES_DIRTY;
@@ -677,7 +731,7 @@ in_progress:
677 731
678 /* yield and continue right away if there is more work to do */ 732 /* yield and continue right away if there is more work to do */
679 if (!n) { 733 if (!n) {
680 yield(); 734 cond_resched();
681 continue; 735 continue;
682 } 736 }
683 737
diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c
index 7b1a27542674..b0c3134f4f70 100644
--- a/fs/ocfs2/dlm/dlmunlock.c
+++ b/fs/ocfs2/dlm/dlmunlock.c
@@ -271,8 +271,7 @@ void dlm_commit_pending_unlock(struct dlm_lock_resource *res,
271void dlm_commit_pending_cancel(struct dlm_lock_resource *res, 271void dlm_commit_pending_cancel(struct dlm_lock_resource *res,
272 struct dlm_lock *lock) 272 struct dlm_lock *lock)
273{ 273{
274 list_del_init(&lock->list); 274 list_move_tail(&lock->list, &res->granted);
275 list_add_tail(&lock->list, &res->granted);
276 lock->ml.convert_type = LKM_IVMODE; 275 lock->ml.convert_type = LKM_IVMODE;
277} 276}
278 277
@@ -319,6 +318,16 @@ static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
319 318
320 mlog_entry("%.*s\n", res->lockname.len, res->lockname.name); 319 mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
321 320
321 if (owner == dlm->node_num) {
322 /* ended up trying to contact ourself. this means
323 * that the lockres had been remote but became local
324 * via a migration. just retry it, now as local */
325 mlog(0, "%s:%.*s: this node became the master due to a "
326 "migration, re-evaluate now\n", dlm->name,
327 res->lockname.len, res->lockname.name);
328 return DLM_FORWARD;
329 }
330
322 memset(&unlock, 0, sizeof(unlock)); 331 memset(&unlock, 0, sizeof(unlock));
323 unlock.node_idx = dlm->node_num; 332 unlock.node_idx = dlm->node_num;
324 unlock.flags = cpu_to_be32(flags); 333 unlock.flags = cpu_to_be32(flags);
diff --git a/fs/ocfs2/dlm/userdlm.c b/fs/ocfs2/dlm/userdlm.c
index 74ca4e5f9765..e641b084b343 100644
--- a/fs/ocfs2/dlm/userdlm.c
+++ b/fs/ocfs2/dlm/userdlm.c
@@ -672,7 +672,7 @@ struct dlm_ctxt *user_dlm_register_context(struct qstr *name)
672 u32 dlm_key; 672 u32 dlm_key;
673 char *domain; 673 char *domain;
674 674
675 domain = kmalloc(name->len + 1, GFP_KERNEL); 675 domain = kmalloc(name->len + 1, GFP_NOFS);
676 if (!domain) { 676 if (!domain) {
677 mlog_errno(-ENOMEM); 677 mlog_errno(-ENOMEM);
678 return ERR_PTR(-ENOMEM); 678 return ERR_PTR(-ENOMEM);