aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmlock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmlock.c')
-rw-r--r--fs/ocfs2/dlm/dlmlock.c54
1 files changed, 26 insertions, 28 deletions
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index 8d39e0fd66f7..975810b98492 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -183,10 +183,6 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
183 kick_thread = 1; 183 kick_thread = 1;
184 } 184 }
185 } 185 }
186 /* reduce the inflight count, this may result in the lockres
187 * being purged below during calc_usage */
188 if (lock->ml.node == dlm->node_num)
189 dlm_lockres_drop_inflight_ref(dlm, res);
190 186
191 spin_unlock(&res->spinlock); 187 spin_unlock(&res->spinlock);
192 wake_up(&res->wq); 188 wake_up(&res->wq);
@@ -231,10 +227,16 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
231 lock->ml.type, res->lockname.len, 227 lock->ml.type, res->lockname.len,
232 res->lockname.name, flags); 228 res->lockname.name, flags);
233 229
230 /*
231 * Wait if resource is getting recovered, remastered, etc.
232 * If the resource was remastered and new owner is self, then exit.
233 */
234 spin_lock(&res->spinlock); 234 spin_lock(&res->spinlock);
235
236 /* will exit this call with spinlock held */
237 __dlm_wait_on_lockres(res); 235 __dlm_wait_on_lockres(res);
236 if (res->owner == dlm->node_num) {
237 spin_unlock(&res->spinlock);
238 return DLM_RECOVERING;
239 }
238 res->state |= DLM_LOCK_RES_IN_PROGRESS; 240 res->state |= DLM_LOCK_RES_IN_PROGRESS;
239 241
240 /* add lock to local (secondary) queue */ 242 /* add lock to local (secondary) queue */
@@ -319,27 +321,23 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
319 tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, 321 tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create,
320 sizeof(create), res->owner, &status); 322 sizeof(create), res->owner, &status);
321 if (tmpret >= 0) { 323 if (tmpret >= 0) {
322 // successfully sent and received 324 ret = status;
323 ret = status; // this is already a dlm_status
324 if (ret == DLM_REJECTED) { 325 if (ret == DLM_REJECTED) {
325 mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres " 326 mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer "
326 "no longer owned by %u. that node is coming back " 327 "owned by node %u. That node is coming back up "
327 "up currently.\n", dlm->name, create.namelen, 328 "currently.\n", dlm->name, create.namelen,
328 create.name, res->owner); 329 create.name, res->owner);
329 dlm_print_one_lock_resource(res); 330 dlm_print_one_lock_resource(res);
330 BUG(); 331 BUG();
331 } 332 }
332 } else { 333 } else {
333 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " 334 mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to "
334 "node %u\n", tmpret, DLM_CREATE_LOCK_MSG, dlm->key, 335 "node %u\n", dlm->name, create.namelen, create.name,
335 res->owner); 336 tmpret, res->owner);
336 if (dlm_is_host_down(tmpret)) { 337 if (dlm_is_host_down(tmpret))
337 ret = DLM_RECOVERING; 338 ret = DLM_RECOVERING;
338 mlog(0, "node %u died so returning DLM_RECOVERING " 339 else
339 "from lock message!\n", res->owner);
340 } else {
341 ret = dlm_err_to_dlm_status(tmpret); 340 ret = dlm_err_to_dlm_status(tmpret);
342 }
343 } 341 }
344 342
345 return ret; 343 return ret;
@@ -440,7 +438,7 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
440 /* zero memory only if kernel-allocated */ 438 /* zero memory only if kernel-allocated */
441 lksb = kzalloc(sizeof(*lksb), GFP_NOFS); 439 lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
442 if (!lksb) { 440 if (!lksb) {
443 kfree(lock); 441 kmem_cache_free(dlm_lock_cache, lock);
444 return NULL; 442 return NULL;
445 } 443 }
446 kernel_allocated = 1; 444 kernel_allocated = 1;
@@ -718,18 +716,10 @@ retry_lock:
718 716
719 if (status == DLM_RECOVERING || status == DLM_MIGRATING || 717 if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
720 status == DLM_FORWARD) { 718 status == DLM_FORWARD) {
721 mlog(0, "retrying lock with migration/"
722 "recovery/in progress\n");
723 msleep(100); 719 msleep(100);
724 /* no waiting for dlm_reco_thread */
725 if (recovery) { 720 if (recovery) {
726 if (status != DLM_RECOVERING) 721 if (status != DLM_RECOVERING)
727 goto retry_lock; 722 goto retry_lock;
728
729 mlog(0, "%s: got RECOVERING "
730 "for $RECOVERY lock, master "
731 "was %u\n", dlm->name,
732 res->owner);
733 /* wait to see the node go down, then 723 /* wait to see the node go down, then
734 * drop down and allow the lockres to 724 * drop down and allow the lockres to
735 * get cleaned up. need to remaster. */ 725 * get cleaned up. need to remaster. */
@@ -741,6 +731,14 @@ retry_lock:
741 } 731 }
742 } 732 }
743 733
734 /* Inflight taken in dlm_get_lock_resource() is dropped here */
735 spin_lock(&res->spinlock);
736 dlm_lockres_drop_inflight_ref(dlm, res);
737 spin_unlock(&res->spinlock);
738
739 dlm_lockres_calc_usage(dlm, res);
740 dlm_kick_thread(dlm, res);
741
744 if (status != DLM_NORMAL) { 742 if (status != DLM_NORMAL) {
745 lock->lksb->flags &= ~DLM_LKSB_GET_LVB; 743 lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
746 if (status != DLM_NOTQUEUED) 744 if (status != DLM_NOTQUEUED)