aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm')
-rw-r--r--fs/ocfs2/dlm/dlmapi.h2
-rw-r--r--fs/ocfs2/dlm/dlmast.c2
-rw-r--r--fs/ocfs2/dlm/dlmconvert.c2
-rw-r--r--fs/ocfs2/dlm/dlmdebug.c2
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c2
-rw-r--r--fs/ocfs2/dlm/dlmlock.c2
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c38
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c147
-rw-r--r--fs/ocfs2/dlm/dlmunlock.c8
9 files changed, 133 insertions, 72 deletions
diff --git a/fs/ocfs2/dlm/dlmapi.h b/fs/ocfs2/dlm/dlmapi.h
index b5786a787fab..3cfa114aa391 100644
--- a/fs/ocfs2/dlm/dlmapi.h
+++ b/fs/ocfs2/dlm/dlmapi.h
@@ -95,7 +95,7 @@ const char *dlm_errname(enum dlm_status err);
95 mlog(ML_ERROR, "dlm status = %s\n", dlm_errname((st))); \ 95 mlog(ML_ERROR, "dlm status = %s\n", dlm_errname((st))); \
96} while (0) 96} while (0)
97 97
98#define DLM_LKSB_UNUSED1 0x01 98#define DLM_LKSB_UNUSED1 0x01
99#define DLM_LKSB_PUT_LVB 0x02 99#define DLM_LKSB_PUT_LVB 0x02
100#define DLM_LKSB_GET_LVB 0x04 100#define DLM_LKSB_GET_LVB 0x04
101#define DLM_LKSB_UNUSED2 0x08 101#define DLM_LKSB_UNUSED2 0x08
diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c
index 01cf8cc3d286..dccc439fa087 100644
--- a/fs/ocfs2/dlm/dlmast.c
+++ b/fs/ocfs2/dlm/dlmast.c
@@ -123,7 +123,7 @@ static void __dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock)
123 dlm_lock_put(lock); 123 dlm_lock_put(lock);
124 /* free up the reserved bast that we are cancelling. 124 /* free up the reserved bast that we are cancelling.
125 * guaranteed that this will not be the last reserved 125 * guaranteed that this will not be the last reserved
126 * ast because *both* an ast and a bast were reserved 126 * ast because *both* an ast and a bast were reserved
127 * to get to this point. the res->spinlock will not be 127 * to get to this point. the res->spinlock will not be
128 * taken here */ 128 * taken here */
129 dlm_lockres_release_ast(dlm, res); 129 dlm_lockres_release_ast(dlm, res);
diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c
index ca96bce50e18..f283bce776b4 100644
--- a/fs/ocfs2/dlm/dlmconvert.c
+++ b/fs/ocfs2/dlm/dlmconvert.c
@@ -396,7 +396,7 @@ static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
396 /* instead of logging the same network error over 396 /* instead of logging the same network error over
397 * and over, sleep here and wait for the heartbeat 397 * and over, sleep here and wait for the heartbeat
398 * to notice the node is dead. times out after 5s. */ 398 * to notice the node is dead. times out after 5s. */
399 dlm_wait_for_node_death(dlm, res->owner, 399 dlm_wait_for_node_death(dlm, res->owner,
400 DLM_NODE_DEATH_WAIT_MAX); 400 DLM_NODE_DEATH_WAIT_MAX);
401 ret = DLM_RECOVERING; 401 ret = DLM_RECOVERING;
402 mlog(0, "node %u died so returning DLM_RECOVERING " 402 mlog(0, "node %u died so returning DLM_RECOVERING "
diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c
index 42b0bad7a612..0cd24cf54396 100644
--- a/fs/ocfs2/dlm/dlmdebug.c
+++ b/fs/ocfs2/dlm/dlmdebug.c
@@ -102,7 +102,7 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
102 assert_spin_locked(&res->spinlock); 102 assert_spin_locked(&res->spinlock);
103 103
104 stringify_lockname(res->lockname.name, res->lockname.len, 104 stringify_lockname(res->lockname.name, res->lockname.len,
105 buf, sizeof(buf) - 1); 105 buf, sizeof(buf));
106 printk("lockres: %s, owner=%u, state=%u\n", 106 printk("lockres: %s, owner=%u, state=%u\n",
107 buf, res->owner, res->state); 107 buf, res->owner, res->state);
108 printk(" last used: %lu, refcnt: %u, on purge list: %s\n", 108 printk(" last used: %lu, refcnt: %u, on purge list: %s\n",
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 0334000676d3..988c9055fd4e 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -816,7 +816,7 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
816 } 816 }
817 817
818 /* Once the dlm ctxt is marked as leaving then we don't want 818 /* Once the dlm ctxt is marked as leaving then we don't want
819 * to be put in someone's domain map. 819 * to be put in someone's domain map.
820 * Also, explicitly disallow joining at certain troublesome 820 * Also, explicitly disallow joining at certain troublesome
821 * times (ie. during recovery). */ 821 * times (ie. during recovery). */
822 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) { 822 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index 437698e9465f..733337772671 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -269,7 +269,7 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
269 } 269 }
270 dlm_revert_pending_lock(res, lock); 270 dlm_revert_pending_lock(res, lock);
271 dlm_lock_put(lock); 271 dlm_lock_put(lock);
272 } else if (dlm_is_recovery_lock(res->lockname.name, 272 } else if (dlm_is_recovery_lock(res->lockname.name,
273 res->lockname.len)) { 273 res->lockname.len)) {
274 /* special case for the $RECOVERY lock. 274 /* special case for the $RECOVERY lock.
275 * there will never be an AST delivered to put 275 * there will never be an AST delivered to put
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 03ccf9a7b1f4..a659606dcb95 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -366,7 +366,7 @@ void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
366 struct dlm_master_list_entry *mle; 366 struct dlm_master_list_entry *mle;
367 367
368 assert_spin_locked(&dlm->spinlock); 368 assert_spin_locked(&dlm->spinlock);
369 369
370 list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) { 370 list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
371 if (node_up) 371 if (node_up)
372 dlm_mle_node_up(dlm, mle, NULL, idx); 372 dlm_mle_node_up(dlm, mle, NULL, idx);
@@ -833,7 +833,7 @@ lookup:
833 __dlm_insert_mle(dlm, mle); 833 __dlm_insert_mle(dlm, mle);
834 834
835 /* still holding the dlm spinlock, check the recovery map 835 /* still holding the dlm spinlock, check the recovery map
836 * to see if there are any nodes that still need to be 836 * to see if there are any nodes that still need to be
837 * considered. these will not appear in the mle nodemap 837 * considered. these will not appear in the mle nodemap
838 * but they might own this lockres. wait on them. */ 838 * but they might own this lockres. wait on them. */
839 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); 839 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
@@ -883,7 +883,7 @@ redo_request:
883 msleep(500); 883 msleep(500);
884 } 884 }
885 continue; 885 continue;
886 } 886 }
887 887
888 dlm_kick_recovery_thread(dlm); 888 dlm_kick_recovery_thread(dlm);
889 msleep(1000); 889 msleep(1000);
@@ -939,8 +939,8 @@ wait:
939 res->lockname.name, blocked); 939 res->lockname.name, blocked);
940 if (++tries > 20) { 940 if (++tries > 20) {
941 mlog(ML_ERROR, "%s:%.*s: spinning on " 941 mlog(ML_ERROR, "%s:%.*s: spinning on "
942 "dlm_wait_for_lock_mastery, blocked=%d\n", 942 "dlm_wait_for_lock_mastery, blocked=%d\n",
943 dlm->name, res->lockname.len, 943 dlm->name, res->lockname.len,
944 res->lockname.name, blocked); 944 res->lockname.name, blocked);
945 dlm_print_one_lock_resource(res); 945 dlm_print_one_lock_resource(res);
946 dlm_print_one_mle(mle); 946 dlm_print_one_mle(mle);
@@ -1029,7 +1029,7 @@ recheck:
1029 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked); 1029 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1030 b = (mle->type == DLM_MLE_BLOCK); 1030 b = (mle->type == DLM_MLE_BLOCK);
1031 if ((*blocked && !b) || (!*blocked && b)) { 1031 if ((*blocked && !b) || (!*blocked && b)) {
1032 mlog(0, "%s:%.*s: status change: old=%d new=%d\n", 1032 mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1033 dlm->name, res->lockname.len, res->lockname.name, 1033 dlm->name, res->lockname.len, res->lockname.name,
1034 *blocked, b); 1034 *blocked, b);
1035 *blocked = b; 1035 *blocked = b;
@@ -1602,7 +1602,7 @@ send_response:
1602 } 1602 }
1603 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n", 1603 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1604 dlm->node_num, res->lockname.len, res->lockname.name); 1604 dlm->node_num, res->lockname.len, res->lockname.name);
1605 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, 1605 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1606 DLM_ASSERT_MASTER_MLE_CLEANUP); 1606 DLM_ASSERT_MASTER_MLE_CLEANUP);
1607 if (ret < 0) { 1607 if (ret < 0) {
1608 mlog(ML_ERROR, "failed to dispatch assert master work\n"); 1608 mlog(ML_ERROR, "failed to dispatch assert master work\n");
@@ -1701,7 +1701,7 @@ again:
1701 1701
1702 if (r & DLM_ASSERT_RESPONSE_REASSERT) { 1702 if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1703 mlog(0, "%.*s: node %u create mles on other " 1703 mlog(0, "%.*s: node %u create mles on other "
1704 "nodes and requests a re-assert\n", 1704 "nodes and requests a re-assert\n",
1705 namelen, lockname, to); 1705 namelen, lockname, to);
1706 reassert = 1; 1706 reassert = 1;
1707 } 1707 }
@@ -1812,7 +1812,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1812 spin_unlock(&dlm->master_lock); 1812 spin_unlock(&dlm->master_lock);
1813 spin_unlock(&dlm->spinlock); 1813 spin_unlock(&dlm->spinlock);
1814 goto done; 1814 goto done;
1815 } 1815 }
1816 } 1816 }
1817 } 1817 }
1818 spin_unlock(&dlm->master_lock); 1818 spin_unlock(&dlm->master_lock);
@@ -1883,7 +1883,7 @@ ok:
1883 int extra_ref = 0; 1883 int extra_ref = 0;
1884 int nn = -1; 1884 int nn = -1;
1885 int rr, err = 0; 1885 int rr, err = 0;
1886 1886
1887 spin_lock(&mle->spinlock); 1887 spin_lock(&mle->spinlock);
1888 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) 1888 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1889 extra_ref = 1; 1889 extra_ref = 1;
@@ -1891,7 +1891,7 @@ ok:
1891 /* MASTER mle: if any bits set in the response map 1891 /* MASTER mle: if any bits set in the response map
1892 * then the calling node needs to re-assert to clear 1892 * then the calling node needs to re-assert to clear
1893 * up nodes that this node contacted */ 1893 * up nodes that this node contacted */
1894 while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, 1894 while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1895 nn+1)) < O2NM_MAX_NODES) { 1895 nn+1)) < O2NM_MAX_NODES) {
1896 if (nn != dlm->node_num && nn != assert->node_idx) 1896 if (nn != dlm->node_num && nn != assert->node_idx)
1897 master_request = 1; 1897 master_request = 1;
@@ -2002,7 +2002,7 @@ kill:
2002 __dlm_print_one_lock_resource(res); 2002 __dlm_print_one_lock_resource(res);
2003 spin_unlock(&res->spinlock); 2003 spin_unlock(&res->spinlock);
2004 spin_unlock(&dlm->spinlock); 2004 spin_unlock(&dlm->spinlock);
2005 *ret_data = (void *)res; 2005 *ret_data = (void *)res;
2006 dlm_put(dlm); 2006 dlm_put(dlm);
2007 return -EINVAL; 2007 return -EINVAL;
2008} 2008}
@@ -2040,10 +2040,10 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2040 item->u.am.request_from = request_from; 2040 item->u.am.request_from = request_from;
2041 item->u.am.flags = flags; 2041 item->u.am.flags = flags;
2042 2042
2043 if (ignore_higher) 2043 if (ignore_higher)
2044 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, 2044 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
2045 res->lockname.name); 2045 res->lockname.name);
2046 2046
2047 spin_lock(&dlm->work_lock); 2047 spin_lock(&dlm->work_lock);
2048 list_add_tail(&item->list, &dlm->work_list); 2048 list_add_tail(&item->list, &dlm->work_list);
2049 spin_unlock(&dlm->work_lock); 2049 spin_unlock(&dlm->work_lock);
@@ -2133,7 +2133,7 @@ put:
2133 * think that $RECOVERY is currently mastered by a dead node. If so, 2133 * think that $RECOVERY is currently mastered by a dead node. If so,
2134 * we wait a short time to allow that node to get notified by its own 2134 * we wait a short time to allow that node to get notified by its own
2135 * heartbeat stack, then check again. All $RECOVERY lock resources 2135 * heartbeat stack, then check again. All $RECOVERY lock resources
2136 * mastered by dead nodes are purged when the hearbeat callback is 2136 * mastered by dead nodes are purged when the hearbeat callback is
2137 * fired, so we can know for sure that it is safe to continue once 2137 * fired, so we can know for sure that it is safe to continue once
2138 * the node returns a live node or no node. */ 2138 * the node returns a live node or no node. */
2139static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, 2139static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
@@ -2174,7 +2174,7 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2174 ret = -EAGAIN; 2174 ret = -EAGAIN;
2175 } 2175 }
2176 spin_unlock(&dlm->spinlock); 2176 spin_unlock(&dlm->spinlock);
2177 mlog(0, "%s: reco lock master is %u\n", dlm->name, 2177 mlog(0, "%s: reco lock master is %u\n", dlm->name,
2178 master); 2178 master);
2179 break; 2179 break;
2180 } 2180 }
@@ -2602,7 +2602,7 @@ fail:
2602 2602
2603 mlog(0, "%s:%.*s: timed out during migration\n", 2603 mlog(0, "%s:%.*s: timed out during migration\n",
2604 dlm->name, res->lockname.len, res->lockname.name); 2604 dlm->name, res->lockname.len, res->lockname.name);
2605 /* avoid hang during shutdown when migrating lockres 2605 /* avoid hang during shutdown when migrating lockres
2606 * to a node which also goes down */ 2606 * to a node which also goes down */
2607 if (dlm_is_node_dead(dlm, target)) { 2607 if (dlm_is_node_dead(dlm, target)) {
2608 mlog(0, "%s:%.*s: expected migration " 2608 mlog(0, "%s:%.*s: expected migration "
@@ -2738,7 +2738,7 @@ static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2738 can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING); 2738 can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2739 spin_unlock(&res->spinlock); 2739 spin_unlock(&res->spinlock);
2740 2740
2741 /* target has died, so make the caller break out of the 2741 /* target has died, so make the caller break out of the
2742 * wait_event, but caller must recheck the domain_map */ 2742 * wait_event, but caller must recheck the domain_map */
2743 spin_lock(&dlm->spinlock); 2743 spin_lock(&dlm->spinlock);
2744 if (!test_bit(mig_target, dlm->domain_map)) 2744 if (!test_bit(mig_target, dlm->domain_map))
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 2f9e4e19a4f2..344bcf90cbf4 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -1050,7 +1050,7 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
1050 if (lock->ml.node == dead_node) { 1050 if (lock->ml.node == dead_node) {
1051 mlog(0, "AHA! there was " 1051 mlog(0, "AHA! there was "
1052 "a $RECOVERY lock for dead " 1052 "a $RECOVERY lock for dead "
1053 "node %u (%s)!\n", 1053 "node %u (%s)!\n",
1054 dead_node, dlm->name); 1054 dead_node, dlm->name);
1055 list_del_init(&lock->list); 1055 list_del_init(&lock->list);
1056 dlm_lock_put(lock); 1056 dlm_lock_put(lock);
@@ -1164,6 +1164,39 @@ static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres,
1164 mres->master = master; 1164 mres->master = master;
1165} 1165}
1166 1166
1167static void dlm_prepare_lvb_for_migration(struct dlm_lock *lock,
1168 struct dlm_migratable_lockres *mres,
1169 int queue)
1170{
1171 if (!lock->lksb)
1172 return;
1173
1174 /* Ignore lvb in all locks in the blocked list */
1175 if (queue == DLM_BLOCKED_LIST)
1176 return;
1177
1178 /* Only consider lvbs in locks with granted EX or PR lock levels */
1179 if (lock->ml.type != LKM_EXMODE && lock->ml.type != LKM_PRMODE)
1180 return;
1181
1182 if (dlm_lvb_is_empty(mres->lvb)) {
1183 memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);
1184 return;
1185 }
1186
1187 /* Ensure the lvb copied for migration matches in other valid locks */
1188 if (!memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))
1189 return;
1190
1191 mlog(ML_ERROR, "Mismatched lvb in lock cookie=%u:%llu, name=%.*s, "
1192 "node=%u\n",
1193 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
1194 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
1195 lock->lockres->lockname.len, lock->lockres->lockname.name,
1196 lock->ml.node);
1197 dlm_print_one_lock_resource(lock->lockres);
1198 BUG();
1199}
1167 1200
1168/* returns 1 if this lock fills the network structure, 1201/* returns 1 if this lock fills the network structure,
1169 * 0 otherwise */ 1202 * 0 otherwise */
@@ -1181,20 +1214,7 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
1181 ml->list = queue; 1214 ml->list = queue;
1182 if (lock->lksb) { 1215 if (lock->lksb) {
1183 ml->flags = lock->lksb->flags; 1216 ml->flags = lock->lksb->flags;
1184 /* send our current lvb */ 1217 dlm_prepare_lvb_for_migration(lock, mres, queue);
1185 if (ml->type == LKM_EXMODE ||
1186 ml->type == LKM_PRMODE) {
1187 /* if it is already set, this had better be a PR
1188 * and it has to match */
1189 if (!dlm_lvb_is_empty(mres->lvb) &&
1190 (ml->type == LKM_EXMODE ||
1191 memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
1192 mlog(ML_ERROR, "mismatched lvbs!\n");
1193 dlm_print_one_lock_resource(lock->lockres);
1194 BUG();
1195 }
1196 memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);
1197 }
1198 } 1218 }
1199 ml->node = lock->ml.node; 1219 ml->node = lock->ml.node;
1200 mres->num_locks++; 1220 mres->num_locks++;
@@ -1730,6 +1750,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1730 struct dlm_lock *lock = NULL; 1750 struct dlm_lock *lock = NULL;
1731 u8 from = O2NM_MAX_NODES; 1751 u8 from = O2NM_MAX_NODES;
1732 unsigned int added = 0; 1752 unsigned int added = 0;
1753 __be64 c;
1733 1754
1734 mlog(0, "running %d locks for this lockres\n", mres->num_locks); 1755 mlog(0, "running %d locks for this lockres\n", mres->num_locks);
1735 for (i=0; i<mres->num_locks; i++) { 1756 for (i=0; i<mres->num_locks; i++) {
@@ -1777,19 +1798,48 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1777 /* lock is always created locally first, and 1798 /* lock is always created locally first, and
1778 * destroyed locally last. it must be on the list */ 1799 * destroyed locally last. it must be on the list */
1779 if (!lock) { 1800 if (!lock) {
1780 __be64 c = ml->cookie; 1801 c = ml->cookie;
1781 mlog(ML_ERROR, "could not find local lock " 1802 mlog(ML_ERROR, "Could not find local lock "
1782 "with cookie %u:%llu!\n", 1803 "with cookie %u:%llu, node %u, "
1804 "list %u, flags 0x%x, type %d, "
1805 "conv %d, highest blocked %d\n",
1783 dlm_get_lock_cookie_node(be64_to_cpu(c)), 1806 dlm_get_lock_cookie_node(be64_to_cpu(c)),
1784 dlm_get_lock_cookie_seq(be64_to_cpu(c))); 1807 dlm_get_lock_cookie_seq(be64_to_cpu(c)),
1808 ml->node, ml->list, ml->flags, ml->type,
1809 ml->convert_type, ml->highest_blocked);
1810 __dlm_print_one_lock_resource(res);
1811 BUG();
1812 }
1813
1814 if (lock->ml.node != ml->node) {
1815 c = lock->ml.cookie;
1816 mlog(ML_ERROR, "Mismatched node# in lock "
1817 "cookie %u:%llu, name %.*s, node %u\n",
1818 dlm_get_lock_cookie_node(be64_to_cpu(c)),
1819 dlm_get_lock_cookie_seq(be64_to_cpu(c)),
1820 res->lockname.len, res->lockname.name,
1821 lock->ml.node);
1822 c = ml->cookie;
1823 mlog(ML_ERROR, "Migrate lock cookie %u:%llu, "
1824 "node %u, list %u, flags 0x%x, type %d, "
1825 "conv %d, highest blocked %d\n",
1826 dlm_get_lock_cookie_node(be64_to_cpu(c)),
1827 dlm_get_lock_cookie_seq(be64_to_cpu(c)),
1828 ml->node, ml->list, ml->flags, ml->type,
1829 ml->convert_type, ml->highest_blocked);
1785 __dlm_print_one_lock_resource(res); 1830 __dlm_print_one_lock_resource(res);
1786 BUG(); 1831 BUG();
1787 } 1832 }
1788 BUG_ON(lock->ml.node != ml->node);
1789 1833
1790 if (tmpq != queue) { 1834 if (tmpq != queue) {
1791 mlog(0, "lock was on %u instead of %u for %.*s\n", 1835 c = ml->cookie;
1792 j, ml->list, res->lockname.len, res->lockname.name); 1836 mlog(0, "Lock cookie %u:%llu was on list %u "
1837 "instead of list %u for %.*s\n",
1838 dlm_get_lock_cookie_node(be64_to_cpu(c)),
1839 dlm_get_lock_cookie_seq(be64_to_cpu(c)),
1840 j, ml->list, res->lockname.len,
1841 res->lockname.name);
1842 __dlm_print_one_lock_resource(res);
1793 spin_unlock(&res->spinlock); 1843 spin_unlock(&res->spinlock);
1794 continue; 1844 continue;
1795 } 1845 }
@@ -1839,7 +1889,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1839 * the lvb. */ 1889 * the lvb. */
1840 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); 1890 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
1841 } else { 1891 } else {
1842 /* otherwise, the node is sending its 1892 /* otherwise, the node is sending its
1843 * most recent valid lvb info */ 1893 * most recent valid lvb info */
1844 BUG_ON(ml->type != LKM_EXMODE && 1894 BUG_ON(ml->type != LKM_EXMODE &&
1845 ml->type != LKM_PRMODE); 1895 ml->type != LKM_PRMODE);
@@ -1886,7 +1936,7 @@ skip_lvb:
1886 spin_lock(&res->spinlock); 1936 spin_lock(&res->spinlock);
1887 list_for_each_entry(lock, queue, list) { 1937 list_for_each_entry(lock, queue, list) {
1888 if (lock->ml.cookie == ml->cookie) { 1938 if (lock->ml.cookie == ml->cookie) {
1889 __be64 c = lock->ml.cookie; 1939 c = lock->ml.cookie;
1890 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already " 1940 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
1891 "exists on this lockres!\n", dlm->name, 1941 "exists on this lockres!\n", dlm->name,
1892 res->lockname.len, res->lockname.name, 1942 res->lockname.len, res->lockname.name,
@@ -2114,7 +2164,7 @@ static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
2114 assert_spin_locked(&res->spinlock); 2164 assert_spin_locked(&res->spinlock);
2115 2165
2116 if (res->owner == dlm->node_num) 2166 if (res->owner == dlm->node_num)
2117 /* if this node owned the lockres, and if the dead node 2167 /* if this node owned the lockres, and if the dead node
2118 * had an EX when he died, blank out the lvb */ 2168 * had an EX when he died, blank out the lvb */
2119 search_node = dead_node; 2169 search_node = dead_node;
2120 else { 2170 else {
@@ -2152,7 +2202,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2152 2202
2153 /* this node is the lockres master: 2203 /* this node is the lockres master:
2154 * 1) remove any stale locks for the dead node 2204 * 1) remove any stale locks for the dead node
2155 * 2) if the dead node had an EX when he died, blank out the lvb 2205 * 2) if the dead node had an EX when he died, blank out the lvb
2156 */ 2206 */
2157 assert_spin_locked(&dlm->spinlock); 2207 assert_spin_locked(&dlm->spinlock);
2158 assert_spin_locked(&res->spinlock); 2208 assert_spin_locked(&res->spinlock);
@@ -2193,7 +2243,12 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2193 mlog(0, "%s:%.*s: freed %u locks for dead node %u, " 2243 mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
2194 "dropping ref from lockres\n", dlm->name, 2244 "dropping ref from lockres\n", dlm->name,
2195 res->lockname.len, res->lockname.name, freed, dead_node); 2245 res->lockname.len, res->lockname.name, freed, dead_node);
2196 BUG_ON(!test_bit(dead_node, res->refmap)); 2246 if(!test_bit(dead_node, res->refmap)) {
2247 mlog(ML_ERROR, "%s:%.*s: freed %u locks for dead node %u, "
2248 "but ref was not set\n", dlm->name,
2249 res->lockname.len, res->lockname.name, freed, dead_node);
2250 __dlm_print_one_lock_resource(res);
2251 }
2197 dlm_lockres_clear_refmap_bit(dead_node, res); 2252 dlm_lockres_clear_refmap_bit(dead_node, res);
2198 } else if (test_bit(dead_node, res->refmap)) { 2253 } else if (test_bit(dead_node, res->refmap)) {
2199 mlog(0, "%s:%.*s: dead node %u had a ref, but had " 2254 mlog(0, "%s:%.*s: dead node %u had a ref, but had "
@@ -2260,7 +2315,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
2260 } 2315 }
2261 spin_unlock(&res->spinlock); 2316 spin_unlock(&res->spinlock);
2262 continue; 2317 continue;
2263 } 2318 }
2264 spin_lock(&res->spinlock); 2319 spin_lock(&res->spinlock);
2265 /* zero the lvb if necessary */ 2320 /* zero the lvb if necessary */
2266 dlm_revalidate_lvb(dlm, res, dead_node); 2321 dlm_revalidate_lvb(dlm, res, dead_node);
@@ -2411,7 +2466,7 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st)
2411 * this function on each node racing to become the recovery 2466 * this function on each node racing to become the recovery
2412 * master will not stop attempting this until either: 2467 * master will not stop attempting this until either:
2413 * a) this node gets the EX (and becomes the recovery master), 2468 * a) this node gets the EX (and becomes the recovery master),
2414 * or b) dlm->reco.new_master gets set to some nodenum 2469 * or b) dlm->reco.new_master gets set to some nodenum
2415 * != O2NM_INVALID_NODE_NUM (another node will do the reco). 2470 * != O2NM_INVALID_NODE_NUM (another node will do the reco).
2416 * so each time a recovery master is needed, the entire cluster 2471 * so each time a recovery master is needed, the entire cluster
2417 * will sync at this point. if the new master dies, that will 2472 * will sync at this point. if the new master dies, that will
@@ -2424,7 +2479,7 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
2424 2479
2425 mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n", 2480 mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",
2426 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); 2481 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
2427again: 2482again:
2428 memset(&lksb, 0, sizeof(lksb)); 2483 memset(&lksb, 0, sizeof(lksb));
2429 2484
2430 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, 2485 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
@@ -2437,8 +2492,8 @@ again:
2437 if (ret == DLM_NORMAL) { 2492 if (ret == DLM_NORMAL) {
2438 mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n", 2493 mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
2439 dlm->name, dlm->node_num); 2494 dlm->name, dlm->node_num);
2440 2495
2441 /* got the EX lock. check to see if another node 2496 /* got the EX lock. check to see if another node
2442 * just became the reco master */ 2497 * just became the reco master */
2443 if (dlm_reco_master_ready(dlm)) { 2498 if (dlm_reco_master_ready(dlm)) {
2444 mlog(0, "%s: got reco EX lock, but %u will " 2499 mlog(0, "%s: got reco EX lock, but %u will "
@@ -2451,12 +2506,12 @@ again:
2451 /* see if recovery was already finished elsewhere */ 2506 /* see if recovery was already finished elsewhere */
2452 spin_lock(&dlm->spinlock); 2507 spin_lock(&dlm->spinlock);
2453 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { 2508 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
2454 status = -EINVAL; 2509 status = -EINVAL;
2455 mlog(0, "%s: got reco EX lock, but " 2510 mlog(0, "%s: got reco EX lock, but "
2456 "node got recovered already\n", dlm->name); 2511 "node got recovered already\n", dlm->name);
2457 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { 2512 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
2458 mlog(ML_ERROR, "%s: new master is %u " 2513 mlog(ML_ERROR, "%s: new master is %u "
2459 "but no dead node!\n", 2514 "but no dead node!\n",
2460 dlm->name, dlm->reco.new_master); 2515 dlm->name, dlm->reco.new_master);
2461 BUG(); 2516 BUG();
2462 } 2517 }
@@ -2468,7 +2523,7 @@ again:
2468 * set the master and send the messages to begin recovery */ 2523 * set the master and send the messages to begin recovery */
2469 if (!status) { 2524 if (!status) {
2470 mlog(0, "%s: dead=%u, this=%u, sending " 2525 mlog(0, "%s: dead=%u, this=%u, sending "
2471 "begin_reco now\n", dlm->name, 2526 "begin_reco now\n", dlm->name,
2472 dlm->reco.dead_node, dlm->node_num); 2527 dlm->reco.dead_node, dlm->node_num);
2473 status = dlm_send_begin_reco_message(dlm, 2528 status = dlm_send_begin_reco_message(dlm,
2474 dlm->reco.dead_node); 2529 dlm->reco.dead_node);
@@ -2501,7 +2556,7 @@ again:
2501 mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n", 2556 mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
2502 dlm->name, dlm->node_num); 2557 dlm->name, dlm->node_num);
2503 /* another node is master. wait on 2558 /* another node is master. wait on
2504 * reco.new_master != O2NM_INVALID_NODE_NUM 2559 * reco.new_master != O2NM_INVALID_NODE_NUM
2505 * for at most one second */ 2560 * for at most one second */
2506 wait_event_timeout(dlm->dlm_reco_thread_wq, 2561 wait_event_timeout(dlm->dlm_reco_thread_wq,
2507 dlm_reco_master_ready(dlm), 2562 dlm_reco_master_ready(dlm),
@@ -2589,7 +2644,13 @@ retry:
2589 "begin reco msg (%d)\n", dlm->name, nodenum, ret); 2644 "begin reco msg (%d)\n", dlm->name, nodenum, ret);
2590 ret = 0; 2645 ret = 0;
2591 } 2646 }
2592 if (ret == -EAGAIN) { 2647
2648 /*
2649 * Prior to commit aad1b15310b9bcd59fa81ab8f2b1513b59553ea8,
2650 * dlm_begin_reco_handler() returned EAGAIN and not -EAGAIN.
2651 * We are handling both for compatibility reasons.
2652 */
2653 if (ret == -EAGAIN || ret == EAGAIN) {
2593 mlog(0, "%s: trying to start recovery of node " 2654 mlog(0, "%s: trying to start recovery of node "
2594 "%u, but node %u is waiting for last recovery " 2655 "%u, but node %u is waiting for last recovery "
2595 "to complete, backoff for a bit\n", dlm->name, 2656 "to complete, backoff for a bit\n", dlm->name,
@@ -2599,7 +2660,7 @@ retry:
2599 } 2660 }
2600 if (ret < 0) { 2661 if (ret < 0) {
2601 struct dlm_lock_resource *res; 2662 struct dlm_lock_resource *res;
2602 /* this is now a serious problem, possibly ENOMEM 2663 /* this is now a serious problem, possibly ENOMEM
2603 * in the network stack. must retry */ 2664 * in the network stack. must retry */
2604 mlog_errno(ret); 2665 mlog_errno(ret);
2605 mlog(ML_ERROR, "begin reco of dlm %s to node %u " 2666 mlog(ML_ERROR, "begin reco of dlm %s to node %u "
@@ -2612,7 +2673,7 @@ retry:
2612 } else { 2673 } else {
2613 mlog(ML_ERROR, "recovery lock not found\n"); 2674 mlog(ML_ERROR, "recovery lock not found\n");
2614 } 2675 }
2615 /* sleep for a bit in hopes that we can avoid 2676 /* sleep for a bit in hopes that we can avoid
2616 * another ENOMEM */ 2677 * another ENOMEM */
2617 msleep(100); 2678 msleep(100);
2618 goto retry; 2679 goto retry;
@@ -2664,7 +2725,7 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
2664 } 2725 }
2665 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) { 2726 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) {
2666 mlog(ML_NOTICE, "%s: dead_node previously set to %u, " 2727 mlog(ML_NOTICE, "%s: dead_node previously set to %u, "
2667 "node %u changing it to %u\n", dlm->name, 2728 "node %u changing it to %u\n", dlm->name,
2668 dlm->reco.dead_node, br->node_idx, br->dead_node); 2729 dlm->reco.dead_node, br->node_idx, br->dead_node);
2669 } 2730 }
2670 dlm_set_reco_master(dlm, br->node_idx); 2731 dlm_set_reco_master(dlm, br->node_idx);
@@ -2730,8 +2791,8 @@ stage2:
2730 if (ret < 0) { 2791 if (ret < 0) {
2731 mlog_errno(ret); 2792 mlog_errno(ret);
2732 if (dlm_is_host_down(ret)) { 2793 if (dlm_is_host_down(ret)) {
2733 /* this has no effect on this recovery 2794 /* this has no effect on this recovery
2734 * session, so set the status to zero to 2795 * session, so set the status to zero to
2735 * finish out the last recovery */ 2796 * finish out the last recovery */
2736 mlog(ML_ERROR, "node %u went down after this " 2797 mlog(ML_ERROR, "node %u went down after this "
2737 "node finished recovery.\n", nodenum); 2798 "node finished recovery.\n", nodenum);
@@ -2768,7 +2829,7 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
2768 mlog(0, "%s: node %u finalizing recovery stage%d of " 2829 mlog(0, "%s: node %u finalizing recovery stage%d of "
2769 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage, 2830 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
2770 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master); 2831 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
2771 2832
2772 spin_lock(&dlm->spinlock); 2833 spin_lock(&dlm->spinlock);
2773 2834
2774 if (dlm->reco.new_master != fr->node_idx) { 2835 if (dlm->reco.new_master != fr->node_idx) {
diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c
index 00f53b2aea76..49e29ecd0201 100644
--- a/fs/ocfs2/dlm/dlmunlock.c
+++ b/fs/ocfs2/dlm/dlmunlock.c
@@ -190,8 +190,8 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm,
190 actions &= ~(DLM_UNLOCK_REMOVE_LOCK| 190 actions &= ~(DLM_UNLOCK_REMOVE_LOCK|
191 DLM_UNLOCK_REGRANT_LOCK| 191 DLM_UNLOCK_REGRANT_LOCK|
192 DLM_UNLOCK_CLEAR_CONVERT_TYPE); 192 DLM_UNLOCK_CLEAR_CONVERT_TYPE);
193 } else if (status == DLM_RECOVERING || 193 } else if (status == DLM_RECOVERING ||
194 status == DLM_MIGRATING || 194 status == DLM_MIGRATING ||
195 status == DLM_FORWARD) { 195 status == DLM_FORWARD) {
196 /* must clear the actions because this unlock 196 /* must clear the actions because this unlock
197 * is about to be retried. cannot free or do 197 * is about to be retried. cannot free or do
@@ -661,14 +661,14 @@ retry:
661 if (call_ast) { 661 if (call_ast) {
662 mlog(0, "calling unlockast(%p, %d)\n", data, status); 662 mlog(0, "calling unlockast(%p, %d)\n", data, status);
663 if (is_master) { 663 if (is_master) {
664 /* it is possible that there is one last bast 664 /* it is possible that there is one last bast
665 * pending. make sure it is flushed, then 665 * pending. make sure it is flushed, then
666 * call the unlockast. 666 * call the unlockast.
667 * not an issue if this is a mastered remotely, 667 * not an issue if this is a mastered remotely,
668 * since this lock has been removed from the 668 * since this lock has been removed from the
669 * lockres queues and cannot be found. */ 669 * lockres queues and cannot be found. */
670 dlm_kick_thread(dlm, NULL); 670 dlm_kick_thread(dlm, NULL);
671 wait_event(dlm->ast_wq, 671 wait_event(dlm->ast_wq,
672 dlm_lock_basts_flushed(dlm, lock)); 672 dlm_lock_basts_flushed(dlm, lock));
673 } 673 }
674 (*unlockast)(data, status); 674 (*unlockast)(data, status);