aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmmaster.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmmaster.c')
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c124
1 files changed, 92 insertions, 32 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 847dd3cc4cf5..78ac3a00eb54 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -792,7 +792,15 @@ redo_request:
792 mlog_errno(ret); 792 mlog_errno(ret);
793 if (mle->master != O2NM_MAX_NODES) { 793 if (mle->master != O2NM_MAX_NODES) {
794 /* found a master ! */ 794 /* found a master ! */
795 break; 795 if (mle->master <= nodenum)
796 break;
797 /* if our master request has not reached the master
798 * yet, keep going until it does. this is how the
799 * master will know that asserts are needed back to
800 * the lower nodes. */
801 mlog(0, "%s:%.*s: requests only up to %u but master "
802 "is %u, keep going\n", dlm->name, namelen,
803 lockid, nodenum, mle->master);
796 } 804 }
797 } 805 }
798 806
@@ -860,7 +868,19 @@ recheck:
860 /* check if another node has already become the owner */ 868 /* check if another node has already become the owner */
861 spin_lock(&res->spinlock); 869 spin_lock(&res->spinlock);
862 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { 870 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
871 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
872 res->lockname.len, res->lockname.name, res->owner);
863 spin_unlock(&res->spinlock); 873 spin_unlock(&res->spinlock);
874 /* this will cause the master to re-assert across
875 * the whole cluster, freeing up mles */
876 ret = dlm_do_master_request(mle, res->owner);
877 if (ret < 0) {
878 /* give recovery a chance to run */
879 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
880 msleep(500);
881 goto recheck;
882 }
883 ret = 0;
864 goto leave; 884 goto leave;
865 } 885 }
866 spin_unlock(&res->spinlock); 886 spin_unlock(&res->spinlock);
@@ -1244,13 +1264,14 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1244{ 1264{
1245 u8 response = DLM_MASTER_RESP_MAYBE; 1265 u8 response = DLM_MASTER_RESP_MAYBE;
1246 struct dlm_ctxt *dlm = data; 1266 struct dlm_ctxt *dlm = data;
1247 struct dlm_lock_resource *res; 1267 struct dlm_lock_resource *res = NULL;
1248 struct dlm_master_request *request = (struct dlm_master_request *) msg->buf; 1268 struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1249 struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL; 1269 struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1250 char *name; 1270 char *name;
1251 unsigned int namelen; 1271 unsigned int namelen;
1252 int found, ret; 1272 int found, ret;
1253 int set_maybe; 1273 int set_maybe;
1274 int dispatch_assert = 0;
1254 1275
1255 if (!dlm_grab(dlm)) 1276 if (!dlm_grab(dlm))
1256 return DLM_MASTER_RESP_NO; 1277 return DLM_MASTER_RESP_NO;
@@ -1287,7 +1308,6 @@ way_up_top:
1287 } 1308 }
1288 1309
1289 if (res->owner == dlm->node_num) { 1310 if (res->owner == dlm->node_num) {
1290 u32 flags = DLM_ASSERT_MASTER_MLE_CLEANUP;
1291 spin_unlock(&res->spinlock); 1311 spin_unlock(&res->spinlock);
1292 // mlog(0, "this node is the master\n"); 1312 // mlog(0, "this node is the master\n");
1293 response = DLM_MASTER_RESP_YES; 1313 response = DLM_MASTER_RESP_YES;
@@ -1300,16 +1320,7 @@ way_up_top:
1300 * caused all nodes up to this one to 1320 * caused all nodes up to this one to
1301 * create mles. this node now needs to 1321 * create mles. this node now needs to
1302 * go back and clean those up. */ 1322 * go back and clean those up. */
1303 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n", 1323 dispatch_assert = 1;
1304 dlm->node_num, res->lockname.len, res->lockname.name);
1305 ret = dlm_dispatch_assert_master(dlm, res, 1,
1306 request->node_idx,
1307 flags);
1308 if (ret < 0) {
1309 mlog(ML_ERROR, "failed to dispatch assert "
1310 "master work\n");
1311 response = DLM_MASTER_RESP_ERROR;
1312 }
1313 goto send_response; 1324 goto send_response;
1314 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { 1325 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1315 spin_unlock(&res->spinlock); 1326 spin_unlock(&res->spinlock);
@@ -1357,9 +1368,13 @@ way_up_top:
1357 } 1368 }
1358 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) { 1369 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1359 set_maybe = 0; 1370 set_maybe = 0;
1360 if (tmpmle->master == dlm->node_num) 1371 if (tmpmle->master == dlm->node_num) {
1361 response = DLM_MASTER_RESP_YES; 1372 response = DLM_MASTER_RESP_YES;
1362 else 1373 /* this node will be the owner.
1374 * go back and clean the mles on any
1375 * other nodes */
1376 dispatch_assert = 1;
1377 } else
1363 response = DLM_MASTER_RESP_NO; 1378 response = DLM_MASTER_RESP_NO;
1364 } else { 1379 } else {
1365 // mlog(0, "this node is attempting to " 1380 // mlog(0, "this node is attempting to "
@@ -1398,8 +1413,8 @@ way_up_top:
1398 mle = (struct dlm_master_list_entry *) 1413 mle = (struct dlm_master_list_entry *)
1399 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); 1414 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
1400 if (!mle) { 1415 if (!mle) {
1401 // bad bad bad... this sucks.
1402 response = DLM_MASTER_RESP_ERROR; 1416 response = DLM_MASTER_RESP_ERROR;
1417 mlog_errno(-ENOMEM);
1403 goto send_response; 1418 goto send_response;
1404 } 1419 }
1405 spin_lock(&dlm->spinlock); 1420 spin_lock(&dlm->spinlock);
@@ -1418,25 +1433,19 @@ way_up_top:
1418 // mlog(0, "mle was found\n"); 1433 // mlog(0, "mle was found\n");
1419 set_maybe = 1; 1434 set_maybe = 1;
1420 spin_lock(&tmpmle->spinlock); 1435 spin_lock(&tmpmle->spinlock);
1436 if (tmpmle->master == dlm->node_num) {
1437 mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1438 BUG();
1439 }
1421 if (tmpmle->type == DLM_MLE_BLOCK) 1440 if (tmpmle->type == DLM_MLE_BLOCK)
1422 response = DLM_MASTER_RESP_NO; 1441 response = DLM_MASTER_RESP_NO;
1423 else if (tmpmle->type == DLM_MLE_MIGRATION) { 1442 else if (tmpmle->type == DLM_MLE_MIGRATION) {
1424 mlog(0, "migration mle was found (%u->%u)\n", 1443 mlog(0, "migration mle was found (%u->%u)\n",
1425 tmpmle->master, tmpmle->new_master); 1444 tmpmle->master, tmpmle->new_master);
1426 if (tmpmle->master == dlm->node_num) {
1427 mlog(ML_ERROR, "no lockres, but migration mle "
1428 "says that this node is master!\n");
1429 BUG();
1430 }
1431 /* real master can respond on its own */ 1445 /* real master can respond on its own */
1432 response = DLM_MASTER_RESP_NO; 1446 response = DLM_MASTER_RESP_NO;
1433 } else { 1447 } else
1434 if (tmpmle->master == dlm->node_num) { 1448 response = DLM_MASTER_RESP_MAYBE;
1435 response = DLM_MASTER_RESP_YES;
1436 set_maybe = 0;
1437 } else
1438 response = DLM_MASTER_RESP_MAYBE;
1439 }
1440 if (set_maybe) 1449 if (set_maybe)
1441 set_bit(request->node_idx, tmpmle->maybe_map); 1450 set_bit(request->node_idx, tmpmle->maybe_map);
1442 spin_unlock(&tmpmle->spinlock); 1451 spin_unlock(&tmpmle->spinlock);
@@ -1449,6 +1458,24 @@ way_up_top:
1449 dlm_put_mle(tmpmle); 1458 dlm_put_mle(tmpmle);
1450 } 1459 }
1451send_response: 1460send_response:
1461
1462 if (dispatch_assert) {
1463 if (response != DLM_MASTER_RESP_YES)
1464 mlog(ML_ERROR, "invalid response %d\n", response);
1465 if (!res) {
1466 mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1467 BUG();
1468 }
1469 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1470 dlm->node_num, res->lockname.len, res->lockname.name);
1471 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1472 DLM_ASSERT_MASTER_MLE_CLEANUP);
1473 if (ret < 0) {
1474 mlog(ML_ERROR, "failed to dispatch assert master work\n");
1475 response = DLM_MASTER_RESP_ERROR;
1476 }
1477 }
1478
1452 dlm_put(dlm); 1479 dlm_put(dlm);
1453 return response; 1480 return response;
1454} 1481}
@@ -1471,8 +1498,11 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
1471 int to, tmpret; 1498 int to, tmpret;
1472 struct dlm_node_iter iter; 1499 struct dlm_node_iter iter;
1473 int ret = 0; 1500 int ret = 0;
1501 int reassert;
1474 1502
1475 BUG_ON(namelen > O2NM_MAX_NAME_LEN); 1503 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1504again:
1505 reassert = 0;
1476 1506
1477 /* note that if this nodemap is empty, it returns 0 */ 1507 /* note that if this nodemap is empty, it returns 0 */
1478 dlm_node_iter_init(nodemap, &iter); 1508 dlm_node_iter_init(nodemap, &iter);
@@ -1504,9 +1534,17 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
1504 "got %d.\n", namelen, lockname, to, r); 1534 "got %d.\n", namelen, lockname, to, r);
1505 dlm_dump_lock_resources(dlm); 1535 dlm_dump_lock_resources(dlm);
1506 BUG(); 1536 BUG();
1537 } else if (r == EAGAIN) {
1538 mlog(0, "%.*s: node %u create mles on other "
1539 "nodes and requests a re-assert\n",
1540 namelen, lockname, to);
1541 reassert = 1;
1507 } 1542 }
1508 } 1543 }
1509 1544
1545 if (reassert)
1546 goto again;
1547
1510 return ret; 1548 return ret;
1511} 1549}
1512 1550
@@ -1528,6 +1566,8 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1528 char *name; 1566 char *name;
1529 unsigned int namelen; 1567 unsigned int namelen;
1530 u32 flags; 1568 u32 flags;
1569 int master_request = 0;
1570 int ret = 0;
1531 1571
1532 if (!dlm_grab(dlm)) 1572 if (!dlm_grab(dlm))
1533 return 0; 1573 return 0;
@@ -1642,11 +1682,22 @@ ok:
1642 // mlog(0, "woo! got an assert_master from node %u!\n", 1682 // mlog(0, "woo! got an assert_master from node %u!\n",
1643 // assert->node_idx); 1683 // assert->node_idx);
1644 if (mle) { 1684 if (mle) {
1645 int extra_ref; 1685 int extra_ref = 0;
1686 int nn = -1;
1646 1687
1647 spin_lock(&mle->spinlock); 1688 spin_lock(&mle->spinlock);
1648 extra_ref = !!(mle->type == DLM_MLE_BLOCK 1689 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1649 || mle->type == DLM_MLE_MIGRATION); 1690 extra_ref = 1;
1691 else {
1692 /* MASTER mle: if any bits set in the response map
1693 * then the calling node needs to re-assert to clear
1694 * up nodes that this node contacted */
1695 while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1696 nn+1)) < O2NM_MAX_NODES) {
1697 if (nn != dlm->node_num && nn != assert->node_idx)
1698 master_request = 1;
1699 }
1700 }
1650 mle->master = assert->node_idx; 1701 mle->master = assert->node_idx;
1651 atomic_set(&mle->woken, 1); 1702 atomic_set(&mle->woken, 1);
1652 wake_up(&mle->wq); 1703 wake_up(&mle->wq);
@@ -1677,10 +1728,15 @@ ok:
1677 } 1728 }
1678 1729
1679done: 1730done:
1731 ret = 0;
1680 if (res) 1732 if (res)
1681 dlm_lockres_put(res); 1733 dlm_lockres_put(res);
1682 dlm_put(dlm); 1734 dlm_put(dlm);
1683 return 0; 1735 if (master_request) {
1736 mlog(0, "need to tell master to reassert\n");
1737 ret = EAGAIN; // positive. negative would shoot down the node.
1738 }
1739 return ret;
1684 1740
1685kill: 1741kill:
1686 /* kill the caller! */ 1742 /* kill the caller! */
@@ -1713,6 +1769,10 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1713 item->u.am.request_from = request_from; 1769 item->u.am.request_from = request_from;
1714 item->u.am.flags = flags; 1770 item->u.am.flags = flags;
1715 1771
1772 if (ignore_higher)
1773 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
1774 res->lockname.name);
1775
1716 spin_lock(&dlm->work_lock); 1776 spin_lock(&dlm->work_lock);
1717 list_add_tail(&item->list, &dlm->work_list); 1777 list_add_tail(&item->list, &dlm->work_list);
1718 spin_unlock(&dlm->work_lock); 1778 spin_unlock(&dlm->work_lock);