aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmmaster.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmmaster.c')
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c448
1 files changed, 309 insertions, 139 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 940be4c13b1f..1b8346dd0572 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -47,7 +47,6 @@
47 47
48#include "dlmapi.h" 48#include "dlmapi.h"
49#include "dlmcommon.h" 49#include "dlmcommon.h"
50#include "dlmdebug.h"
51#include "dlmdomain.h" 50#include "dlmdomain.h"
52 51
53#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER) 52#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
@@ -74,6 +73,7 @@ struct dlm_master_list_entry
74 wait_queue_head_t wq; 73 wait_queue_head_t wq;
75 atomic_t woken; 74 atomic_t woken;
76 struct kref mle_refs; 75 struct kref mle_refs;
76 int inuse;
77 unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 77 unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
78 unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 78 unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
79 unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 79 unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
@@ -127,18 +127,30 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
127 return 1; 127 return 1;
128} 128}
129 129
130#if 0 130#define dlm_print_nodemap(m) _dlm_print_nodemap(m,#m)
131/* Code here is included but defined out as it aids debugging */ 131static void _dlm_print_nodemap(unsigned long *map, const char *mapname)
132{
133 int i;
134 printk("%s=[ ", mapname);
135 for (i=0; i<O2NM_MAX_NODES; i++)
136 if (test_bit(i, map))
137 printk("%d ", i);
138 printk("]");
139}
132 140
133void dlm_print_one_mle(struct dlm_master_list_entry *mle) 141static void dlm_print_one_mle(struct dlm_master_list_entry *mle)
134{ 142{
135 int i = 0, refs; 143 int refs;
136 char *type; 144 char *type;
137 char attached; 145 char attached;
138 u8 master; 146 u8 master;
139 unsigned int namelen; 147 unsigned int namelen;
140 const char *name; 148 const char *name;
141 struct kref *k; 149 struct kref *k;
150 unsigned long *maybe = mle->maybe_map,
151 *vote = mle->vote_map,
152 *resp = mle->response_map,
153 *node = mle->node_map;
142 154
143 k = &mle->mle_refs; 155 k = &mle->mle_refs;
144 if (mle->type == DLM_MLE_BLOCK) 156 if (mle->type == DLM_MLE_BLOCK)
@@ -159,18 +171,29 @@ void dlm_print_one_mle(struct dlm_master_list_entry *mle)
159 name = mle->u.res->lockname.name; 171 name = mle->u.res->lockname.name;
160 } 172 }
161 173
162 mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n", 174 mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
163 i, type, refs, master, mle->new_master, attached, 175 namelen, name, type, refs, master, mle->new_master, attached,
164 namelen, namelen, name); 176 mle->inuse);
177 dlm_print_nodemap(maybe);
178 printk(", ");
179 dlm_print_nodemap(vote);
180 printk(", ");
181 dlm_print_nodemap(resp);
182 printk(", ");
183 dlm_print_nodemap(node);
184 printk(", ");
185 printk("\n");
165} 186}
166 187
188#if 0
189/* Code here is included but defined out as it aids debugging */
190
167static void dlm_dump_mles(struct dlm_ctxt *dlm) 191static void dlm_dump_mles(struct dlm_ctxt *dlm)
168{ 192{
169 struct dlm_master_list_entry *mle; 193 struct dlm_master_list_entry *mle;
170 struct list_head *iter; 194 struct list_head *iter;
171 195
172 mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name); 196 mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
173 mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
174 spin_lock(&dlm->master_lock); 197 spin_lock(&dlm->master_lock);
175 list_for_each(iter, &dlm->master_list) { 198 list_for_each(iter, &dlm->master_list) {
176 mle = list_entry(iter, struct dlm_master_list_entry, list); 199 mle = list_entry(iter, struct dlm_master_list_entry, list);
@@ -314,6 +337,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
314 spin_unlock(&dlm->spinlock); 337 spin_unlock(&dlm->spinlock);
315} 338}
316 339
340static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
341{
342 struct dlm_ctxt *dlm;
343 dlm = mle->dlm;
344
345 assert_spin_locked(&dlm->spinlock);
346 assert_spin_locked(&dlm->master_lock);
347 mle->inuse++;
348 kref_get(&mle->mle_refs);
349}
350
351static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
352{
353 struct dlm_ctxt *dlm;
354 dlm = mle->dlm;
355
356 spin_lock(&dlm->spinlock);
357 spin_lock(&dlm->master_lock);
358 mle->inuse--;
359 __dlm_put_mle(mle);
360 spin_unlock(&dlm->master_lock);
361 spin_unlock(&dlm->spinlock);
362
363}
364
317/* remove from list and free */ 365/* remove from list and free */
318static void __dlm_put_mle(struct dlm_master_list_entry *mle) 366static void __dlm_put_mle(struct dlm_master_list_entry *mle)
319{ 367{
@@ -322,9 +370,14 @@ static void __dlm_put_mle(struct dlm_master_list_entry *mle)
322 370
323 assert_spin_locked(&dlm->spinlock); 371 assert_spin_locked(&dlm->spinlock);
324 assert_spin_locked(&dlm->master_lock); 372 assert_spin_locked(&dlm->master_lock);
325 BUG_ON(!atomic_read(&mle->mle_refs.refcount)); 373 if (!atomic_read(&mle->mle_refs.refcount)) {
326 374 /* this may or may not crash, but who cares.
327 kref_put(&mle->mle_refs, dlm_mle_release); 375 * it's a BUG. */
376 mlog(ML_ERROR, "bad mle: %p\n", mle);
377 dlm_print_one_mle(mle);
378 BUG();
379 } else
380 kref_put(&mle->mle_refs, dlm_mle_release);
328} 381}
329 382
330 383
@@ -367,6 +420,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle,
367 memset(mle->response_map, 0, sizeof(mle->response_map)); 420 memset(mle->response_map, 0, sizeof(mle->response_map));
368 mle->master = O2NM_MAX_NODES; 421 mle->master = O2NM_MAX_NODES;
369 mle->new_master = O2NM_MAX_NODES; 422 mle->new_master = O2NM_MAX_NODES;
423 mle->inuse = 0;
370 424
371 if (mle->type == DLM_MLE_MASTER) { 425 if (mle->type == DLM_MLE_MASTER) {
372 BUG_ON(!res); 426 BUG_ON(!res);
@@ -564,6 +618,28 @@ static void dlm_lockres_release(struct kref *kref)
564 mlog(0, "destroying lockres %.*s\n", res->lockname.len, 618 mlog(0, "destroying lockres %.*s\n", res->lockname.len,
565 res->lockname.name); 619 res->lockname.name);
566 620
621 if (!hlist_unhashed(&res->hash_node) ||
622 !list_empty(&res->granted) ||
623 !list_empty(&res->converting) ||
624 !list_empty(&res->blocked) ||
625 !list_empty(&res->dirty) ||
626 !list_empty(&res->recovering) ||
627 !list_empty(&res->purge)) {
628 mlog(ML_ERROR,
629 "Going to BUG for resource %.*s."
630 " We're on a list! [%c%c%c%c%c%c%c]\n",
631 res->lockname.len, res->lockname.name,
632 !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
633 !list_empty(&res->granted) ? 'G' : ' ',
634 !list_empty(&res->converting) ? 'C' : ' ',
635 !list_empty(&res->blocked) ? 'B' : ' ',
636 !list_empty(&res->dirty) ? 'D' : ' ',
637 !list_empty(&res->recovering) ? 'R' : ' ',
638 !list_empty(&res->purge) ? 'P' : ' ');
639
640 dlm_print_one_lock_resource(res);
641 }
642
567 /* By the time we're ready to blow this guy away, we shouldn't 643 /* By the time we're ready to blow this guy away, we shouldn't
568 * be on any lists. */ 644 * be on any lists. */
569 BUG_ON(!hlist_unhashed(&res->hash_node)); 645 BUG_ON(!hlist_unhashed(&res->hash_node));
@@ -579,11 +655,6 @@ static void dlm_lockres_release(struct kref *kref)
579 kfree(res); 655 kfree(res);
580} 656}
581 657
582void dlm_lockres_get(struct dlm_lock_resource *res)
583{
584 kref_get(&res->refs);
585}
586
587void dlm_lockres_put(struct dlm_lock_resource *res) 658void dlm_lockres_put(struct dlm_lock_resource *res)
588{ 659{
589 kref_put(&res->refs, dlm_lockres_release); 660 kref_put(&res->refs, dlm_lockres_release);
@@ -603,7 +674,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
603 memcpy(qname, name, namelen); 674 memcpy(qname, name, namelen);
604 675
605 res->lockname.len = namelen; 676 res->lockname.len = namelen;
606 res->lockname.hash = full_name_hash(name, namelen); 677 res->lockname.hash = dlm_lockid_hash(name, namelen);
607 678
608 init_waitqueue_head(&res->wq); 679 init_waitqueue_head(&res->wq);
609 spin_lock_init(&res->spinlock); 680 spin_lock_init(&res->spinlock);
@@ -637,11 +708,11 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
637{ 708{
638 struct dlm_lock_resource *res; 709 struct dlm_lock_resource *res;
639 710
640 res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 711 res = kmalloc(sizeof(struct dlm_lock_resource), GFP_NOFS);
641 if (!res) 712 if (!res)
642 return NULL; 713 return NULL;
643 714
644 res->lockname.name = kmalloc(namelen, GFP_KERNEL); 715 res->lockname.name = kmalloc(namelen, GFP_NOFS);
645 if (!res->lockname.name) { 716 if (!res->lockname.name) {
646 kfree(res); 717 kfree(res);
647 return NULL; 718 return NULL;
@@ -677,19 +748,20 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
677 int blocked = 0; 748 int blocked = 0;
678 int ret, nodenum; 749 int ret, nodenum;
679 struct dlm_node_iter iter; 750 struct dlm_node_iter iter;
680 unsigned int namelen; 751 unsigned int namelen, hash;
681 int tries = 0; 752 int tries = 0;
682 int bit, wait_on_recovery = 0; 753 int bit, wait_on_recovery = 0;
683 754
684 BUG_ON(!lockid); 755 BUG_ON(!lockid);
685 756
686 namelen = strlen(lockid); 757 namelen = strlen(lockid);
758 hash = dlm_lockid_hash(lockid, namelen);
687 759
688 mlog(0, "get lockres %s (len %d)\n", lockid, namelen); 760 mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
689 761
690lookup: 762lookup:
691 spin_lock(&dlm->spinlock); 763 spin_lock(&dlm->spinlock);
692 tmpres = __dlm_lookup_lockres(dlm, lockid, namelen); 764 tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
693 if (tmpres) { 765 if (tmpres) {
694 spin_unlock(&dlm->spinlock); 766 spin_unlock(&dlm->spinlock);
695 mlog(0, "found in hash!\n"); 767 mlog(0, "found in hash!\n");
@@ -704,7 +776,7 @@ lookup:
704 mlog(0, "allocating a new resource\n"); 776 mlog(0, "allocating a new resource\n");
705 /* nothing found and we need to allocate one. */ 777 /* nothing found and we need to allocate one. */
706 alloc_mle = (struct dlm_master_list_entry *) 778 alloc_mle = (struct dlm_master_list_entry *)
707 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); 779 kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
708 if (!alloc_mle) 780 if (!alloc_mle)
709 goto leave; 781 goto leave;
710 res = dlm_new_lockres(dlm, lockid, namelen); 782 res = dlm_new_lockres(dlm, lockid, namelen);
@@ -790,10 +862,11 @@ lookup:
790 * if so, the creator of the BLOCK may try to put the last 862 * if so, the creator of the BLOCK may try to put the last
791 * ref at this time in the assert master handler, so we 863 * ref at this time in the assert master handler, so we
792 * need an extra one to keep from a bad ptr deref. */ 864 * need an extra one to keep from a bad ptr deref. */
793 dlm_get_mle(mle); 865 dlm_get_mle_inuse(mle);
794 spin_unlock(&dlm->master_lock); 866 spin_unlock(&dlm->master_lock);
795 spin_unlock(&dlm->spinlock); 867 spin_unlock(&dlm->spinlock);
796 868
869redo_request:
797 while (wait_on_recovery) { 870 while (wait_on_recovery) {
798 /* any cluster changes that occurred after dropping the 871 /* any cluster changes that occurred after dropping the
799 * dlm spinlock would be detectable be a change on the mle, 872 * dlm spinlock would be detectable be a change on the mle,
@@ -812,7 +885,7 @@ lookup:
812 } 885 }
813 886
814 dlm_kick_recovery_thread(dlm); 887 dlm_kick_recovery_thread(dlm);
815 msleep(100); 888 msleep(1000);
816 dlm_wait_for_recovery(dlm); 889 dlm_wait_for_recovery(dlm);
817 890
818 spin_lock(&dlm->spinlock); 891 spin_lock(&dlm->spinlock);
@@ -825,13 +898,15 @@ lookup:
825 } else 898 } else
826 wait_on_recovery = 0; 899 wait_on_recovery = 0;
827 spin_unlock(&dlm->spinlock); 900 spin_unlock(&dlm->spinlock);
901
902 if (wait_on_recovery)
903 dlm_wait_for_node_recovery(dlm, bit, 10000);
828 } 904 }
829 905
830 /* must wait for lock to be mastered elsewhere */ 906 /* must wait for lock to be mastered elsewhere */
831 if (blocked) 907 if (blocked)
832 goto wait; 908 goto wait;
833 909
834redo_request:
835 ret = -EINVAL; 910 ret = -EINVAL;
836 dlm_node_iter_init(mle->vote_map, &iter); 911 dlm_node_iter_init(mle->vote_map, &iter);
837 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 912 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
@@ -856,6 +931,7 @@ wait:
856 /* keep going until the response map includes all nodes */ 931 /* keep going until the response map includes all nodes */
857 ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); 932 ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
858 if (ret < 0) { 933 if (ret < 0) {
934 wait_on_recovery = 1;
859 mlog(0, "%s:%.*s: node map changed, redo the " 935 mlog(0, "%s:%.*s: node map changed, redo the "
860 "master request now, blocked=%d\n", 936 "master request now, blocked=%d\n",
861 dlm->name, res->lockname.len, 937 dlm->name, res->lockname.len,
@@ -866,7 +942,7 @@ wait:
866 dlm->name, res->lockname.len, 942 dlm->name, res->lockname.len,
867 res->lockname.name, blocked); 943 res->lockname.name, blocked);
868 dlm_print_one_lock_resource(res); 944 dlm_print_one_lock_resource(res);
869 /* dlm_print_one_mle(mle); */ 945 dlm_print_one_mle(mle);
870 tries = 0; 946 tries = 0;
871 } 947 }
872 goto redo_request; 948 goto redo_request;
@@ -880,7 +956,7 @@ wait:
880 dlm_mle_detach_hb_events(dlm, mle); 956 dlm_mle_detach_hb_events(dlm, mle);
881 dlm_put_mle(mle); 957 dlm_put_mle(mle);
882 /* put the extra ref */ 958 /* put the extra ref */
883 dlm_put_mle(mle); 959 dlm_put_mle_inuse(mle);
884 960
885wake_waiters: 961wake_waiters:
886 spin_lock(&res->spinlock); 962 spin_lock(&res->spinlock);
@@ -921,12 +997,14 @@ recheck:
921 spin_unlock(&res->spinlock); 997 spin_unlock(&res->spinlock);
922 /* this will cause the master to re-assert across 998 /* this will cause the master to re-assert across
923 * the whole cluster, freeing up mles */ 999 * the whole cluster, freeing up mles */
924 ret = dlm_do_master_request(mle, res->owner); 1000 if (res->owner != dlm->node_num) {
925 if (ret < 0) { 1001 ret = dlm_do_master_request(mle, res->owner);
926 /* give recovery a chance to run */ 1002 if (ret < 0) {
927 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); 1003 /* give recovery a chance to run */
928 msleep(500); 1004 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
929 goto recheck; 1005 msleep(500);
1006 goto recheck;
1007 }
930 } 1008 }
931 ret = 0; 1009 ret = 0;
932 goto leave; 1010 goto leave;
@@ -962,6 +1040,12 @@ recheck:
962 "rechecking now\n", dlm->name, res->lockname.len, 1040 "rechecking now\n", dlm->name, res->lockname.len,
963 res->lockname.name); 1041 res->lockname.name);
964 goto recheck; 1042 goto recheck;
1043 } else {
1044 if (!voting_done) {
1045 mlog(0, "map not changed and voting not done "
1046 "for %s:%.*s\n", dlm->name, res->lockname.len,
1047 res->lockname.name);
1048 }
965 } 1049 }
966 1050
967 if (m != O2NM_MAX_NODES) { 1051 if (m != O2NM_MAX_NODES) {
@@ -1129,18 +1213,6 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1129 set_bit(node, mle->vote_map); 1213 set_bit(node, mle->vote_map);
1130 } else { 1214 } else {
1131 mlog(ML_ERROR, "node down! %d\n", node); 1215 mlog(ML_ERROR, "node down! %d\n", node);
1132
1133 /* if the node wasn't involved in mastery skip it,
1134 * but clear it out from the maps so that it will
1135 * not affect mastery of this lockres */
1136 clear_bit(node, mle->response_map);
1137 clear_bit(node, mle->vote_map);
1138 if (!test_bit(node, mle->maybe_map))
1139 goto next;
1140
1141 /* if we're already blocked on lock mastery, and the
1142 * dead node wasn't the expected master, or there is
1143 * another node in the maybe_map, keep waiting */
1144 if (blocked) { 1216 if (blocked) {
1145 int lowest = find_next_bit(mle->maybe_map, 1217 int lowest = find_next_bit(mle->maybe_map,
1146 O2NM_MAX_NODES, 0); 1218 O2NM_MAX_NODES, 0);
@@ -1148,54 +1220,53 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1148 /* act like it was never there */ 1220 /* act like it was never there */
1149 clear_bit(node, mle->maybe_map); 1221 clear_bit(node, mle->maybe_map);
1150 1222
1151 if (node != lowest) 1223 if (node == lowest) {
1152 goto next; 1224 mlog(0, "expected master %u died"
1153 1225 " while this node was blocked "
1154 mlog(ML_ERROR, "expected master %u died while " 1226 "waiting on it!\n", node);
1155 "this node was blocked waiting on it!\n", 1227 lowest = find_next_bit(mle->maybe_map,
1156 node); 1228 O2NM_MAX_NODES,
1157 lowest = find_next_bit(mle->maybe_map, 1229 lowest+1);
1158 O2NM_MAX_NODES, 1230 if (lowest < O2NM_MAX_NODES) {
1159 lowest+1); 1231 mlog(0, "%s:%.*s:still "
1160 if (lowest < O2NM_MAX_NODES) { 1232 "blocked. waiting on %u "
1161 mlog(0, "still blocked. waiting " 1233 "now\n", dlm->name,
1162 "on %u now\n", lowest); 1234 res->lockname.len,
1163 goto next; 1235 res->lockname.name,
1236 lowest);
1237 } else {
1238 /* mle is an MLE_BLOCK, but
1239 * there is now nothing left to
1240 * block on. we need to return
1241 * all the way back out and try
1242 * again with an MLE_MASTER.
1243 * dlm_do_local_recovery_cleanup
1244 * has already run, so the mle
1245 * refcount is ok */
1246 mlog(0, "%s:%.*s: no "
1247 "longer blocking. try to "
1248 "master this here\n",
1249 dlm->name,
1250 res->lockname.len,
1251 res->lockname.name);
1252 mle->type = DLM_MLE_MASTER;
1253 mle->u.res = res;
1254 }
1164 } 1255 }
1165
1166 /* mle is an MLE_BLOCK, but there is now
1167 * nothing left to block on. we need to return
1168 * all the way back out and try again with
1169 * an MLE_MASTER. dlm_do_local_recovery_cleanup
1170 * has already run, so the mle refcount is ok */
1171 mlog(0, "no longer blocking. we can "
1172 "try to master this here\n");
1173 mle->type = DLM_MLE_MASTER;
1174 memset(mle->maybe_map, 0,
1175 sizeof(mle->maybe_map));
1176 memset(mle->response_map, 0,
1177 sizeof(mle->maybe_map));
1178 memcpy(mle->vote_map, mle->node_map,
1179 sizeof(mle->node_map));
1180 mle->u.res = res;
1181 set_bit(dlm->node_num, mle->maybe_map);
1182
1183 ret = -EAGAIN;
1184 goto next;
1185 } 1256 }
1186 1257
1187 clear_bit(node, mle->maybe_map); 1258 /* now blank out everything, as if we had never
1188 if (node > dlm->node_num) 1259 * contacted anyone */
1189 goto next; 1260 memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1190 1261 memset(mle->response_map, 0, sizeof(mle->response_map));
1191 mlog(0, "dead node in map!\n"); 1262 /* reset the vote_map to the current node_map */
1192 /* yuck. go back and re-contact all nodes 1263 memcpy(mle->vote_map, mle->node_map,
1193 * in the vote_map, removing this node. */ 1264 sizeof(mle->node_map));
1194 memset(mle->response_map, 0, 1265 /* put myself into the maybe map */
1195 sizeof(mle->response_map)); 1266 if (mle->type != DLM_MLE_BLOCK)
1267 set_bit(dlm->node_num, mle->maybe_map);
1196 } 1268 }
1197 ret = -EAGAIN; 1269 ret = -EAGAIN;
1198next:
1199 node = dlm_bitmap_diff_iter_next(&bdi, &sc); 1270 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1200 } 1271 }
1201 return ret; 1272 return ret;
@@ -1316,7 +1387,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1316 struct dlm_master_request *request = (struct dlm_master_request *) msg->buf; 1387 struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1317 struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL; 1388 struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1318 char *name; 1389 char *name;
1319 unsigned int namelen; 1390 unsigned int namelen, hash;
1320 int found, ret; 1391 int found, ret;
1321 int set_maybe; 1392 int set_maybe;
1322 int dispatch_assert = 0; 1393 int dispatch_assert = 0;
@@ -1331,6 +1402,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1331 1402
1332 name = request->name; 1403 name = request->name;
1333 namelen = request->namelen; 1404 namelen = request->namelen;
1405 hash = dlm_lockid_hash(name, namelen);
1334 1406
1335 if (namelen > DLM_LOCKID_NAME_MAX) { 1407 if (namelen > DLM_LOCKID_NAME_MAX) {
1336 response = DLM_IVBUFLEN; 1408 response = DLM_IVBUFLEN;
@@ -1339,7 +1411,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1339 1411
1340way_up_top: 1412way_up_top:
1341 spin_lock(&dlm->spinlock); 1413 spin_lock(&dlm->spinlock);
1342 res = __dlm_lookup_lockres(dlm, name, namelen); 1414 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1343 if (res) { 1415 if (res) {
1344 spin_unlock(&dlm->spinlock); 1416 spin_unlock(&dlm->spinlock);
1345 1417
@@ -1459,21 +1531,18 @@ way_up_top:
1459 spin_unlock(&dlm->spinlock); 1531 spin_unlock(&dlm->spinlock);
1460 1532
1461 mle = (struct dlm_master_list_entry *) 1533 mle = (struct dlm_master_list_entry *)
1462 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); 1534 kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1463 if (!mle) { 1535 if (!mle) {
1464 response = DLM_MASTER_RESP_ERROR; 1536 response = DLM_MASTER_RESP_ERROR;
1465 mlog_errno(-ENOMEM); 1537 mlog_errno(-ENOMEM);
1466 goto send_response; 1538 goto send_response;
1467 } 1539 }
1468 spin_lock(&dlm->spinlock);
1469 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
1470 name, namelen);
1471 spin_unlock(&dlm->spinlock);
1472 goto way_up_top; 1540 goto way_up_top;
1473 } 1541 }
1474 1542
1475 // mlog(0, "this is second time thru, already allocated, " 1543 // mlog(0, "this is second time thru, already allocated, "
1476 // "add the block.\n"); 1544 // "add the block.\n");
1545 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1477 set_bit(request->node_idx, mle->maybe_map); 1546 set_bit(request->node_idx, mle->maybe_map);
1478 list_add(&mle->list, &dlm->master_list); 1547 list_add(&mle->list, &dlm->master_list);
1479 response = DLM_MASTER_RESP_NO; 1548 response = DLM_MASTER_RESP_NO;
@@ -1556,6 +1625,8 @@ again:
1556 dlm_node_iter_init(nodemap, &iter); 1625 dlm_node_iter_init(nodemap, &iter);
1557 while ((to = dlm_node_iter_next(&iter)) >= 0) { 1626 while ((to = dlm_node_iter_next(&iter)) >= 0) {
1558 int r = 0; 1627 int r = 0;
1628 struct dlm_master_list_entry *mle = NULL;
1629
1559 mlog(0, "sending assert master to %d (%.*s)\n", to, 1630 mlog(0, "sending assert master to %d (%.*s)\n", to,
1560 namelen, lockname); 1631 namelen, lockname);
1561 memset(&assert, 0, sizeof(assert)); 1632 memset(&assert, 0, sizeof(assert));
@@ -1567,20 +1638,28 @@ again:
1567 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key, 1638 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1568 &assert, sizeof(assert), to, &r); 1639 &assert, sizeof(assert), to, &r);
1569 if (tmpret < 0) { 1640 if (tmpret < 0) {
1570 mlog(ML_ERROR, "assert_master returned %d!\n", tmpret); 1641 mlog(0, "assert_master returned %d!\n", tmpret);
1571 if (!dlm_is_host_down(tmpret)) { 1642 if (!dlm_is_host_down(tmpret)) {
1572 mlog(ML_ERROR, "unhandled error!\n"); 1643 mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1573 BUG(); 1644 BUG();
1574 } 1645 }
1575 /* a node died. finish out the rest of the nodes. */ 1646 /* a node died. finish out the rest of the nodes. */
1576 mlog(ML_ERROR, "link to %d went down!\n", to); 1647 mlog(0, "link to %d went down!\n", to);
1577 /* any nonzero status return will do */ 1648 /* any nonzero status return will do */
1578 ret = tmpret; 1649 ret = tmpret;
1579 } else if (r < 0) { 1650 } else if (r < 0) {
1580 /* ok, something horribly messed. kill thyself. */ 1651 /* ok, something horribly messed. kill thyself. */
1581 mlog(ML_ERROR,"during assert master of %.*s to %u, " 1652 mlog(ML_ERROR,"during assert master of %.*s to %u, "
1582 "got %d.\n", namelen, lockname, to, r); 1653 "got %d.\n", namelen, lockname, to, r);
1583 dlm_dump_lock_resources(dlm); 1654 spin_lock(&dlm->spinlock);
1655 spin_lock(&dlm->master_lock);
1656 if (dlm_find_mle(dlm, &mle, (char *)lockname,
1657 namelen)) {
1658 dlm_print_one_mle(mle);
1659 __dlm_put_mle(mle);
1660 }
1661 spin_unlock(&dlm->master_lock);
1662 spin_unlock(&dlm->spinlock);
1584 BUG(); 1663 BUG();
1585 } else if (r == EAGAIN) { 1664 } else if (r == EAGAIN) {
1586 mlog(0, "%.*s: node %u create mles on other " 1665 mlog(0, "%.*s: node %u create mles on other "
@@ -1612,7 +1691,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1612 struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf; 1691 struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1613 struct dlm_lock_resource *res = NULL; 1692 struct dlm_lock_resource *res = NULL;
1614 char *name; 1693 char *name;
1615 unsigned int namelen; 1694 unsigned int namelen, hash;
1616 u32 flags; 1695 u32 flags;
1617 int master_request = 0; 1696 int master_request = 0;
1618 int ret = 0; 1697 int ret = 0;
@@ -1622,6 +1701,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1622 1701
1623 name = assert->name; 1702 name = assert->name;
1624 namelen = assert->namelen; 1703 namelen = assert->namelen;
1704 hash = dlm_lockid_hash(name, namelen);
1625 flags = be32_to_cpu(assert->flags); 1705 flags = be32_to_cpu(assert->flags);
1626 1706
1627 if (namelen > DLM_LOCKID_NAME_MAX) { 1707 if (namelen > DLM_LOCKID_NAME_MAX) {
@@ -1646,7 +1726,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1646 if (bit >= O2NM_MAX_NODES) { 1726 if (bit >= O2NM_MAX_NODES) {
1647 /* not necessarily an error, though less likely. 1727 /* not necessarily an error, though less likely.
1648 * could be master just re-asserting. */ 1728 * could be master just re-asserting. */
1649 mlog(ML_ERROR, "no bits set in the maybe_map, but %u " 1729 mlog(0, "no bits set in the maybe_map, but %u "
1650 "is asserting! (%.*s)\n", assert->node_idx, 1730 "is asserting! (%.*s)\n", assert->node_idx,
1651 namelen, name); 1731 namelen, name);
1652 } else if (bit != assert->node_idx) { 1732 } else if (bit != assert->node_idx) {
@@ -1658,19 +1738,36 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1658 * number winning the mastery will respond 1738 * number winning the mastery will respond
1659 * YES to mastery requests, but this node 1739 * YES to mastery requests, but this node
1660 * had no way of knowing. let it pass. */ 1740 * had no way of knowing. let it pass. */
1661 mlog(ML_ERROR, "%u is the lowest node, " 1741 mlog(0, "%u is the lowest node, "
1662 "%u is asserting. (%.*s) %u must " 1742 "%u is asserting. (%.*s) %u must "
1663 "have begun after %u won.\n", bit, 1743 "have begun after %u won.\n", bit,
1664 assert->node_idx, namelen, name, bit, 1744 assert->node_idx, namelen, name, bit,
1665 assert->node_idx); 1745 assert->node_idx);
1666 } 1746 }
1667 } 1747 }
1748 if (mle->type == DLM_MLE_MIGRATION) {
1749 if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1750 mlog(0, "%s:%.*s: got cleanup assert"
1751 " from %u for migration\n",
1752 dlm->name, namelen, name,
1753 assert->node_idx);
1754 } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1755 mlog(0, "%s:%.*s: got unrelated assert"
1756 " from %u for migration, ignoring\n",
1757 dlm->name, namelen, name,
1758 assert->node_idx);
1759 __dlm_put_mle(mle);
1760 spin_unlock(&dlm->master_lock);
1761 spin_unlock(&dlm->spinlock);
1762 goto done;
1763 }
1764 }
1668 } 1765 }
1669 spin_unlock(&dlm->master_lock); 1766 spin_unlock(&dlm->master_lock);
1670 1767
1671 /* ok everything checks out with the MLE 1768 /* ok everything checks out with the MLE
1672 * now check to see if there is a lockres */ 1769 * now check to see if there is a lockres */
1673 res = __dlm_lookup_lockres(dlm, name, namelen); 1770 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1674 if (res) { 1771 if (res) {
1675 spin_lock(&res->spinlock); 1772 spin_lock(&res->spinlock);
1676 if (res->state & DLM_LOCK_RES_RECOVERING) { 1773 if (res->state & DLM_LOCK_RES_RECOVERING) {
@@ -1679,7 +1776,8 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1679 goto kill; 1776 goto kill;
1680 } 1777 }
1681 if (!mle) { 1778 if (!mle) {
1682 if (res->owner != assert->node_idx) { 1779 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1780 res->owner != assert->node_idx) {
1683 mlog(ML_ERROR, "assert_master from " 1781 mlog(ML_ERROR, "assert_master from "
1684 "%u, but current owner is " 1782 "%u, but current owner is "
1685 "%u! (%.*s)\n", 1783 "%u! (%.*s)\n",
@@ -1732,6 +1830,7 @@ ok:
1732 if (mle) { 1830 if (mle) {
1733 int extra_ref = 0; 1831 int extra_ref = 0;
1734 int nn = -1; 1832 int nn = -1;
1833 int rr, err = 0;
1735 1834
1736 spin_lock(&mle->spinlock); 1835 spin_lock(&mle->spinlock);
1737 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) 1836 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
@@ -1751,27 +1850,64 @@ ok:
1751 wake_up(&mle->wq); 1850 wake_up(&mle->wq);
1752 spin_unlock(&mle->spinlock); 1851 spin_unlock(&mle->spinlock);
1753 1852
1754 if (mle->type == DLM_MLE_MIGRATION && res) { 1853 if (res) {
1755 mlog(0, "finishing off migration of lockres %.*s, "
1756 "from %u to %u\n",
1757 res->lockname.len, res->lockname.name,
1758 dlm->node_num, mle->new_master);
1759 spin_lock(&res->spinlock); 1854 spin_lock(&res->spinlock);
1760 res->state &= ~DLM_LOCK_RES_MIGRATING; 1855 if (mle->type == DLM_MLE_MIGRATION) {
1761 dlm_change_lockres_owner(dlm, res, mle->new_master); 1856 mlog(0, "finishing off migration of lockres %.*s, "
1762 BUG_ON(res->state & DLM_LOCK_RES_DIRTY); 1857 "from %u to %u\n",
1858 res->lockname.len, res->lockname.name,
1859 dlm->node_num, mle->new_master);
1860 res->state &= ~DLM_LOCK_RES_MIGRATING;
1861 dlm_change_lockres_owner(dlm, res, mle->new_master);
1862 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1863 } else {
1864 dlm_change_lockres_owner(dlm, res, mle->master);
1865 }
1763 spin_unlock(&res->spinlock); 1866 spin_unlock(&res->spinlock);
1764 } 1867 }
1765 /* master is known, detach if not already detached */ 1868
1766 dlm_mle_detach_hb_events(dlm, mle); 1869 /* master is known, detach if not already detached.
1767 dlm_put_mle(mle); 1870 * ensures that only one assert_master call will happen
1768 1871 * on this mle. */
1872 spin_lock(&dlm->spinlock);
1873 spin_lock(&dlm->master_lock);
1874
1875 rr = atomic_read(&mle->mle_refs.refcount);
1876 if (mle->inuse > 0) {
1877 if (extra_ref && rr < 3)
1878 err = 1;
1879 else if (!extra_ref && rr < 2)
1880 err = 1;
1881 } else {
1882 if (extra_ref && rr < 2)
1883 err = 1;
1884 else if (!extra_ref && rr < 1)
1885 err = 1;
1886 }
1887 if (err) {
1888 mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1889 "that will mess up this node, refs=%d, extra=%d, "
1890 "inuse=%d\n", dlm->name, namelen, name,
1891 assert->node_idx, rr, extra_ref, mle->inuse);
1892 dlm_print_one_mle(mle);
1893 }
1894 list_del_init(&mle->list);
1895 __dlm_mle_detach_hb_events(dlm, mle);
1896 __dlm_put_mle(mle);
1769 if (extra_ref) { 1897 if (extra_ref) {
1770 /* the assert master message now balances the extra 1898 /* the assert master message now balances the extra
1771 * ref given by the master / migration request message. 1899 * ref given by the master / migration request message.
1772 * if this is the last put, it will be removed 1900 * if this is the last put, it will be removed
1773 * from the list. */ 1901 * from the list. */
1774 dlm_put_mle(mle); 1902 __dlm_put_mle(mle);
1903 }
1904 spin_unlock(&dlm->master_lock);
1905 spin_unlock(&dlm->spinlock);
1906 } else if (res) {
1907 if (res->owner != assert->node_idx) {
1908 mlog(0, "assert_master from %u, but current "
1909 "owner is %u (%.*s), no mle\n", assert->node_idx,
1910 res->owner, namelen, name);
1775 } 1911 }
1776 } 1912 }
1777 1913
@@ -1788,12 +1924,12 @@ done:
1788 1924
1789kill: 1925kill:
1790 /* kill the caller! */ 1926 /* kill the caller! */
1927 mlog(ML_ERROR, "Bad message received from another node. Dumping state "
1928 "and killing the other node now! This node is OK and can continue.\n");
1929 __dlm_print_one_lock_resource(res);
1791 spin_unlock(&res->spinlock); 1930 spin_unlock(&res->spinlock);
1792 spin_unlock(&dlm->spinlock); 1931 spin_unlock(&dlm->spinlock);
1793 dlm_lockres_put(res); 1932 dlm_lockres_put(res);
1794 mlog(ML_ERROR, "Bad message received from another node. Dumping state "
1795 "and killing the other node now! This node is OK and can continue.\n");
1796 dlm_dump_lock_resources(dlm);
1797 dlm_put(dlm); 1933 dlm_put(dlm);
1798 return -EINVAL; 1934 return -EINVAL;
1799} 1935}
@@ -1803,7 +1939,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1803 int ignore_higher, u8 request_from, u32 flags) 1939 int ignore_higher, u8 request_from, u32 flags)
1804{ 1940{
1805 struct dlm_work_item *item; 1941 struct dlm_work_item *item;
1806 item = kcalloc(1, sizeof(*item), GFP_KERNEL); 1942 item = kcalloc(1, sizeof(*item), GFP_NOFS);
1807 if (!item) 1943 if (!item)
1808 return -ENOMEM; 1944 return -ENOMEM;
1809 1945
@@ -1825,7 +1961,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1825 list_add_tail(&item->list, &dlm->work_list); 1961 list_add_tail(&item->list, &dlm->work_list);
1826 spin_unlock(&dlm->work_lock); 1962 spin_unlock(&dlm->work_lock);
1827 1963
1828 schedule_work(&dlm->dispatched_work); 1964 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
1829 return 0; 1965 return 0;
1830} 1966}
1831 1967
@@ -1866,6 +2002,23 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1866 } 2002 }
1867 } 2003 }
1868 2004
2005 /*
2006 * If we're migrating this lock to someone else, we are no
2007 * longer allowed to assert out own mastery. OTOH, we need to
2008 * prevent migration from starting while we're still asserting
2009 * our dominance. The reserved ast delays migration.
2010 */
2011 spin_lock(&res->spinlock);
2012 if (res->state & DLM_LOCK_RES_MIGRATING) {
2013 mlog(0, "Someone asked us to assert mastery, but we're "
2014 "in the middle of migration. Skipping assert, "
2015 "the new master will handle that.\n");
2016 spin_unlock(&res->spinlock);
2017 goto put;
2018 } else
2019 __dlm_lockres_reserve_ast(res);
2020 spin_unlock(&res->spinlock);
2021
1869 /* this call now finishes out the nodemap 2022 /* this call now finishes out the nodemap
1870 * even if one or more nodes die */ 2023 * even if one or more nodes die */
1871 mlog(0, "worker about to master %.*s here, this=%u\n", 2024 mlog(0, "worker about to master %.*s here, this=%u\n",
@@ -1875,9 +2028,14 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1875 nodemap, flags); 2028 nodemap, flags);
1876 if (ret < 0) { 2029 if (ret < 0) {
1877 /* no need to restart, we are done */ 2030 /* no need to restart, we are done */
1878 mlog_errno(ret); 2031 if (!dlm_is_host_down(ret))
2032 mlog_errno(ret);
1879 } 2033 }
1880 2034
2035 /* Ok, we've asserted ourselves. Let's let migration start. */
2036 dlm_lockres_release_ast(dlm, res);
2037
2038put:
1881 dlm_lockres_put(res); 2039 dlm_lockres_put(res);
1882 2040
1883 mlog(0, "finished with dlm_assert_master_worker\n"); 2041 mlog(0, "finished with dlm_assert_master_worker\n");
@@ -1916,6 +2074,7 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
1916 BUG(); 2074 BUG();
1917 /* host is down, so answer for that node would be 2075 /* host is down, so answer for that node would be
1918 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */ 2076 * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
2077 ret = 0;
1919 } 2078 }
1920 2079
1921 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) { 2080 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
@@ -2016,14 +2175,14 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2016 */ 2175 */
2017 2176
2018 ret = -ENOMEM; 2177 ret = -ENOMEM;
2019 mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL); 2178 mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2020 if (!mres) { 2179 if (!mres) {
2021 mlog_errno(ret); 2180 mlog_errno(ret);
2022 goto leave; 2181 goto leave;
2023 } 2182 }
2024 2183
2025 mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, 2184 mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2026 GFP_KERNEL); 2185 GFP_NOFS);
2027 if (!mle) { 2186 if (!mle) {
2028 mlog_errno(ret); 2187 mlog_errno(ret);
2029 goto leave; 2188 goto leave;
@@ -2117,7 +2276,7 @@ fail:
2117 * take both dlm->spinlock and dlm->master_lock */ 2276 * take both dlm->spinlock and dlm->master_lock */
2118 spin_lock(&dlm->spinlock); 2277 spin_lock(&dlm->spinlock);
2119 spin_lock(&dlm->master_lock); 2278 spin_lock(&dlm->master_lock);
2120 dlm_get_mle(mle); 2279 dlm_get_mle_inuse(mle);
2121 spin_unlock(&dlm->master_lock); 2280 spin_unlock(&dlm->master_lock);
2122 spin_unlock(&dlm->spinlock); 2281 spin_unlock(&dlm->spinlock);
2123 2282
@@ -2134,7 +2293,10 @@ fail:
2134 /* migration failed, detach and clean up mle */ 2293 /* migration failed, detach and clean up mle */
2135 dlm_mle_detach_hb_events(dlm, mle); 2294 dlm_mle_detach_hb_events(dlm, mle);
2136 dlm_put_mle(mle); 2295 dlm_put_mle(mle);
2137 dlm_put_mle(mle); 2296 dlm_put_mle_inuse(mle);
2297 spin_lock(&res->spinlock);
2298 res->state &= ~DLM_LOCK_RES_MIGRATING;
2299 spin_unlock(&res->spinlock);
2138 goto leave; 2300 goto leave;
2139 } 2301 }
2140 2302
@@ -2164,8 +2326,8 @@ fail:
2164 /* avoid hang during shutdown when migrating lockres 2326 /* avoid hang during shutdown when migrating lockres
2165 * to a node which also goes down */ 2327 * to a node which also goes down */
2166 if (dlm_is_node_dead(dlm, target)) { 2328 if (dlm_is_node_dead(dlm, target)) {
2167 mlog(0, "%s:%.*s: expected migration target %u " 2329 mlog(0, "%s:%.*s: expected migration "
2168 "is no longer up. restarting.\n", 2330 "target %u is no longer up, restarting\n",
2169 dlm->name, res->lockname.len, 2331 dlm->name, res->lockname.len,
2170 res->lockname.name, target); 2332 res->lockname.name, target);
2171 ret = -ERESTARTSYS; 2333 ret = -ERESTARTSYS;
@@ -2175,7 +2337,10 @@ fail:
2175 /* migration failed, detach and clean up mle */ 2337 /* migration failed, detach and clean up mle */
2176 dlm_mle_detach_hb_events(dlm, mle); 2338 dlm_mle_detach_hb_events(dlm, mle);
2177 dlm_put_mle(mle); 2339 dlm_put_mle(mle);
2178 dlm_put_mle(mle); 2340 dlm_put_mle_inuse(mle);
2341 spin_lock(&res->spinlock);
2342 res->state &= ~DLM_LOCK_RES_MIGRATING;
2343 spin_unlock(&res->spinlock);
2179 goto leave; 2344 goto leave;
2180 } 2345 }
2181 /* TODO: if node died: stop, clean up, return error */ 2346 /* TODO: if node died: stop, clean up, return error */
@@ -2191,7 +2356,7 @@ fail:
2191 2356
2192 /* master is known, detach if not already detached */ 2357 /* master is known, detach if not already detached */
2193 dlm_mle_detach_hb_events(dlm, mle); 2358 dlm_mle_detach_hb_events(dlm, mle);
2194 dlm_put_mle(mle); 2359 dlm_put_mle_inuse(mle);
2195 ret = 0; 2360 ret = 0;
2196 2361
2197 dlm_lockres_calc_usage(dlm, res); 2362 dlm_lockres_calc_usage(dlm, res);
@@ -2462,7 +2627,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
2462 struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf; 2627 struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
2463 struct dlm_master_list_entry *mle = NULL, *oldmle = NULL; 2628 struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
2464 const char *name; 2629 const char *name;
2465 unsigned int namelen; 2630 unsigned int namelen, hash;
2466 int ret = 0; 2631 int ret = 0;
2467 2632
2468 if (!dlm_grab(dlm)) 2633 if (!dlm_grab(dlm))
@@ -2470,10 +2635,11 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
2470 2635
2471 name = migrate->name; 2636 name = migrate->name;
2472 namelen = migrate->namelen; 2637 namelen = migrate->namelen;
2638 hash = dlm_lockid_hash(name, namelen);
2473 2639
2474 /* preallocate.. if this fails, abort */ 2640 /* preallocate.. if this fails, abort */
2475 mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, 2641 mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2476 GFP_KERNEL); 2642 GFP_NOFS);
2477 2643
2478 if (!mle) { 2644 if (!mle) {
2479 ret = -ENOMEM; 2645 ret = -ENOMEM;
@@ -2482,7 +2648,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
2482 2648
2483 /* check for pre-existing lock */ 2649 /* check for pre-existing lock */
2484 spin_lock(&dlm->spinlock); 2650 spin_lock(&dlm->spinlock);
2485 res = __dlm_lookup_lockres(dlm, name, namelen); 2651 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
2486 spin_lock(&dlm->master_lock); 2652 spin_lock(&dlm->master_lock);
2487 2653
2488 if (res) { 2654 if (res) {
@@ -2580,6 +2746,7 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2580 /* remove it from the list so that only one 2746 /* remove it from the list so that only one
2581 * mle will be found */ 2747 * mle will be found */
2582 list_del_init(&tmp->list); 2748 list_del_init(&tmp->list);
2749 __dlm_mle_detach_hb_events(dlm, mle);
2583 } 2750 }
2584 spin_unlock(&tmp->spinlock); 2751 spin_unlock(&tmp->spinlock);
2585 } 2752 }
@@ -2601,6 +2768,7 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
2601 struct list_head *iter, *iter2; 2768 struct list_head *iter, *iter2;
2602 struct dlm_master_list_entry *mle; 2769 struct dlm_master_list_entry *mle;
2603 struct dlm_lock_resource *res; 2770 struct dlm_lock_resource *res;
2771 unsigned int hash;
2604 2772
2605 mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); 2773 mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
2606top: 2774top:
@@ -2640,7 +2808,7 @@ top:
2640 * may result in the mle being unlinked and 2808 * may result in the mle being unlinked and
2641 * freed, but there may still be a process 2809 * freed, but there may still be a process
2642 * waiting in the dlmlock path which is fine. */ 2810 * waiting in the dlmlock path which is fine. */
2643 mlog(ML_ERROR, "node %u was expected master\n", 2811 mlog(0, "node %u was expected master\n",
2644 dead_node); 2812 dead_node);
2645 atomic_set(&mle->woken, 1); 2813 atomic_set(&mle->woken, 1);
2646 spin_unlock(&mle->spinlock); 2814 spin_unlock(&mle->spinlock);
@@ -2673,19 +2841,21 @@ top:
2673 2841
2674 /* remove from the list early. NOTE: unlinking 2842 /* remove from the list early. NOTE: unlinking
2675 * list_head while in list_for_each_safe */ 2843 * list_head while in list_for_each_safe */
2844 __dlm_mle_detach_hb_events(dlm, mle);
2676 spin_lock(&mle->spinlock); 2845 spin_lock(&mle->spinlock);
2677 list_del_init(&mle->list); 2846 list_del_init(&mle->list);
2678 atomic_set(&mle->woken, 1); 2847 atomic_set(&mle->woken, 1);
2679 spin_unlock(&mle->spinlock); 2848 spin_unlock(&mle->spinlock);
2680 wake_up(&mle->wq); 2849 wake_up(&mle->wq);
2681 2850
2682 mlog(0, "node %u died during migration from " 2851 mlog(0, "%s: node %u died during migration from "
2683 "%u to %u!\n", dead_node, 2852 "%u to %u!\n", dlm->name, dead_node,
2684 mle->master, mle->new_master); 2853 mle->master, mle->new_master);
2685 /* if there is a lockres associated with this 2854 /* if there is a lockres associated with this
2686 * mle, find it and set its owner to UNKNOWN */ 2855 * mle, find it and set its owner to UNKNOWN */
2856 hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len);
2687 res = __dlm_lookup_lockres(dlm, mle->u.name.name, 2857 res = __dlm_lookup_lockres(dlm, mle->u.name.name,
2688 mle->u.name.len); 2858 mle->u.name.len, hash);
2689 if (res) { 2859 if (res) {
2690 /* unfortunately if we hit this rare case, our 2860 /* unfortunately if we hit this rare case, our
2691 * lock ordering is messed. we need to drop 2861 * lock ordering is messed. we need to drop