aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmlock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmlock.c')
-rw-r--r--fs/ocfs2/dlm/dlmlock.c68
1 files changed, 52 insertions, 16 deletions
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index 55cda25ae11b..d6f89577e25f 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -201,6 +201,7 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
201 struct dlm_lock *lock, int flags) 201 struct dlm_lock *lock, int flags)
202{ 202{
203 enum dlm_status status = DLM_DENIED; 203 enum dlm_status status = DLM_DENIED;
204 int lockres_changed = 1;
204 205
205 mlog_entry("type=%d\n", lock->ml.type); 206 mlog_entry("type=%d\n", lock->ml.type);
206 mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len, 207 mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len,
@@ -226,8 +227,25 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
226 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 227 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
227 lock->lock_pending = 0; 228 lock->lock_pending = 0;
228 if (status != DLM_NORMAL) { 229 if (status != DLM_NORMAL) {
229 if (status != DLM_NOTQUEUED) 230 if (status == DLM_RECOVERING &&
231 dlm_is_recovery_lock(res->lockname.name,
232 res->lockname.len)) {
233 /* recovery lock was mastered by dead node.
234 * we need to have calc_usage shoot down this
235 * lockres and completely remaster it. */
236 mlog(0, "%s: recovery lock was owned by "
237 "dead node %u, remaster it now.\n",
238 dlm->name, res->owner);
239 } else if (status != DLM_NOTQUEUED) {
240 /*
241 * DO NOT call calc_usage, as this would unhash
242 * the remote lockres before we ever get to use
243 * it. treat as if we never made any change to
244 * the lockres.
245 */
246 lockres_changed = 0;
230 dlm_error(status); 247 dlm_error(status);
248 }
231 dlm_revert_pending_lock(res, lock); 249 dlm_revert_pending_lock(res, lock);
232 dlm_lock_put(lock); 250 dlm_lock_put(lock);
233 } else if (dlm_is_recovery_lock(res->lockname.name, 251 } else if (dlm_is_recovery_lock(res->lockname.name,
@@ -243,7 +261,8 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
243 } 261 }
244 spin_unlock(&res->spinlock); 262 spin_unlock(&res->spinlock);
245 263
246 dlm_lockres_calc_usage(dlm, res); 264 if (lockres_changed)
265 dlm_lockres_calc_usage(dlm, res);
247 266
248 wake_up(&res->wq); 267 wake_up(&res->wq);
249 return status; 268 return status;
@@ -280,6 +299,14 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
280 if (tmpret >= 0) { 299 if (tmpret >= 0) {
281 // successfully sent and received 300 // successfully sent and received
282 ret = status; // this is already a dlm_status 301 ret = status; // this is already a dlm_status
302 if (ret == DLM_REJECTED) {
303 mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres "
304 "no longer owned by %u. that node is coming back "
305 "up currently.\n", dlm->name, create.namelen,
306 create.name, res->owner);
307 dlm_print_one_lock_resource(res);
308 BUG();
309 }
283 } else { 310 } else {
284 mlog_errno(tmpret); 311 mlog_errno(tmpret);
285 if (dlm_is_host_down(tmpret)) { 312 if (dlm_is_host_down(tmpret)) {
@@ -381,13 +408,13 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
381 struct dlm_lock *lock; 408 struct dlm_lock *lock;
382 int kernel_allocated = 0; 409 int kernel_allocated = 0;
383 410
384 lock = kcalloc(1, sizeof(*lock), GFP_KERNEL); 411 lock = kcalloc(1, sizeof(*lock), GFP_NOFS);
385 if (!lock) 412 if (!lock)
386 return NULL; 413 return NULL;
387 414
388 if (!lksb) { 415 if (!lksb) {
389 /* zero memory only if kernel-allocated */ 416 /* zero memory only if kernel-allocated */
390 lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL); 417 lksb = kcalloc(1, sizeof(*lksb), GFP_NOFS);
391 if (!lksb) { 418 if (!lksb) {
392 kfree(lock); 419 kfree(lock);
393 return NULL; 420 return NULL;
@@ -428,11 +455,16 @@ int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data)
428 if (!dlm_grab(dlm)) 455 if (!dlm_grab(dlm))
429 return DLM_REJECTED; 456 return DLM_REJECTED;
430 457
431 mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
432 "Domain %s not fully joined!\n", dlm->name);
433
434 name = create->name; 458 name = create->name;
435 namelen = create->namelen; 459 namelen = create->namelen;
460 status = DLM_REJECTED;
461 if (!dlm_domain_fully_joined(dlm)) {
462 mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
463 "sending a create_lock message for lock %.*s!\n",
464 dlm->name, create->node_idx, namelen, name);
465 dlm_error(status);
466 goto leave;
467 }
436 468
437 status = DLM_IVBUFLEN; 469 status = DLM_IVBUFLEN;
438 if (namelen > DLM_LOCKID_NAME_MAX) { 470 if (namelen > DLM_LOCKID_NAME_MAX) {
@@ -668,18 +700,22 @@ retry_lock:
668 msleep(100); 700 msleep(100);
669 /* no waiting for dlm_reco_thread */ 701 /* no waiting for dlm_reco_thread */
670 if (recovery) { 702 if (recovery) {
671 if (status == DLM_RECOVERING) { 703 if (status != DLM_RECOVERING)
672 mlog(0, "%s: got RECOVERING " 704 goto retry_lock;
673 "for $REOCVERY lock, master " 705
674 "was %u\n", dlm->name, 706 mlog(0, "%s: got RECOVERING "
675 res->owner); 707 "for $RECOVERY lock, master "
676 dlm_wait_for_node_death(dlm, res->owner, 708 "was %u\n", dlm->name,
677 DLM_NODE_DEATH_WAIT_MAX); 709 res->owner);
678 } 710 /* wait to see the node go down, then
711 * drop down and allow the lockres to
712 * get cleaned up. need to remaster. */
713 dlm_wait_for_node_death(dlm, res->owner,
714 DLM_NODE_DEATH_WAIT_MAX);
679 } else { 715 } else {
680 dlm_wait_for_recovery(dlm); 716 dlm_wait_for_recovery(dlm);
717 goto retry_lock;
681 } 718 }
682 goto retry_lock;
683 } 719 }
684 720
685 if (status != DLM_NORMAL) { 721 if (status != DLM_NORMAL) {