aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmmaster.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmmaster.c')
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c387
1 files changed, 197 insertions, 190 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 0a281394785..f8b653fcd4d 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -73,22 +73,13 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
73 const char *name, 73 const char *name,
74 unsigned int namelen) 74 unsigned int namelen)
75{ 75{
76 struct dlm_lock_resource *res;
77
78 if (dlm != mle->dlm) 76 if (dlm != mle->dlm)
79 return 0; 77 return 0;
80 78
81 if (mle->type == DLM_MLE_BLOCK || 79 if (namelen != mle->mnamelen ||
82 mle->type == DLM_MLE_MIGRATION) { 80 memcmp(name, mle->mname, namelen) != 0)
83 if (namelen != mle->u.name.len || 81 return 0;
84 memcmp(name, mle->u.name.name, namelen)!=0) 82
85 return 0;
86 } else {
87 res = mle->u.res;
88 if (namelen != res->lockname.len ||
89 memcmp(res->lockname.name, name, namelen) != 0)
90 return 0;
91 }
92 return 1; 83 return 1;
93} 84}
94 85
@@ -283,7 +274,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle,
283 274
284 mle->dlm = dlm; 275 mle->dlm = dlm;
285 mle->type = type; 276 mle->type = type;
286 INIT_LIST_HEAD(&mle->list); 277 INIT_HLIST_NODE(&mle->master_hash_node);
287 INIT_LIST_HEAD(&mle->hb_events); 278 INIT_LIST_HEAD(&mle->hb_events);
288 memset(mle->maybe_map, 0, sizeof(mle->maybe_map)); 279 memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
289 spin_lock_init(&mle->spinlock); 280 spin_lock_init(&mle->spinlock);
@@ -295,19 +286,27 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle,
295 mle->new_master = O2NM_MAX_NODES; 286 mle->new_master = O2NM_MAX_NODES;
296 mle->inuse = 0; 287 mle->inuse = 0;
297 288
289 BUG_ON(mle->type != DLM_MLE_BLOCK &&
290 mle->type != DLM_MLE_MASTER &&
291 mle->type != DLM_MLE_MIGRATION);
292
298 if (mle->type == DLM_MLE_MASTER) { 293 if (mle->type == DLM_MLE_MASTER) {
299 BUG_ON(!res); 294 BUG_ON(!res);
300 mle->u.res = res; 295 mle->mleres = res;
301 } else if (mle->type == DLM_MLE_BLOCK) { 296 memcpy(mle->mname, res->lockname.name, res->lockname.len);
302 BUG_ON(!name); 297 mle->mnamelen = res->lockname.len;
303 memcpy(mle->u.name.name, name, namelen); 298 mle->mnamehash = res->lockname.hash;
304 mle->u.name.len = namelen; 299 } else {
305 } else /* DLM_MLE_MIGRATION */ {
306 BUG_ON(!name); 300 BUG_ON(!name);
307 memcpy(mle->u.name.name, name, namelen); 301 mle->mleres = NULL;
308 mle->u.name.len = namelen; 302 memcpy(mle->mname, name, namelen);
303 mle->mnamelen = namelen;
304 mle->mnamehash = dlm_lockid_hash(name, namelen);
309 } 305 }
310 306
307 atomic_inc(&dlm->mle_tot_count[mle->type]);
308 atomic_inc(&dlm->mle_cur_count[mle->type]);
309
311 /* copy off the node_map and register hb callbacks on our copy */ 310 /* copy off the node_map and register hb callbacks on our copy */
312 memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map)); 311 memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
313 memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map)); 312 memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
@@ -318,6 +317,24 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle,
318 __dlm_mle_attach_hb_events(dlm, mle); 317 __dlm_mle_attach_hb_events(dlm, mle);
319} 318}
320 319
320void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
321{
322 assert_spin_locked(&dlm->spinlock);
323 assert_spin_locked(&dlm->master_lock);
324
325 if (!hlist_unhashed(&mle->master_hash_node))
326 hlist_del_init(&mle->master_hash_node);
327}
328
329void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
330{
331 struct hlist_head *bucket;
332
333 assert_spin_locked(&dlm->master_lock);
334
335 bucket = dlm_master_hash(dlm, mle->mnamehash);
336 hlist_add_head(&mle->master_hash_node, bucket);
337}
321 338
322/* returns 1 if found, 0 if not */ 339/* returns 1 if found, 0 if not */
323static int dlm_find_mle(struct dlm_ctxt *dlm, 340static int dlm_find_mle(struct dlm_ctxt *dlm,
@@ -325,10 +342,17 @@ static int dlm_find_mle(struct dlm_ctxt *dlm,
325 char *name, unsigned int namelen) 342 char *name, unsigned int namelen)
326{ 343{
327 struct dlm_master_list_entry *tmpmle; 344 struct dlm_master_list_entry *tmpmle;
345 struct hlist_head *bucket;
346 struct hlist_node *list;
347 unsigned int hash;
328 348
329 assert_spin_locked(&dlm->master_lock); 349 assert_spin_locked(&dlm->master_lock);
330 350
331 list_for_each_entry(tmpmle, &dlm->master_list, list) { 351 hash = dlm_lockid_hash(name, namelen);
352 bucket = dlm_master_hash(dlm, hash);
353 hlist_for_each(list, bucket) {
354 tmpmle = hlist_entry(list, struct dlm_master_list_entry,
355 master_hash_node);
332 if (!dlm_mle_equal(dlm, tmpmle, name, namelen)) 356 if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
333 continue; 357 continue;
334 dlm_get_mle(tmpmle); 358 dlm_get_mle(tmpmle);
@@ -408,24 +432,20 @@ static void dlm_mle_release(struct kref *kref)
408 mle = container_of(kref, struct dlm_master_list_entry, mle_refs); 432 mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
409 dlm = mle->dlm; 433 dlm = mle->dlm;
410 434
411 if (mle->type != DLM_MLE_MASTER) {
412 mlog(0, "calling mle_release for %.*s, type %d\n",
413 mle->u.name.len, mle->u.name.name, mle->type);
414 } else {
415 mlog(0, "calling mle_release for %.*s, type %d\n",
416 mle->u.res->lockname.len,
417 mle->u.res->lockname.name, mle->type);
418 }
419 assert_spin_locked(&dlm->spinlock); 435 assert_spin_locked(&dlm->spinlock);
420 assert_spin_locked(&dlm->master_lock); 436 assert_spin_locked(&dlm->master_lock);
421 437
438 mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
439 mle->type);
440
422 /* remove from list if not already */ 441 /* remove from list if not already */
423 if (!list_empty(&mle->list)) 442 __dlm_unlink_mle(dlm, mle);
424 list_del_init(&mle->list);
425 443
426 /* detach the mle from the domain node up/down events */ 444 /* detach the mle from the domain node up/down events */
427 __dlm_mle_detach_hb_events(dlm, mle); 445 __dlm_mle_detach_hb_events(dlm, mle);
428 446
447 atomic_dec(&dlm->mle_cur_count[mle->type]);
448
429 /* NOTE: kfree under spinlock here. 449 /* NOTE: kfree under spinlock here.
430 * if this is bad, we can move this to a freelist. */ 450 * if this is bad, we can move this to a freelist. */
431 kmem_cache_free(dlm_mle_cache, mle); 451 kmem_cache_free(dlm_mle_cache, mle);
@@ -465,43 +485,6 @@ void dlm_destroy_master_caches(void)
465 kmem_cache_destroy(dlm_lockres_cache); 485 kmem_cache_destroy(dlm_lockres_cache);
466} 486}
467 487
468static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
469 struct dlm_lock_resource *res,
470 u8 owner)
471{
472 assert_spin_locked(&res->spinlock);
473
474 mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner);
475
476 if (owner == dlm->node_num)
477 atomic_inc(&dlm->local_resources);
478 else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
479 atomic_inc(&dlm->unknown_resources);
480 else
481 atomic_inc(&dlm->remote_resources);
482
483 res->owner = owner;
484}
485
486void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
487 struct dlm_lock_resource *res, u8 owner)
488{
489 assert_spin_locked(&res->spinlock);
490
491 if (owner == res->owner)
492 return;
493
494 if (res->owner == dlm->node_num)
495 atomic_dec(&dlm->local_resources);
496 else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)
497 atomic_dec(&dlm->unknown_resources);
498 else
499 atomic_dec(&dlm->remote_resources);
500
501 dlm_set_lockres_owner(dlm, res, owner);
502}
503
504
505static void dlm_lockres_release(struct kref *kref) 488static void dlm_lockres_release(struct kref *kref)
506{ 489{
507 struct dlm_lock_resource *res; 490 struct dlm_lock_resource *res;
@@ -527,6 +510,8 @@ static void dlm_lockres_release(struct kref *kref)
527 } 510 }
528 spin_unlock(&dlm->track_lock); 511 spin_unlock(&dlm->track_lock);
529 512
513 atomic_dec(&dlm->res_cur_count);
514
530 dlm_put(dlm); 515 dlm_put(dlm);
531 516
532 if (!hlist_unhashed(&res->hash_node) || 517 if (!hlist_unhashed(&res->hash_node) ||
@@ -607,6 +592,9 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
607 592
608 kref_init(&res->refs); 593 kref_init(&res->refs);
609 594
595 atomic_inc(&dlm->res_tot_count);
596 atomic_inc(&dlm->res_cur_count);
597
610 /* just for consistency */ 598 /* just for consistency */
611 spin_lock(&res->spinlock); 599 spin_lock(&res->spinlock);
612 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN); 600 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
@@ -843,7 +831,7 @@ lookup:
843 alloc_mle = NULL; 831 alloc_mle = NULL;
844 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0); 832 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
845 set_bit(dlm->node_num, mle->maybe_map); 833 set_bit(dlm->node_num, mle->maybe_map);
846 list_add(&mle->list, &dlm->master_list); 834 __dlm_insert_mle(dlm, mle);
847 835
848 /* still holding the dlm spinlock, check the recovery map 836 /* still holding the dlm spinlock, check the recovery map
849 * to see if there are any nodes that still need to be 837 * to see if there are any nodes that still need to be
@@ -1270,7 +1258,7 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1270 res->lockname.len, 1258 res->lockname.len,
1271 res->lockname.name); 1259 res->lockname.name);
1272 mle->type = DLM_MLE_MASTER; 1260 mle->type = DLM_MLE_MASTER;
1273 mle->u.res = res; 1261 mle->mleres = res;
1274 } 1262 }
1275 } 1263 }
1276 } 1264 }
@@ -1315,14 +1303,8 @@ static int dlm_do_master_request(struct dlm_lock_resource *res,
1315 1303
1316 BUG_ON(mle->type == DLM_MLE_MIGRATION); 1304 BUG_ON(mle->type == DLM_MLE_MIGRATION);
1317 1305
1318 if (mle->type != DLM_MLE_MASTER) { 1306 request.namelen = (u8)mle->mnamelen;
1319 request.namelen = mle->u.name.len; 1307 memcpy(request.name, mle->mname, request.namelen);
1320 memcpy(request.name, mle->u.name.name, request.namelen);
1321 } else {
1322 request.namelen = mle->u.res->lockname.len;
1323 memcpy(request.name, mle->u.res->lockname.name,
1324 request.namelen);
1325 }
1326 1308
1327again: 1309again:
1328 ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request, 1310 ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
@@ -1575,7 +1557,7 @@ way_up_top:
1575 // "add the block.\n"); 1557 // "add the block.\n");
1576 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen); 1558 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1577 set_bit(request->node_idx, mle->maybe_map); 1559 set_bit(request->node_idx, mle->maybe_map);
1578 list_add(&mle->list, &dlm->master_list); 1560 __dlm_insert_mle(dlm, mle);
1579 response = DLM_MASTER_RESP_NO; 1561 response = DLM_MASTER_RESP_NO;
1580 } else { 1562 } else {
1581 // mlog(0, "mle was found\n"); 1563 // mlog(0, "mle was found\n");
@@ -1967,7 +1949,7 @@ ok:
1967 assert->node_idx, rr, extra_ref, mle->inuse); 1949 assert->node_idx, rr, extra_ref, mle->inuse);
1968 dlm_print_one_mle(mle); 1950 dlm_print_one_mle(mle);
1969 } 1951 }
1970 list_del_init(&mle->list); 1952 __dlm_unlink_mle(dlm, mle);
1971 __dlm_mle_detach_hb_events(dlm, mle); 1953 __dlm_mle_detach_hb_events(dlm, mle);
1972 __dlm_put_mle(mle); 1954 __dlm_put_mle(mle);
1973 if (extra_ref) { 1955 if (extra_ref) {
@@ -3159,10 +3141,8 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3159 tmp->master = master; 3141 tmp->master = master;
3160 atomic_set(&tmp->woken, 1); 3142 atomic_set(&tmp->woken, 1);
3161 wake_up(&tmp->wq); 3143 wake_up(&tmp->wq);
3162 /* remove it from the list so that only one 3144 /* remove it so that only one mle will be found */
3163 * mle will be found */ 3145 __dlm_unlink_mle(dlm, tmp);
3164 list_del_init(&tmp->list);
3165 /* this was obviously WRONG. mle is uninited here. should be tmp. */
3166 __dlm_mle_detach_hb_events(dlm, tmp); 3146 __dlm_mle_detach_hb_events(dlm, tmp);
3167 ret = DLM_MIGRATE_RESPONSE_MASTERY_REF; 3147 ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3168 mlog(0, "%s:%.*s: master=%u, newmaster=%u, " 3148 mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
@@ -3181,137 +3161,164 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3181 mle->master = master; 3161 mle->master = master;
3182 /* do this for consistency with other mle types */ 3162 /* do this for consistency with other mle types */
3183 set_bit(new_master, mle->maybe_map); 3163 set_bit(new_master, mle->maybe_map);
3184 list_add(&mle->list, &dlm->master_list); 3164 __dlm_insert_mle(dlm, mle);
3185 3165
3186 return ret; 3166 return ret;
3187} 3167}
3188 3168
3189 3169/*
3190void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node) 3170 * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3171 */
3172static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3173 struct dlm_master_list_entry *mle)
3191{ 3174{
3192 struct dlm_master_list_entry *mle, *next;
3193 struct dlm_lock_resource *res; 3175 struct dlm_lock_resource *res;
3194 unsigned int hash;
3195 3176
3196 mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); 3177 /* Find the lockres associated to the mle and set its owner to UNK */
3197top: 3178 res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3198 assert_spin_locked(&dlm->spinlock); 3179 mle->mnamehash);
3180 if (res) {
3181 spin_unlock(&dlm->master_lock);
3199 3182
3200 /* clean the master list */ 3183 /* move lockres onto recovery list */
3201 spin_lock(&dlm->master_lock); 3184 spin_lock(&res->spinlock);
3202 list_for_each_entry_safe(mle, next, &dlm->master_list, list) { 3185 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3203 BUG_ON(mle->type != DLM_MLE_BLOCK && 3186 dlm_move_lockres_to_recovery_list(dlm, res);
3204 mle->type != DLM_MLE_MASTER && 3187 spin_unlock(&res->spinlock);
3205 mle->type != DLM_MLE_MIGRATION); 3188 dlm_lockres_put(res);
3206
3207 /* MASTER mles are initiated locally. the waiting
3208 * process will notice the node map change
3209 * shortly. let that happen as normal. */
3210 if (mle->type == DLM_MLE_MASTER)
3211 continue;
3212 3189
3190 /* about to get rid of mle, detach from heartbeat */
3191 __dlm_mle_detach_hb_events(dlm, mle);
3213 3192
3214 /* BLOCK mles are initiated by other nodes. 3193 /* dump the mle */
3215 * need to clean up if the dead node would have 3194 spin_lock(&dlm->master_lock);
3216 * been the master. */ 3195 __dlm_put_mle(mle);
3217 if (mle->type == DLM_MLE_BLOCK) { 3196 spin_unlock(&dlm->master_lock);
3218 int bit; 3197 }
3219 3198
3220 spin_lock(&mle->spinlock); 3199 return res;
3221 bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); 3200}
3222 if (bit != dead_node) {
3223 mlog(0, "mle found, but dead node %u would "
3224 "not have been master\n", dead_node);
3225 spin_unlock(&mle->spinlock);
3226 } else {
3227 /* must drop the refcount by one since the
3228 * assert_master will never arrive. this
3229 * may result in the mle being unlinked and
3230 * freed, but there may still be a process
3231 * waiting in the dlmlock path which is fine. */
3232 mlog(0, "node %u was expected master\n",
3233 dead_node);
3234 atomic_set(&mle->woken, 1);
3235 spin_unlock(&mle->spinlock);
3236 wake_up(&mle->wq);
3237 /* do not need events any longer, so detach
3238 * from heartbeat */
3239 __dlm_mle_detach_hb_events(dlm, mle);
3240 __dlm_put_mle(mle);
3241 }
3242 continue;
3243 }
3244 3201
3245 /* everything else is a MIGRATION mle */ 3202static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3246 3203 struct dlm_master_list_entry *mle)
3247 /* the rule for MIGRATION mles is that the master 3204{
3248 * becomes UNKNOWN if *either* the original or 3205 __dlm_mle_detach_hb_events(dlm, mle);
3249 * the new master dies. all UNKNOWN lockreses
3250 * are sent to whichever node becomes the recovery
3251 * master. the new master is responsible for
3252 * determining if there is still a master for
3253 * this lockres, or if he needs to take over
3254 * mastery. either way, this node should expect
3255 * another message to resolve this. */
3256 if (mle->master != dead_node &&
3257 mle->new_master != dead_node)
3258 continue;
3259 3206
3260 /* if we have reached this point, this mle needs to 3207 spin_lock(&mle->spinlock);
3261 * be removed from the list and freed. */ 3208 __dlm_unlink_mle(dlm, mle);
3209 atomic_set(&mle->woken, 1);
3210 spin_unlock(&mle->spinlock);
3262 3211
3263 /* remove from the list early. NOTE: unlinking 3212 wake_up(&mle->wq);
3264 * list_head while in list_for_each_safe */ 3213}
3265 __dlm_mle_detach_hb_events(dlm, mle); 3214
3266 spin_lock(&mle->spinlock); 3215static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3267 list_del_init(&mle->list); 3216 struct dlm_master_list_entry *mle, u8 dead_node)
3217{
3218 int bit;
3219
3220 BUG_ON(mle->type != DLM_MLE_BLOCK);
3221
3222 spin_lock(&mle->spinlock);
3223 bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3224 if (bit != dead_node) {
3225 mlog(0, "mle found, but dead node %u would not have been "
3226 "master\n", dead_node);
3227 spin_unlock(&mle->spinlock);
3228 } else {
3229 /* Must drop the refcount by one since the assert_master will
3230 * never arrive. This may result in the mle being unlinked and
3231 * freed, but there may still be a process waiting in the
3232 * dlmlock path which is fine. */
3233 mlog(0, "node %u was expected master\n", dead_node);
3268 atomic_set(&mle->woken, 1); 3234 atomic_set(&mle->woken, 1);
3269 spin_unlock(&mle->spinlock); 3235 spin_unlock(&mle->spinlock);
3270 wake_up(&mle->wq); 3236 wake_up(&mle->wq);
3271 3237
3272 mlog(0, "%s: node %u died during migration from " 3238 /* Do not need events any longer, so detach from heartbeat */
3273 "%u to %u!\n", dlm->name, dead_node, 3239 __dlm_mle_detach_hb_events(dlm, mle);
3274 mle->master, mle->new_master); 3240 __dlm_put_mle(mle);
3275 /* if there is a lockres associated with this 3241 }
3276 * mle, find it and set its owner to UNKNOWN */ 3242}
3277 hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len);
3278 res = __dlm_lookup_lockres(dlm, mle->u.name.name,
3279 mle->u.name.len, hash);
3280 if (res) {
3281 /* unfortunately if we hit this rare case, our
3282 * lock ordering is messed. we need to drop
3283 * the master lock so that we can take the
3284 * lockres lock, meaning that we will have to
3285 * restart from the head of list. */
3286 spin_unlock(&dlm->master_lock);
3287 3243
3288 /* move lockres onto recovery list */ 3244void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3289 spin_lock(&res->spinlock); 3245{
3290 dlm_set_lockres_owner(dlm, res, 3246 struct dlm_master_list_entry *mle;
3291 DLM_LOCK_RES_OWNER_UNKNOWN); 3247 struct dlm_lock_resource *res;
3292 dlm_move_lockres_to_recovery_list(dlm, res); 3248 struct hlist_head *bucket;
3293 spin_unlock(&res->spinlock); 3249 struct hlist_node *list;
3294 dlm_lockres_put(res); 3250 unsigned int i;
3295 3251
3296 /* about to get rid of mle, detach from heartbeat */ 3252 mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
3297 __dlm_mle_detach_hb_events(dlm, mle); 3253top:
3254 assert_spin_locked(&dlm->spinlock);
3298 3255
3299 /* dump the mle */ 3256 /* clean the master list */
3300 spin_lock(&dlm->master_lock); 3257 spin_lock(&dlm->master_lock);
3301 __dlm_put_mle(mle); 3258 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3302 spin_unlock(&dlm->master_lock); 3259 bucket = dlm_master_hash(dlm, i);
3260 hlist_for_each(list, bucket) {
3261 mle = hlist_entry(list, struct dlm_master_list_entry,
3262 master_hash_node);
3263
3264 BUG_ON(mle->type != DLM_MLE_BLOCK &&
3265 mle->type != DLM_MLE_MASTER &&
3266 mle->type != DLM_MLE_MIGRATION);
3267
3268 /* MASTER mles are initiated locally. The waiting
3269 * process will notice the node map change shortly.
3270 * Let that happen as normal. */
3271 if (mle->type == DLM_MLE_MASTER)
3272 continue;
3273
3274 /* BLOCK mles are initiated by other nodes. Need to
3275 * clean up if the dead node would have been the
3276 * master. */
3277 if (mle->type == DLM_MLE_BLOCK) {
3278 dlm_clean_block_mle(dlm, mle, dead_node);
3279 continue;
3280 }
3303 3281
3304 /* restart */ 3282 /* Everything else is a MIGRATION mle */
3305 goto top; 3283
3306 } 3284 /* The rule for MIGRATION mles is that the master
3285 * becomes UNKNOWN if *either* the original or the new
3286 * master dies. All UNKNOWN lockres' are sent to
3287 * whichever node becomes the recovery master. The new
3288 * master is responsible for determining if there is
3289 * still a master for this lockres, or if he needs to
3290 * take over mastery. Either way, this node should
3291 * expect another message to resolve this. */
3292
3293 if (mle->master != dead_node &&
3294 mle->new_master != dead_node)
3295 continue;
3296
3297 /* If we have reached this point, this mle needs to be
3298 * removed from the list and freed. */
3299 dlm_clean_migration_mle(dlm, mle);
3300
3301 mlog(0, "%s: node %u died during migration from "
3302 "%u to %u!\n", dlm->name, dead_node, mle->master,
3303 mle->new_master);
3304
3305 /* If we find a lockres associated with the mle, we've
3306 * hit this rare case that messes up our lock ordering.
3307 * If so, we need to drop the master lock so that we can
3308 * take the lockres lock, meaning that we will have to
3309 * restart from the head of list. */
3310 res = dlm_reset_mleres_owner(dlm, mle);
3311 if (res)
3312 /* restart */
3313 goto top;
3307 3314
3308 /* this may be the last reference */ 3315 /* This may be the last reference */
3309 __dlm_put_mle(mle); 3316 __dlm_put_mle(mle);
3317 }
3310 } 3318 }
3311 spin_unlock(&dlm->master_lock); 3319 spin_unlock(&dlm->master_lock);
3312} 3320}
3313 3321
3314
3315int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 3322int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3316 u8 old_master) 3323 u8 old_master)
3317{ 3324{