aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmrecovery.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmrecovery.c')
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c123
1 files changed, 116 insertions, 7 deletions
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 367a11e9e2ed..d011a2a22742 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -1129,6 +1129,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
1129 if (total_locks == mres_total_locks) 1129 if (total_locks == mres_total_locks)
1130 mres->flags |= DLM_MRES_ALL_DONE; 1130 mres->flags |= DLM_MRES_ALL_DONE;
1131 1131
1132 mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
1133 dlm->name, res->lockname.len, res->lockname.name,
1134 orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
1135 send_to);
1136
1132 /* send it */ 1137 /* send it */
1133 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, 1138 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
1134 sz, send_to, &status); 1139 sz, send_to, &status);
@@ -1213,6 +1218,34 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
1213 return 0; 1218 return 0;
1214} 1219}
1215 1220
1221static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
1222 struct dlm_migratable_lockres *mres)
1223{
1224 struct dlm_lock dummy;
1225 memset(&dummy, 0, sizeof(dummy));
1226 dummy.ml.cookie = 0;
1227 dummy.ml.type = LKM_IVMODE;
1228 dummy.ml.convert_type = LKM_IVMODE;
1229 dummy.ml.highest_blocked = LKM_IVMODE;
1230 dummy.lksb = NULL;
1231 dummy.ml.node = dlm->node_num;
1232 dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
1233}
1234
1235static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
1236 struct dlm_migratable_lock *ml,
1237 u8 *nodenum)
1238{
1239 if (unlikely(ml->cookie == 0 &&
1240 ml->type == LKM_IVMODE &&
1241 ml->convert_type == LKM_IVMODE &&
1242 ml->highest_blocked == LKM_IVMODE &&
1243 ml->list == DLM_BLOCKED_LIST)) {
1244 *nodenum = ml->node;
1245 return 1;
1246 }
1247 return 0;
1248}
1216 1249
1217int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 1250int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1218 struct dlm_migratable_lockres *mres, 1251 struct dlm_migratable_lockres *mres,
@@ -1260,6 +1293,14 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1260 goto error; 1293 goto error;
1261 } 1294 }
1262 } 1295 }
1296 if (total_locks == 0) {
1297 /* send a dummy lock to indicate a mastery reference only */
1298 mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
1299 dlm->name, res->lockname.len, res->lockname.name,
1300 send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
1301 "migration");
1302 dlm_add_dummy_lock(dlm, mres);
1303 }
1263 /* flush any remaining locks */ 1304 /* flush any remaining locks */
1264 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); 1305 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
1265 if (ret < 0) 1306 if (ret < 0)
@@ -1386,13 +1427,16 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1386 /* add an extra ref for just-allocated lockres 1427 /* add an extra ref for just-allocated lockres
1387 * otherwise the lockres will be purged immediately */ 1428 * otherwise the lockres will be purged immediately */
1388 dlm_lockres_get(res); 1429 dlm_lockres_get(res);
1389
1390 } 1430 }
1391 1431
1392 /* at this point we have allocated everything we need, 1432 /* at this point we have allocated everything we need,
1393 * and we have a hashed lockres with an extra ref and 1433 * and we have a hashed lockres with an extra ref and
1394 * the proper res->state flags. */ 1434 * the proper res->state flags. */
1395 ret = 0; 1435 ret = 0;
1436 spin_lock(&res->spinlock);
1437 /* drop this either when master requery finds a different master
1438 * or when a lock is added by the recovery worker */
1439 dlm_lockres_grab_inflight_ref(dlm, res);
1396 if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1440 if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
1397 /* migration cannot have an unknown master */ 1441 /* migration cannot have an unknown master */
1398 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); 1442 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
@@ -1400,10 +1444,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1400 "unknown owner.. will need to requery: " 1444 "unknown owner.. will need to requery: "
1401 "%.*s\n", mres->lockname_len, mres->lockname); 1445 "%.*s\n", mres->lockname_len, mres->lockname);
1402 } else { 1446 } else {
1403 spin_lock(&res->spinlock); 1447 /* take a reference now to pin the lockres, drop it
1448 * when locks are added in the worker */
1404 dlm_change_lockres_owner(dlm, res, dlm->node_num); 1449 dlm_change_lockres_owner(dlm, res, dlm->node_num);
1405 spin_unlock(&res->spinlock);
1406 } 1450 }
1451 spin_unlock(&res->spinlock);
1407 1452
1408 /* queue up work for dlm_mig_lockres_worker */ 1453 /* queue up work for dlm_mig_lockres_worker */
1409 dlm_grab(dlm); /* get an extra ref for the work item */ 1454 dlm_grab(dlm); /* get an extra ref for the work item */
@@ -1459,6 +1504,9 @@ again:
1459 "this node will take it.\n", 1504 "this node will take it.\n",
1460 res->lockname.len, res->lockname.name); 1505 res->lockname.len, res->lockname.name);
1461 } else { 1506 } else {
1507 spin_lock(&res->spinlock);
1508 dlm_lockres_drop_inflight_ref(dlm, res);
1509 spin_unlock(&res->spinlock);
1462 mlog(0, "master needs to respond to sender " 1510 mlog(0, "master needs to respond to sender "
1463 "that node %u still owns %.*s\n", 1511 "that node %u still owns %.*s\n",
1464 real_master, res->lockname.len, 1512 real_master, res->lockname.len,
@@ -1666,10 +1714,25 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1666 int i, bad; 1714 int i, bad;
1667 struct list_head *iter; 1715 struct list_head *iter;
1668 struct dlm_lock *lock = NULL; 1716 struct dlm_lock *lock = NULL;
1717 u8 from = O2NM_MAX_NODES;
1718 unsigned int added = 0;
1669 1719
1670 mlog(0, "running %d locks for this lockres\n", mres->num_locks); 1720 mlog(0, "running %d locks for this lockres\n", mres->num_locks);
1671 for (i=0; i<mres->num_locks; i++) { 1721 for (i=0; i<mres->num_locks; i++) {
1672 ml = &(mres->ml[i]); 1722 ml = &(mres->ml[i]);
1723
1724 if (dlm_is_dummy_lock(dlm, ml, &from)) {
1725 /* placeholder, just need to set the refmap bit */
1726 BUG_ON(mres->num_locks != 1);
1727 mlog(0, "%s:%.*s: dummy lock for %u\n",
1728 dlm->name, mres->lockname_len, mres->lockname,
1729 from);
1730 spin_lock(&res->spinlock);
1731 dlm_lockres_set_refmap_bit(from, res);
1732 spin_unlock(&res->spinlock);
1733 added++;
1734 break;
1735 }
1673 BUG_ON(ml->highest_blocked != LKM_IVMODE); 1736 BUG_ON(ml->highest_blocked != LKM_IVMODE);
1674 newlock = NULL; 1737 newlock = NULL;
1675 lksb = NULL; 1738 lksb = NULL;
@@ -1711,6 +1774,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1711 /* do not alter lock refcount. switching lists. */ 1774 /* do not alter lock refcount. switching lists. */
1712 list_move_tail(&lock->list, queue); 1775 list_move_tail(&lock->list, queue);
1713 spin_unlock(&res->spinlock); 1776 spin_unlock(&res->spinlock);
1777 added++;
1714 1778
1715 mlog(0, "just reordered a local lock!\n"); 1779 mlog(0, "just reordered a local lock!\n");
1716 continue; 1780 continue;
@@ -1817,12 +1881,24 @@ skip_lvb:
1817 if (!bad) { 1881 if (!bad) {
1818 dlm_lock_get(newlock); 1882 dlm_lock_get(newlock);
1819 list_add_tail(&newlock->list, queue); 1883 list_add_tail(&newlock->list, queue);
1884 mlog(0, "%s:%.*s: added lock for node %u, "
1885 "setting refmap bit\n", dlm->name,
1886 res->lockname.len, res->lockname.name, ml->node);
1887 dlm_lockres_set_refmap_bit(ml->node, res);
1888 added++;
1820 } 1889 }
1821 spin_unlock(&res->spinlock); 1890 spin_unlock(&res->spinlock);
1822 } 1891 }
1823 mlog(0, "done running all the locks\n"); 1892 mlog(0, "done running all the locks\n");
1824 1893
1825leave: 1894leave:
1895 /* balance the ref taken when the work was queued */
1896 if (added > 0) {
1897 spin_lock(&res->spinlock);
1898 dlm_lockres_drop_inflight_ref(dlm, res);
1899 spin_unlock(&res->spinlock);
1900 }
1901
1826 if (ret < 0) { 1902 if (ret < 0) {
1827 mlog_errno(ret); 1903 mlog_errno(ret);
1828 if (newlock) 1904 if (newlock)
@@ -1935,9 +2011,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1935 if (res->owner == dead_node) { 2011 if (res->owner == dead_node) {
1936 list_del_init(&res->recovering); 2012 list_del_init(&res->recovering);
1937 spin_lock(&res->spinlock); 2013 spin_lock(&res->spinlock);
2014 /* new_master has our reference from
2015 * the lock state sent during recovery */
1938 dlm_change_lockres_owner(dlm, res, new_master); 2016 dlm_change_lockres_owner(dlm, res, new_master);
1939 res->state &= ~DLM_LOCK_RES_RECOVERING; 2017 res->state &= ~DLM_LOCK_RES_RECOVERING;
1940 if (!__dlm_lockres_unused(res)) 2018 if (__dlm_lockres_has_locks(res))
1941 __dlm_dirty_lockres(dlm, res); 2019 __dlm_dirty_lockres(dlm, res);
1942 spin_unlock(&res->spinlock); 2020 spin_unlock(&res->spinlock);
1943 wake_up(&res->wq); 2021 wake_up(&res->wq);
@@ -1977,9 +2055,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1977 dlm_lockres_put(res); 2055 dlm_lockres_put(res);
1978 } 2056 }
1979 spin_lock(&res->spinlock); 2057 spin_lock(&res->spinlock);
2058 /* new_master has our reference from
2059 * the lock state sent during recovery */
1980 dlm_change_lockres_owner(dlm, res, new_master); 2060 dlm_change_lockres_owner(dlm, res, new_master);
1981 res->state &= ~DLM_LOCK_RES_RECOVERING; 2061 res->state &= ~DLM_LOCK_RES_RECOVERING;
1982 if (!__dlm_lockres_unused(res)) 2062 if (__dlm_lockres_has_locks(res))
1983 __dlm_dirty_lockres(dlm, res); 2063 __dlm_dirty_lockres(dlm, res);
1984 spin_unlock(&res->spinlock); 2064 spin_unlock(&res->spinlock);
1985 wake_up(&res->wq); 2065 wake_up(&res->wq);
@@ -2048,6 +2128,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2048{ 2128{
2049 struct list_head *iter, *tmpiter; 2129 struct list_head *iter, *tmpiter;
2050 struct dlm_lock *lock; 2130 struct dlm_lock *lock;
2131 unsigned int freed = 0;
2051 2132
2052 /* this node is the lockres master: 2133 /* this node is the lockres master:
2053 * 1) remove any stale locks for the dead node 2134 * 1) remove any stale locks for the dead node
@@ -2062,6 +2143,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2062 if (lock->ml.node == dead_node) { 2143 if (lock->ml.node == dead_node) {
2063 list_del_init(&lock->list); 2144 list_del_init(&lock->list);
2064 dlm_lock_put(lock); 2145 dlm_lock_put(lock);
2146 freed++;
2065 } 2147 }
2066 } 2148 }
2067 list_for_each_safe(iter, tmpiter, &res->converting) { 2149 list_for_each_safe(iter, tmpiter, &res->converting) {
@@ -2069,6 +2151,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2069 if (lock->ml.node == dead_node) { 2151 if (lock->ml.node == dead_node) {
2070 list_del_init(&lock->list); 2152 list_del_init(&lock->list);
2071 dlm_lock_put(lock); 2153 dlm_lock_put(lock);
2154 freed++;
2072 } 2155 }
2073 } 2156 }
2074 list_for_each_safe(iter, tmpiter, &res->blocked) { 2157 list_for_each_safe(iter, tmpiter, &res->blocked) {
@@ -2076,9 +2159,23 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2076 if (lock->ml.node == dead_node) { 2159 if (lock->ml.node == dead_node) {
2077 list_del_init(&lock->list); 2160 list_del_init(&lock->list);
2078 dlm_lock_put(lock); 2161 dlm_lock_put(lock);
2162 freed++;
2079 } 2163 }
2080 } 2164 }
2081 2165
2166 if (freed) {
2167 mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
2168 "dropping ref from lockres\n", dlm->name,
2169 res->lockname.len, res->lockname.name, freed, dead_node);
2170 BUG_ON(!test_bit(dead_node, res->refmap));
2171 dlm_lockres_clear_refmap_bit(dead_node, res);
2172 } else if (test_bit(dead_node, res->refmap)) {
2173 mlog(0, "%s:%.*s: dead node %u had a ref, but had "
2174 "no locks and had not purged before dying\n", dlm->name,
2175 res->lockname.len, res->lockname.name, dead_node);
2176 dlm_lockres_clear_refmap_bit(dead_node, res);
2177 }
2178
2082 /* do not kick thread yet */ 2179 /* do not kick thread yet */
2083 __dlm_dirty_lockres(dlm, res); 2180 __dlm_dirty_lockres(dlm, res);
2084} 2181}
@@ -2141,9 +2238,21 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
2141 spin_lock(&res->spinlock); 2238 spin_lock(&res->spinlock);
2142 /* zero the lvb if necessary */ 2239 /* zero the lvb if necessary */
2143 dlm_revalidate_lvb(dlm, res, dead_node); 2240 dlm_revalidate_lvb(dlm, res, dead_node);
2144 if (res->owner == dead_node) 2241 if (res->owner == dead_node) {
2242 if (res->state & DLM_LOCK_RES_DROPPING_REF)
2243 mlog(0, "%s:%.*s: owned by "
2244 "dead node %u, this node was "
2245 "dropping its ref when it died. "
2246 "continue, dropping the flag.\n",
2247 dlm->name, res->lockname.len,
2248 res->lockname.name, dead_node);
2249
2250 /* the wake_up for this will happen when the
2251 * RECOVERING flag is dropped later */
2252 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
2253
2145 dlm_move_lockres_to_recovery_list(dlm, res); 2254 dlm_move_lockres_to_recovery_list(dlm, res);
2146 else if (res->owner == dlm->node_num) { 2255 } else if (res->owner == dlm->node_num) {
2147 dlm_free_dead_locks(dlm, res, dead_node); 2256 dlm_free_dead_locks(dlm, res, dead_node);
2148 __dlm_lockres_calc_usage(dlm, res); 2257 __dlm_lockres_calc_usage(dlm, res);
2149 } 2258 }