aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmmaster.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmmaster.c')
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c394
1 files changed, 352 insertions, 42 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 0ad872055cb3..4645ec2e0fc3 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -99,9 +99,9 @@ static void dlm_mle_node_up(struct dlm_ctxt *dlm,
99 int idx); 99 int idx);
100 100
101static void dlm_assert_master_worker(struct dlm_work_item *item, void *data); 101static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
102static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, 102static int dlm_do_assert_master(struct dlm_ctxt *dlm,
103 unsigned int namelen, void *nodemap, 103 struct dlm_lock_resource *res,
104 u32 flags); 104 void *nodemap, u32 flags);
105 105
106static inline int dlm_mle_equal(struct dlm_ctxt *dlm, 106static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
107 struct dlm_master_list_entry *mle, 107 struct dlm_master_list_entry *mle,
@@ -237,7 +237,8 @@ static int dlm_find_mle(struct dlm_ctxt *dlm,
237 struct dlm_master_list_entry **mle, 237 struct dlm_master_list_entry **mle,
238 char *name, unsigned int namelen); 238 char *name, unsigned int namelen);
239 239
240static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to); 240static int dlm_do_master_request(struct dlm_lock_resource *res,
241 struct dlm_master_list_entry *mle, int to);
241 242
242 243
243static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm, 244static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
@@ -687,6 +688,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
687 INIT_LIST_HEAD(&res->purge); 688 INIT_LIST_HEAD(&res->purge);
688 atomic_set(&res->asts_reserved, 0); 689 atomic_set(&res->asts_reserved, 0);
689 res->migration_pending = 0; 690 res->migration_pending = 0;
691 res->inflight_locks = 0;
690 692
691 kref_init(&res->refs); 693 kref_init(&res->refs);
692 694
@@ -700,6 +702,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
700 res->last_used = 0; 702 res->last_used = 0;
701 703
702 memset(res->lvb, 0, DLM_LVB_LEN); 704 memset(res->lvb, 0, DLM_LVB_LEN);
705 memset(res->refmap, 0, sizeof(res->refmap));
703} 706}
704 707
705struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, 708struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
@@ -722,6 +725,42 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
722 return res; 725 return res;
723} 726}
724 727
728void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
729 struct dlm_lock_resource *res,
730 int new_lockres,
731 const char *file,
732 int line)
733{
734 if (!new_lockres)
735 assert_spin_locked(&res->spinlock);
736
737 if (!test_bit(dlm->node_num, res->refmap)) {
738 BUG_ON(res->inflight_locks != 0);
739 dlm_lockres_set_refmap_bit(dlm->node_num, res);
740 }
741 res->inflight_locks++;
742 mlog(0, "%s:%.*s: inflight++: now %u\n",
743 dlm->name, res->lockname.len, res->lockname.name,
744 res->inflight_locks);
745}
746
747void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
748 struct dlm_lock_resource *res,
749 const char *file,
750 int line)
751{
752 assert_spin_locked(&res->spinlock);
753
754 BUG_ON(res->inflight_locks == 0);
755 res->inflight_locks--;
756 mlog(0, "%s:%.*s: inflight--: now %u\n",
757 dlm->name, res->lockname.len, res->lockname.name,
758 res->inflight_locks);
759 if (res->inflight_locks == 0)
760 dlm_lockres_clear_refmap_bit(dlm->node_num, res);
761 wake_up(&res->wq);
762}
763
725/* 764/*
726 * lookup a lock resource by name. 765 * lookup a lock resource by name.
727 * may already exist in the hashtable. 766 * may already exist in the hashtable.
@@ -752,6 +791,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
752 unsigned int hash; 791 unsigned int hash;
753 int tries = 0; 792 int tries = 0;
754 int bit, wait_on_recovery = 0; 793 int bit, wait_on_recovery = 0;
794 int drop_inflight_if_nonlocal = 0;
755 795
756 BUG_ON(!lockid); 796 BUG_ON(!lockid);
757 797
@@ -761,9 +801,30 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
761 801
762lookup: 802lookup:
763 spin_lock(&dlm->spinlock); 803 spin_lock(&dlm->spinlock);
764 tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash); 804 tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
765 if (tmpres) { 805 if (tmpres) {
806 int dropping_ref = 0;
807
808 spin_lock(&tmpres->spinlock);
809 if (tmpres->owner == dlm->node_num) {
810 BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
811 dlm_lockres_grab_inflight_ref(dlm, tmpres);
812 } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
813 dropping_ref = 1;
814 spin_unlock(&tmpres->spinlock);
766 spin_unlock(&dlm->spinlock); 815 spin_unlock(&dlm->spinlock);
816
817 /* wait until done messaging the master, drop our ref to allow
818 * the lockres to be purged, start over. */
819 if (dropping_ref) {
820 spin_lock(&tmpres->spinlock);
821 __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
822 spin_unlock(&tmpres->spinlock);
823 dlm_lockres_put(tmpres);
824 tmpres = NULL;
825 goto lookup;
826 }
827
767 mlog(0, "found in hash!\n"); 828 mlog(0, "found in hash!\n");
768 if (res) 829 if (res)
769 dlm_lockres_put(res); 830 dlm_lockres_put(res);
@@ -793,6 +854,7 @@ lookup:
793 spin_lock(&res->spinlock); 854 spin_lock(&res->spinlock);
794 dlm_change_lockres_owner(dlm, res, dlm->node_num); 855 dlm_change_lockres_owner(dlm, res, dlm->node_num);
795 __dlm_insert_lockres(dlm, res); 856 __dlm_insert_lockres(dlm, res);
857 dlm_lockres_grab_inflight_ref(dlm, res);
796 spin_unlock(&res->spinlock); 858 spin_unlock(&res->spinlock);
797 spin_unlock(&dlm->spinlock); 859 spin_unlock(&dlm->spinlock);
798 /* lockres still marked IN_PROGRESS */ 860 /* lockres still marked IN_PROGRESS */
@@ -805,29 +867,40 @@ lookup:
805 /* if we found a block, wait for lock to be mastered by another node */ 867 /* if we found a block, wait for lock to be mastered by another node */
806 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen); 868 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
807 if (blocked) { 869 if (blocked) {
870 int mig;
808 if (mle->type == DLM_MLE_MASTER) { 871 if (mle->type == DLM_MLE_MASTER) {
809 mlog(ML_ERROR, "master entry for nonexistent lock!\n"); 872 mlog(ML_ERROR, "master entry for nonexistent lock!\n");
810 BUG(); 873 BUG();
811 } else if (mle->type == DLM_MLE_MIGRATION) { 874 }
812 /* migration is in progress! */ 875 mig = (mle->type == DLM_MLE_MIGRATION);
813 /* the good news is that we now know the 876 /* if there is a migration in progress, let the migration
814 * "current" master (mle->master). */ 877 * finish before continuing. we can wait for the absence
815 878 * of the MIGRATION mle: either the migrate finished or
879 * one of the nodes died and the mle was cleaned up.
880 * if there is a BLOCK here, but it already has a master
881 * set, we are too late. the master does not have a ref
882 * for us in the refmap. detach the mle and drop it.
883 * either way, go back to the top and start over. */
884 if (mig || mle->master != O2NM_MAX_NODES) {
885 BUG_ON(mig && mle->master == dlm->node_num);
886 /* we arrived too late. the master does not
887 * have a ref for us. retry. */
888 mlog(0, "%s:%.*s: late on %s\n",
889 dlm->name, namelen, lockid,
890 mig ? "MIGRATION" : "BLOCK");
816 spin_unlock(&dlm->master_lock); 891 spin_unlock(&dlm->master_lock);
817 assert_spin_locked(&dlm->spinlock);
818
819 /* set the lockres owner and hash it */
820 spin_lock(&res->spinlock);
821 dlm_set_lockres_owner(dlm, res, mle->master);
822 __dlm_insert_lockres(dlm, res);
823 spin_unlock(&res->spinlock);
824 spin_unlock(&dlm->spinlock); 892 spin_unlock(&dlm->spinlock);
825 893
826 /* master is known, detach */ 894 /* master is known, detach */
827 dlm_mle_detach_hb_events(dlm, mle); 895 if (!mig)
896 dlm_mle_detach_hb_events(dlm, mle);
828 dlm_put_mle(mle); 897 dlm_put_mle(mle);
829 mle = NULL; 898 mle = NULL;
830 goto wake_waiters; 899 /* this is lame, but we cant wait on either
900 * the mle or lockres waitqueue here */
901 if (mig)
902 msleep(100);
903 goto lookup;
831 } 904 }
832 } else { 905 } else {
833 /* go ahead and try to master lock on this node */ 906 /* go ahead and try to master lock on this node */
@@ -858,6 +931,13 @@ lookup:
858 931
859 /* finally add the lockres to its hash bucket */ 932 /* finally add the lockres to its hash bucket */
860 __dlm_insert_lockres(dlm, res); 933 __dlm_insert_lockres(dlm, res);
934 /* since this lockres is new it doesnt not require the spinlock */
935 dlm_lockres_grab_inflight_ref_new(dlm, res);
936
937 /* if this node does not become the master make sure to drop
938 * this inflight reference below */
939 drop_inflight_if_nonlocal = 1;
940
861 /* get an extra ref on the mle in case this is a BLOCK 941 /* get an extra ref on the mle in case this is a BLOCK
862 * if so, the creator of the BLOCK may try to put the last 942 * if so, the creator of the BLOCK may try to put the last
863 * ref at this time in the assert master handler, so we 943 * ref at this time in the assert master handler, so we
@@ -910,7 +990,7 @@ redo_request:
910 ret = -EINVAL; 990 ret = -EINVAL;
911 dlm_node_iter_init(mle->vote_map, &iter); 991 dlm_node_iter_init(mle->vote_map, &iter);
912 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 992 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
913 ret = dlm_do_master_request(mle, nodenum); 993 ret = dlm_do_master_request(res, mle, nodenum);
914 if (ret < 0) 994 if (ret < 0)
915 mlog_errno(ret); 995 mlog_errno(ret);
916 if (mle->master != O2NM_MAX_NODES) { 996 if (mle->master != O2NM_MAX_NODES) {
@@ -960,6 +1040,8 @@ wait:
960 1040
961wake_waiters: 1041wake_waiters:
962 spin_lock(&res->spinlock); 1042 spin_lock(&res->spinlock);
1043 if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
1044 dlm_lockres_drop_inflight_ref(dlm, res);
963 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 1045 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
964 spin_unlock(&res->spinlock); 1046 spin_unlock(&res->spinlock);
965 wake_up(&res->wq); 1047 wake_up(&res->wq);
@@ -998,7 +1080,7 @@ recheck:
998 /* this will cause the master to re-assert across 1080 /* this will cause the master to re-assert across
999 * the whole cluster, freeing up mles */ 1081 * the whole cluster, freeing up mles */
1000 if (res->owner != dlm->node_num) { 1082 if (res->owner != dlm->node_num) {
1001 ret = dlm_do_master_request(mle, res->owner); 1083 ret = dlm_do_master_request(res, mle, res->owner);
1002 if (ret < 0) { 1084 if (ret < 0) {
1003 /* give recovery a chance to run */ 1085 /* give recovery a chance to run */
1004 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); 1086 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
@@ -1062,6 +1144,8 @@ recheck:
1062 * now tell other nodes that I am 1144 * now tell other nodes that I am
1063 * mastering this. */ 1145 * mastering this. */
1064 mle->master = dlm->node_num; 1146 mle->master = dlm->node_num;
1147 /* ref was grabbed in get_lock_resource
1148 * will be dropped in dlmlock_master */
1065 assert = 1; 1149 assert = 1;
1066 sleep = 0; 1150 sleep = 0;
1067 } 1151 }
@@ -1087,7 +1171,8 @@ recheck:
1087 (atomic_read(&mle->woken) == 1), 1171 (atomic_read(&mle->woken) == 1),
1088 timeo); 1172 timeo);
1089 if (res->owner == O2NM_MAX_NODES) { 1173 if (res->owner == O2NM_MAX_NODES) {
1090 mlog(0, "waiting again\n"); 1174 mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1175 res->lockname.len, res->lockname.name);
1091 goto recheck; 1176 goto recheck;
1092 } 1177 }
1093 mlog(0, "done waiting, master is %u\n", res->owner); 1178 mlog(0, "done waiting, master is %u\n", res->owner);
@@ -1100,8 +1185,7 @@ recheck:
1100 m = dlm->node_num; 1185 m = dlm->node_num;
1101 mlog(0, "about to master %.*s here, this=%u\n", 1186 mlog(0, "about to master %.*s here, this=%u\n",
1102 res->lockname.len, res->lockname.name, m); 1187 res->lockname.len, res->lockname.name, m);
1103 ret = dlm_do_assert_master(dlm, res->lockname.name, 1188 ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1104 res->lockname.len, mle->vote_map, 0);
1105 if (ret) { 1189 if (ret) {
1106 /* This is a failure in the network path, 1190 /* This is a failure in the network path,
1107 * not in the response to the assert_master 1191 * not in the response to the assert_master
@@ -1117,6 +1201,8 @@ recheck:
1117 1201
1118 /* set the lockres owner */ 1202 /* set the lockres owner */
1119 spin_lock(&res->spinlock); 1203 spin_lock(&res->spinlock);
1204 /* mastery reference obtained either during
1205 * assert_master_handler or in get_lock_resource */
1120 dlm_change_lockres_owner(dlm, res, m); 1206 dlm_change_lockres_owner(dlm, res, m);
1121 spin_unlock(&res->spinlock); 1207 spin_unlock(&res->spinlock);
1122 1208
@@ -1283,7 +1369,8 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1283 * 1369 *
1284 */ 1370 */
1285 1371
1286static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to) 1372static int dlm_do_master_request(struct dlm_lock_resource *res,
1373 struct dlm_master_list_entry *mle, int to)
1287{ 1374{
1288 struct dlm_ctxt *dlm = mle->dlm; 1375 struct dlm_ctxt *dlm = mle->dlm;
1289 struct dlm_master_request request; 1376 struct dlm_master_request request;
@@ -1339,6 +1426,9 @@ again:
1339 case DLM_MASTER_RESP_YES: 1426 case DLM_MASTER_RESP_YES:
1340 set_bit(to, mle->response_map); 1427 set_bit(to, mle->response_map);
1341 mlog(0, "node %u is the master, response=YES\n", to); 1428 mlog(0, "node %u is the master, response=YES\n", to);
1429 mlog(0, "%s:%.*s: master node %u now knows I have a "
1430 "reference\n", dlm->name, res->lockname.len,
1431 res->lockname.name, to);
1342 mle->master = to; 1432 mle->master = to;
1343 break; 1433 break;
1344 case DLM_MASTER_RESP_NO: 1434 case DLM_MASTER_RESP_NO:
@@ -1428,8 +1518,10 @@ way_up_top:
1428 } 1518 }
1429 1519
1430 if (res->owner == dlm->node_num) { 1520 if (res->owner == dlm->node_num) {
1521 mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1522 dlm->name, namelen, name, request->node_idx);
1523 dlm_lockres_set_refmap_bit(request->node_idx, res);
1431 spin_unlock(&res->spinlock); 1524 spin_unlock(&res->spinlock);
1432 // mlog(0, "this node is the master\n");
1433 response = DLM_MASTER_RESP_YES; 1525 response = DLM_MASTER_RESP_YES;
1434 if (mle) 1526 if (mle)
1435 kmem_cache_free(dlm_mle_cache, mle); 1527 kmem_cache_free(dlm_mle_cache, mle);
@@ -1477,7 +1569,6 @@ way_up_top:
1477 mlog(0, "node %u is master, but trying to migrate to " 1569 mlog(0, "node %u is master, but trying to migrate to "
1478 "node %u.\n", tmpmle->master, tmpmle->new_master); 1570 "node %u.\n", tmpmle->master, tmpmle->new_master);
1479 if (tmpmle->master == dlm->node_num) { 1571 if (tmpmle->master == dlm->node_num) {
1480 response = DLM_MASTER_RESP_YES;
1481 mlog(ML_ERROR, "no owner on lockres, but this " 1572 mlog(ML_ERROR, "no owner on lockres, but this "
1482 "node is trying to migrate it to %u?!\n", 1573 "node is trying to migrate it to %u?!\n",
1483 tmpmle->new_master); 1574 tmpmle->new_master);
@@ -1494,6 +1585,10 @@ way_up_top:
1494 * go back and clean the mles on any 1585 * go back and clean the mles on any
1495 * other nodes */ 1586 * other nodes */
1496 dispatch_assert = 1; 1587 dispatch_assert = 1;
1588 dlm_lockres_set_refmap_bit(request->node_idx, res);
1589 mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1590 dlm->name, namelen, name,
1591 request->node_idx);
1497 } else 1592 } else
1498 response = DLM_MASTER_RESP_NO; 1593 response = DLM_MASTER_RESP_NO;
1499 } else { 1594 } else {
@@ -1607,15 +1702,17 @@ send_response:
1607 * can periodically run all locks owned by this node 1702 * can periodically run all locks owned by this node
1608 * and re-assert across the cluster... 1703 * and re-assert across the cluster...
1609 */ 1704 */
1610static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, 1705int dlm_do_assert_master(struct dlm_ctxt *dlm,
1611 unsigned int namelen, void *nodemap, 1706 struct dlm_lock_resource *res,
1612 u32 flags) 1707 void *nodemap, u32 flags)
1613{ 1708{
1614 struct dlm_assert_master assert; 1709 struct dlm_assert_master assert;
1615 int to, tmpret; 1710 int to, tmpret;
1616 struct dlm_node_iter iter; 1711 struct dlm_node_iter iter;
1617 int ret = 0; 1712 int ret = 0;
1618 int reassert; 1713 int reassert;
1714 const char *lockname = res->lockname.name;
1715 unsigned int namelen = res->lockname.len;
1619 1716
1620 BUG_ON(namelen > O2NM_MAX_NAME_LEN); 1717 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1621again: 1718again:
@@ -1647,6 +1744,7 @@ again:
1647 mlog(0, "link to %d went down!\n", to); 1744 mlog(0, "link to %d went down!\n", to);
1648 /* any nonzero status return will do */ 1745 /* any nonzero status return will do */
1649 ret = tmpret; 1746 ret = tmpret;
1747 r = 0;
1650 } else if (r < 0) { 1748 } else if (r < 0) {
1651 /* ok, something horribly messed. kill thyself. */ 1749 /* ok, something horribly messed. kill thyself. */
1652 mlog(ML_ERROR,"during assert master of %.*s to %u, " 1750 mlog(ML_ERROR,"during assert master of %.*s to %u, "
@@ -1661,12 +1759,29 @@ again:
1661 spin_unlock(&dlm->master_lock); 1759 spin_unlock(&dlm->master_lock);
1662 spin_unlock(&dlm->spinlock); 1760 spin_unlock(&dlm->spinlock);
1663 BUG(); 1761 BUG();
1664 } else if (r == EAGAIN) { 1762 }
1763
1764 if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1765 !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1766 mlog(ML_ERROR, "%.*s: very strange, "
1767 "master MLE but no lockres on %u\n",
1768 namelen, lockname, to);
1769 }
1770
1771 if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1665 mlog(0, "%.*s: node %u create mles on other " 1772 mlog(0, "%.*s: node %u create mles on other "
1666 "nodes and requests a re-assert\n", 1773 "nodes and requests a re-assert\n",
1667 namelen, lockname, to); 1774 namelen, lockname, to);
1668 reassert = 1; 1775 reassert = 1;
1669 } 1776 }
1777 if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1778 mlog(0, "%.*s: node %u has a reference to this "
1779 "lockres, set the bit in the refmap\n",
1780 namelen, lockname, to);
1781 spin_lock(&res->spinlock);
1782 dlm_lockres_set_refmap_bit(to, res);
1783 spin_unlock(&res->spinlock);
1784 }
1670 } 1785 }
1671 1786
1672 if (reassert) 1787 if (reassert)
@@ -1693,7 +1808,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1693 char *name; 1808 char *name;
1694 unsigned int namelen, hash; 1809 unsigned int namelen, hash;
1695 u32 flags; 1810 u32 flags;
1696 int master_request = 0; 1811 int master_request = 0, have_lockres_ref = 0;
1697 int ret = 0; 1812 int ret = 0;
1698 1813
1699 if (!dlm_grab(dlm)) 1814 if (!dlm_grab(dlm))
@@ -1864,6 +1979,7 @@ ok:
1864 dlm_change_lockres_owner(dlm, res, mle->master); 1979 dlm_change_lockres_owner(dlm, res, mle->master);
1865 } 1980 }
1866 spin_unlock(&res->spinlock); 1981 spin_unlock(&res->spinlock);
1982 have_lockres_ref = 1;
1867 } 1983 }
1868 1984
1869 /* master is known, detach if not already detached. 1985 /* master is known, detach if not already detached.
@@ -1918,7 +2034,19 @@ done:
1918 dlm_put(dlm); 2034 dlm_put(dlm);
1919 if (master_request) { 2035 if (master_request) {
1920 mlog(0, "need to tell master to reassert\n"); 2036 mlog(0, "need to tell master to reassert\n");
1921 ret = EAGAIN; // positive. negative would shoot down the node. 2037 /* positive. negative would shoot down the node. */
2038 ret |= DLM_ASSERT_RESPONSE_REASSERT;
2039 if (!have_lockres_ref) {
2040 mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2041 "mle present here for %s:%.*s, but no lockres!\n",
2042 assert->node_idx, dlm->name, namelen, name);
2043 }
2044 }
2045 if (have_lockres_ref) {
2046 /* let the master know we have a reference to the lockres */
2047 ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2048 mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2049 dlm->name, namelen, name, assert->node_idx);
1922 } 2050 }
1923 return ret; 2051 return ret;
1924 2052
@@ -2023,9 +2151,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2023 * even if one or more nodes die */ 2151 * even if one or more nodes die */
2024 mlog(0, "worker about to master %.*s here, this=%u\n", 2152 mlog(0, "worker about to master %.*s here, this=%u\n",
2025 res->lockname.len, res->lockname.name, dlm->node_num); 2153 res->lockname.len, res->lockname.name, dlm->node_num);
2026 ret = dlm_do_assert_master(dlm, res->lockname.name, 2154 ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2027 res->lockname.len,
2028 nodemap, flags);
2029 if (ret < 0) { 2155 if (ret < 0) {
2030 /* no need to restart, we are done */ 2156 /* no need to restart, we are done */
2031 if (!dlm_is_host_down(ret)) 2157 if (!dlm_is_host_down(ret))
@@ -2097,6 +2223,104 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2097 return ret; 2223 return ret;
2098} 2224}
2099 2225
2226/*
2227 * DLM_DEREF_LOCKRES_MSG
2228 */
2229
2230int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2231{
2232 struct dlm_deref_lockres deref;
2233 int ret = 0, r;
2234 const char *lockname;
2235 unsigned int namelen;
2236
2237 lockname = res->lockname.name;
2238 namelen = res->lockname.len;
2239 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2240
2241 mlog(0, "%s:%.*s: sending deref to %d\n",
2242 dlm->name, namelen, lockname, res->owner);
2243 memset(&deref, 0, sizeof(deref));
2244 deref.node_idx = dlm->node_num;
2245 deref.namelen = namelen;
2246 memcpy(deref.name, lockname, namelen);
2247
2248 ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2249 &deref, sizeof(deref), res->owner, &r);
2250 if (ret < 0)
2251 mlog_errno(ret);
2252 else if (r < 0) {
2253 /* BAD. other node says I did not have a ref. */
2254 mlog(ML_ERROR,"while dropping ref on %s:%.*s "
2255 "(master=%u) got %d.\n", dlm->name, namelen,
2256 lockname, res->owner, r);
2257 dlm_print_one_lock_resource(res);
2258 BUG();
2259 }
2260 return ret;
2261}
2262
2263int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
2264{
2265 struct dlm_ctxt *dlm = data;
2266 struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2267 struct dlm_lock_resource *res = NULL;
2268 char *name;
2269 unsigned int namelen;
2270 int ret = -EINVAL;
2271 u8 node;
2272 unsigned int hash;
2273
2274 if (!dlm_grab(dlm))
2275 return 0;
2276
2277 name = deref->name;
2278 namelen = deref->namelen;
2279 node = deref->node_idx;
2280
2281 if (namelen > DLM_LOCKID_NAME_MAX) {
2282 mlog(ML_ERROR, "Invalid name length!");
2283 goto done;
2284 }
2285 if (deref->node_idx >= O2NM_MAX_NODES) {
2286 mlog(ML_ERROR, "Invalid node number: %u\n", node);
2287 goto done;
2288 }
2289
2290 hash = dlm_lockid_hash(name, namelen);
2291
2292 spin_lock(&dlm->spinlock);
2293 res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2294 if (!res) {
2295 spin_unlock(&dlm->spinlock);
2296 mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2297 dlm->name, namelen, name);
2298 goto done;
2299 }
2300 spin_unlock(&dlm->spinlock);
2301
2302 spin_lock(&res->spinlock);
2303 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2304 if (test_bit(node, res->refmap)) {
2305 ret = 0;
2306 dlm_lockres_clear_refmap_bit(node, res);
2307 } else {
2308 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2309 "but it is already dropped!\n", dlm->name, namelen,
2310 name, node);
2311 __dlm_print_one_lock_resource(res);
2312 }
2313 spin_unlock(&res->spinlock);
2314
2315 if (!ret)
2316 dlm_lockres_calc_usage(dlm, res);
2317done:
2318 if (res)
2319 dlm_lockres_put(res);
2320 dlm_put(dlm);
2321 return ret;
2322}
2323
2100 2324
2101/* 2325/*
2102 * DLM_MIGRATE_LOCKRES 2326 * DLM_MIGRATE_LOCKRES
@@ -2376,6 +2600,53 @@ leave:
2376 return ret; 2600 return ret;
2377} 2601}
2378 2602
2603#define DLM_MIGRATION_RETRY_MS 100
2604
2605/* Should be called only after beginning the domain leave process.
2606 * There should not be any remaining locks on nonlocal lock resources,
2607 * and there should be no local locks left on locally mastered resources.
2608 *
2609 * Called with the dlm spinlock held, may drop it to do migration, but
2610 * will re-acquire before exit.
2611 *
2612 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
2613int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2614{
2615 int ret;
2616 int lock_dropped = 0;
2617
2618 if (res->owner != dlm->node_num) {
2619 if (!__dlm_lockres_unused(res)) {
2620 mlog(ML_ERROR, "%s:%.*s: this node is not master, "
2621 "trying to free this but locks remain\n",
2622 dlm->name, res->lockname.len, res->lockname.name);
2623 }
2624 goto leave;
2625 }
2626
2627 /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2628 spin_unlock(&dlm->spinlock);
2629 lock_dropped = 1;
2630 while (1) {
2631 ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
2632 if (ret >= 0)
2633 break;
2634 if (ret == -ENOTEMPTY) {
2635 mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
2636 res->lockname.len, res->lockname.name);
2637 BUG();
2638 }
2639
2640 mlog(0, "lockres %.*s: migrate failed, "
2641 "retrying\n", res->lockname.len,
2642 res->lockname.name);
2643 msleep(DLM_MIGRATION_RETRY_MS);
2644 }
2645 spin_lock(&dlm->spinlock);
2646leave:
2647 return lock_dropped;
2648}
2649
2379int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock) 2650int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2380{ 2651{
2381 int ret; 2652 int ret;
@@ -2490,7 +2761,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2490{ 2761{
2491 struct list_head *iter, *iter2; 2762 struct list_head *iter, *iter2;
2492 struct list_head *queue = &res->granted; 2763 struct list_head *queue = &res->granted;
2493 int i; 2764 int i, bit;
2494 struct dlm_lock *lock; 2765 struct dlm_lock *lock;
2495 2766
2496 assert_spin_locked(&res->spinlock); 2767 assert_spin_locked(&res->spinlock);
@@ -2508,12 +2779,28 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2508 BUG_ON(!list_empty(&lock->bast_list)); 2779 BUG_ON(!list_empty(&lock->bast_list));
2509 BUG_ON(lock->ast_pending); 2780 BUG_ON(lock->ast_pending);
2510 BUG_ON(lock->bast_pending); 2781 BUG_ON(lock->bast_pending);
2782 dlm_lockres_clear_refmap_bit(lock->ml.node, res);
2511 list_del_init(&lock->list); 2783 list_del_init(&lock->list);
2512 dlm_lock_put(lock); 2784 dlm_lock_put(lock);
2513 } 2785 }
2514 } 2786 }
2515 queue++; 2787 queue++;
2516 } 2788 }
2789 bit = 0;
2790 while (1) {
2791 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2792 if (bit >= O2NM_MAX_NODES)
2793 break;
2794 /* do not clear the local node reference, if there is a
2795 * process holding this, let it drop the ref itself */
2796 if (bit != dlm->node_num) {
2797 mlog(0, "%s:%.*s: node %u had a ref to this "
2798 "migrating lockres, clearing\n", dlm->name,
2799 res->lockname.len, res->lockname.name, bit);
2800 dlm_lockres_clear_refmap_bit(bit, res);
2801 }
2802 bit++;
2803 }
2517} 2804}
2518 2805
2519/* for now this is not too intelligent. we will 2806/* for now this is not too intelligent. we will
@@ -2601,6 +2888,16 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2601 mlog(0, "migrate request (node %u) returned %d!\n", 2888 mlog(0, "migrate request (node %u) returned %d!\n",
2602 nodenum, status); 2889 nodenum, status);
2603 ret = status; 2890 ret = status;
2891 } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
2892 /* during the migration request we short-circuited
2893 * the mastery of the lockres. make sure we have
2894 * a mastery ref for nodenum */
2895 mlog(0, "%s:%.*s: need ref for node %u\n",
2896 dlm->name, res->lockname.len, res->lockname.name,
2897 nodenum);
2898 spin_lock(&res->spinlock);
2899 dlm_lockres_set_refmap_bit(nodenum, res);
2900 spin_unlock(&res->spinlock);
2604 } 2901 }
2605 } 2902 }
2606 2903
@@ -2745,7 +3042,13 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2745 /* remove it from the list so that only one 3042 /* remove it from the list so that only one
2746 * mle will be found */ 3043 * mle will be found */
2747 list_del_init(&tmp->list); 3044 list_del_init(&tmp->list);
2748 __dlm_mle_detach_hb_events(dlm, mle); 3045 /* this was obviously WRONG. mle is uninited here. should be tmp. */
3046 __dlm_mle_detach_hb_events(dlm, tmp);
3047 ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3048 mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3049 "telling master to get ref for cleared out mle "
3050 "during migration\n", dlm->name, namelen, name,
3051 master, new_master);
2749 } 3052 }
2750 spin_unlock(&tmp->spinlock); 3053 spin_unlock(&tmp->spinlock);
2751 } 3054 }
@@ -2753,6 +3056,8 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2753 /* now add a migration mle to the tail of the list */ 3056 /* now add a migration mle to the tail of the list */
2754 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen); 3057 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
2755 mle->new_master = new_master; 3058 mle->new_master = new_master;
3059 /* the new master will be sending an assert master for this.
3060 * at that point we will get the refmap reference */
2756 mle->master = master; 3061 mle->master = master;
2757 /* do this for consistency with other mle types */ 3062 /* do this for consistency with other mle types */
2758 set_bit(new_master, mle->maybe_map); 3063 set_bit(new_master, mle->maybe_map);
@@ -2902,6 +3207,13 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2902 clear_bit(dlm->node_num, iter.node_map); 3207 clear_bit(dlm->node_num, iter.node_map);
2903 spin_unlock(&dlm->spinlock); 3208 spin_unlock(&dlm->spinlock);
2904 3209
3210 /* ownership of the lockres is changing. account for the
3211 * mastery reference here since old_master will briefly have
3212 * a reference after the migration completes */
3213 spin_lock(&res->spinlock);
3214 dlm_lockres_set_refmap_bit(old_master, res);
3215 spin_unlock(&res->spinlock);
3216
2905 mlog(0, "now time to do a migrate request to other nodes\n"); 3217 mlog(0, "now time to do a migrate request to other nodes\n");
2906 ret = dlm_do_migrate_request(dlm, res, old_master, 3218 ret = dlm_do_migrate_request(dlm, res, old_master,
2907 dlm->node_num, &iter); 3219 dlm->node_num, &iter);
@@ -2914,8 +3226,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2914 res->lockname.len, res->lockname.name); 3226 res->lockname.len, res->lockname.name);
2915 /* this call now finishes out the nodemap 3227 /* this call now finishes out the nodemap
2916 * even if one or more nodes die */ 3228 * even if one or more nodes die */
2917 ret = dlm_do_assert_master(dlm, res->lockname.name, 3229 ret = dlm_do_assert_master(dlm, res, iter.node_map,
2918 res->lockname.len, iter.node_map,
2919 DLM_ASSERT_MASTER_FINISH_MIGRATION); 3230 DLM_ASSERT_MASTER_FINISH_MIGRATION);
2920 if (ret < 0) { 3231 if (ret < 0) {
2921 /* no longer need to retry. all living nodes contacted. */ 3232 /* no longer need to retry. all living nodes contacted. */
@@ -2927,8 +3238,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2927 set_bit(old_master, iter.node_map); 3238 set_bit(old_master, iter.node_map);
2928 mlog(0, "doing assert master of %.*s back to %u\n", 3239 mlog(0, "doing assert master of %.*s back to %u\n",
2929 res->lockname.len, res->lockname.name, old_master); 3240 res->lockname.len, res->lockname.name, old_master);
2930 ret = dlm_do_assert_master(dlm, res->lockname.name, 3241 ret = dlm_do_assert_master(dlm, res, iter.node_map,
2931 res->lockname.len, iter.node_map,
2932 DLM_ASSERT_MASTER_FINISH_MIGRATION); 3242 DLM_ASSERT_MASTER_FINISH_MIGRATION);
2933 if (ret < 0) { 3243 if (ret < 0) {
2934 mlog(0, "assert master to original master failed " 3244 mlog(0, "assert master to original master failed "