aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmmaster.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmmaster.c')
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c227
1 files changed, 195 insertions, 32 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 847dd3cc4cf5..940be4c13b1f 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -239,6 +239,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
239static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm, 239static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
240 struct dlm_lock_resource *res, 240 struct dlm_lock_resource *res,
241 u8 target); 241 u8 target);
242static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
243 struct dlm_lock_resource *res);
242 244
243 245
244int dlm_is_host_down(int errno) 246int dlm_is_host_down(int errno)
@@ -677,6 +679,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
677 struct dlm_node_iter iter; 679 struct dlm_node_iter iter;
678 unsigned int namelen; 680 unsigned int namelen;
679 int tries = 0; 681 int tries = 0;
682 int bit, wait_on_recovery = 0;
680 683
681 BUG_ON(!lockid); 684 BUG_ON(!lockid);
682 685
@@ -762,6 +765,18 @@ lookup:
762 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0); 765 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
763 set_bit(dlm->node_num, mle->maybe_map); 766 set_bit(dlm->node_num, mle->maybe_map);
764 list_add(&mle->list, &dlm->master_list); 767 list_add(&mle->list, &dlm->master_list);
768
769 /* still holding the dlm spinlock, check the recovery map
770 * to see if there are any nodes that still need to be
771 * considered. these will not appear in the mle nodemap
772 * but they might own this lockres. wait on them. */
773 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
774 if (bit < O2NM_MAX_NODES) {
775 mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
776 "recover before lock mastery can begin\n",
777 dlm->name, namelen, (char *)lockid, bit);
778 wait_on_recovery = 1;
779 }
765 } 780 }
766 781
767 /* at this point there is either a DLM_MLE_BLOCK or a 782 /* at this point there is either a DLM_MLE_BLOCK or a
@@ -779,6 +794,39 @@ lookup:
779 spin_unlock(&dlm->master_lock); 794 spin_unlock(&dlm->master_lock);
780 spin_unlock(&dlm->spinlock); 795 spin_unlock(&dlm->spinlock);
781 796
797 while (wait_on_recovery) {
798 /* any cluster changes that occurred after dropping the
799 * dlm spinlock would be detectable be a change on the mle,
800 * so we only need to clear out the recovery map once. */
801 if (dlm_is_recovery_lock(lockid, namelen)) {
802 mlog(ML_NOTICE, "%s: recovery map is not empty, but "
803 "must master $RECOVERY lock now\n", dlm->name);
804 if (!dlm_pre_master_reco_lockres(dlm, res))
805 wait_on_recovery = 0;
806 else {
807 mlog(0, "%s: waiting 500ms for heartbeat state "
808 "change\n", dlm->name);
809 msleep(500);
810 }
811 continue;
812 }
813
814 dlm_kick_recovery_thread(dlm);
815 msleep(100);
816 dlm_wait_for_recovery(dlm);
817
818 spin_lock(&dlm->spinlock);
819 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
820 if (bit < O2NM_MAX_NODES) {
821 mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
822 "recover before lock mastery can begin\n",
823 dlm->name, namelen, (char *)lockid, bit);
824 wait_on_recovery = 1;
825 } else
826 wait_on_recovery = 0;
827 spin_unlock(&dlm->spinlock);
828 }
829
782 /* must wait for lock to be mastered elsewhere */ 830 /* must wait for lock to be mastered elsewhere */
783 if (blocked) 831 if (blocked)
784 goto wait; 832 goto wait;
@@ -792,7 +840,15 @@ redo_request:
792 mlog_errno(ret); 840 mlog_errno(ret);
793 if (mle->master != O2NM_MAX_NODES) { 841 if (mle->master != O2NM_MAX_NODES) {
794 /* found a master ! */ 842 /* found a master ! */
795 break; 843 if (mle->master <= nodenum)
844 break;
845 /* if our master request has not reached the master
846 * yet, keep going until it does. this is how the
847 * master will know that asserts are needed back to
848 * the lower nodes. */
849 mlog(0, "%s:%.*s: requests only up to %u but master "
850 "is %u, keep going\n", dlm->name, namelen,
851 lockid, nodenum, mle->master);
796 } 852 }
797 } 853 }
798 854
@@ -860,7 +916,19 @@ recheck:
860 /* check if another node has already become the owner */ 916 /* check if another node has already become the owner */
861 spin_lock(&res->spinlock); 917 spin_lock(&res->spinlock);
862 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { 918 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
919 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
920 res->lockname.len, res->lockname.name, res->owner);
863 spin_unlock(&res->spinlock); 921 spin_unlock(&res->spinlock);
922 /* this will cause the master to re-assert across
923 * the whole cluster, freeing up mles */
924 ret = dlm_do_master_request(mle, res->owner);
925 if (ret < 0) {
926 /* give recovery a chance to run */
927 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
928 msleep(500);
929 goto recheck;
930 }
931 ret = 0;
864 goto leave; 932 goto leave;
865 } 933 }
866 spin_unlock(&res->spinlock); 934 spin_unlock(&res->spinlock);
@@ -1244,13 +1312,14 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1244{ 1312{
1245 u8 response = DLM_MASTER_RESP_MAYBE; 1313 u8 response = DLM_MASTER_RESP_MAYBE;
1246 struct dlm_ctxt *dlm = data; 1314 struct dlm_ctxt *dlm = data;
1247 struct dlm_lock_resource *res; 1315 struct dlm_lock_resource *res = NULL;
1248 struct dlm_master_request *request = (struct dlm_master_request *) msg->buf; 1316 struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1249 struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL; 1317 struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1250 char *name; 1318 char *name;
1251 unsigned int namelen; 1319 unsigned int namelen;
1252 int found, ret; 1320 int found, ret;
1253 int set_maybe; 1321 int set_maybe;
1322 int dispatch_assert = 0;
1254 1323
1255 if (!dlm_grab(dlm)) 1324 if (!dlm_grab(dlm))
1256 return DLM_MASTER_RESP_NO; 1325 return DLM_MASTER_RESP_NO;
@@ -1287,7 +1356,6 @@ way_up_top:
1287 } 1356 }
1288 1357
1289 if (res->owner == dlm->node_num) { 1358 if (res->owner == dlm->node_num) {
1290 u32 flags = DLM_ASSERT_MASTER_MLE_CLEANUP;
1291 spin_unlock(&res->spinlock); 1359 spin_unlock(&res->spinlock);
1292 // mlog(0, "this node is the master\n"); 1360 // mlog(0, "this node is the master\n");
1293 response = DLM_MASTER_RESP_YES; 1361 response = DLM_MASTER_RESP_YES;
@@ -1300,16 +1368,7 @@ way_up_top:
1300 * caused all nodes up to this one to 1368 * caused all nodes up to this one to
1301 * create mles. this node now needs to 1369 * create mles. this node now needs to
1302 * go back and clean those up. */ 1370 * go back and clean those up. */
1303 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n", 1371 dispatch_assert = 1;
1304 dlm->node_num, res->lockname.len, res->lockname.name);
1305 ret = dlm_dispatch_assert_master(dlm, res, 1,
1306 request->node_idx,
1307 flags);
1308 if (ret < 0) {
1309 mlog(ML_ERROR, "failed to dispatch assert "
1310 "master work\n");
1311 response = DLM_MASTER_RESP_ERROR;
1312 }
1313 goto send_response; 1372 goto send_response;
1314 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { 1373 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1315 spin_unlock(&res->spinlock); 1374 spin_unlock(&res->spinlock);
@@ -1357,9 +1416,13 @@ way_up_top:
1357 } 1416 }
1358 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) { 1417 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1359 set_maybe = 0; 1418 set_maybe = 0;
1360 if (tmpmle->master == dlm->node_num) 1419 if (tmpmle->master == dlm->node_num) {
1361 response = DLM_MASTER_RESP_YES; 1420 response = DLM_MASTER_RESP_YES;
1362 else 1421 /* this node will be the owner.
1422 * go back and clean the mles on any
1423 * other nodes */
1424 dispatch_assert = 1;
1425 } else
1363 response = DLM_MASTER_RESP_NO; 1426 response = DLM_MASTER_RESP_NO;
1364 } else { 1427 } else {
1365 // mlog(0, "this node is attempting to " 1428 // mlog(0, "this node is attempting to "
@@ -1398,8 +1461,8 @@ way_up_top:
1398 mle = (struct dlm_master_list_entry *) 1461 mle = (struct dlm_master_list_entry *)
1399 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); 1462 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
1400 if (!mle) { 1463 if (!mle) {
1401 // bad bad bad... this sucks.
1402 response = DLM_MASTER_RESP_ERROR; 1464 response = DLM_MASTER_RESP_ERROR;
1465 mlog_errno(-ENOMEM);
1403 goto send_response; 1466 goto send_response;
1404 } 1467 }
1405 spin_lock(&dlm->spinlock); 1468 spin_lock(&dlm->spinlock);
@@ -1418,25 +1481,19 @@ way_up_top:
1418 // mlog(0, "mle was found\n"); 1481 // mlog(0, "mle was found\n");
1419 set_maybe = 1; 1482 set_maybe = 1;
1420 spin_lock(&tmpmle->spinlock); 1483 spin_lock(&tmpmle->spinlock);
1484 if (tmpmle->master == dlm->node_num) {
1485 mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1486 BUG();
1487 }
1421 if (tmpmle->type == DLM_MLE_BLOCK) 1488 if (tmpmle->type == DLM_MLE_BLOCK)
1422 response = DLM_MASTER_RESP_NO; 1489 response = DLM_MASTER_RESP_NO;
1423 else if (tmpmle->type == DLM_MLE_MIGRATION) { 1490 else if (tmpmle->type == DLM_MLE_MIGRATION) {
1424 mlog(0, "migration mle was found (%u->%u)\n", 1491 mlog(0, "migration mle was found (%u->%u)\n",
1425 tmpmle->master, tmpmle->new_master); 1492 tmpmle->master, tmpmle->new_master);
1426 if (tmpmle->master == dlm->node_num) {
1427 mlog(ML_ERROR, "no lockres, but migration mle "
1428 "says that this node is master!\n");
1429 BUG();
1430 }
1431 /* real master can respond on its own */ 1493 /* real master can respond on its own */
1432 response = DLM_MASTER_RESP_NO; 1494 response = DLM_MASTER_RESP_NO;
1433 } else { 1495 } else
1434 if (tmpmle->master == dlm->node_num) { 1496 response = DLM_MASTER_RESP_MAYBE;
1435 response = DLM_MASTER_RESP_YES;
1436 set_maybe = 0;
1437 } else
1438 response = DLM_MASTER_RESP_MAYBE;
1439 }
1440 if (set_maybe) 1497 if (set_maybe)
1441 set_bit(request->node_idx, tmpmle->maybe_map); 1498 set_bit(request->node_idx, tmpmle->maybe_map);
1442 spin_unlock(&tmpmle->spinlock); 1499 spin_unlock(&tmpmle->spinlock);
@@ -1449,6 +1506,24 @@ way_up_top:
1449 dlm_put_mle(tmpmle); 1506 dlm_put_mle(tmpmle);
1450 } 1507 }
1451send_response: 1508send_response:
1509
1510 if (dispatch_assert) {
1511 if (response != DLM_MASTER_RESP_YES)
1512 mlog(ML_ERROR, "invalid response %d\n", response);
1513 if (!res) {
1514 mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1515 BUG();
1516 }
1517 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1518 dlm->node_num, res->lockname.len, res->lockname.name);
1519 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1520 DLM_ASSERT_MASTER_MLE_CLEANUP);
1521 if (ret < 0) {
1522 mlog(ML_ERROR, "failed to dispatch assert master work\n");
1523 response = DLM_MASTER_RESP_ERROR;
1524 }
1525 }
1526
1452 dlm_put(dlm); 1527 dlm_put(dlm);
1453 return response; 1528 return response;
1454} 1529}
@@ -1471,8 +1546,11 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
1471 int to, tmpret; 1546 int to, tmpret;
1472 struct dlm_node_iter iter; 1547 struct dlm_node_iter iter;
1473 int ret = 0; 1548 int ret = 0;
1549 int reassert;
1474 1550
1475 BUG_ON(namelen > O2NM_MAX_NAME_LEN); 1551 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1552again:
1553 reassert = 0;
1476 1554
1477 /* note that if this nodemap is empty, it returns 0 */ 1555 /* note that if this nodemap is empty, it returns 0 */
1478 dlm_node_iter_init(nodemap, &iter); 1556 dlm_node_iter_init(nodemap, &iter);
@@ -1504,9 +1582,17 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
1504 "got %d.\n", namelen, lockname, to, r); 1582 "got %d.\n", namelen, lockname, to, r);
1505 dlm_dump_lock_resources(dlm); 1583 dlm_dump_lock_resources(dlm);
1506 BUG(); 1584 BUG();
1585 } else if (r == EAGAIN) {
1586 mlog(0, "%.*s: node %u create mles on other "
1587 "nodes and requests a re-assert\n",
1588 namelen, lockname, to);
1589 reassert = 1;
1507 } 1590 }
1508 } 1591 }
1509 1592
1593 if (reassert)
1594 goto again;
1595
1510 return ret; 1596 return ret;
1511} 1597}
1512 1598
@@ -1528,6 +1614,8 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1528 char *name; 1614 char *name;
1529 unsigned int namelen; 1615 unsigned int namelen;
1530 u32 flags; 1616 u32 flags;
1617 int master_request = 0;
1618 int ret = 0;
1531 1619
1532 if (!dlm_grab(dlm)) 1620 if (!dlm_grab(dlm))
1533 return 0; 1621 return 0;
@@ -1642,11 +1730,22 @@ ok:
1642 // mlog(0, "woo! got an assert_master from node %u!\n", 1730 // mlog(0, "woo! got an assert_master from node %u!\n",
1643 // assert->node_idx); 1731 // assert->node_idx);
1644 if (mle) { 1732 if (mle) {
1645 int extra_ref; 1733 int extra_ref = 0;
1734 int nn = -1;
1646 1735
1647 spin_lock(&mle->spinlock); 1736 spin_lock(&mle->spinlock);
1648 extra_ref = !!(mle->type == DLM_MLE_BLOCK 1737 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1649 || mle->type == DLM_MLE_MIGRATION); 1738 extra_ref = 1;
1739 else {
1740 /* MASTER mle: if any bits set in the response map
1741 * then the calling node needs to re-assert to clear
1742 * up nodes that this node contacted */
1743 while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1744 nn+1)) < O2NM_MAX_NODES) {
1745 if (nn != dlm->node_num && nn != assert->node_idx)
1746 master_request = 1;
1747 }
1748 }
1650 mle->master = assert->node_idx; 1749 mle->master = assert->node_idx;
1651 atomic_set(&mle->woken, 1); 1750 atomic_set(&mle->woken, 1);
1652 wake_up(&mle->wq); 1751 wake_up(&mle->wq);
@@ -1677,10 +1776,15 @@ ok:
1677 } 1776 }
1678 1777
1679done: 1778done:
1779 ret = 0;
1680 if (res) 1780 if (res)
1681 dlm_lockres_put(res); 1781 dlm_lockres_put(res);
1682 dlm_put(dlm); 1782 dlm_put(dlm);
1683 return 0; 1783 if (master_request) {
1784 mlog(0, "need to tell master to reassert\n");
1785 ret = EAGAIN; // positive. negative would shoot down the node.
1786 }
1787 return ret;
1684 1788
1685kill: 1789kill:
1686 /* kill the caller! */ 1790 /* kill the caller! */
@@ -1713,6 +1817,10 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1713 item->u.am.request_from = request_from; 1817 item->u.am.request_from = request_from;
1714 item->u.am.flags = flags; 1818 item->u.am.flags = flags;
1715 1819
1820 if (ignore_higher)
1821 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
1822 res->lockname.name);
1823
1716 spin_lock(&dlm->work_lock); 1824 spin_lock(&dlm->work_lock);
1717 list_add_tail(&item->list, &dlm->work_list); 1825 list_add_tail(&item->list, &dlm->work_list);
1718 spin_unlock(&dlm->work_lock); 1826 spin_unlock(&dlm->work_lock);
@@ -1775,6 +1883,61 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1775 mlog(0, "finished with dlm_assert_master_worker\n"); 1883 mlog(0, "finished with dlm_assert_master_worker\n");
1776} 1884}
1777 1885
1886/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
1887 * We cannot wait for node recovery to complete to begin mastering this
1888 * lockres because this lockres is used to kick off recovery! ;-)
1889 * So, do a pre-check on all living nodes to see if any of those nodes
1890 * think that $RECOVERY is currently mastered by a dead node. If so,
1891 * we wait a short time to allow that node to get notified by its own
1892 * heartbeat stack, then check again. All $RECOVERY lock resources
1893 * mastered by dead nodes are purged when the hearbeat callback is
1894 * fired, so we can know for sure that it is safe to continue once
1895 * the node returns a live node or no node. */
1896static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
1897 struct dlm_lock_resource *res)
1898{
1899 struct dlm_node_iter iter;
1900 int nodenum;
1901 int ret = 0;
1902 u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
1903
1904 spin_lock(&dlm->spinlock);
1905 dlm_node_iter_init(dlm->domain_map, &iter);
1906 spin_unlock(&dlm->spinlock);
1907
1908 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
1909 /* do not send to self */
1910 if (nodenum == dlm->node_num)
1911 continue;
1912 ret = dlm_do_master_requery(dlm, res, nodenum, &master);
1913 if (ret < 0) {
1914 mlog_errno(ret);
1915 if (!dlm_is_host_down(ret))
1916 BUG();
1917 /* host is down, so answer for that node would be
1918 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
1919 }
1920
1921 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1922 /* check to see if this master is in the recovery map */
1923 spin_lock(&dlm->spinlock);
1924 if (test_bit(master, dlm->recovery_map)) {
1925 mlog(ML_NOTICE, "%s: node %u has not seen "
1926 "node %u go down yet, and thinks the "
1927 "dead node is mastering the recovery "
1928 "lock. must wait.\n", dlm->name,
1929 nodenum, master);
1930 ret = -EAGAIN;
1931 }
1932 spin_unlock(&dlm->spinlock);
1933 mlog(0, "%s: reco lock master is %u\n", dlm->name,
1934 master);
1935 break;
1936 }
1937 }
1938 return ret;
1939}
1940
1778 1941
1779/* 1942/*
1780 * DLM_MIGRATE_LOCKRES 1943 * DLM_MIGRATE_LOCKRES