aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/dlm/ast.c3
-rw-r--r--fs/dlm/dlm_internal.h8
-rw-r--r--fs/dlm/lock.c286
-rw-r--r--fs/dlm/lock.h4
-rw-r--r--fs/dlm/lockspace.c20
-rw-r--r--fs/dlm/rcom.c23
-rw-r--r--fs/dlm/recover.c73
-rw-r--r--fs/dlm/recoverd.c9
-rw-r--r--fs/dlm/requestqueue.c39
-rw-r--r--fs/gfs2/incore.h1
-rw-r--r--fs/gfs2/lock_dlm.c2
-rw-r--r--fs/gfs2/ops_fstype.c7
-rw-r--r--include/linux/dlm.h1
13 files changed, 304 insertions, 172 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 90e5997262ea..63dc19c54d5a 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -310,6 +310,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
310 } 310 }
311 mutex_unlock(&ls->ls_cb_mutex); 311 mutex_unlock(&ls->ls_cb_mutex);
312 312
313 log_debug(ls, "dlm_callback_resume %d", count); 313 if (count)
314 log_debug(ls, "dlm_callback_resume %d", count);
314} 315}
315 316
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 0e74832c021b..bc342f7ac3af 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -271,6 +271,8 @@ struct dlm_lkb {
271 ktime_t lkb_last_cast_time; /* for debugging */ 271 ktime_t lkb_last_cast_time; /* for debugging */
272 ktime_t lkb_last_bast_time; /* for debugging */ 272 ktime_t lkb_last_bast_time; /* for debugging */
273 273
274 uint64_t lkb_recover_seq; /* from ls_recover_seq */
275
274 char *lkb_lvbptr; 276 char *lkb_lvbptr;
275 struct dlm_lksb *lkb_lksb; /* caller's status block */ 277 struct dlm_lksb *lkb_lksb; /* caller's status block */
276 void (*lkb_astfn) (void *astparam); 278 void (*lkb_astfn) (void *astparam);
@@ -325,7 +327,7 @@ enum rsb_flags {
325 RSB_NEW_MASTER, 327 RSB_NEW_MASTER,
326 RSB_NEW_MASTER2, 328 RSB_NEW_MASTER2,
327 RSB_RECOVER_CONVERT, 329 RSB_RECOVER_CONVERT,
328 RSB_LOCKS_PURGED, 330 RSB_RECOVER_GRANT,
329}; 331};
330 332
331static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) 333static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
@@ -571,6 +573,7 @@ struct dlm_ls {
571 struct mutex ls_requestqueue_mutex; 573 struct mutex ls_requestqueue_mutex;
572 struct dlm_rcom *ls_recover_buf; 574 struct dlm_rcom *ls_recover_buf;
573 int ls_recover_nodeid; /* for debugging */ 575 int ls_recover_nodeid; /* for debugging */
576 unsigned int ls_recover_locks_in; /* for log info */
574 uint64_t ls_rcom_seq; 577 uint64_t ls_rcom_seq;
575 spinlock_t ls_rcom_spin; 578 spinlock_t ls_rcom_spin;
576 struct list_head ls_recover_list; 579 struct list_head ls_recover_list;
@@ -597,6 +600,7 @@ struct dlm_ls {
597#define LSFL_UEVENT_WAIT 5 600#define LSFL_UEVENT_WAIT 5
598#define LSFL_TIMEWARN 6 601#define LSFL_TIMEWARN 6
599#define LSFL_CB_DELAY 7 602#define LSFL_CB_DELAY 7
603#define LSFL_NODIR 8
600 604
601/* much of this is just saving user space pointers associated with the 605/* much of this is just saving user space pointers associated with the
602 lock that we pass back to the user lib with an ast */ 606 lock that we pass back to the user lib with an ast */
@@ -644,7 +648,7 @@ static inline int dlm_recovery_stopped(struct dlm_ls *ls)
644 648
645static inline int dlm_no_directory(struct dlm_ls *ls) 649static inline int dlm_no_directory(struct dlm_ls *ls)
646{ 650{
647 return (ls->ls_exflags & DLM_LSFL_NODIR) ? 1 : 0; 651 return test_bit(LSFL_NODIR, &ls->ls_flags);
648} 652}
649 653
650int dlm_netlink_init(void); 654int dlm_netlink_init(void);
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index f3ba70301a45..bdafb65a5234 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -161,10 +161,11 @@ static const int __quecvt_compat_matrix[8][8] = {
161void dlm_print_lkb(struct dlm_lkb *lkb) 161void dlm_print_lkb(struct dlm_lkb *lkb)
162{ 162{
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " 163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d\n", 164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, 166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid); 167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 (unsigned long long)lkb->lkb_recover_seq);
168} 169}
169 170
170static void dlm_print_rsb(struct dlm_rsb *r) 171static void dlm_print_rsb(struct dlm_rsb *r)
@@ -251,8 +252,6 @@ static inline int is_process_copy(struct dlm_lkb *lkb)
251 252
252static inline int is_master_copy(struct dlm_lkb *lkb) 253static inline int is_master_copy(struct dlm_lkb *lkb)
253{ 254{
254 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
255 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
256 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; 255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
257} 256}
258 257
@@ -1519,13 +1518,13 @@ static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1519 } 1518 }
1520 1519
1521 lkb->lkb_rqmode = DLM_LOCK_IV; 1520 lkb->lkb_rqmode = DLM_LOCK_IV;
1521 lkb->lkb_highbast = 0;
1522} 1522}
1523 1523
1524static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1524static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1525{ 1525{
1526 set_lvb_lock(r, lkb); 1526 set_lvb_lock(r, lkb);
1527 _grant_lock(r, lkb); 1527 _grant_lock(r, lkb);
1528 lkb->lkb_highbast = 0;
1529} 1528}
1530 1529
1531static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1530static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -1887,7 +1886,8 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1887/* Returns the highest requested mode of all blocked conversions; sets 1886/* Returns the highest requested mode of all blocked conversions; sets
1888 cw if there's a blocked conversion to DLM_LOCK_CW. */ 1887 cw if there's a blocked conversion to DLM_LOCK_CW. */
1889 1888
1890static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) 1889static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
1890 unsigned int *count)
1891{ 1891{
1892 struct dlm_lkb *lkb, *s; 1892 struct dlm_lkb *lkb, *s;
1893 int hi, demoted, quit, grant_restart, demote_restart; 1893 int hi, demoted, quit, grant_restart, demote_restart;
@@ -1906,6 +1906,8 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1906 if (can_be_granted(r, lkb, 0, &deadlk)) { 1906 if (can_be_granted(r, lkb, 0, &deadlk)) {
1907 grant_lock_pending(r, lkb); 1907 grant_lock_pending(r, lkb);
1908 grant_restart = 1; 1908 grant_restart = 1;
1909 if (count)
1910 (*count)++;
1909 continue; 1911 continue;
1910 } 1912 }
1911 1913
@@ -1939,14 +1941,17 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1939 return max_t(int, high, hi); 1941 return max_t(int, high, hi);
1940} 1942}
1941 1943
1942static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw) 1944static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
1945 unsigned int *count)
1943{ 1946{
1944 struct dlm_lkb *lkb, *s; 1947 struct dlm_lkb *lkb, *s;
1945 1948
1946 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 1949 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1947 if (can_be_granted(r, lkb, 0, NULL)) 1950 if (can_be_granted(r, lkb, 0, NULL)) {
1948 grant_lock_pending(r, lkb); 1951 grant_lock_pending(r, lkb);
1949 else { 1952 if (count)
1953 (*count)++;
1954 } else {
1950 high = max_t(int, lkb->lkb_rqmode, high); 1955 high = max_t(int, lkb->lkb_rqmode, high);
1951 if (lkb->lkb_rqmode == DLM_LOCK_CW) 1956 if (lkb->lkb_rqmode == DLM_LOCK_CW)
1952 *cw = 1; 1957 *cw = 1;
@@ -1975,16 +1980,20 @@ static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1975 return 0; 1980 return 0;
1976} 1981}
1977 1982
1978static void grant_pending_locks(struct dlm_rsb *r) 1983static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
1979{ 1984{
1980 struct dlm_lkb *lkb, *s; 1985 struct dlm_lkb *lkb, *s;
1981 int high = DLM_LOCK_IV; 1986 int high = DLM_LOCK_IV;
1982 int cw = 0; 1987 int cw = 0;
1983 1988
1984 DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); 1989 if (!is_master(r)) {
1990 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
1991 dlm_dump_rsb(r);
1992 return;
1993 }
1985 1994
1986 high = grant_pending_convert(r, high, &cw); 1995 high = grant_pending_convert(r, high, &cw, count);
1987 high = grant_pending_wait(r, high, &cw); 1996 high = grant_pending_wait(r, high, &cw, count);
1988 1997
1989 if (high == DLM_LOCK_IV) 1998 if (high == DLM_LOCK_IV)
1990 return; 1999 return;
@@ -2520,7 +2529,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2520 before we try again to grant this one. */ 2529 before we try again to grant this one. */
2521 2530
2522 if (is_demoted(lkb)) { 2531 if (is_demoted(lkb)) {
2523 grant_pending_convert(r, DLM_LOCK_IV, NULL); 2532 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
2524 if (_can_be_granted(r, lkb, 1)) { 2533 if (_can_be_granted(r, lkb, 1)) {
2525 grant_lock(r, lkb); 2534 grant_lock(r, lkb);
2526 queue_cast(r, lkb, 0); 2535 queue_cast(r, lkb, 0);
@@ -2548,7 +2557,7 @@ static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2548{ 2557{
2549 switch (error) { 2558 switch (error) {
2550 case 0: 2559 case 0:
2551 grant_pending_locks(r); 2560 grant_pending_locks(r, NULL);
2552 /* grant_pending_locks also sends basts */ 2561 /* grant_pending_locks also sends basts */
2553 break; 2562 break;
2554 case -EAGAIN: 2563 case -EAGAIN:
@@ -2571,7 +2580,7 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2571static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 2580static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2572 int error) 2581 int error)
2573{ 2582{
2574 grant_pending_locks(r); 2583 grant_pending_locks(r, NULL);
2575} 2584}
2576 2585
2577/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 2586/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
@@ -2592,7 +2601,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2592 int error) 2601 int error)
2593{ 2602{
2594 if (error) 2603 if (error)
2595 grant_pending_locks(r); 2604 grant_pending_locks(r, NULL);
2596} 2605}
2597 2606
2598/* 2607/*
@@ -3452,8 +3461,9 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3452 goto fail; 3461 goto fail;
3453 3462
3454 if (lkb->lkb_remid != ms->m_lkid) { 3463 if (lkb->lkb_remid != ms->m_lkid) {
3455 log_error(ls, "receive_convert %x remid %x remote %d %x", 3464 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3456 lkb->lkb_id, lkb->lkb_remid, 3465 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3466 (unsigned long long)lkb->lkb_recover_seq,
3457 ms->m_header.h_nodeid, ms->m_lkid); 3467 ms->m_header.h_nodeid, ms->m_lkid);
3458 error = -ENOENT; 3468 error = -ENOENT;
3459 goto fail; 3469 goto fail;
@@ -3631,6 +3641,7 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3631 goto out; 3641 goto out;
3632 3642
3633 queue_bast(r, lkb, ms->m_bastmode); 3643 queue_bast(r, lkb, ms->m_bastmode);
3644 lkb->lkb_highbast = ms->m_bastmode;
3634 out: 3645 out:
3635 unlock_rsb(r); 3646 unlock_rsb(r);
3636 put_rsb(r); 3647 put_rsb(r);
@@ -3710,8 +3721,13 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3710 3721
3711 mstype = lkb->lkb_wait_type; 3722 mstype = lkb->lkb_wait_type;
3712 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 3723 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3713 if (error) 3724 if (error) {
3725 log_error(ls, "receive_request_reply %x remote %d %x result %d",
3726 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
3727 ms->m_result);
3728 dlm_dump_rsb(r);
3714 goto out; 3729 goto out;
3730 }
3715 3731
3716 /* Optimization: the dir node was also the master, so it took our 3732 /* Optimization: the dir node was also the master, so it took our
3717 lookup as a request and sent request reply instead of lookup reply */ 3733 lookup as a request and sent request reply instead of lookup reply */
@@ -4122,21 +4138,28 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4122 * happen in normal usage for the async messages and cancel, so 4138 * happen in normal usage for the async messages and cancel, so
4123 * only use log_debug for them. 4139 * only use log_debug for them.
4124 * 4140 *
4125 * Other errors are expected and normal. 4141 * Some errors are expected and normal.
4126 */ 4142 */
4127 4143
4128 if (error == -ENOENT && noent) { 4144 if (error == -ENOENT && noent) {
4129 log_debug(ls, "receive %d no %x remote %d %x seq %u", 4145 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4130 ms->m_type, ms->m_remid, ms->m_header.h_nodeid, 4146 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4131 ms->m_lkid, saved_seq); 4147 ms->m_lkid, saved_seq);
4132 } else if (error == -ENOENT) { 4148 } else if (error == -ENOENT) {
4133 log_error(ls, "receive %d no %x remote %d %x seq %u", 4149 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4134 ms->m_type, ms->m_remid, ms->m_header.h_nodeid, 4150 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4135 ms->m_lkid, saved_seq); 4151 ms->m_lkid, saved_seq);
4136 4152
4137 if (ms->m_type == DLM_MSG_CONVERT) 4153 if (ms->m_type == DLM_MSG_CONVERT)
4138 dlm_dump_rsb_hash(ls, ms->m_hash); 4154 dlm_dump_rsb_hash(ls, ms->m_hash);
4139 } 4155 }
4156
4157 if (error == -EINVAL) {
4158 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4159 "saved_seq %u",
4160 ms->m_type, ms->m_header.h_nodeid,
4161 ms->m_lkid, ms->m_remid, saved_seq);
4162 }
4140} 4163}
4141 4164
4142/* If the lockspace is in recovery mode (locking stopped), then normal 4165/* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4200,9 +4223,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4200 4223
4201 ls = dlm_find_lockspace_global(hd->h_lockspace); 4224 ls = dlm_find_lockspace_global(hd->h_lockspace);
4202 if (!ls) { 4225 if (!ls) {
4203 if (dlm_config.ci_log_debug) 4226 if (dlm_config.ci_log_debug) {
4204 log_print("invalid lockspace %x from %d cmd %d type %d", 4227 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4205 hd->h_lockspace, nodeid, hd->h_cmd, type); 4228 "%u from %d cmd %d type %d\n",
4229 hd->h_lockspace, nodeid, hd->h_cmd, type);
4230 }
4206 4231
4207 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) 4232 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4208 dlm_send_ls_not_ready(nodeid, &p->rcom); 4233 dlm_send_ls_not_ready(nodeid, &p->rcom);
@@ -4253,16 +4278,10 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4253static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, 4278static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4254 int dir_nodeid) 4279 int dir_nodeid)
4255{ 4280{
4256 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) 4281 if (dlm_no_directory(ls))
4257 return 1;
4258
4259 if (!dlm_no_directory(ls))
4260 return 0;
4261
4262 if (dir_nodeid == dlm_our_nodeid())
4263 return 1; 4282 return 1;
4264 4283
4265 if (dir_nodeid != lkb->lkb_wait_nodeid) 4284 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4266 return 1; 4285 return 1;
4267 4286
4268 return 0; 4287 return 0;
@@ -4519,112 +4538,177 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
4519 return error; 4538 return error;
4520} 4539}
4521 4540
4522static void purge_queue(struct dlm_rsb *r, struct list_head *queue, 4541static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
4523 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb)) 4542 struct list_head *list)
4524{ 4543{
4525 struct dlm_ls *ls = r->res_ls;
4526 struct dlm_lkb *lkb, *safe; 4544 struct dlm_lkb *lkb, *safe;
4527 4545
4528 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) { 4546 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
4529 if (test(ls, lkb)) { 4547 if (!is_master_copy(lkb))
4530 rsb_set_flag(r, RSB_LOCKS_PURGED); 4548 continue;
4531 del_lkb(r, lkb); 4549
4532 /* this put should free the lkb */ 4550 /* don't purge lkbs we've added in recover_master_copy for
4533 if (!dlm_put_lkb(lkb)) 4551 the current recovery seq */
4534 log_error(ls, "purged lkb not released"); 4552
4535 } 4553 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
4554 continue;
4555
4556 del_lkb(r, lkb);
4557
4558 /* this put should free the lkb */
4559 if (!dlm_put_lkb(lkb))
4560 log_error(ls, "purged mstcpy lkb not released");
4536 } 4561 }
4537} 4562}
4538 4563
4539static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 4564void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4540{ 4565{
4541 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid)); 4566 struct dlm_ls *ls = r->res_ls;
4542}
4543 4567
4544static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 4568 purge_mstcpy_list(ls, r, &r->res_grantqueue);
4545{ 4569 purge_mstcpy_list(ls, r, &r->res_convertqueue);
4546 return is_master_copy(lkb); 4570 purge_mstcpy_list(ls, r, &r->res_waitqueue);
4547} 4571}
4548 4572
4549static void purge_dead_locks(struct dlm_rsb *r) 4573static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
4574 struct list_head *list,
4575 int nodeid_gone, unsigned int *count)
4550{ 4576{
4551 purge_queue(r, &r->res_grantqueue, &purge_dead_test); 4577 struct dlm_lkb *lkb, *safe;
4552 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4553 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4554}
4555 4578
4556void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 4579 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
4557{ 4580 if (!is_master_copy(lkb))
4558 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test); 4581 continue;
4559 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test); 4582
4560 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test); 4583 if ((lkb->lkb_nodeid == nodeid_gone) ||
4584 dlm_is_removed(ls, lkb->lkb_nodeid)) {
4585
4586 del_lkb(r, lkb);
4587
4588 /* this put should free the lkb */
4589 if (!dlm_put_lkb(lkb))
4590 log_error(ls, "purged dead lkb not released");
4591
4592 rsb_set_flag(r, RSB_RECOVER_GRANT);
4593
4594 (*count)++;
4595 }
4596 }
4561} 4597}
4562 4598
4563/* Get rid of locks held by nodes that are gone. */ 4599/* Get rid of locks held by nodes that are gone. */
4564 4600
4565int dlm_purge_locks(struct dlm_ls *ls) 4601void dlm_recover_purge(struct dlm_ls *ls)
4566{ 4602{
4567 struct dlm_rsb *r; 4603 struct dlm_rsb *r;
4604 struct dlm_member *memb;
4605 int nodes_count = 0;
4606 int nodeid_gone = 0;
4607 unsigned int lkb_count = 0;
4608
4609 /* cache one removed nodeid to optimize the common
4610 case of a single node removed */
4611
4612 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
4613 nodes_count++;
4614 nodeid_gone = memb->nodeid;
4615 }
4568 4616
4569 log_debug(ls, "dlm_purge_locks"); 4617 if (!nodes_count)
4618 return;
4570 4619
4571 down_write(&ls->ls_root_sem); 4620 down_write(&ls->ls_root_sem);
4572 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 4621 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4573 hold_rsb(r); 4622 hold_rsb(r);
4574 lock_rsb(r); 4623 lock_rsb(r);
4575 if (is_master(r)) 4624 if (is_master(r)) {
4576 purge_dead_locks(r); 4625 purge_dead_list(ls, r, &r->res_grantqueue,
4626 nodeid_gone, &lkb_count);
4627 purge_dead_list(ls, r, &r->res_convertqueue,
4628 nodeid_gone, &lkb_count);
4629 purge_dead_list(ls, r, &r->res_waitqueue,
4630 nodeid_gone, &lkb_count);
4631 }
4577 unlock_rsb(r); 4632 unlock_rsb(r);
4578 unhold_rsb(r); 4633 unhold_rsb(r);
4579 4634 cond_resched();
4580 schedule();
4581 } 4635 }
4582 up_write(&ls->ls_root_sem); 4636 up_write(&ls->ls_root_sem);
4583 4637
4584 return 0; 4638 if (lkb_count)
4639 log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
4640 lkb_count, nodes_count);
4585} 4641}
4586 4642
4587static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) 4643static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
4588{ 4644{
4589 struct rb_node *n; 4645 struct rb_node *n;
4590 struct dlm_rsb *r, *r_ret = NULL; 4646 struct dlm_rsb *r;
4591 4647
4592 spin_lock(&ls->ls_rsbtbl[bucket].lock); 4648 spin_lock(&ls->ls_rsbtbl[bucket].lock);
4593 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { 4649 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4594 r = rb_entry(n, struct dlm_rsb, res_hashnode); 4650 r = rb_entry(n, struct dlm_rsb, res_hashnode);
4595 if (!rsb_flag(r, RSB_LOCKS_PURGED)) 4651
4652 if (!rsb_flag(r, RSB_RECOVER_GRANT))
4653 continue;
4654 rsb_clear_flag(r, RSB_RECOVER_GRANT);
4655 if (!is_master(r))
4596 continue; 4656 continue;
4597 hold_rsb(r); 4657 hold_rsb(r);
4598 rsb_clear_flag(r, RSB_LOCKS_PURGED); 4658 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4599 r_ret = r; 4659 return r;
4600 break;
4601 } 4660 }
4602 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 4661 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4603 return r_ret; 4662 return NULL;
4604} 4663}
4605 4664
4606void dlm_grant_after_purge(struct dlm_ls *ls) 4665/*
4666 * Attempt to grant locks on resources that we are the master of.
4667 * Locks may have become grantable during recovery because locks
4668 * from departed nodes have been purged (or not rebuilt), allowing
4669 * previously blocked locks to now be granted. The subset of rsb's
4670 * we are interested in are those with lkb's on either the convert or
4671 * waiting queues.
4672 *
4673 * Simplest would be to go through each master rsb and check for non-empty
4674 * convert or waiting queues, and attempt to grant on those rsbs.
4675 * Checking the queues requires lock_rsb, though, for which we'd need
4676 * to release the rsbtbl lock. This would make iterating through all
4677 * rsb's very inefficient. So, we rely on earlier recovery routines
4678 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
4679 * locks for.
4680 */
4681
4682void dlm_recover_grant(struct dlm_ls *ls)
4607{ 4683{
4608 struct dlm_rsb *r; 4684 struct dlm_rsb *r;
4609 int bucket = 0; 4685 int bucket = 0;
4686 unsigned int count = 0;
4687 unsigned int rsb_count = 0;
4688 unsigned int lkb_count = 0;
4610 4689
4611 while (1) { 4690 while (1) {
4612 r = find_purged_rsb(ls, bucket); 4691 r = find_grant_rsb(ls, bucket);
4613 if (!r) { 4692 if (!r) {
4614 if (bucket == ls->ls_rsbtbl_size - 1) 4693 if (bucket == ls->ls_rsbtbl_size - 1)
4615 break; 4694 break;
4616 bucket++; 4695 bucket++;
4617 continue; 4696 continue;
4618 } 4697 }
4698 rsb_count++;
4699 count = 0;
4619 lock_rsb(r); 4700 lock_rsb(r);
4620 if (is_master(r)) { 4701 grant_pending_locks(r, &count);
4621 grant_pending_locks(r); 4702 lkb_count += count;
4622 confirm_master(r, 0); 4703 confirm_master(r, 0);
4623 }
4624 unlock_rsb(r); 4704 unlock_rsb(r);
4625 put_rsb(r); 4705 put_rsb(r);
4626 schedule(); 4706 cond_resched();
4627 } 4707 }
4708
4709 if (lkb_count)
4710 log_debug(ls, "dlm_recover_grant %u locks on %u resources",
4711 lkb_count, rsb_count);
4628} 4712}
4629 4713
4630static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 4714static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
@@ -4723,11 +4807,26 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4723 4807
4724 remid = le32_to_cpu(rl->rl_lkid); 4808 remid = le32_to_cpu(rl->rl_lkid);
4725 4809
4726 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 4810 /* In general we expect the rsb returned to be R_MASTER, but we don't
4727 R_MASTER, &r); 4811 have to require it. Recovery of masters on one node can overlap
4812 recovery of locks on another node, so one node can send us MSTCPY
4813 locks before we've made ourselves master of this rsb. We can still
4814 add new MSTCPY locks that we receive here without any harm; when
4815 we make ourselves master, dlm_recover_masters() won't touch the
4816 MSTCPY locks we've received early. */
4817
4818 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
4728 if (error) 4819 if (error)
4729 goto out; 4820 goto out;
4730 4821
4822 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
4823 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
4824 rc->rc_header.h_nodeid, remid);
4825 error = -EBADR;
4826 put_rsb(r);
4827 goto out;
4828 }
4829
4731 lock_rsb(r); 4830 lock_rsb(r);
4732 4831
4733 lkb = search_remid(r, rc->rc_header.h_nodeid, remid); 4832 lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
@@ -4749,12 +4848,18 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4749 attach_lkb(r, lkb); 4848 attach_lkb(r, lkb);
4750 add_lkb(r, lkb, rl->rl_status); 4849 add_lkb(r, lkb, rl->rl_status);
4751 error = 0; 4850 error = 0;
4851 ls->ls_recover_locks_in++;
4852
4853 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
4854 rsb_set_flag(r, RSB_RECOVER_GRANT);
4752 4855
4753 out_remid: 4856 out_remid:
4754 /* this is the new value returned to the lock holder for 4857 /* this is the new value returned to the lock holder for
4755 saving in its process-copy lkb */ 4858 saving in its process-copy lkb */
4756 rl->rl_remid = cpu_to_le32(lkb->lkb_id); 4859 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4757 4860
4861 lkb->lkb_recover_seq = ls->ls_recover_seq;
4862
4758 out_unlock: 4863 out_unlock:
4759 unlock_rsb(r); 4864 unlock_rsb(r);
4760 put_rsb(r); 4865 put_rsb(r);
@@ -4786,17 +4891,20 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4786 return error; 4891 return error;
4787 } 4892 }
4788 4893
4894 r = lkb->lkb_resource;
4895 hold_rsb(r);
4896 lock_rsb(r);
4897
4789 if (!is_process_copy(lkb)) { 4898 if (!is_process_copy(lkb)) {
4790 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", 4899 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
4791 lkid, rc->rc_header.h_nodeid, remid, result); 4900 lkid, rc->rc_header.h_nodeid, remid, result);
4792 dlm_print_lkb(lkb); 4901 dlm_dump_rsb(r);
4902 unlock_rsb(r);
4903 put_rsb(r);
4904 dlm_put_lkb(lkb);
4793 return -EINVAL; 4905 return -EINVAL;
4794 } 4906 }
4795 4907
4796 r = lkb->lkb_resource;
4797 hold_rsb(r);
4798 lock_rsb(r);
4799
4800 switch (result) { 4908 switch (result) {
4801 case -EBADR: 4909 case -EBADR:
4802 /* There's a chance the new master received our lock before 4910 /* There's a chance the new master received our lock before
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 56e2bc646565..c8b226c62807 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -32,9 +32,9 @@ void dlm_adjust_timeouts(struct dlm_ls *ls);
32int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, 32int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
33 unsigned int flags, struct dlm_rsb **r_ret); 33 unsigned int flags, struct dlm_rsb **r_ret);
34 34
35int dlm_purge_locks(struct dlm_ls *ls); 35void dlm_recover_purge(struct dlm_ls *ls);
36void dlm_purge_mstcpy_locks(struct dlm_rsb *r); 36void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
37void dlm_grant_after_purge(struct dlm_ls *ls); 37void dlm_recover_grant(struct dlm_ls *ls);
38int dlm_recover_waiters_post(struct dlm_ls *ls); 38int dlm_recover_waiters_post(struct dlm_ls *ls);
39void dlm_recover_waiters_pre(struct dlm_ls *ls); 39void dlm_recover_waiters_pre(struct dlm_ls *ls);
40int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc); 40int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index a1ea25face82..ca506abbdd3b 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -74,6 +74,19 @@ static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
74 return len; 74 return len;
75} 75}
76 76
77static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
78{
79 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
80}
81
82static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
83{
84 int val = simple_strtoul(buf, NULL, 0);
85 if (val == 1)
86 set_bit(LSFL_NODIR, &ls->ls_flags);
87 return len;
88}
89
77static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf) 90static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78{ 91{
79 uint32_t status = dlm_recover_status(ls); 92 uint32_t status = dlm_recover_status(ls);
@@ -107,6 +120,12 @@ static struct dlm_attr dlm_attr_id = {
107 .store = dlm_id_store 120 .store = dlm_id_store
108}; 121};
109 122
123static struct dlm_attr dlm_attr_nodir = {
124 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
125 .show = dlm_nodir_show,
126 .store = dlm_nodir_store
127};
128
110static struct dlm_attr dlm_attr_recover_status = { 129static struct dlm_attr dlm_attr_recover_status = {
111 .attr = {.name = "recover_status", .mode = S_IRUGO}, 130 .attr = {.name = "recover_status", .mode = S_IRUGO},
112 .show = dlm_recover_status_show 131 .show = dlm_recover_status_show
@@ -121,6 +140,7 @@ static struct attribute *dlm_attrs[] = {
121 &dlm_attr_control.attr, 140 &dlm_attr_control.attr,
122 &dlm_attr_event.attr, 141 &dlm_attr_event.attr,
123 &dlm_attr_id.attr, 142 &dlm_attr_id.attr,
143 &dlm_attr_nodir.attr,
124 &dlm_attr_recover_status.attr, 144 &dlm_attr_recover_status.attr,
125 &dlm_attr_recover_nodeid.attr, 145 &dlm_attr_recover_nodeid.attr,
126 NULL, 146 NULL,
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 6565fd5e28ef..64d3e2b958c7 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -492,30 +492,41 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
492void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 492void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
493{ 493{
494 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); 494 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
495 int stop, reply = 0; 495 int stop, reply = 0, lock = 0;
496 uint32_t status;
496 uint64_t seq; 497 uint64_t seq;
497 498
498 switch (rc->rc_type) { 499 switch (rc->rc_type) {
500 case DLM_RCOM_LOCK:
501 lock = 1;
502 break;
503 case DLM_RCOM_LOCK_REPLY:
504 lock = 1;
505 reply = 1;
506 break;
499 case DLM_RCOM_STATUS_REPLY: 507 case DLM_RCOM_STATUS_REPLY:
500 case DLM_RCOM_NAMES_REPLY: 508 case DLM_RCOM_NAMES_REPLY:
501 case DLM_RCOM_LOOKUP_REPLY: 509 case DLM_RCOM_LOOKUP_REPLY:
502 case DLM_RCOM_LOCK_REPLY:
503 reply = 1; 510 reply = 1;
504 }; 511 };
505 512
506 spin_lock(&ls->ls_recover_lock); 513 spin_lock(&ls->ls_recover_lock);
514 status = ls->ls_recover_status;
507 stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); 515 stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
508 seq = ls->ls_recover_seq; 516 seq = ls->ls_recover_seq;
509 spin_unlock(&ls->ls_recover_lock); 517 spin_unlock(&ls->ls_recover_lock);
510 518
511 if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) || 519 if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) ||
512 (reply && (rc->rc_seq_reply != seq))) { 520 (reply && (rc->rc_seq_reply != seq)) ||
521 (lock && !(status & DLM_RS_DIR))) {
513 log_limit(ls, "dlm_receive_rcom ignore msg %d " 522 log_limit(ls, "dlm_receive_rcom ignore msg %d "
514 "from %d %llu %llu seq %llu", 523 "from %d %llu %llu recover seq %llu sts %x gen %u",
515 rc->rc_type, nodeid, 524 rc->rc_type,
525 nodeid,
516 (unsigned long long)rc->rc_seq, 526 (unsigned long long)rc->rc_seq,
517 (unsigned long long)rc->rc_seq_reply, 527 (unsigned long long)rc->rc_seq_reply,
518 (unsigned long long)seq); 528 (unsigned long long)seq,
529 status, ls->ls_generation);
519 goto out; 530 goto out;
520 } 531 }
521 532
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 34d5adf1fce7..7554e4dac6bb 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -339,9 +339,12 @@ static void set_lock_master(struct list_head *queue, int nodeid)
339{ 339{
340 struct dlm_lkb *lkb; 340 struct dlm_lkb *lkb;
341 341
342 list_for_each_entry(lkb, queue, lkb_statequeue) 342 list_for_each_entry(lkb, queue, lkb_statequeue) {
343 if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) 343 if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
344 lkb->lkb_nodeid = nodeid; 344 lkb->lkb_nodeid = nodeid;
345 lkb->lkb_remid = 0;
346 }
347 }
345} 348}
346 349
347static void set_master_lkbs(struct dlm_rsb *r) 350static void set_master_lkbs(struct dlm_rsb *r)
@@ -354,18 +357,16 @@ static void set_master_lkbs(struct dlm_rsb *r)
354/* 357/*
355 * Propagate the new master nodeid to locks 358 * Propagate the new master nodeid to locks
356 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider. 359 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
357 * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which 360 * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
358 * rsb's to consider. 361 * rsb's to consider.
359 */ 362 */
360 363
361static void set_new_master(struct dlm_rsb *r, int nodeid) 364static void set_new_master(struct dlm_rsb *r, int nodeid)
362{ 365{
363 lock_rsb(r);
364 r->res_nodeid = nodeid; 366 r->res_nodeid = nodeid;
365 set_master_lkbs(r); 367 set_master_lkbs(r);
366 rsb_set_flag(r, RSB_NEW_MASTER); 368 rsb_set_flag(r, RSB_NEW_MASTER);
367 rsb_set_flag(r, RSB_NEW_MASTER2); 369 rsb_set_flag(r, RSB_NEW_MASTER2);
368 unlock_rsb(r);
369} 370}
370 371
371/* 372/*
@@ -376,9 +377,9 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
376static int recover_master(struct dlm_rsb *r) 377static int recover_master(struct dlm_rsb *r)
377{ 378{
378 struct dlm_ls *ls = r->res_ls; 379 struct dlm_ls *ls = r->res_ls;
379 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); 380 int error, ret_nodeid;
380 381 int our_nodeid = dlm_our_nodeid();
381 dir_nodeid = dlm_dir_nodeid(r); 382 int dir_nodeid = dlm_dir_nodeid(r);
382 383
383 if (dir_nodeid == our_nodeid) { 384 if (dir_nodeid == our_nodeid) {
384 error = dlm_dir_lookup(ls, our_nodeid, r->res_name, 385 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
@@ -388,7 +389,9 @@ static int recover_master(struct dlm_rsb *r)
388 389
389 if (ret_nodeid == our_nodeid) 390 if (ret_nodeid == our_nodeid)
390 ret_nodeid = 0; 391 ret_nodeid = 0;
392 lock_rsb(r);
391 set_new_master(r, ret_nodeid); 393 set_new_master(r, ret_nodeid);
394 unlock_rsb(r);
392 } else { 395 } else {
393 recover_list_add(r); 396 recover_list_add(r);
394 error = dlm_send_rcom_lookup(r, dir_nodeid); 397 error = dlm_send_rcom_lookup(r, dir_nodeid);
@@ -398,24 +401,33 @@ static int recover_master(struct dlm_rsb *r)
398} 401}
399 402
400/* 403/*
401 * When not using a directory, most resource names will hash to a new static 404 * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
402 * master nodeid and the resource will need to be remastered. 405 * This is necessary because recovery can be started, aborted and restarted,
406 * causing the master nodeid to briefly change during the aborted recovery, and
407 * change back to the original value in the second recovery. The MSTCPY locks
408 * may or may not have been purged during the aborted recovery. Another node
409 * with an outstanding request in waiters list and a request reply saved in the
410 * requestqueue, cannot know whether it should ignore the reply and resend the
411 * request, or accept the reply and complete the request. It must do the
412 * former if the remote node purged MSTCPY locks, and it must do the later if
413 * the remote node did not. This is solved by always purging MSTCPY locks, in
414 * which case, the request reply would always be ignored and the request
415 * resent.
403 */ 416 */
404 417
405static int recover_master_static(struct dlm_rsb *r) 418static int recover_master_static(struct dlm_rsb *r)
406{ 419{
407 int master = dlm_dir_nodeid(r); 420 int dir_nodeid = dlm_dir_nodeid(r);
421 int new_master = dir_nodeid;
408 422
409 if (master == dlm_our_nodeid()) 423 if (dir_nodeid == dlm_our_nodeid())
410 master = 0; 424 new_master = 0;
411 425
412 if (r->res_nodeid != master) { 426 lock_rsb(r);
413 if (is_master(r)) 427 dlm_purge_mstcpy_locks(r);
414 dlm_purge_mstcpy_locks(r); 428 set_new_master(r, new_master);
415 set_new_master(r, master); 429 unlock_rsb(r);
416 return 1; 430 return 1;
417 }
418 return 0;
419} 431}
420 432
421/* 433/*
@@ -481,7 +493,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
481 if (nodeid == dlm_our_nodeid()) 493 if (nodeid == dlm_our_nodeid())
482 nodeid = 0; 494 nodeid = 0;
483 495
496 lock_rsb(r);
484 set_new_master(r, nodeid); 497 set_new_master(r, nodeid);
498 unlock_rsb(r);
485 recover_list_del(r); 499 recover_list_del(r);
486 500
487 if (recover_list_empty(ls)) 501 if (recover_list_empty(ls))
@@ -556,8 +570,6 @@ int dlm_recover_locks(struct dlm_ls *ls)
556 struct dlm_rsb *r; 570 struct dlm_rsb *r;
557 int error, count = 0; 571 int error, count = 0;
558 572
559 log_debug(ls, "dlm_recover_locks");
560
561 down_read(&ls->ls_root_sem); 573 down_read(&ls->ls_root_sem);
562 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 574 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
563 if (is_master(r)) { 575 if (is_master(r)) {
@@ -584,7 +596,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
584 } 596 }
585 up_read(&ls->ls_root_sem); 597 up_read(&ls->ls_root_sem);
586 598
587 log_debug(ls, "dlm_recover_locks %d locks", count); 599 log_debug(ls, "dlm_recover_locks %d out", count);
588 600
589 error = dlm_wait_function(ls, &recover_list_empty); 601 error = dlm_wait_function(ls, &recover_list_empty);
590 out: 602 out:
@@ -721,21 +733,19 @@ static void recover_conversion(struct dlm_rsb *r)
721} 733}
722 734
723/* We've become the new master for this rsb and waiting/converting locks may 735/* We've become the new master for this rsb and waiting/converting locks may
724 need to be granted in dlm_grant_after_purge() due to locks that may have 736 need to be granted in dlm_recover_grant() due to locks that may have
725 existed from a removed node. */ 737 existed from a removed node. */
726 738
727static void set_locks_purged(struct dlm_rsb *r) 739static void recover_grant(struct dlm_rsb *r)
728{ 740{
729 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) 741 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
730 rsb_set_flag(r, RSB_LOCKS_PURGED); 742 rsb_set_flag(r, RSB_RECOVER_GRANT);
731} 743}
732 744
733void dlm_recover_rsbs(struct dlm_ls *ls) 745void dlm_recover_rsbs(struct dlm_ls *ls)
734{ 746{
735 struct dlm_rsb *r; 747 struct dlm_rsb *r;
736 int count = 0; 748 unsigned int count = 0;
737
738 log_debug(ls, "dlm_recover_rsbs");
739 749
740 down_read(&ls->ls_root_sem); 750 down_read(&ls->ls_root_sem);
741 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 751 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
@@ -744,7 +754,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
744 if (rsb_flag(r, RSB_RECOVER_CONVERT)) 754 if (rsb_flag(r, RSB_RECOVER_CONVERT))
745 recover_conversion(r); 755 recover_conversion(r);
746 if (rsb_flag(r, RSB_NEW_MASTER2)) 756 if (rsb_flag(r, RSB_NEW_MASTER2))
747 set_locks_purged(r); 757 recover_grant(r);
748 recover_lvb(r); 758 recover_lvb(r);
749 count++; 759 count++;
750 } 760 }
@@ -754,7 +764,8 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
754 } 764 }
755 up_read(&ls->ls_root_sem); 765 up_read(&ls->ls_root_sem);
756 766
757 log_debug(ls, "dlm_recover_rsbs %d rsbs", count); 767 if (count)
768 log_debug(ls, "dlm_recover_rsbs %d done", count);
758} 769}
759 770
760/* Create a single list of all root rsb's to be used during recovery */ 771/* Create a single list of all root rsb's to be used during recovery */
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 11351b57c781..f1a9073c0835 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -84,6 +84,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
84 goto fail; 84 goto fail;
85 } 85 }
86 86
87 ls->ls_recover_locks_in = 0;
88
87 dlm_set_recover_status(ls, DLM_RS_NODES); 89 dlm_set_recover_status(ls, DLM_RS_NODES);
88 90
89 error = dlm_recover_members_wait(ls); 91 error = dlm_recover_members_wait(ls);
@@ -130,7 +132,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
130 * Clear lkb's for departed nodes. 132 * Clear lkb's for departed nodes.
131 */ 133 */
132 134
133 dlm_purge_locks(ls); 135 dlm_recover_purge(ls);
134 136
135 /* 137 /*
136 * Get new master nodeid's for rsb's that were mastered on 138 * Get new master nodeid's for rsb's that were mastered on
@@ -161,6 +163,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
161 goto fail; 163 goto fail;
162 } 164 }
163 165
166 log_debug(ls, "dlm_recover_locks %u in",
167 ls->ls_recover_locks_in);
168
164 /* 169 /*
165 * Finalize state in master rsb's now that all locks can be 170 * Finalize state in master rsb's now that all locks can be
166 * checked. This includes conversion resolution and lvb 171 * checked. This includes conversion resolution and lvb
@@ -225,7 +230,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
225 goto fail; 230 goto fail;
226 } 231 }
227 232
228 dlm_grant_after_purge(ls); 233 dlm_recover_grant(ls);
229 234
230 log_debug(ls, "dlm_recover %llu generation %u done: %u ms", 235 log_debug(ls, "dlm_recover %llu generation %u done: %u ms",
231 (unsigned long long)rv->seq, ls->ls_generation, 236 (unsigned long long)rv->seq, ls->ls_generation,
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index d3191bf03a68..1695f1b0dd45 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -65,6 +65,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
65int dlm_process_requestqueue(struct dlm_ls *ls) 65int dlm_process_requestqueue(struct dlm_ls *ls)
66{ 66{
67 struct rq_entry *e; 67 struct rq_entry *e;
68 struct dlm_message *ms;
68 int error = 0; 69 int error = 0;
69 70
70 mutex_lock(&ls->ls_requestqueue_mutex); 71 mutex_lock(&ls->ls_requestqueue_mutex);
@@ -78,6 +79,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
78 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); 79 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
79 mutex_unlock(&ls->ls_requestqueue_mutex); 80 mutex_unlock(&ls->ls_requestqueue_mutex);
80 81
82 ms = &e->request;
83
84 log_limit(ls, "dlm_process_requestqueue msg %d from %d "
85 "lkid %x remid %x result %d seq %u",
86 ms->m_type, ms->m_header.h_nodeid,
87 ms->m_lkid, ms->m_remid, ms->m_result,
88 e->recover_seq);
89
81 dlm_receive_message_saved(ls, &e->request, e->recover_seq); 90 dlm_receive_message_saved(ls, &e->request, e->recover_seq);
82 91
83 mutex_lock(&ls->ls_requestqueue_mutex); 92 mutex_lock(&ls->ls_requestqueue_mutex);
@@ -140,35 +149,7 @@ static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
140 if (!dlm_no_directory(ls)) 149 if (!dlm_no_directory(ls))
141 return 0; 150 return 0;
142 151
143 /* with no directory, the master is likely to change as a part of 152 return 1;
144 recovery; requests to/from the defunct master need to be purged */
145
146 switch (type) {
147 case DLM_MSG_REQUEST:
148 case DLM_MSG_CONVERT:
149 case DLM_MSG_UNLOCK:
150 case DLM_MSG_CANCEL:
151 /* we're no longer the master of this resource, the sender
152 will resend to the new master (see waiter_needs_recovery) */
153
154 if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid())
155 return 1;
156 break;
157
158 case DLM_MSG_REQUEST_REPLY:
159 case DLM_MSG_CONVERT_REPLY:
160 case DLM_MSG_UNLOCK_REPLY:
161 case DLM_MSG_CANCEL_REPLY:
162 case DLM_MSG_GRANT:
163 /* this reply is from the former master of the resource,
164 we'll resend to the new master if needed */
165
166 if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid)
167 return 1;
168 break;
169 }
170
171 return 0;
172} 153}
173 154
174void dlm_purge_requestqueue(struct dlm_ls *ls) 155void dlm_purge_requestqueue(struct dlm_ls *ls)
diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h
index 47d0bda5ac2b..c7975bf4fd43 100644
--- a/fs/gfs2/incore.h
+++ b/fs/gfs2/incore.h
@@ -556,7 +556,6 @@ struct gfs2_sb_host {
556struct lm_lockstruct { 556struct lm_lockstruct {
557 int ls_jid; 557 int ls_jid;
558 unsigned int ls_first; 558 unsigned int ls_first;
559 unsigned int ls_nodir;
560 const struct lm_lockops *ls_ops; 559 const struct lm_lockops *ls_ops;
561 dlm_lockspace_t *ls_dlm; 560 dlm_lockspace_t *ls_dlm;
562 561
diff --git a/fs/gfs2/lock_dlm.c b/fs/gfs2/lock_dlm.c
index 5f5e70e047dc..4a38db739ca0 100644
--- a/fs/gfs2/lock_dlm.c
+++ b/fs/gfs2/lock_dlm.c
@@ -1209,8 +1209,6 @@ static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
1209 fsname++; 1209 fsname++;
1210 1210
1211 flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL; 1211 flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL;
1212 if (ls->ls_nodir)
1213 flags |= DLM_LSFL_NODIR;
1214 1212
1215 /* 1213 /*
1216 * create/join lockspace 1214 * create/join lockspace
diff --git a/fs/gfs2/ops_fstype.c b/fs/gfs2/ops_fstype.c
index 6f3a18f9e176..018b4c13db92 100644
--- a/fs/gfs2/ops_fstype.c
+++ b/fs/gfs2/ops_fstype.c
@@ -994,6 +994,7 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent)
994 ls->ls_jid = option; 994 ls->ls_jid = option;
995 break; 995 break;
996 case Opt_id: 996 case Opt_id:
997 case Opt_nodir:
997 /* Obsolete, but left for backward compat purposes */ 998 /* Obsolete, but left for backward compat purposes */
998 break; 999 break;
999 case Opt_first: 1000 case Opt_first:
@@ -1002,12 +1003,6 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent)
1002 goto hostdata_error; 1003 goto hostdata_error;
1003 ls->ls_first = option; 1004 ls->ls_first = option;
1004 break; 1005 break;
1005 case Opt_nodir:
1006 ret = match_int(&tmp[0], &option);
1007 if (ret || (option != 0 && option != 1))
1008 goto hostdata_error;
1009 ls->ls_nodir = option;
1010 break;
1011 case Opt_err: 1006 case Opt_err:
1012 default: 1007 default:
1013hostdata_error: 1008hostdata_error:
diff --git a/include/linux/dlm.h b/include/linux/dlm.h
index 6c7f6e9546c7..520152411cd1 100644
--- a/include/linux/dlm.h
+++ b/include/linux/dlm.h
@@ -67,7 +67,6 @@ struct dlm_lksb {
67 67
68/* dlm_new_lockspace() flags */ 68/* dlm_new_lockspace() flags */
69 69
70#define DLM_LSFL_NODIR 0x00000001
71#define DLM_LSFL_TIMEWARN 0x00000002 70#define DLM_LSFL_TIMEWARN 0x00000002
72#define DLM_LSFL_FS 0x00000004 71#define DLM_LSFL_FS 0x00000004
73#define DLM_LSFL_NEWEXCL 0x00000008 72#define DLM_LSFL_NEWEXCL 0x00000008