aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm')
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h5
-rw-r--r--fs/ocfs2/dlm/dlmconvert.c12
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c18
-rw-r--r--fs/ocfs2/dlm/dlmlock.c25
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c31
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c292
-rw-r--r--fs/ocfs2/dlm/dlmunlock.c13
-rw-r--r--fs/ocfs2/dlm/userdlm.c2
8 files changed, 343 insertions, 55 deletions
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 3fecba0a6023..23ceaa7127b4 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -208,6 +208,9 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm,
208#define DLM_LOCK_RES_IN_PROGRESS 0x00000010 208#define DLM_LOCK_RES_IN_PROGRESS 0x00000010
209#define DLM_LOCK_RES_MIGRATING 0x00000020 209#define DLM_LOCK_RES_MIGRATING 0x00000020
210 210
211/* max milliseconds to wait to sync up a network failure with a node death */
212#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
213
211#define DLM_PURGE_INTERVAL_MS (8 * 1000) 214#define DLM_PURGE_INTERVAL_MS (8 * 1000)
212 215
213struct dlm_lock_resource 216struct dlm_lock_resource
@@ -657,6 +660,8 @@ void dlm_complete_thread(struct dlm_ctxt *dlm);
657int dlm_launch_recovery_thread(struct dlm_ctxt *dlm); 660int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
658void dlm_complete_recovery_thread(struct dlm_ctxt *dlm); 661void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
659void dlm_wait_for_recovery(struct dlm_ctxt *dlm); 662void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
663int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
664int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
660 665
661void dlm_put(struct dlm_ctxt *dlm); 666void dlm_put(struct dlm_ctxt *dlm);
662struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm); 667struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm);
diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c
index 6001b22a997d..f66e2d818ccd 100644
--- a/fs/ocfs2/dlm/dlmconvert.c
+++ b/fs/ocfs2/dlm/dlmconvert.c
@@ -392,6 +392,11 @@ static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
392 } else { 392 } else {
393 mlog_errno(tmpret); 393 mlog_errno(tmpret);
394 if (dlm_is_host_down(tmpret)) { 394 if (dlm_is_host_down(tmpret)) {
395 /* instead of logging the same network error over
396 * and over, sleep here and wait for the heartbeat
397 * to notice the node is dead. times out after 5s. */
398 dlm_wait_for_node_death(dlm, res->owner,
399 DLM_NODE_DEATH_WAIT_MAX);
395 ret = DLM_RECOVERING; 400 ret = DLM_RECOVERING;
396 mlog(0, "node %u died so returning DLM_RECOVERING " 401 mlog(0, "node %u died so returning DLM_RECOVERING "
397 "from convert message!\n", res->owner); 402 "from convert message!\n", res->owner);
@@ -421,7 +426,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
421 struct dlm_lockstatus *lksb; 426 struct dlm_lockstatus *lksb;
422 enum dlm_status status = DLM_NORMAL; 427 enum dlm_status status = DLM_NORMAL;
423 u32 flags; 428 u32 flags;
424 int call_ast = 0, kick_thread = 0; 429 int call_ast = 0, kick_thread = 0, ast_reserved = 0;
425 430
426 if (!dlm_grab(dlm)) { 431 if (!dlm_grab(dlm)) {
427 dlm_error(DLM_REJECTED); 432 dlm_error(DLM_REJECTED);
@@ -490,6 +495,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
490 status = __dlm_lockres_state_to_status(res); 495 status = __dlm_lockres_state_to_status(res);
491 if (status == DLM_NORMAL) { 496 if (status == DLM_NORMAL) {
492 __dlm_lockres_reserve_ast(res); 497 __dlm_lockres_reserve_ast(res);
498 ast_reserved = 1;
493 res->state |= DLM_LOCK_RES_IN_PROGRESS; 499 res->state |= DLM_LOCK_RES_IN_PROGRESS;
494 status = __dlmconvert_master(dlm, res, lock, flags, 500 status = __dlmconvert_master(dlm, res, lock, flags,
495 cnv->requested_type, 501 cnv->requested_type,
@@ -512,10 +518,10 @@ leave:
512 else 518 else
513 dlm_lock_put(lock); 519 dlm_lock_put(lock);
514 520
515 /* either queue the ast or release it */ 521 /* either queue the ast or release it, if reserved */
516 if (call_ast) 522 if (call_ast)
517 dlm_queue_ast(dlm, lock); 523 dlm_queue_ast(dlm, lock);
518 else 524 else if (ast_reserved)
519 dlm_lockres_release_ast(dlm, res); 525 dlm_lockres_release_ast(dlm, res);
520 526
521 if (kick_thread) 527 if (kick_thread)
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index da3c22045f89..6ee30837389c 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -573,8 +573,11 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
573 spin_lock(&dlm_domain_lock); 573 spin_lock(&dlm_domain_lock);
574 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 574 dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
575 /* Once the dlm ctxt is marked as leaving then we don't want 575 /* Once the dlm ctxt is marked as leaving then we don't want
576 * to be put in someone's domain map. */ 576 * to be put in someone's domain map.
577 * Also, explicitly disallow joining at certain troublesome
578 * times (ie. during recovery). */
577 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) { 579 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
580 int bit = query->node_idx;
578 spin_lock(&dlm->spinlock); 581 spin_lock(&dlm->spinlock);
579 582
580 if (dlm->dlm_state == DLM_CTXT_NEW && 583 if (dlm->dlm_state == DLM_CTXT_NEW &&
@@ -586,6 +589,19 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
586 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 589 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
587 /* Disallow parallel joins. */ 590 /* Disallow parallel joins. */
588 response = JOIN_DISALLOW; 591 response = JOIN_DISALLOW;
592 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
593 mlog(ML_NOTICE, "node %u trying to join, but recovery "
594 "is ongoing.\n", bit);
595 response = JOIN_DISALLOW;
596 } else if (test_bit(bit, dlm->recovery_map)) {
597 mlog(ML_NOTICE, "node %u trying to join, but it "
598 "still needs recovery.\n", bit);
599 response = JOIN_DISALLOW;
600 } else if (test_bit(bit, dlm->domain_map)) {
601 mlog(ML_NOTICE, "node %u trying to join, but it "
602 "is still in the domain! needs recovery?\n",
603 bit);
604 response = JOIN_DISALLOW;
589 } else { 605 } else {
590 /* Alright we're fully a part of this domain 606 /* Alright we're fully a part of this domain
591 * so we keep some state as to who's joining 607 * so we keep some state as to who's joining
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index d1a0038557a3..671d4ff222cc 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -220,6 +220,17 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
220 dlm_error(status); 220 dlm_error(status);
221 dlm_revert_pending_lock(res, lock); 221 dlm_revert_pending_lock(res, lock);
222 dlm_lock_put(lock); 222 dlm_lock_put(lock);
223 } else if (dlm_is_recovery_lock(res->lockname.name,
224 res->lockname.len)) {
225 /* special case for the $RECOVERY lock.
226 * there will never be an AST delivered to put
227 * this lock on the proper secondary queue
228 * (granted), so do it manually. */
229 mlog(0, "%s: $RECOVERY lock for this node (%u) is "
230 "mastered by %u; got lock, manually granting (no ast)\n",
231 dlm->name, dlm->node_num, res->owner);
232 list_del_init(&lock->list);
233 list_add_tail(&lock->list, &res->granted);
223 } 234 }
224 spin_unlock(&res->spinlock); 235 spin_unlock(&res->spinlock);
225 236
@@ -646,7 +657,19 @@ retry_lock:
646 mlog(0, "retrying lock with migration/" 657 mlog(0, "retrying lock with migration/"
647 "recovery/in progress\n"); 658 "recovery/in progress\n");
648 msleep(100); 659 msleep(100);
649 dlm_wait_for_recovery(dlm); 660 /* no waiting for dlm_reco_thread */
661 if (recovery) {
662 if (status == DLM_RECOVERING) {
663 mlog(0, "%s: got RECOVERING "
664 "for $REOCVERY lock, master "
665 "was %u\n", dlm->name,
666 res->owner);
667 dlm_wait_for_node_death(dlm, res->owner,
668 DLM_NODE_DEATH_WAIT_MAX);
669 }
670 } else {
671 dlm_wait_for_recovery(dlm);
672 }
650 goto retry_lock; 673 goto retry_lock;
651 } 674 }
652 675
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 27e984f7e4cd..2e2e95e69499 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -1050,17 +1050,10 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1050 node = dlm_bitmap_diff_iter_next(&bdi, &sc); 1050 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1051 while (node >= 0) { 1051 while (node >= 0) {
1052 if (sc == NODE_UP) { 1052 if (sc == NODE_UP) {
1053 /* a node came up. easy. might not even need 1053 /* a node came up. clear any old vote from
1054 * to talk to it if its node number is higher 1054 * the response map and set it in the vote map
1055 * or if we are already blocked. */ 1055 * then restart the mastery. */
1056 mlog(0, "node up! %d\n", node); 1056 mlog(ML_NOTICE, "node %d up while restarting\n", node);
1057 if (blocked)
1058 goto next;
1059
1060 if (node > dlm->node_num) {
1061 mlog(0, "node > this node. skipping.\n");
1062 goto next;
1063 }
1064 1057
1065 /* redo the master request, but only for the new node */ 1058 /* redo the master request, but only for the new node */
1066 mlog(0, "sending request to new node\n"); 1059 mlog(0, "sending request to new node\n");
@@ -2005,6 +1998,15 @@ fail:
2005 break; 1998 break;
2006 1999
2007 mlog(0, "timed out during migration\n"); 2000 mlog(0, "timed out during migration\n");
2001 /* avoid hang during shutdown when migrating lockres
2002 * to a node which also goes down */
2003 if (dlm_is_node_dead(dlm, target)) {
2004 mlog(0, "%s:%.*s: expected migration target %u "
2005 "is no longer up. restarting.\n",
2006 dlm->name, res->lockname.len,
2007 res->lockname.name, target);
2008 ret = -ERESTARTSYS;
2009 }
2008 } 2010 }
2009 if (ret == -ERESTARTSYS) { 2011 if (ret == -ERESTARTSYS) {
2010 /* migration failed, detach and clean up mle */ 2012 /* migration failed, detach and clean up mle */
@@ -2480,7 +2482,9 @@ top:
2480 atomic_set(&mle->woken, 1); 2482 atomic_set(&mle->woken, 1);
2481 spin_unlock(&mle->spinlock); 2483 spin_unlock(&mle->spinlock);
2482 wake_up(&mle->wq); 2484 wake_up(&mle->wq);
2483 /* final put will take care of list removal */ 2485 /* do not need events any longer, so detach
2486 * from heartbeat */
2487 __dlm_mle_detach_hb_events(dlm, mle);
2484 __dlm_put_mle(mle); 2488 __dlm_put_mle(mle);
2485 } 2489 }
2486 continue; 2490 continue;
@@ -2535,6 +2539,9 @@ top:
2535 spin_unlock(&res->spinlock); 2539 spin_unlock(&res->spinlock);
2536 dlm_lockres_put(res); 2540 dlm_lockres_put(res);
2537 2541
2542 /* about to get rid of mle, detach from heartbeat */
2543 __dlm_mle_detach_hb_events(dlm, mle);
2544
2538 /* dump the mle */ 2545 /* dump the mle */
2539 spin_lock(&dlm->master_lock); 2546 spin_lock(&dlm->master_lock);
2540 __dlm_put_mle(mle); 2547 __dlm_put_mle(mle);
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 0c8eb1093f00..ed76bda1a534 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -39,6 +39,7 @@
39#include <linux/inet.h> 39#include <linux/inet.h>
40#include <linux/timer.h> 40#include <linux/timer.h>
41#include <linux/kthread.h> 41#include <linux/kthread.h>
42#include <linux/delay.h>
42 43
43 44
44#include "cluster/heartbeat.h" 45#include "cluster/heartbeat.h"
@@ -256,6 +257,45 @@ static int dlm_recovery_thread(void *data)
256 return 0; 257 return 0;
257} 258}
258 259
260/* returns true when the recovery master has contacted us */
261static int dlm_reco_master_ready(struct dlm_ctxt *dlm)
262{
263 int ready;
264 spin_lock(&dlm->spinlock);
265 ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM);
266 spin_unlock(&dlm->spinlock);
267 return ready;
268}
269
270/* returns true if node is no longer in the domain
271 * could be dead or just not joined */
272int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
273{
274 int dead;
275 spin_lock(&dlm->spinlock);
276 dead = test_bit(node, dlm->domain_map);
277 spin_unlock(&dlm->spinlock);
278 return dead;
279}
280
281int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
282{
283 if (timeout) {
284 mlog(ML_NOTICE, "%s: waiting %dms for notification of "
285 "death of node %u\n", dlm->name, timeout, node);
286 wait_event_timeout(dlm->dlm_reco_thread_wq,
287 dlm_is_node_dead(dlm, node),
288 msecs_to_jiffies(timeout));
289 } else {
290 mlog(ML_NOTICE, "%s: waiting indefinitely for notification "
291 "of death of node %u\n", dlm->name, node);
292 wait_event(dlm->dlm_reco_thread_wq,
293 dlm_is_node_dead(dlm, node));
294 }
295 /* for now, return 0 */
296 return 0;
297}
298
259/* callers of the top-level api calls (dlmlock/dlmunlock) should 299/* callers of the top-level api calls (dlmlock/dlmunlock) should
260 * block on the dlm->reco.event when recovery is in progress. 300 * block on the dlm->reco.event when recovery is in progress.
261 * the dlm recovery thread will set this state when it begins 301 * the dlm recovery thread will set this state when it begins
@@ -297,6 +337,7 @@ static void dlm_end_recovery(struct dlm_ctxt *dlm)
297static int dlm_do_recovery(struct dlm_ctxt *dlm) 337static int dlm_do_recovery(struct dlm_ctxt *dlm)
298{ 338{
299 int status = 0; 339 int status = 0;
340 int ret;
300 341
301 spin_lock(&dlm->spinlock); 342 spin_lock(&dlm->spinlock);
302 343
@@ -343,10 +384,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
343 goto master_here; 384 goto master_here;
344 385
345 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { 386 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
346 /* choose a new master */ 387 /* choose a new master, returns 0 if this node
347 if (!dlm_pick_recovery_master(dlm)) { 388 * is the master, -EEXIST if it's another node.
389 * this does not return until a new master is chosen
390 * or recovery completes entirely. */
391 ret = dlm_pick_recovery_master(dlm);
392 if (!ret) {
348 /* already notified everyone. go. */ 393 /* already notified everyone. go. */
349 dlm->reco.new_master = dlm->node_num;
350 goto master_here; 394 goto master_here;
351 } 395 }
352 mlog(0, "another node will master this recovery session.\n"); 396 mlog(0, "another node will master this recovery session.\n");
@@ -371,8 +415,13 @@ master_here:
371 if (status < 0) { 415 if (status < 0) {
372 mlog(ML_ERROR, "error %d remastering locks for node %u, " 416 mlog(ML_ERROR, "error %d remastering locks for node %u, "
373 "retrying.\n", status, dlm->reco.dead_node); 417 "retrying.\n", status, dlm->reco.dead_node);
418 /* yield a bit to allow any final network messages
419 * to get handled on remaining nodes */
420 msleep(100);
374 } else { 421 } else {
375 /* success! see if any other nodes need recovery */ 422 /* success! see if any other nodes need recovery */
423 mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n",
424 dlm->name, dlm->reco.dead_node, dlm->node_num);
376 dlm_reset_recovery(dlm); 425 dlm_reset_recovery(dlm);
377 } 426 }
378 dlm_end_recovery(dlm); 427 dlm_end_recovery(dlm);
@@ -477,7 +526,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
477 BUG(); 526 BUG();
478 break; 527 break;
479 case DLM_RECO_NODE_DATA_DEAD: 528 case DLM_RECO_NODE_DATA_DEAD:
480 mlog(0, "node %u died after " 529 mlog(ML_NOTICE, "node %u died after "
481 "requesting recovery info for " 530 "requesting recovery info for "
482 "node %u\n", ndata->node_num, 531 "node %u\n", ndata->node_num,
483 dead_node); 532 dead_node);
@@ -485,6 +534,19 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
485 // start all over 534 // start all over
486 destroy = 1; 535 destroy = 1;
487 status = -EAGAIN; 536 status = -EAGAIN;
537 /* instead of spinning like crazy here,
538 * wait for the domain map to catch up
539 * with the network state. otherwise this
540 * can be hit hundreds of times before
541 * the node is really seen as dead. */
542 wait_event_timeout(dlm->dlm_reco_thread_wq,
543 dlm_is_node_dead(dlm,
544 ndata->node_num),
545 msecs_to_jiffies(1000));
546 mlog(0, "waited 1 sec for %u, "
547 "dead? %s\n", ndata->node_num,
548 dlm_is_node_dead(dlm, ndata->node_num) ?
549 "yes" : "no");
488 goto leave; 550 goto leave;
489 case DLM_RECO_NODE_DATA_RECEIVING: 551 case DLM_RECO_NODE_DATA_RECEIVING:
490 case DLM_RECO_NODE_DATA_REQUESTED: 552 case DLM_RECO_NODE_DATA_REQUESTED:
@@ -678,11 +740,27 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
678 dlm = item->dlm; 740 dlm = item->dlm;
679 dead_node = item->u.ral.dead_node; 741 dead_node = item->u.ral.dead_node;
680 reco_master = item->u.ral.reco_master; 742 reco_master = item->u.ral.reco_master;
743 mres = (struct dlm_migratable_lockres *)data;
744
745 if (dead_node != dlm->reco.dead_node ||
746 reco_master != dlm->reco.new_master) {
747 /* show extra debug info if the recovery state is messed */
748 mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), "
749 "request(dead=%u, master=%u)\n",
750 dlm->name, dlm->reco.dead_node, dlm->reco.new_master,
751 dead_node, reco_master);
752 mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u "
753 "entry[0]={c=%"MLFu64",l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n",
754 dlm->name, mres->lockname_len, mres->lockname, mres->master,
755 mres->num_locks, mres->total_locks, mres->flags,
756 mres->ml[0].cookie, mres->ml[0].list, mres->ml[0].flags,
757 mres->ml[0].type, mres->ml[0].convert_type,
758 mres->ml[0].highest_blocked, mres->ml[0].node);
759 BUG();
760 }
681 BUG_ON(dead_node != dlm->reco.dead_node); 761 BUG_ON(dead_node != dlm->reco.dead_node);
682 BUG_ON(reco_master != dlm->reco.new_master); 762 BUG_ON(reco_master != dlm->reco.new_master);
683 763
684 mres = (struct dlm_migratable_lockres *)data;
685
686 /* lock resources should have already been moved to the 764 /* lock resources should have already been moved to the
687 * dlm->reco.resources list. now move items from that list 765 * dlm->reco.resources list. now move items from that list
688 * to a temp list if the dead owner matches. note that the 766 * to a temp list if the dead owner matches. note that the
@@ -757,15 +835,18 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
757 continue; 835 continue;
758 836
759 switch (ndata->state) { 837 switch (ndata->state) {
838 /* should have moved beyond INIT but not to FINALIZE yet */
760 case DLM_RECO_NODE_DATA_INIT: 839 case DLM_RECO_NODE_DATA_INIT:
761 case DLM_RECO_NODE_DATA_DEAD: 840 case DLM_RECO_NODE_DATA_DEAD:
762 case DLM_RECO_NODE_DATA_DONE:
763 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 841 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
764 mlog(ML_ERROR, "bad ndata state for node %u:" 842 mlog(ML_ERROR, "bad ndata state for node %u:"
765 " state=%d\n", ndata->node_num, 843 " state=%d\n", ndata->node_num,
766 ndata->state); 844 ndata->state);
767 BUG(); 845 BUG();
768 break; 846 break;
847 /* these states are possible at this point, anywhere along
848 * the line of recovery */
849 case DLM_RECO_NODE_DATA_DONE:
769 case DLM_RECO_NODE_DATA_RECEIVING: 850 case DLM_RECO_NODE_DATA_RECEIVING:
770 case DLM_RECO_NODE_DATA_REQUESTED: 851 case DLM_RECO_NODE_DATA_REQUESTED:
771 case DLM_RECO_NODE_DATA_REQUESTING: 852 case DLM_RECO_NODE_DATA_REQUESTING:
@@ -799,13 +880,31 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
799{ 880{
800 struct dlm_lock_resource *res; 881 struct dlm_lock_resource *res;
801 struct list_head *iter, *iter2; 882 struct list_head *iter, *iter2;
883 struct dlm_lock *lock;
802 884
803 spin_lock(&dlm->spinlock); 885 spin_lock(&dlm->spinlock);
804 list_for_each_safe(iter, iter2, &dlm->reco.resources) { 886 list_for_each_safe(iter, iter2, &dlm->reco.resources) {
805 res = list_entry (iter, struct dlm_lock_resource, recovering); 887 res = list_entry (iter, struct dlm_lock_resource, recovering);
888 /* always prune any $RECOVERY entries for dead nodes,
889 * otherwise hangs can occur during later recovery */
806 if (dlm_is_recovery_lock(res->lockname.name, 890 if (dlm_is_recovery_lock(res->lockname.name,
807 res->lockname.len)) 891 res->lockname.len)) {
892 spin_lock(&res->spinlock);
893 list_for_each_entry(lock, &res->granted, list) {
894 if (lock->ml.node == dead_node) {
895 mlog(0, "AHA! there was "
896 "a $RECOVERY lock for dead "
897 "node %u (%s)!\n",
898 dead_node, dlm->name);
899 list_del_init(&lock->list);
900 dlm_lock_put(lock);
901 break;
902 }
903 }
904 spin_unlock(&res->spinlock);
808 continue; 905 continue;
906 }
907
809 if (res->owner == dead_node) { 908 if (res->owner == dead_node) {
810 mlog(0, "found lockres owned by dead node while " 909 mlog(0, "found lockres owned by dead node while "
811 "doing recovery for node %u. sending it.\n", 910 "doing recovery for node %u. sending it.\n",
@@ -1179,7 +1278,7 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data)
1179again: 1278again:
1180 ret = dlm_lockres_master_requery(dlm, res, &real_master); 1279 ret = dlm_lockres_master_requery(dlm, res, &real_master);
1181 if (ret < 0) { 1280 if (ret < 0) {
1182 mlog(0, "dlm_lockres_master_requery failure: %d\n", 1281 mlog(0, "dlm_lockres_master_requery ret=%d\n",
1183 ret); 1282 ret);
1184 goto again; 1283 goto again;
1185 } 1284 }
@@ -1757,6 +1856,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
1757 struct dlm_lock_resource *res; 1856 struct dlm_lock_resource *res;
1758 int i; 1857 int i;
1759 struct list_head *bucket; 1858 struct list_head *bucket;
1859 struct dlm_lock *lock;
1760 1860
1761 1861
1762 /* purge any stale mles */ 1862 /* purge any stale mles */
@@ -1780,10 +1880,25 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
1780 bucket = &(dlm->resources[i]); 1880 bucket = &(dlm->resources[i]);
1781 list_for_each(iter, bucket) { 1881 list_for_each(iter, bucket) {
1782 res = list_entry (iter, struct dlm_lock_resource, list); 1882 res = list_entry (iter, struct dlm_lock_resource, list);
1883 /* always prune any $RECOVERY entries for dead nodes,
1884 * otherwise hangs can occur during later recovery */
1783 if (dlm_is_recovery_lock(res->lockname.name, 1885 if (dlm_is_recovery_lock(res->lockname.name,
1784 res->lockname.len)) 1886 res->lockname.len)) {
1887 spin_lock(&res->spinlock);
1888 list_for_each_entry(lock, &res->granted, list) {
1889 if (lock->ml.node == dead_node) {
1890 mlog(0, "AHA! there was "
1891 "a $RECOVERY lock for dead "
1892 "node %u (%s)!\n",
1893 dead_node, dlm->name);
1894 list_del_init(&lock->list);
1895 dlm_lock_put(lock);
1896 break;
1897 }
1898 }
1899 spin_unlock(&res->spinlock);
1785 continue; 1900 continue;
1786 1901 }
1787 spin_lock(&res->spinlock); 1902 spin_lock(&res->spinlock);
1788 /* zero the lvb if necessary */ 1903 /* zero the lvb if necessary */
1789 dlm_revalidate_lvb(dlm, res, dead_node); 1904 dlm_revalidate_lvb(dlm, res, dead_node);
@@ -1869,12 +1984,9 @@ void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data)
1869 return; 1984 return;
1870 1985
1871 spin_lock(&dlm->spinlock); 1986 spin_lock(&dlm->spinlock);
1872
1873 set_bit(idx, dlm->live_nodes_map); 1987 set_bit(idx, dlm->live_nodes_map);
1874 1988 /* do NOT notify mle attached to the heartbeat events.
1875 /* notify any mles attached to the heartbeat events */ 1989 * new nodes are not interesting in mastery until joined. */
1876 dlm_hb_event_notify_attached(dlm, idx, 1);
1877
1878 spin_unlock(&dlm->spinlock); 1990 spin_unlock(&dlm->spinlock);
1879 1991
1880 dlm_put(dlm); 1992 dlm_put(dlm);
@@ -1897,7 +2009,18 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st)
1897 mlog(0, "unlockast for recovery lock fired!\n"); 2009 mlog(0, "unlockast for recovery lock fired!\n");
1898} 2010}
1899 2011
1900 2012/*
2013 * dlm_pick_recovery_master will continually attempt to use
2014 * dlmlock() on the special "$RECOVERY" lockres with the
2015 * LKM_NOQUEUE flag to get an EX. every thread that enters
2016 * this function on each node racing to become the recovery
2017 * master will not stop attempting this until either:
2018 * a) this node gets the EX (and becomes the recovery master),
2019 * or b) dlm->reco.new_master gets set to some nodenum
2020 * != O2NM_INVALID_NODE_NUM (another node will do the reco).
2021 * so each time a recovery master is needed, the entire cluster
2022 * will sync at this point. if the new master dies, that will
2023 * be detected in dlm_do_recovery */
1901static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) 2024static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
1902{ 2025{
1903 enum dlm_status ret; 2026 enum dlm_status ret;
@@ -1906,23 +2029,69 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
1906 2029
1907 mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n", 2030 mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",
1908 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); 2031 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
1909retry: 2032again:
1910 memset(&lksb, 0, sizeof(lksb)); 2033 memset(&lksb, 0, sizeof(lksb));
1911 2034
1912 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, 2035 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
1913 DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast); 2036 DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
1914 2037
2038 mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
2039 dlm->name, ret, lksb.status);
2040
1915 if (ret == DLM_NORMAL) { 2041 if (ret == DLM_NORMAL) {
1916 mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n", 2042 mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
1917 dlm->name, dlm->node_num); 2043 dlm->name, dlm->node_num);
1918 /* I am master, send message to all nodes saying 2044
1919 * that I am beginning a recovery session */ 2045 /* got the EX lock. check to see if another node
1920 status = dlm_send_begin_reco_message(dlm, 2046 * just became the reco master */
1921 dlm->reco.dead_node); 2047 if (dlm_reco_master_ready(dlm)) {
2048 mlog(0, "%s: got reco EX lock, but %u will "
2049 "do the recovery\n", dlm->name,
2050 dlm->reco.new_master);
2051 status = -EEXIST;
2052 } else {
2053 status = 0;
2054
2055 /* see if recovery was already finished elsewhere */
2056 spin_lock(&dlm->spinlock);
2057 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
2058 status = -EINVAL;
2059 mlog(0, "%s: got reco EX lock, but "
2060 "node got recovered already\n", dlm->name);
2061 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
2062 mlog(ML_ERROR, "%s: new master is %u "
2063 "but no dead node!\n",
2064 dlm->name, dlm->reco.new_master);
2065 BUG();
2066 }
2067 }
2068 spin_unlock(&dlm->spinlock);
2069 }
2070
2071 /* if this node has actually become the recovery master,
2072 * set the master and send the messages to begin recovery */
2073 if (!status) {
2074 mlog(0, "%s: dead=%u, this=%u, sending "
2075 "begin_reco now\n", dlm->name,
2076 dlm->reco.dead_node, dlm->node_num);
2077 status = dlm_send_begin_reco_message(dlm,
2078 dlm->reco.dead_node);
2079 /* this always succeeds */
2080 BUG_ON(status);
2081
2082 /* set the new_master to this node */
2083 spin_lock(&dlm->spinlock);
2084 dlm->reco.new_master = dlm->node_num;
2085 spin_unlock(&dlm->spinlock);
2086 }
1922 2087
1923 /* recovery lock is a special case. ast will not get fired, 2088 /* recovery lock is a special case. ast will not get fired,
1924 * so just go ahead and unlock it. */ 2089 * so just go ahead and unlock it. */
1925 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm); 2090 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm);
2091 if (ret == DLM_DENIED) {
2092 mlog(0, "got DLM_DENIED, trying LKM_CANCEL\n");
2093 ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm);
2094 }
1926 if (ret != DLM_NORMAL) { 2095 if (ret != DLM_NORMAL) {
1927 /* this would really suck. this could only happen 2096 /* this would really suck. this could only happen
1928 * if there was a network error during the unlock 2097 * if there was a network error during the unlock
@@ -1930,20 +2099,42 @@ retry:
1930 * is actually "done" and the lock structure is 2099 * is actually "done" and the lock structure is
1931 * even freed. we can continue, but only 2100 * even freed. we can continue, but only
1932 * because this specific lock name is special. */ 2101 * because this specific lock name is special. */
1933 mlog(0, "dlmunlock returned %d\n", ret); 2102 mlog(ML_ERROR, "dlmunlock returned %d\n", ret);
1934 }
1935
1936 if (status < 0) {
1937 mlog(0, "failed to send recovery message. "
1938 "must retry with new node map.\n");
1939 goto retry;
1940 } 2103 }
1941 } else if (ret == DLM_NOTQUEUED) { 2104 } else if (ret == DLM_NOTQUEUED) {
1942 mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n", 2105 mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
1943 dlm->name, dlm->node_num); 2106 dlm->name, dlm->node_num);
1944 /* another node is master. wait on 2107 /* another node is master. wait on
1945 * reco.new_master != O2NM_INVALID_NODE_NUM */ 2108 * reco.new_master != O2NM_INVALID_NODE_NUM
2109 * for at most one second */
2110 wait_event_timeout(dlm->dlm_reco_thread_wq,
2111 dlm_reco_master_ready(dlm),
2112 msecs_to_jiffies(1000));
2113 if (!dlm_reco_master_ready(dlm)) {
2114 mlog(0, "%s: reco master taking awhile\n",
2115 dlm->name);
2116 goto again;
2117 }
2118 /* another node has informed this one that it is reco master */
2119 mlog(0, "%s: reco master %u is ready to recover %u\n",
2120 dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
1946 status = -EEXIST; 2121 status = -EEXIST;
2122 } else {
2123 struct dlm_lock_resource *res;
2124
2125 /* dlmlock returned something other than NOTQUEUED or NORMAL */
2126 mlog(ML_ERROR, "%s: got %s from dlmlock($RECOVERY), "
2127 "lksb.status=%s\n", dlm->name, dlm_errname(ret),
2128 dlm_errname(lksb.status));
2129 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,
2130 DLM_RECOVERY_LOCK_NAME_LEN);
2131 if (res) {
2132 dlm_print_one_lock_resource(res);
2133 dlm_lockres_put(res);
2134 } else {
2135 mlog(ML_ERROR, "recovery lock not found\n");
2136 }
2137 BUG();
1947 } 2138 }
1948 2139
1949 return status; 2140 return status;
@@ -1982,7 +2173,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
1982 mlog(0, "not sending begin reco to self\n"); 2173 mlog(0, "not sending begin reco to self\n");
1983 continue; 2174 continue;
1984 } 2175 }
1985 2176retry:
1986 ret = -EINVAL; 2177 ret = -EINVAL;
1987 mlog(0, "attempting to send begin reco msg to %d\n", 2178 mlog(0, "attempting to send begin reco msg to %d\n",
1988 nodenum); 2179 nodenum);
@@ -1991,8 +2182,17 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
1991 /* negative status is handled ok by caller here */ 2182 /* negative status is handled ok by caller here */
1992 if (ret >= 0) 2183 if (ret >= 0)
1993 ret = status; 2184 ret = status;
2185 if (dlm_is_host_down(ret)) {
2186 /* node is down. not involved in recovery
2187 * so just keep going */
2188 mlog(0, "%s: node %u was down when sending "
2189 "begin reco msg (%d)\n", dlm->name, nodenum, ret);
2190 ret = 0;
2191 }
1994 if (ret < 0) { 2192 if (ret < 0) {
1995 struct dlm_lock_resource *res; 2193 struct dlm_lock_resource *res;
2194 /* this is now a serious problem, possibly ENOMEM
2195 * in the network stack. must retry */
1996 mlog_errno(ret); 2196 mlog_errno(ret);
1997 mlog(ML_ERROR, "begin reco of dlm %s to node %u " 2197 mlog(ML_ERROR, "begin reco of dlm %s to node %u "
1998 " returned %d\n", dlm->name, nodenum, ret); 2198 " returned %d\n", dlm->name, nodenum, ret);
@@ -2004,7 +2204,10 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
2004 } else { 2204 } else {
2005 mlog(ML_ERROR, "recovery lock not found\n"); 2205 mlog(ML_ERROR, "recovery lock not found\n");
2006 } 2206 }
2007 break; 2207 /* sleep for a bit in hopes that we can avoid
2208 * another ENOMEM */
2209 msleep(100);
2210 goto retry;
2008 } 2211 }
2009 } 2212 }
2010 2213
@@ -2027,19 +2230,34 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2027 2230
2028 spin_lock(&dlm->spinlock); 2231 spin_lock(&dlm->spinlock);
2029 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { 2232 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
2030 mlog(0, "new_master already set to %u!\n", 2233 if (test_bit(dlm->reco.new_master, dlm->recovery_map)) {
2031 dlm->reco.new_master); 2234 mlog(0, "%s: new_master %u died, changing "
2235 "to %u\n", dlm->name, dlm->reco.new_master,
2236 br->node_idx);
2237 } else {
2238 mlog(0, "%s: new_master %u NOT DEAD, changing "
2239 "to %u\n", dlm->name, dlm->reco.new_master,
2240 br->node_idx);
2241 /* may not have seen the new master as dead yet */
2242 }
2032 } 2243 }
2033 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) { 2244 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) {
2034 mlog(0, "dead_node already set to %u!\n", 2245 mlog(ML_NOTICE, "%s: dead_node previously set to %u, "
2035 dlm->reco.dead_node); 2246 "node %u changing it to %u\n", dlm->name,
2247 dlm->reco.dead_node, br->node_idx, br->dead_node);
2036 } 2248 }
2037 dlm->reco.new_master = br->node_idx; 2249 dlm->reco.new_master = br->node_idx;
2038 dlm->reco.dead_node = br->dead_node; 2250 dlm->reco.dead_node = br->dead_node;
2039 if (!test_bit(br->dead_node, dlm->recovery_map)) { 2251 if (!test_bit(br->dead_node, dlm->recovery_map)) {
2040 mlog(ML_ERROR, "recovery master %u sees %u as dead, but this " 2252 mlog(0, "recovery master %u sees %u as dead, but this "
2041 "node has not yet. marking %u as dead\n", 2253 "node has not yet. marking %u as dead\n",
2042 br->node_idx, br->dead_node, br->dead_node); 2254 br->node_idx, br->dead_node, br->dead_node);
2255 if (!test_bit(br->dead_node, dlm->domain_map) ||
2256 !test_bit(br->dead_node, dlm->live_nodes_map))
2257 mlog(0, "%u not in domain/live_nodes map "
2258 "so setting it in reco map manually\n",
2259 br->dead_node);
2260 set_bit(br->dead_node, dlm->recovery_map);
2043 __dlm_hb_node_down(dlm, br->dead_node); 2261 __dlm_hb_node_down(dlm, br->dead_node);
2044 } 2262 }
2045 spin_unlock(&dlm->spinlock); 2263 spin_unlock(&dlm->spinlock);
diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c
index cec2ce1cd318..c95f08d2e925 100644
--- a/fs/ocfs2/dlm/dlmunlock.c
+++ b/fs/ocfs2/dlm/dlmunlock.c
@@ -188,6 +188,19 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm,
188 actions &= ~(DLM_UNLOCK_REMOVE_LOCK| 188 actions &= ~(DLM_UNLOCK_REMOVE_LOCK|
189 DLM_UNLOCK_REGRANT_LOCK| 189 DLM_UNLOCK_REGRANT_LOCK|
190 DLM_UNLOCK_CLEAR_CONVERT_TYPE); 190 DLM_UNLOCK_CLEAR_CONVERT_TYPE);
191 } else if (status == DLM_RECOVERING ||
192 status == DLM_MIGRATING ||
193 status == DLM_FORWARD) {
194 /* must clear the actions because this unlock
195 * is about to be retried. cannot free or do
196 * any list manipulation. */
197 mlog(0, "%s:%.*s: clearing actions, %s\n",
198 dlm->name, res->lockname.len,
199 res->lockname.name,
200 status==DLM_RECOVERING?"recovering":
201 (status==DLM_MIGRATING?"migrating":
202 "forward"));
203 actions = 0;
191 } 204 }
192 if (flags & LKM_CANCEL) 205 if (flags & LKM_CANCEL)
193 lock->cancel_pending = 0; 206 lock->cancel_pending = 0;
diff --git a/fs/ocfs2/dlm/userdlm.c b/fs/ocfs2/dlm/userdlm.c
index e1fdd288796e..c3764f4744ee 100644
--- a/fs/ocfs2/dlm/userdlm.c
+++ b/fs/ocfs2/dlm/userdlm.c
@@ -27,7 +27,7 @@
27 * Boston, MA 021110-1307, USA. 27 * Boston, MA 021110-1307, USA.
28 */ 28 */
29 29
30#include <asm/signal.h> 30#include <linux/signal.h>
31 31
32#include <linux/module.h> 32#include <linux/module.h>
33#include <linux/fs.h> 33#include <linux/fs.h>