aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2012-05-22 22:31:38 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2012-05-22 22:31:38 -0400
commit6101167727932a929e37fb8a6eeb68bdbf54d58e (patch)
treeda3e9c8244f86082c6ea4d150f7fa653a7843192
parent6133308ad1a386e7e7f776003a1c44e8b54e2166 (diff)
parent75af271ed5f51b1f3506c7c1d567b1f32e5c9206 (diff)
Merge tag 'dlm-3.5' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm
Pull dlm updates from David Teigland: "This set includes some minor fixes and improvements. The one large patch addresses the special "nodir" mode, which has been a long neglected proof of concept, but with these fixes seems to be quite usable. It allows the resource master to be assigned statically instead of dynamically, which can improve performance if there is little locality and most resources are shared." * tag 'dlm-3.5' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm: dlm: NULL dereference on failure in kmem_cache_create() gfs2: fix recovery during unmount dlm: fixes for nodir mode dlm: improve error and debug messages dlm: avoid unnecessary search in search_rsb dlm: limit rcom debug messages dlm: fix waiter recovery dlm: prevent connections during shutdown
-rw-r--r--fs/dlm/ast.c3
-rw-r--r--fs/dlm/dlm_internal.h16
-rw-r--r--fs/dlm/lock.c541
-rw-r--r--fs/dlm/lock.h7
-rw-r--r--fs/dlm/lockspace.c20
-rw-r--r--fs/dlm/lowcomms.c28
-rw-r--r--fs/dlm/memory.c8
-rw-r--r--fs/dlm/rcom.c61
-rw-r--r--fs/dlm/recover.c73
-rw-r--r--fs/dlm/recoverd.c15
-rw-r--r--fs/dlm/requestqueue.c43
-rw-r--r--fs/gfs2/incore.h1
-rw-r--r--fs/gfs2/lock_dlm.c2
-rw-r--r--fs/gfs2/ops_fstype.c7
-rw-r--r--fs/gfs2/sys.c10
-rw-r--r--include/linux/dlm.h1
16 files changed, 538 insertions, 298 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 90e5997262ea..63dc19c54d5a 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -310,6 +310,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
310 } 310 }
311 mutex_unlock(&ls->ls_cb_mutex); 311 mutex_unlock(&ls->ls_cb_mutex);
312 312
313 log_debug(ls, "dlm_callback_resume %d", count); 313 if (count)
314 log_debug(ls, "dlm_callback_resume %d", count);
314} 315}
315 316
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 3a564d197e99..bc342f7ac3af 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -38,6 +38,7 @@
38#include <linux/miscdevice.h> 38#include <linux/miscdevice.h>
39#include <linux/mutex.h> 39#include <linux/mutex.h>
40#include <linux/idr.h> 40#include <linux/idr.h>
41#include <linux/ratelimit.h>
41#include <asm/uaccess.h> 42#include <asm/uaccess.h>
42 43
43#include <linux/dlm.h> 44#include <linux/dlm.h>
@@ -74,6 +75,13 @@ do { \
74 (ls)->ls_name , ##args); \ 75 (ls)->ls_name , ##args); \
75} while (0) 76} while (0)
76 77
78#define log_limit(ls, fmt, args...) \
79do { \
80 if (dlm_config.ci_log_debug) \
81 printk_ratelimited(KERN_DEBUG "dlm: %s: " fmt "\n", \
82 (ls)->ls_name , ##args); \
83} while (0)
84
77#define DLM_ASSERT(x, do) \ 85#define DLM_ASSERT(x, do) \
78{ \ 86{ \
79 if (!(x)) \ 87 if (!(x)) \
@@ -263,6 +271,8 @@ struct dlm_lkb {
263 ktime_t lkb_last_cast_time; /* for debugging */ 271 ktime_t lkb_last_cast_time; /* for debugging */
264 ktime_t lkb_last_bast_time; /* for debugging */ 272 ktime_t lkb_last_bast_time; /* for debugging */
265 273
274 uint64_t lkb_recover_seq; /* from ls_recover_seq */
275
266 char *lkb_lvbptr; 276 char *lkb_lvbptr;
267 struct dlm_lksb *lkb_lksb; /* caller's status block */ 277 struct dlm_lksb *lkb_lksb; /* caller's status block */
268 void (*lkb_astfn) (void *astparam); 278 void (*lkb_astfn) (void *astparam);
@@ -317,7 +327,7 @@ enum rsb_flags {
317 RSB_NEW_MASTER, 327 RSB_NEW_MASTER,
318 RSB_NEW_MASTER2, 328 RSB_NEW_MASTER2,
319 RSB_RECOVER_CONVERT, 329 RSB_RECOVER_CONVERT,
320 RSB_LOCKS_PURGED, 330 RSB_RECOVER_GRANT,
321}; 331};
322 332
323static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) 333static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
@@ -563,6 +573,7 @@ struct dlm_ls {
563 struct mutex ls_requestqueue_mutex; 573 struct mutex ls_requestqueue_mutex;
564 struct dlm_rcom *ls_recover_buf; 574 struct dlm_rcom *ls_recover_buf;
565 int ls_recover_nodeid; /* for debugging */ 575 int ls_recover_nodeid; /* for debugging */
576 unsigned int ls_recover_locks_in; /* for log info */
566 uint64_t ls_rcom_seq; 577 uint64_t ls_rcom_seq;
567 spinlock_t ls_rcom_spin; 578 spinlock_t ls_rcom_spin;
568 struct list_head ls_recover_list; 579 struct list_head ls_recover_list;
@@ -589,6 +600,7 @@ struct dlm_ls {
589#define LSFL_UEVENT_WAIT 5 600#define LSFL_UEVENT_WAIT 5
590#define LSFL_TIMEWARN 6 601#define LSFL_TIMEWARN 6
591#define LSFL_CB_DELAY 7 602#define LSFL_CB_DELAY 7
603#define LSFL_NODIR 8
592 604
593/* much of this is just saving user space pointers associated with the 605/* much of this is just saving user space pointers associated with the
594 lock that we pass back to the user lib with an ast */ 606 lock that we pass back to the user lib with an ast */
@@ -636,7 +648,7 @@ static inline int dlm_recovery_stopped(struct dlm_ls *ls)
636 648
637static inline int dlm_no_directory(struct dlm_ls *ls) 649static inline int dlm_no_directory(struct dlm_ls *ls)
638{ 650{
639 return (ls->ls_exflags & DLM_LSFL_NODIR) ? 1 : 0; 651 return test_bit(LSFL_NODIR, &ls->ls_flags);
640} 652}
641 653
642int dlm_netlink_init(void); 654int dlm_netlink_init(void);
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 4c58d4a3adc4..bdafb65a5234 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -160,11 +160,12 @@ static const int __quecvt_compat_matrix[8][8] = {
160 160
161void dlm_print_lkb(struct dlm_lkb *lkb) 161void dlm_print_lkb(struct dlm_lkb *lkb)
162{ 162{
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n" 163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 " status %d rqmode %d grmode %d wait_type %d\n", 164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, 166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
167 lkb->lkb_grmode, lkb->lkb_wait_type); 167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 (unsigned long long)lkb->lkb_recover_seq);
168} 169}
169 170
170static void dlm_print_rsb(struct dlm_rsb *r) 171static void dlm_print_rsb(struct dlm_rsb *r)
@@ -251,8 +252,6 @@ static inline int is_process_copy(struct dlm_lkb *lkb)
251 252
252static inline int is_master_copy(struct dlm_lkb *lkb) 253static inline int is_master_copy(struct dlm_lkb *lkb)
253{ 254{
254 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
255 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
256 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; 255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
257} 256}
258 257
@@ -479,6 +478,9 @@ static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
479 kref_get(&r->res_ref); 478 kref_get(&r->res_ref);
480 goto out; 479 goto out;
481 } 480 }
481 if (error == -ENOTBLK)
482 goto out;
483
482 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); 484 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
483 if (error) 485 if (error)
484 goto out; 486 goto out;
@@ -586,6 +588,23 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
586 return error; 588 return error;
587} 589}
588 590
591static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
592{
593 struct rb_node *n;
594 struct dlm_rsb *r;
595 int i;
596
597 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
598 spin_lock(&ls->ls_rsbtbl[i].lock);
599 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
600 r = rb_entry(n, struct dlm_rsb, res_hashnode);
601 if (r->res_hash == hash)
602 dlm_dump_rsb(r);
603 }
604 spin_unlock(&ls->ls_rsbtbl[i].lock);
605 }
606}
607
589/* This is only called to add a reference when the code already holds 608/* This is only called to add a reference when the code already holds
590 a valid reference to the rsb, so there's no need for locking. */ 609 a valid reference to the rsb, so there's no need for locking. */
591 610
@@ -1064,8 +1083,9 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1064 goto out_del; 1083 goto out_del;
1065 } 1084 }
1066 1085
1067 log_error(ls, "remwait error %x reply %d flags %x no wait_type", 1086 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1068 lkb->lkb_id, mstype, lkb->lkb_flags); 1087 lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1088 mstype, lkb->lkb_flags);
1069 return -1; 1089 return -1;
1070 1090
1071 out_del: 1091 out_del:
@@ -1498,13 +1518,13 @@ static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1498 } 1518 }
1499 1519
1500 lkb->lkb_rqmode = DLM_LOCK_IV; 1520 lkb->lkb_rqmode = DLM_LOCK_IV;
1521 lkb->lkb_highbast = 0;
1501} 1522}
1502 1523
1503static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1524static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1504{ 1525{
1505 set_lvb_lock(r, lkb); 1526 set_lvb_lock(r, lkb);
1506 _grant_lock(r, lkb); 1527 _grant_lock(r, lkb);
1507 lkb->lkb_highbast = 0;
1508} 1528}
1509 1529
1510static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1530static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -1866,7 +1886,8 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1866/* Returns the highest requested mode of all blocked conversions; sets 1886/* Returns the highest requested mode of all blocked conversions; sets
1867 cw if there's a blocked conversion to DLM_LOCK_CW. */ 1887 cw if there's a blocked conversion to DLM_LOCK_CW. */
1868 1888
1869static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) 1889static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
1890 unsigned int *count)
1870{ 1891{
1871 struct dlm_lkb *lkb, *s; 1892 struct dlm_lkb *lkb, *s;
1872 int hi, demoted, quit, grant_restart, demote_restart; 1893 int hi, demoted, quit, grant_restart, demote_restart;
@@ -1885,6 +1906,8 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1885 if (can_be_granted(r, lkb, 0, &deadlk)) { 1906 if (can_be_granted(r, lkb, 0, &deadlk)) {
1886 grant_lock_pending(r, lkb); 1907 grant_lock_pending(r, lkb);
1887 grant_restart = 1; 1908 grant_restart = 1;
1909 if (count)
1910 (*count)++;
1888 continue; 1911 continue;
1889 } 1912 }
1890 1913
@@ -1918,14 +1941,17 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1918 return max_t(int, high, hi); 1941 return max_t(int, high, hi);
1919} 1942}
1920 1943
1921static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw) 1944static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
1945 unsigned int *count)
1922{ 1946{
1923 struct dlm_lkb *lkb, *s; 1947 struct dlm_lkb *lkb, *s;
1924 1948
1925 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 1949 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1926 if (can_be_granted(r, lkb, 0, NULL)) 1950 if (can_be_granted(r, lkb, 0, NULL)) {
1927 grant_lock_pending(r, lkb); 1951 grant_lock_pending(r, lkb);
1928 else { 1952 if (count)
1953 (*count)++;
1954 } else {
1929 high = max_t(int, lkb->lkb_rqmode, high); 1955 high = max_t(int, lkb->lkb_rqmode, high);
1930 if (lkb->lkb_rqmode == DLM_LOCK_CW) 1956 if (lkb->lkb_rqmode == DLM_LOCK_CW)
1931 *cw = 1; 1957 *cw = 1;
@@ -1954,16 +1980,20 @@ static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1954 return 0; 1980 return 0;
1955} 1981}
1956 1982
1957static void grant_pending_locks(struct dlm_rsb *r) 1983static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
1958{ 1984{
1959 struct dlm_lkb *lkb, *s; 1985 struct dlm_lkb *lkb, *s;
1960 int high = DLM_LOCK_IV; 1986 int high = DLM_LOCK_IV;
1961 int cw = 0; 1987 int cw = 0;
1962 1988
1963 DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); 1989 if (!is_master(r)) {
1990 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
1991 dlm_dump_rsb(r);
1992 return;
1993 }
1964 1994
1965 high = grant_pending_convert(r, high, &cw); 1995 high = grant_pending_convert(r, high, &cw, count);
1966 high = grant_pending_wait(r, high, &cw); 1996 high = grant_pending_wait(r, high, &cw, count);
1967 1997
1968 if (high == DLM_LOCK_IV) 1998 if (high == DLM_LOCK_IV)
1969 return; 1999 return;
@@ -2499,7 +2529,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2499 before we try again to grant this one. */ 2529 before we try again to grant this one. */
2500 2530
2501 if (is_demoted(lkb)) { 2531 if (is_demoted(lkb)) {
2502 grant_pending_convert(r, DLM_LOCK_IV, NULL); 2532 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
2503 if (_can_be_granted(r, lkb, 1)) { 2533 if (_can_be_granted(r, lkb, 1)) {
2504 grant_lock(r, lkb); 2534 grant_lock(r, lkb);
2505 queue_cast(r, lkb, 0); 2535 queue_cast(r, lkb, 0);
@@ -2527,7 +2557,7 @@ static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2527{ 2557{
2528 switch (error) { 2558 switch (error) {
2529 case 0: 2559 case 0:
2530 grant_pending_locks(r); 2560 grant_pending_locks(r, NULL);
2531 /* grant_pending_locks also sends basts */ 2561 /* grant_pending_locks also sends basts */
2532 break; 2562 break;
2533 case -EAGAIN: 2563 case -EAGAIN:
@@ -2550,7 +2580,7 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2550static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 2580static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2551 int error) 2581 int error)
2552{ 2582{
2553 grant_pending_locks(r); 2583 grant_pending_locks(r, NULL);
2554} 2584}
2555 2585
2556/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 2586/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
@@ -2571,7 +2601,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2571 int error) 2601 int error)
2572{ 2602{
2573 if (error) 2603 if (error)
2574 grant_pending_locks(r); 2604 grant_pending_locks(r, NULL);
2575} 2605}
2576 2606
2577/* 2607/*
@@ -3372,7 +3402,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3372 return error; 3402 return error;
3373} 3403}
3374 3404
3375static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) 3405static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3376{ 3406{
3377 struct dlm_lkb *lkb; 3407 struct dlm_lkb *lkb;
3378 struct dlm_rsb *r; 3408 struct dlm_rsb *r;
@@ -3412,14 +3442,15 @@ static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3412 error = 0; 3442 error = 0;
3413 if (error) 3443 if (error)
3414 dlm_put_lkb(lkb); 3444 dlm_put_lkb(lkb);
3415 return; 3445 return 0;
3416 3446
3417 fail: 3447 fail:
3418 setup_stub_lkb(ls, ms); 3448 setup_stub_lkb(ls, ms);
3419 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 3449 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3450 return error;
3420} 3451}
3421 3452
3422static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) 3453static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3423{ 3454{
3424 struct dlm_lkb *lkb; 3455 struct dlm_lkb *lkb;
3425 struct dlm_rsb *r; 3456 struct dlm_rsb *r;
@@ -3429,6 +3460,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3429 if (error) 3460 if (error)
3430 goto fail; 3461 goto fail;
3431 3462
3463 if (lkb->lkb_remid != ms->m_lkid) {
3464 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3465 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3466 (unsigned long long)lkb->lkb_recover_seq,
3467 ms->m_header.h_nodeid, ms->m_lkid);
3468 error = -ENOENT;
3469 goto fail;
3470 }
3471
3432 r = lkb->lkb_resource; 3472 r = lkb->lkb_resource;
3433 3473
3434 hold_rsb(r); 3474 hold_rsb(r);
@@ -3456,14 +3496,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3456 unlock_rsb(r); 3496 unlock_rsb(r);
3457 put_rsb(r); 3497 put_rsb(r);
3458 dlm_put_lkb(lkb); 3498 dlm_put_lkb(lkb);
3459 return; 3499 return 0;
3460 3500
3461 fail: 3501 fail:
3462 setup_stub_lkb(ls, ms); 3502 setup_stub_lkb(ls, ms);
3463 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 3503 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3504 return error;
3464} 3505}
3465 3506
3466static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) 3507static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3467{ 3508{
3468 struct dlm_lkb *lkb; 3509 struct dlm_lkb *lkb;
3469 struct dlm_rsb *r; 3510 struct dlm_rsb *r;
@@ -3473,6 +3514,14 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3473 if (error) 3514 if (error)
3474 goto fail; 3515 goto fail;
3475 3516
3517 if (lkb->lkb_remid != ms->m_lkid) {
3518 log_error(ls, "receive_unlock %x remid %x remote %d %x",
3519 lkb->lkb_id, lkb->lkb_remid,
3520 ms->m_header.h_nodeid, ms->m_lkid);
3521 error = -ENOENT;
3522 goto fail;
3523 }
3524
3476 r = lkb->lkb_resource; 3525 r = lkb->lkb_resource;
3477 3526
3478 hold_rsb(r); 3527 hold_rsb(r);
@@ -3497,14 +3546,15 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3497 unlock_rsb(r); 3546 unlock_rsb(r);
3498 put_rsb(r); 3547 put_rsb(r);
3499 dlm_put_lkb(lkb); 3548 dlm_put_lkb(lkb);
3500 return; 3549 return 0;
3501 3550
3502 fail: 3551 fail:
3503 setup_stub_lkb(ls, ms); 3552 setup_stub_lkb(ls, ms);
3504 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 3553 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3554 return error;
3505} 3555}
3506 3556
3507static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) 3557static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3508{ 3558{
3509 struct dlm_lkb *lkb; 3559 struct dlm_lkb *lkb;
3510 struct dlm_rsb *r; 3560 struct dlm_rsb *r;
@@ -3532,25 +3582,23 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3532 unlock_rsb(r); 3582 unlock_rsb(r);
3533 put_rsb(r); 3583 put_rsb(r);
3534 dlm_put_lkb(lkb); 3584 dlm_put_lkb(lkb);
3535 return; 3585 return 0;
3536 3586
3537 fail: 3587 fail:
3538 setup_stub_lkb(ls, ms); 3588 setup_stub_lkb(ls, ms);
3539 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 3589 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3590 return error;
3540} 3591}
3541 3592
3542static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) 3593static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3543{ 3594{
3544 struct dlm_lkb *lkb; 3595 struct dlm_lkb *lkb;
3545 struct dlm_rsb *r; 3596 struct dlm_rsb *r;
3546 int error; 3597 int error;
3547 3598
3548 error = find_lkb(ls, ms->m_remid, &lkb); 3599 error = find_lkb(ls, ms->m_remid, &lkb);
3549 if (error) { 3600 if (error)
3550 log_debug(ls, "receive_grant from %d no lkb %x", 3601 return error;
3551 ms->m_header.h_nodeid, ms->m_remid);
3552 return;
3553 }
3554 3602
3555 r = lkb->lkb_resource; 3603 r = lkb->lkb_resource;
3556 3604
@@ -3570,20 +3618,18 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3570 unlock_rsb(r); 3618 unlock_rsb(r);
3571 put_rsb(r); 3619 put_rsb(r);
3572 dlm_put_lkb(lkb); 3620 dlm_put_lkb(lkb);
3621 return 0;
3573} 3622}
3574 3623
3575static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms) 3624static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3576{ 3625{
3577 struct dlm_lkb *lkb; 3626 struct dlm_lkb *lkb;
3578 struct dlm_rsb *r; 3627 struct dlm_rsb *r;
3579 int error; 3628 int error;
3580 3629
3581 error = find_lkb(ls, ms->m_remid, &lkb); 3630 error = find_lkb(ls, ms->m_remid, &lkb);
3582 if (error) { 3631 if (error)
3583 log_debug(ls, "receive_bast from %d no lkb %x", 3632 return error;
3584 ms->m_header.h_nodeid, ms->m_remid);
3585 return;
3586 }
3587 3633
3588 r = lkb->lkb_resource; 3634 r = lkb->lkb_resource;
3589 3635
@@ -3595,10 +3641,12 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3595 goto out; 3641 goto out;
3596 3642
3597 queue_bast(r, lkb, ms->m_bastmode); 3643 queue_bast(r, lkb, ms->m_bastmode);
3644 lkb->lkb_highbast = ms->m_bastmode;
3598 out: 3645 out:
3599 unlock_rsb(r); 3646 unlock_rsb(r);
3600 put_rsb(r); 3647 put_rsb(r);
3601 dlm_put_lkb(lkb); 3648 dlm_put_lkb(lkb);
3649 return 0;
3602} 3650}
3603 3651
3604static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 3652static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3653,18 +3701,15 @@ static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3653 do_purge(ls, ms->m_nodeid, ms->m_pid); 3701 do_purge(ls, ms->m_nodeid, ms->m_pid);
3654} 3702}
3655 3703
3656static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) 3704static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3657{ 3705{
3658 struct dlm_lkb *lkb; 3706 struct dlm_lkb *lkb;
3659 struct dlm_rsb *r; 3707 struct dlm_rsb *r;
3660 int error, mstype, result; 3708 int error, mstype, result;
3661 3709
3662 error = find_lkb(ls, ms->m_remid, &lkb); 3710 error = find_lkb(ls, ms->m_remid, &lkb);
3663 if (error) { 3711 if (error)
3664 log_debug(ls, "receive_request_reply from %d no lkb %x", 3712 return error;
3665 ms->m_header.h_nodeid, ms->m_remid);
3666 return;
3667 }
3668 3713
3669 r = lkb->lkb_resource; 3714 r = lkb->lkb_resource;
3670 hold_rsb(r); 3715 hold_rsb(r);
@@ -3676,8 +3721,13 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3676 3721
3677 mstype = lkb->lkb_wait_type; 3722 mstype = lkb->lkb_wait_type;
3678 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 3723 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3679 if (error) 3724 if (error) {
3725 log_error(ls, "receive_request_reply %x remote %d %x result %d",
3726 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
3727 ms->m_result);
3728 dlm_dump_rsb(r);
3680 goto out; 3729 goto out;
3730 }
3681 3731
3682 /* Optimization: the dir node was also the master, so it took our 3732 /* Optimization: the dir node was also the master, so it took our
3683 lookup as a request and sent request reply instead of lookup reply */ 3733 lookup as a request and sent request reply instead of lookup reply */
@@ -3755,6 +3805,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3755 unlock_rsb(r); 3805 unlock_rsb(r);
3756 put_rsb(r); 3806 put_rsb(r);
3757 dlm_put_lkb(lkb); 3807 dlm_put_lkb(lkb);
3808 return 0;
3758} 3809}
3759 3810
3760static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3811static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -3793,8 +3844,11 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3793 break; 3844 break;
3794 3845
3795 default: 3846 default:
3796 log_error(r->res_ls, "receive_convert_reply %x error %d", 3847 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
3797 lkb->lkb_id, ms->m_result); 3848 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
3849 ms->m_result);
3850 dlm_print_rsb(r);
3851 dlm_print_lkb(lkb);
3798 } 3852 }
3799} 3853}
3800 3854
@@ -3821,20 +3875,18 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3821 put_rsb(r); 3875 put_rsb(r);
3822} 3876}
3823 3877
3824static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) 3878static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3825{ 3879{
3826 struct dlm_lkb *lkb; 3880 struct dlm_lkb *lkb;
3827 int error; 3881 int error;
3828 3882
3829 error = find_lkb(ls, ms->m_remid, &lkb); 3883 error = find_lkb(ls, ms->m_remid, &lkb);
3830 if (error) { 3884 if (error)
3831 log_debug(ls, "receive_convert_reply from %d no lkb %x", 3885 return error;
3832 ms->m_header.h_nodeid, ms->m_remid);
3833 return;
3834 }
3835 3886
3836 _receive_convert_reply(lkb, ms); 3887 _receive_convert_reply(lkb, ms);
3837 dlm_put_lkb(lkb); 3888 dlm_put_lkb(lkb);
3889 return 0;
3838} 3890}
3839 3891
3840static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3892static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
@@ -3873,20 +3925,18 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3873 put_rsb(r); 3925 put_rsb(r);
3874} 3926}
3875 3927
3876static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) 3928static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3877{ 3929{
3878 struct dlm_lkb *lkb; 3930 struct dlm_lkb *lkb;
3879 int error; 3931 int error;
3880 3932
3881 error = find_lkb(ls, ms->m_remid, &lkb); 3933 error = find_lkb(ls, ms->m_remid, &lkb);
3882 if (error) { 3934 if (error)
3883 log_debug(ls, "receive_unlock_reply from %d no lkb %x", 3935 return error;
3884 ms->m_header.h_nodeid, ms->m_remid);
3885 return;
3886 }
3887 3936
3888 _receive_unlock_reply(lkb, ms); 3937 _receive_unlock_reply(lkb, ms);
3889 dlm_put_lkb(lkb); 3938 dlm_put_lkb(lkb);
3939 return 0;
3890} 3940}
3891 3941
3892static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3942static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
@@ -3925,20 +3975,18 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3925 put_rsb(r); 3975 put_rsb(r);
3926} 3976}
3927 3977
3928static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) 3978static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3929{ 3979{
3930 struct dlm_lkb *lkb; 3980 struct dlm_lkb *lkb;
3931 int error; 3981 int error;
3932 3982
3933 error = find_lkb(ls, ms->m_remid, &lkb); 3983 error = find_lkb(ls, ms->m_remid, &lkb);
3934 if (error) { 3984 if (error)
3935 log_debug(ls, "receive_cancel_reply from %d no lkb %x", 3985 return error;
3936 ms->m_header.h_nodeid, ms->m_remid);
3937 return;
3938 }
3939 3986
3940 _receive_cancel_reply(lkb, ms); 3987 _receive_cancel_reply(lkb, ms);
3941 dlm_put_lkb(lkb); 3988 dlm_put_lkb(lkb);
3989 return 0;
3942} 3990}
3943 3991
3944static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) 3992static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3949,7 +3997,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3949 3997
3950 error = find_lkb(ls, ms->m_lkid, &lkb); 3998 error = find_lkb(ls, ms->m_lkid, &lkb);
3951 if (error) { 3999 if (error) {
3952 log_error(ls, "receive_lookup_reply no lkb"); 4000 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
3953 return; 4001 return;
3954 } 4002 }
3955 4003
@@ -3993,8 +4041,11 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3993 dlm_put_lkb(lkb); 4041 dlm_put_lkb(lkb);
3994} 4042}
3995 4043
3996static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms) 4044static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4045 uint32_t saved_seq)
3997{ 4046{
4047 int error = 0, noent = 0;
4048
3998 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { 4049 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3999 log_debug(ls, "ignore non-member message %d from %d %x %x %d", 4050 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
4000 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, 4051 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
@@ -4007,47 +4058,50 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
4007 /* messages sent to a master node */ 4058 /* messages sent to a master node */
4008 4059
4009 case DLM_MSG_REQUEST: 4060 case DLM_MSG_REQUEST:
4010 receive_request(ls, ms); 4061 error = receive_request(ls, ms);
4011 break; 4062 break;
4012 4063
4013 case DLM_MSG_CONVERT: 4064 case DLM_MSG_CONVERT:
4014 receive_convert(ls, ms); 4065 error = receive_convert(ls, ms);
4015 break; 4066 break;
4016 4067
4017 case DLM_MSG_UNLOCK: 4068 case DLM_MSG_UNLOCK:
4018 receive_unlock(ls, ms); 4069 error = receive_unlock(ls, ms);
4019 break; 4070 break;
4020 4071
4021 case DLM_MSG_CANCEL: 4072 case DLM_MSG_CANCEL:
4022 receive_cancel(ls, ms); 4073 noent = 1;
4074 error = receive_cancel(ls, ms);
4023 break; 4075 break;
4024 4076
4025 /* messages sent from a master node (replies to above) */ 4077 /* messages sent from a master node (replies to above) */
4026 4078
4027 case DLM_MSG_REQUEST_REPLY: 4079 case DLM_MSG_REQUEST_REPLY:
4028 receive_request_reply(ls, ms); 4080 error = receive_request_reply(ls, ms);
4029 break; 4081 break;
4030 4082
4031 case DLM_MSG_CONVERT_REPLY: 4083 case DLM_MSG_CONVERT_REPLY:
4032 receive_convert_reply(ls, ms); 4084 error = receive_convert_reply(ls, ms);
4033 break; 4085 break;
4034 4086
4035 case DLM_MSG_UNLOCK_REPLY: 4087 case DLM_MSG_UNLOCK_REPLY:
4036 receive_unlock_reply(ls, ms); 4088 error = receive_unlock_reply(ls, ms);
4037 break; 4089 break;
4038 4090
4039 case DLM_MSG_CANCEL_REPLY: 4091 case DLM_MSG_CANCEL_REPLY:
4040 receive_cancel_reply(ls, ms); 4092 error = receive_cancel_reply(ls, ms);
4041 break; 4093 break;
4042 4094
4043 /* messages sent from a master node (only two types of async msg) */ 4095 /* messages sent from a master node (only two types of async msg) */
4044 4096
4045 case DLM_MSG_GRANT: 4097 case DLM_MSG_GRANT:
4046 receive_grant(ls, ms); 4098 noent = 1;
4099 error = receive_grant(ls, ms);
4047 break; 4100 break;
4048 4101
4049 case DLM_MSG_BAST: 4102 case DLM_MSG_BAST:
4050 receive_bast(ls, ms); 4103 noent = 1;
4104 error = receive_bast(ls, ms);
4051 break; 4105 break;
4052 4106
4053 /* messages sent to a dir node */ 4107 /* messages sent to a dir node */
@@ -4075,6 +4129,37 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
4075 default: 4129 default:
4076 log_error(ls, "unknown message type %d", ms->m_type); 4130 log_error(ls, "unknown message type %d", ms->m_type);
4077 } 4131 }
4132
4133 /*
4134 * When checking for ENOENT, we're checking the result of
4135 * find_lkb(m_remid):
4136 *
4137 * The lock id referenced in the message wasn't found. This may
4138 * happen in normal usage for the async messages and cancel, so
4139 * only use log_debug for them.
4140 *
4141 * Some errors are expected and normal.
4142 */
4143
4144 if (error == -ENOENT && noent) {
4145 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4146 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4147 ms->m_lkid, saved_seq);
4148 } else if (error == -ENOENT) {
4149 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4150 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4151 ms->m_lkid, saved_seq);
4152
4153 if (ms->m_type == DLM_MSG_CONVERT)
4154 dlm_dump_rsb_hash(ls, ms->m_hash);
4155 }
4156
4157 if (error == -EINVAL) {
4158 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4159 "saved_seq %u",
4160 ms->m_type, ms->m_header.h_nodeid,
4161 ms->m_lkid, ms->m_remid, saved_seq);
4162 }
4078} 4163}
4079 4164
4080/* If the lockspace is in recovery mode (locking stopped), then normal 4165/* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4092,16 +4177,17 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4092 dlm_add_requestqueue(ls, nodeid, ms); 4177 dlm_add_requestqueue(ls, nodeid, ms);
4093 } else { 4178 } else {
4094 dlm_wait_requestqueue(ls); 4179 dlm_wait_requestqueue(ls);
4095 _receive_message(ls, ms); 4180 _receive_message(ls, ms, 0);
4096 } 4181 }
4097} 4182}
4098 4183
4099/* This is called by dlm_recoverd to process messages that were saved on 4184/* This is called by dlm_recoverd to process messages that were saved on
4100 the requestqueue. */ 4185 the requestqueue. */
4101 4186
4102void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms) 4187void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4188 uint32_t saved_seq)
4103{ 4189{
4104 _receive_message(ls, ms); 4190 _receive_message(ls, ms, saved_seq);
4105} 4191}
4106 4192
4107/* This is called by the midcomms layer when something is received for 4193/* This is called by the midcomms layer when something is received for
@@ -4137,9 +4223,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4137 4223
4138 ls = dlm_find_lockspace_global(hd->h_lockspace); 4224 ls = dlm_find_lockspace_global(hd->h_lockspace);
4139 if (!ls) { 4225 if (!ls) {
4140 if (dlm_config.ci_log_debug) 4226 if (dlm_config.ci_log_debug) {
4141 log_print("invalid lockspace %x from %d cmd %d type %d", 4227 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4142 hd->h_lockspace, nodeid, hd->h_cmd, type); 4228 "%u from %d cmd %d type %d\n",
4229 hd->h_lockspace, nodeid, hd->h_cmd, type);
4230 }
4143 4231
4144 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) 4232 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4145 dlm_send_ls_not_ready(nodeid, &p->rcom); 4233 dlm_send_ls_not_ready(nodeid, &p->rcom);
@@ -4187,15 +4275,13 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4187/* A waiting lkb needs recovery if the master node has failed, or 4275/* A waiting lkb needs recovery if the master node has failed, or
4188 the master node is changing (only when no directory is used) */ 4276 the master node is changing (only when no directory is used) */
4189 4277
4190static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb) 4278static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4279 int dir_nodeid)
4191{ 4280{
4192 if (dlm_is_removed(ls, lkb->lkb_nodeid)) 4281 if (dlm_no_directory(ls))
4193 return 1; 4282 return 1;
4194 4283
4195 if (!dlm_no_directory(ls)) 4284 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4196 return 0;
4197
4198 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4199 return 1; 4285 return 1;
4200 4286
4201 return 0; 4287 return 0;
@@ -4212,6 +4298,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
4212 struct dlm_lkb *lkb, *safe; 4298 struct dlm_lkb *lkb, *safe;
4213 struct dlm_message *ms_stub; 4299 struct dlm_message *ms_stub;
4214 int wait_type, stub_unlock_result, stub_cancel_result; 4300 int wait_type, stub_unlock_result, stub_cancel_result;
4301 int dir_nodeid;
4215 4302
4216 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL); 4303 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4217 if (!ms_stub) { 4304 if (!ms_stub) {
@@ -4223,13 +4310,21 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
4223 4310
4224 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 4311 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4225 4312
4313 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4314
4226 /* exclude debug messages about unlocks because there can be so 4315 /* exclude debug messages about unlocks because there can be so
4227 many and they aren't very interesting */ 4316 many and they aren't very interesting */
4228 4317
4229 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) { 4318 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4230 log_debug(ls, "recover_waiter %x nodeid %d " 4319 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4231 "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid, 4320 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
4232 lkb->lkb_wait_type, lkb->lkb_wait_nodeid); 4321 lkb->lkb_id,
4322 lkb->lkb_remid,
4323 lkb->lkb_wait_type,
4324 lkb->lkb_resource->res_nodeid,
4325 lkb->lkb_nodeid,
4326 lkb->lkb_wait_nodeid,
4327 dir_nodeid);
4233 } 4328 }
4234 4329
4235 /* all outstanding lookups, regardless of destination will be 4330 /* all outstanding lookups, regardless of destination will be
@@ -4240,7 +4335,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
4240 continue; 4335 continue;
4241 } 4336 }
4242 4337
4243 if (!waiter_needs_recovery(ls, lkb)) 4338 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4244 continue; 4339 continue;
4245 4340
4246 wait_type = lkb->lkb_wait_type; 4341 wait_type = lkb->lkb_wait_type;
@@ -4373,8 +4468,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
4373 ou = is_overlap_unlock(lkb); 4468 ou = is_overlap_unlock(lkb);
4374 err = 0; 4469 err = 0;
4375 4470
4376 log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d", 4471 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4377 lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid); 4472 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
4473 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
4474 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
4475 dlm_dir_nodeid(r), oc, ou);
4378 4476
4379 /* At this point we assume that we won't get a reply to any 4477 /* At this point we assume that we won't get a reply to any
4380 previous op or overlap op on this lock. First, do a big 4478 previous op or overlap op on this lock. First, do a big
@@ -4426,9 +4524,12 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
4426 } 4524 }
4427 } 4525 }
4428 4526
4429 if (err) 4527 if (err) {
4430 log_error(ls, "recover_waiters_post %x %d %x %d %d", 4528 log_error(ls, "waiter %x msg %d r_nodeid %d "
4431 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou); 4529 "dir_nodeid %d overlap %d %d",
4530 lkb->lkb_id, mstype, r->res_nodeid,
4531 dlm_dir_nodeid(r), oc, ou);
4532 }
4432 unlock_rsb(r); 4533 unlock_rsb(r);
4433 put_rsb(r); 4534 put_rsb(r);
4434 dlm_put_lkb(lkb); 4535 dlm_put_lkb(lkb);
@@ -4437,112 +4538,177 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
4437 return error; 4538 return error;
4438} 4539}
4439 4540
4440static void purge_queue(struct dlm_rsb *r, struct list_head *queue, 4541static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
4441 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb)) 4542 struct list_head *list)
4442{ 4543{
4443 struct dlm_ls *ls = r->res_ls;
4444 struct dlm_lkb *lkb, *safe; 4544 struct dlm_lkb *lkb, *safe;
4445 4545
4446 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) { 4546 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
4447 if (test(ls, lkb)) { 4547 if (!is_master_copy(lkb))
4448 rsb_set_flag(r, RSB_LOCKS_PURGED); 4548 continue;
4449 del_lkb(r, lkb); 4549
4450 /* this put should free the lkb */ 4550 /* don't purge lkbs we've added in recover_master_copy for
4451 if (!dlm_put_lkb(lkb)) 4551 the current recovery seq */
4452 log_error(ls, "purged lkb not released"); 4552
4453 } 4553 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
4554 continue;
4555
4556 del_lkb(r, lkb);
4557
4558 /* this put should free the lkb */
4559 if (!dlm_put_lkb(lkb))
4560 log_error(ls, "purged mstcpy lkb not released");
4454 } 4561 }
4455} 4562}
4456 4563
4457static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 4564void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4458{ 4565{
4459 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid)); 4566 struct dlm_ls *ls = r->res_ls;
4460}
4461 4567
4462static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 4568 purge_mstcpy_list(ls, r, &r->res_grantqueue);
4463{ 4569 purge_mstcpy_list(ls, r, &r->res_convertqueue);
4464 return is_master_copy(lkb); 4570 purge_mstcpy_list(ls, r, &r->res_waitqueue);
4465} 4571}
4466 4572
4467static void purge_dead_locks(struct dlm_rsb *r) 4573static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
4574 struct list_head *list,
4575 int nodeid_gone, unsigned int *count)
4468{ 4576{
4469 purge_queue(r, &r->res_grantqueue, &purge_dead_test); 4577 struct dlm_lkb *lkb, *safe;
4470 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4471 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4472}
4473 4578
4474void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 4579 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
4475{ 4580 if (!is_master_copy(lkb))
4476 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test); 4581 continue;
4477 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test); 4582
4478 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test); 4583 if ((lkb->lkb_nodeid == nodeid_gone) ||
4584 dlm_is_removed(ls, lkb->lkb_nodeid)) {
4585
4586 del_lkb(r, lkb);
4587
4588 /* this put should free the lkb */
4589 if (!dlm_put_lkb(lkb))
4590 log_error(ls, "purged dead lkb not released");
4591
4592 rsb_set_flag(r, RSB_RECOVER_GRANT);
4593
4594 (*count)++;
4595 }
4596 }
4479} 4597}
4480 4598
4481/* Get rid of locks held by nodes that are gone. */ 4599/* Get rid of locks held by nodes that are gone. */
4482 4600
4483int dlm_purge_locks(struct dlm_ls *ls) 4601void dlm_recover_purge(struct dlm_ls *ls)
4484{ 4602{
4485 struct dlm_rsb *r; 4603 struct dlm_rsb *r;
4604 struct dlm_member *memb;
4605 int nodes_count = 0;
4606 int nodeid_gone = 0;
4607 unsigned int lkb_count = 0;
4486 4608
4487 log_debug(ls, "dlm_purge_locks"); 4609 /* cache one removed nodeid to optimize the common
4610 case of a single node removed */
4611
4612 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
4613 nodes_count++;
4614 nodeid_gone = memb->nodeid;
4615 }
4616
4617 if (!nodes_count)
4618 return;
4488 4619
4489 down_write(&ls->ls_root_sem); 4620 down_write(&ls->ls_root_sem);
4490 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 4621 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4491 hold_rsb(r); 4622 hold_rsb(r);
4492 lock_rsb(r); 4623 lock_rsb(r);
4493 if (is_master(r)) 4624 if (is_master(r)) {
4494 purge_dead_locks(r); 4625 purge_dead_list(ls, r, &r->res_grantqueue,
4626 nodeid_gone, &lkb_count);
4627 purge_dead_list(ls, r, &r->res_convertqueue,
4628 nodeid_gone, &lkb_count);
4629 purge_dead_list(ls, r, &r->res_waitqueue,
4630 nodeid_gone, &lkb_count);
4631 }
4495 unlock_rsb(r); 4632 unlock_rsb(r);
4496 unhold_rsb(r); 4633 unhold_rsb(r);
4497 4634 cond_resched();
4498 schedule();
4499 } 4635 }
4500 up_write(&ls->ls_root_sem); 4636 up_write(&ls->ls_root_sem);
4501 4637
4502 return 0; 4638 if (lkb_count)
4639 log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
4640 lkb_count, nodes_count);
4503} 4641}
4504 4642
4505static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) 4643static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
4506{ 4644{
4507 struct rb_node *n; 4645 struct rb_node *n;
4508 struct dlm_rsb *r, *r_ret = NULL; 4646 struct dlm_rsb *r;
4509 4647
4510 spin_lock(&ls->ls_rsbtbl[bucket].lock); 4648 spin_lock(&ls->ls_rsbtbl[bucket].lock);
4511 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { 4649 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4512 r = rb_entry(n, struct dlm_rsb, res_hashnode); 4650 r = rb_entry(n, struct dlm_rsb, res_hashnode);
4513 if (!rsb_flag(r, RSB_LOCKS_PURGED)) 4651
4652 if (!rsb_flag(r, RSB_RECOVER_GRANT))
4653 continue;
4654 rsb_clear_flag(r, RSB_RECOVER_GRANT);
4655 if (!is_master(r))
4514 continue; 4656 continue;
4515 hold_rsb(r); 4657 hold_rsb(r);
4516 rsb_clear_flag(r, RSB_LOCKS_PURGED); 4658 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4517 r_ret = r; 4659 return r;
4518 break;
4519 } 4660 }
4520 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 4661 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4521 return r_ret; 4662 return NULL;
4522} 4663}
4523 4664
4524void dlm_grant_after_purge(struct dlm_ls *ls) 4665/*
4666 * Attempt to grant locks on resources that we are the master of.
4667 * Locks may have become grantable during recovery because locks
4668 * from departed nodes have been purged (or not rebuilt), allowing
4669 * previously blocked locks to now be granted. The subset of rsb's
4670 * we are interested in are those with lkb's on either the convert or
4671 * waiting queues.
4672 *
4673 * Simplest would be to go through each master rsb and check for non-empty
4674 * convert or waiting queues, and attempt to grant on those rsbs.
4675 * Checking the queues requires lock_rsb, though, for which we'd need
4676 * to release the rsbtbl lock. This would make iterating through all
4677 * rsb's very inefficient. So, we rely on earlier recovery routines
4678 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
4679 * locks for.
4680 */
4681
4682void dlm_recover_grant(struct dlm_ls *ls)
4525{ 4683{
4526 struct dlm_rsb *r; 4684 struct dlm_rsb *r;
4527 int bucket = 0; 4685 int bucket = 0;
4686 unsigned int count = 0;
4687 unsigned int rsb_count = 0;
4688 unsigned int lkb_count = 0;
4528 4689
4529 while (1) { 4690 while (1) {
4530 r = find_purged_rsb(ls, bucket); 4691 r = find_grant_rsb(ls, bucket);
4531 if (!r) { 4692 if (!r) {
4532 if (bucket == ls->ls_rsbtbl_size - 1) 4693 if (bucket == ls->ls_rsbtbl_size - 1)
4533 break; 4694 break;
4534 bucket++; 4695 bucket++;
4535 continue; 4696 continue;
4536 } 4697 }
4698 rsb_count++;
4699 count = 0;
4537 lock_rsb(r); 4700 lock_rsb(r);
4538 if (is_master(r)) { 4701 grant_pending_locks(r, &count);
4539 grant_pending_locks(r); 4702 lkb_count += count;
4540 confirm_master(r, 0); 4703 confirm_master(r, 0);
4541 }
4542 unlock_rsb(r); 4704 unlock_rsb(r);
4543 put_rsb(r); 4705 put_rsb(r);
4544 schedule(); 4706 cond_resched();
4545 } 4707 }
4708
4709 if (lkb_count)
4710 log_debug(ls, "dlm_recover_grant %u locks on %u resources",
4711 lkb_count, rsb_count);
4546} 4712}
4547 4713
4548static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 4714static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
@@ -4631,6 +4797,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4631 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 4797 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4632 struct dlm_rsb *r; 4798 struct dlm_rsb *r;
4633 struct dlm_lkb *lkb; 4799 struct dlm_lkb *lkb;
4800 uint32_t remid = 0;
4634 int error; 4801 int error;
4635 4802
4636 if (rl->rl_parent_lkid) { 4803 if (rl->rl_parent_lkid) {
@@ -4638,14 +4805,31 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4638 goto out; 4805 goto out;
4639 } 4806 }
4640 4807
4641 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 4808 remid = le32_to_cpu(rl->rl_lkid);
4642 R_MASTER, &r); 4809
4810 /* In general we expect the rsb returned to be R_MASTER, but we don't
4811 have to require it. Recovery of masters on one node can overlap
4812 recovery of locks on another node, so one node can send us MSTCPY
4813 locks before we've made ourselves master of this rsb. We can still
4814 add new MSTCPY locks that we receive here without any harm; when
4815 we make ourselves master, dlm_recover_masters() won't touch the
4816 MSTCPY locks we've received early. */
4817
4818 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
4643 if (error) 4819 if (error)
4644 goto out; 4820 goto out;
4645 4821
4822 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
4823 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
4824 rc->rc_header.h_nodeid, remid);
4825 error = -EBADR;
4826 put_rsb(r);
4827 goto out;
4828 }
4829
4646 lock_rsb(r); 4830 lock_rsb(r);
4647 4831
4648 lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid)); 4832 lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
4649 if (lkb) { 4833 if (lkb) {
4650 error = -EEXIST; 4834 error = -EEXIST;
4651 goto out_remid; 4835 goto out_remid;
@@ -4664,19 +4848,25 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4664 attach_lkb(r, lkb); 4848 attach_lkb(r, lkb);
4665 add_lkb(r, lkb, rl->rl_status); 4849 add_lkb(r, lkb, rl->rl_status);
4666 error = 0; 4850 error = 0;
4851 ls->ls_recover_locks_in++;
4852
4853 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
4854 rsb_set_flag(r, RSB_RECOVER_GRANT);
4667 4855
4668 out_remid: 4856 out_remid:
4669 /* this is the new value returned to the lock holder for 4857 /* this is the new value returned to the lock holder for
4670 saving in its process-copy lkb */ 4858 saving in its process-copy lkb */
4671 rl->rl_remid = cpu_to_le32(lkb->lkb_id); 4859 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4672 4860
4861 lkb->lkb_recover_seq = ls->ls_recover_seq;
4862
4673 out_unlock: 4863 out_unlock:
4674 unlock_rsb(r); 4864 unlock_rsb(r);
4675 put_rsb(r); 4865 put_rsb(r);
4676 out: 4866 out:
4677 if (error) 4867 if (error && error != -EEXIST)
4678 log_debug(ls, "recover_master_copy %d %x", error, 4868 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
4679 le32_to_cpu(rl->rl_lkid)); 4869 rc->rc_header.h_nodeid, remid, error);
4680 rl->rl_result = cpu_to_le32(error); 4870 rl->rl_result = cpu_to_le32(error);
4681 return error; 4871 return error;
4682} 4872}
@@ -4687,41 +4877,52 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4687 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 4877 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4688 struct dlm_rsb *r; 4878 struct dlm_rsb *r;
4689 struct dlm_lkb *lkb; 4879 struct dlm_lkb *lkb;
4690 int error; 4880 uint32_t lkid, remid;
4881 int error, result;
4882
4883 lkid = le32_to_cpu(rl->rl_lkid);
4884 remid = le32_to_cpu(rl->rl_remid);
4885 result = le32_to_cpu(rl->rl_result);
4691 4886
4692 error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb); 4887 error = find_lkb(ls, lkid, &lkb);
4693 if (error) { 4888 if (error) {
4694 log_error(ls, "recover_process_copy no lkid %x", 4889 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
4695 le32_to_cpu(rl->rl_lkid)); 4890 lkid, rc->rc_header.h_nodeid, remid, result);
4696 return error; 4891 return error;
4697 } 4892 }
4698 4893
4699 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4700
4701 error = le32_to_cpu(rl->rl_result);
4702
4703 r = lkb->lkb_resource; 4894 r = lkb->lkb_resource;
4704 hold_rsb(r); 4895 hold_rsb(r);
4705 lock_rsb(r); 4896 lock_rsb(r);
4706 4897
4707 switch (error) { 4898 if (!is_process_copy(lkb)) {
4899 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
4900 lkid, rc->rc_header.h_nodeid, remid, result);
4901 dlm_dump_rsb(r);
4902 unlock_rsb(r);
4903 put_rsb(r);
4904 dlm_put_lkb(lkb);
4905 return -EINVAL;
4906 }
4907
4908 switch (result) {
4708 case -EBADR: 4909 case -EBADR:
4709 /* There's a chance the new master received our lock before 4910 /* There's a chance the new master received our lock before
4710 dlm_recover_master_reply(), this wouldn't happen if we did 4911 dlm_recover_master_reply(), this wouldn't happen if we did
4711 a barrier between recover_masters and recover_locks. */ 4912 a barrier between recover_masters and recover_locks. */
4712 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id, 4913
4713 (unsigned long)r, r->res_name); 4914 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
4915 lkid, rc->rc_header.h_nodeid, remid, result);
4916
4714 dlm_send_rcom_lock(r, lkb); 4917 dlm_send_rcom_lock(r, lkb);
4715 goto out; 4918 goto out;
4716 case -EEXIST: 4919 case -EEXIST:
4717 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4718 /* fall through */
4719 case 0: 4920 case 0:
4720 lkb->lkb_remid = le32_to_cpu(rl->rl_remid); 4921 lkb->lkb_remid = remid;
4721 break; 4922 break;
4722 default: 4923 default:
4723 log_error(ls, "dlm_recover_process_copy unknown error %d %x", 4924 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
4724 error, lkb->lkb_id); 4925 lkid, rc->rc_header.h_nodeid, remid, result);
4725 } 4926 }
4726 4927
4727 /* an ack for dlm_recover_locks() which waits for replies from 4928 /* an ack for dlm_recover_locks() which waits for replies from
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 1a255307f6ff..c8b226c62807 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -15,7 +15,8 @@
15 15
16void dlm_dump_rsb(struct dlm_rsb *r); 16void dlm_dump_rsb(struct dlm_rsb *r);
17void dlm_print_lkb(struct dlm_lkb *lkb); 17void dlm_print_lkb(struct dlm_lkb *lkb);
18void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms); 18void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
19 uint32_t saved_seq);
19void dlm_receive_buffer(union dlm_packet *p, int nodeid); 20void dlm_receive_buffer(union dlm_packet *p, int nodeid);
20int dlm_modes_compat(int mode1, int mode2); 21int dlm_modes_compat(int mode1, int mode2);
21void dlm_put_rsb(struct dlm_rsb *r); 22void dlm_put_rsb(struct dlm_rsb *r);
@@ -31,9 +32,9 @@ void dlm_adjust_timeouts(struct dlm_ls *ls);
31int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, 32int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
32 unsigned int flags, struct dlm_rsb **r_ret); 33 unsigned int flags, struct dlm_rsb **r_ret);
33 34
34int dlm_purge_locks(struct dlm_ls *ls); 35void dlm_recover_purge(struct dlm_ls *ls);
35void dlm_purge_mstcpy_locks(struct dlm_rsb *r); 36void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
36void dlm_grant_after_purge(struct dlm_ls *ls); 37void dlm_recover_grant(struct dlm_ls *ls);
37int dlm_recover_waiters_post(struct dlm_ls *ls); 38int dlm_recover_waiters_post(struct dlm_ls *ls);
38void dlm_recover_waiters_pre(struct dlm_ls *ls); 39void dlm_recover_waiters_pre(struct dlm_ls *ls);
39int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc); 40int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index a1ea25face82..ca506abbdd3b 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -74,6 +74,19 @@ static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
74 return len; 74 return len;
75} 75}
76 76
77static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
78{
79 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
80}
81
82static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
83{
84 int val = simple_strtoul(buf, NULL, 0);
85 if (val == 1)
86 set_bit(LSFL_NODIR, &ls->ls_flags);
87 return len;
88}
89
77static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf) 90static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78{ 91{
79 uint32_t status = dlm_recover_status(ls); 92 uint32_t status = dlm_recover_status(ls);
@@ -107,6 +120,12 @@ static struct dlm_attr dlm_attr_id = {
107 .store = dlm_id_store 120 .store = dlm_id_store
108}; 121};
109 122
123static struct dlm_attr dlm_attr_nodir = {
124 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
125 .show = dlm_nodir_show,
126 .store = dlm_nodir_store
127};
128
110static struct dlm_attr dlm_attr_recover_status = { 129static struct dlm_attr dlm_attr_recover_status = {
111 .attr = {.name = "recover_status", .mode = S_IRUGO}, 130 .attr = {.name = "recover_status", .mode = S_IRUGO},
112 .show = dlm_recover_status_show 131 .show = dlm_recover_status_show
@@ -121,6 +140,7 @@ static struct attribute *dlm_attrs[] = {
121 &dlm_attr_control.attr, 140 &dlm_attr_control.attr,
122 &dlm_attr_event.attr, 141 &dlm_attr_event.attr,
123 &dlm_attr_id.attr, 142 &dlm_attr_id.attr,
143 &dlm_attr_nodir.attr,
124 &dlm_attr_recover_status.attr, 144 &dlm_attr_recover_status.attr,
125 &dlm_attr_recover_nodeid.attr, 145 &dlm_attr_recover_nodeid.attr,
126 NULL, 146 NULL,
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 133ef6dc7cb7..5c1b0e38c7a4 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -142,6 +142,7 @@ struct writequeue_entry {
142 142
143static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 143static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
144static int dlm_local_count; 144static int dlm_local_count;
145static int dlm_allow_conn;
145 146
146/* Work queues */ 147/* Work queues */
147static struct workqueue_struct *recv_workqueue; 148static struct workqueue_struct *recv_workqueue;
@@ -710,6 +711,13 @@ static int tcp_accept_from_sock(struct connection *con)
710 struct connection *newcon; 711 struct connection *newcon;
711 struct connection *addcon; 712 struct connection *addcon;
712 713
714 mutex_lock(&connections_lock);
715 if (!dlm_allow_conn) {
716 mutex_unlock(&connections_lock);
717 return -1;
718 }
719 mutex_unlock(&connections_lock);
720
713 memset(&peeraddr, 0, sizeof(peeraddr)); 721 memset(&peeraddr, 0, sizeof(peeraddr));
714 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 722 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
715 IPPROTO_TCP, &newsock); 723 IPPROTO_TCP, &newsock);
@@ -1503,6 +1511,7 @@ void dlm_lowcomms_stop(void)
1503 socket activity. 1511 socket activity.
1504 */ 1512 */
1505 mutex_lock(&connections_lock); 1513 mutex_lock(&connections_lock);
1514 dlm_allow_conn = 0;
1506 foreach_conn(stop_conn); 1515 foreach_conn(stop_conn);
1507 mutex_unlock(&connections_lock); 1516 mutex_unlock(&connections_lock);
1508 1517
@@ -1530,7 +1539,7 @@ int dlm_lowcomms_start(void)
1530 if (!dlm_local_count) { 1539 if (!dlm_local_count) {
1531 error = -ENOTCONN; 1540 error = -ENOTCONN;
1532 log_print("no local IP address has been set"); 1541 log_print("no local IP address has been set");
1533 goto out; 1542 goto fail;
1534 } 1543 }
1535 1544
1536 error = -ENOMEM; 1545 error = -ENOMEM;
@@ -1538,7 +1547,13 @@ int dlm_lowcomms_start(void)
1538 __alignof__(struct connection), 0, 1547 __alignof__(struct connection), 0,
1539 NULL); 1548 NULL);
1540 if (!con_cache) 1549 if (!con_cache)
1541 goto out; 1550 goto fail;
1551
1552 error = work_start();
1553 if (error)
1554 goto fail_destroy;
1555
1556 dlm_allow_conn = 1;
1542 1557
1543 /* Start listening */ 1558 /* Start listening */
1544 if (dlm_config.ci_protocol == 0) 1559 if (dlm_config.ci_protocol == 0)
@@ -1548,20 +1563,17 @@ int dlm_lowcomms_start(void)
1548 if (error) 1563 if (error)
1549 goto fail_unlisten; 1564 goto fail_unlisten;
1550 1565
1551 error = work_start();
1552 if (error)
1553 goto fail_unlisten;
1554
1555 return 0; 1566 return 0;
1556 1567
1557fail_unlisten: 1568fail_unlisten:
1569 dlm_allow_conn = 0;
1558 con = nodeid2con(0,0); 1570 con = nodeid2con(0,0);
1559 if (con) { 1571 if (con) {
1560 close_connection(con, false); 1572 close_connection(con, false);
1561 kmem_cache_free(con_cache, con); 1573 kmem_cache_free(con_cache, con);
1562 } 1574 }
1575fail_destroy:
1563 kmem_cache_destroy(con_cache); 1576 kmem_cache_destroy(con_cache);
1564 1577fail:
1565out:
1566 return error; 1578 return error;
1567} 1579}
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index da64df7576e1..7cd24bccd4fe 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -21,21 +21,19 @@ static struct kmem_cache *rsb_cache;
21 21
22int __init dlm_memory_init(void) 22int __init dlm_memory_init(void)
23{ 23{
24 int ret = 0;
25
26 lkb_cache = kmem_cache_create("dlm_lkb", sizeof(struct dlm_lkb), 24 lkb_cache = kmem_cache_create("dlm_lkb", sizeof(struct dlm_lkb),
27 __alignof__(struct dlm_lkb), 0, NULL); 25 __alignof__(struct dlm_lkb), 0, NULL);
28 if (!lkb_cache) 26 if (!lkb_cache)
29 ret = -ENOMEM; 27 return -ENOMEM;
30 28
31 rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb), 29 rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb),
32 __alignof__(struct dlm_rsb), 0, NULL); 30 __alignof__(struct dlm_rsb), 0, NULL);
33 if (!rsb_cache) { 31 if (!rsb_cache) {
34 kmem_cache_destroy(lkb_cache); 32 kmem_cache_destroy(lkb_cache);
35 ret = -ENOMEM; 33 return -ENOMEM;
36 } 34 }
37 35
38 return ret; 36 return 0;
39} 37}
40 38
41void dlm_memory_exit(void) 39void dlm_memory_exit(void)
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index ac5c616c9696..64d3e2b958c7 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -486,47 +486,50 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
486 return 0; 486 return 0;
487} 487}
488 488
489static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc) 489/* Called by dlm_recv; corresponds to dlm_receive_message() but special
490 recovery-only comms are sent through here. */
491
492void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
490{ 493{
494 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
495 int stop, reply = 0, lock = 0;
496 uint32_t status;
491 uint64_t seq; 497 uint64_t seq;
492 int rv = 0;
493 498
494 switch (rc->rc_type) { 499 switch (rc->rc_type) {
500 case DLM_RCOM_LOCK:
501 lock = 1;
502 break;
503 case DLM_RCOM_LOCK_REPLY:
504 lock = 1;
505 reply = 1;
506 break;
495 case DLM_RCOM_STATUS_REPLY: 507 case DLM_RCOM_STATUS_REPLY:
496 case DLM_RCOM_NAMES_REPLY: 508 case DLM_RCOM_NAMES_REPLY:
497 case DLM_RCOM_LOOKUP_REPLY: 509 case DLM_RCOM_LOOKUP_REPLY:
498 case DLM_RCOM_LOCK_REPLY: 510 reply = 1;
499 spin_lock(&ls->ls_recover_lock); 511 };
500 seq = ls->ls_recover_seq;
501 spin_unlock(&ls->ls_recover_lock);
502 if (rc->rc_seq_reply != seq) {
503 log_debug(ls, "ignoring old reply %x from %d "
504 "seq_reply %llx expect %llx",
505 rc->rc_type, rc->rc_header.h_nodeid,
506 (unsigned long long)rc->rc_seq_reply,
507 (unsigned long long)seq);
508 rv = 1;
509 }
510 }
511 return rv;
512}
513
514/* Called by dlm_recv; corresponds to dlm_receive_message() but special
515 recovery-only comms are sent through here. */
516 512
517void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 513 spin_lock(&ls->ls_recover_lock);
518{ 514 status = ls->ls_recover_status;
519 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); 515 stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
516 seq = ls->ls_recover_seq;
517 spin_unlock(&ls->ls_recover_lock);
520 518
521 if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) { 519 if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) ||
522 log_debug(ls, "ignoring recovery message %x from %d", 520 (reply && (rc->rc_seq_reply != seq)) ||
523 rc->rc_type, nodeid); 521 (lock && !(status & DLM_RS_DIR))) {
522 log_limit(ls, "dlm_receive_rcom ignore msg %d "
523 "from %d %llu %llu recover seq %llu sts %x gen %u",
524 rc->rc_type,
525 nodeid,
526 (unsigned long long)rc->rc_seq,
527 (unsigned long long)rc->rc_seq_reply,
528 (unsigned long long)seq,
529 status, ls->ls_generation);
524 goto out; 530 goto out;
525 } 531 }
526 532
527 if (is_old_reply(ls, rc))
528 goto out;
529
530 switch (rc->rc_type) { 533 switch (rc->rc_type) {
531 case DLM_RCOM_STATUS: 534 case DLM_RCOM_STATUS:
532 receive_rcom_status(ls, rc); 535 receive_rcom_status(ls, rc);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 34d5adf1fce7..7554e4dac6bb 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -339,9 +339,12 @@ static void set_lock_master(struct list_head *queue, int nodeid)
339{ 339{
340 struct dlm_lkb *lkb; 340 struct dlm_lkb *lkb;
341 341
342 list_for_each_entry(lkb, queue, lkb_statequeue) 342 list_for_each_entry(lkb, queue, lkb_statequeue) {
343 if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) 343 if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
344 lkb->lkb_nodeid = nodeid; 344 lkb->lkb_nodeid = nodeid;
345 lkb->lkb_remid = 0;
346 }
347 }
345} 348}
346 349
347static void set_master_lkbs(struct dlm_rsb *r) 350static void set_master_lkbs(struct dlm_rsb *r)
@@ -354,18 +357,16 @@ static void set_master_lkbs(struct dlm_rsb *r)
354/* 357/*
355 * Propagate the new master nodeid to locks 358 * Propagate the new master nodeid to locks
356 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider. 359 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
357 * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which 360 * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
358 * rsb's to consider. 361 * rsb's to consider.
359 */ 362 */
360 363
361static void set_new_master(struct dlm_rsb *r, int nodeid) 364static void set_new_master(struct dlm_rsb *r, int nodeid)
362{ 365{
363 lock_rsb(r);
364 r->res_nodeid = nodeid; 366 r->res_nodeid = nodeid;
365 set_master_lkbs(r); 367 set_master_lkbs(r);
366 rsb_set_flag(r, RSB_NEW_MASTER); 368 rsb_set_flag(r, RSB_NEW_MASTER);
367 rsb_set_flag(r, RSB_NEW_MASTER2); 369 rsb_set_flag(r, RSB_NEW_MASTER2);
368 unlock_rsb(r);
369} 370}
370 371
371/* 372/*
@@ -376,9 +377,9 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
376static int recover_master(struct dlm_rsb *r) 377static int recover_master(struct dlm_rsb *r)
377{ 378{
378 struct dlm_ls *ls = r->res_ls; 379 struct dlm_ls *ls = r->res_ls;
379 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); 380 int error, ret_nodeid;
380 381 int our_nodeid = dlm_our_nodeid();
381 dir_nodeid = dlm_dir_nodeid(r); 382 int dir_nodeid = dlm_dir_nodeid(r);
382 383
383 if (dir_nodeid == our_nodeid) { 384 if (dir_nodeid == our_nodeid) {
384 error = dlm_dir_lookup(ls, our_nodeid, r->res_name, 385 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
@@ -388,7 +389,9 @@ static int recover_master(struct dlm_rsb *r)
388 389
389 if (ret_nodeid == our_nodeid) 390 if (ret_nodeid == our_nodeid)
390 ret_nodeid = 0; 391 ret_nodeid = 0;
392 lock_rsb(r);
391 set_new_master(r, ret_nodeid); 393 set_new_master(r, ret_nodeid);
394 unlock_rsb(r);
392 } else { 395 } else {
393 recover_list_add(r); 396 recover_list_add(r);
394 error = dlm_send_rcom_lookup(r, dir_nodeid); 397 error = dlm_send_rcom_lookup(r, dir_nodeid);
@@ -398,24 +401,33 @@ static int recover_master(struct dlm_rsb *r)
398} 401}
399 402
400/* 403/*
401 * When not using a directory, most resource names will hash to a new static 404 * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
402 * master nodeid and the resource will need to be remastered. 405 * This is necessary because recovery can be started, aborted and restarted,
406 * causing the master nodeid to briefly change during the aborted recovery, and
407 * change back to the original value in the second recovery. The MSTCPY locks
408 * may or may not have been purged during the aborted recovery. Another node
409 * with an outstanding request in waiters list and a request reply saved in the
410 * requestqueue, cannot know whether it should ignore the reply and resend the
411 * request, or accept the reply and complete the request. It must do the
412 * former if the remote node purged MSTCPY locks, and it must do the later if
413 * the remote node did not. This is solved by always purging MSTCPY locks, in
414 * which case, the request reply would always be ignored and the request
415 * resent.
403 */ 416 */
404 417
405static int recover_master_static(struct dlm_rsb *r) 418static int recover_master_static(struct dlm_rsb *r)
406{ 419{
407 int master = dlm_dir_nodeid(r); 420 int dir_nodeid = dlm_dir_nodeid(r);
421 int new_master = dir_nodeid;
408 422
409 if (master == dlm_our_nodeid()) 423 if (dir_nodeid == dlm_our_nodeid())
410 master = 0; 424 new_master = 0;
411 425
412 if (r->res_nodeid != master) { 426 lock_rsb(r);
413 if (is_master(r)) 427 dlm_purge_mstcpy_locks(r);
414 dlm_purge_mstcpy_locks(r); 428 set_new_master(r, new_master);
415 set_new_master(r, master); 429 unlock_rsb(r);
416 return 1; 430 return 1;
417 }
418 return 0;
419} 431}
420 432
421/* 433/*
@@ -481,7 +493,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
481 if (nodeid == dlm_our_nodeid()) 493 if (nodeid == dlm_our_nodeid())
482 nodeid = 0; 494 nodeid = 0;
483 495
496 lock_rsb(r);
484 set_new_master(r, nodeid); 497 set_new_master(r, nodeid);
498 unlock_rsb(r);
485 recover_list_del(r); 499 recover_list_del(r);
486 500
487 if (recover_list_empty(ls)) 501 if (recover_list_empty(ls))
@@ -556,8 +570,6 @@ int dlm_recover_locks(struct dlm_ls *ls)
556 struct dlm_rsb *r; 570 struct dlm_rsb *r;
557 int error, count = 0; 571 int error, count = 0;
558 572
559 log_debug(ls, "dlm_recover_locks");
560
561 down_read(&ls->ls_root_sem); 573 down_read(&ls->ls_root_sem);
562 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 574 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
563 if (is_master(r)) { 575 if (is_master(r)) {
@@ -584,7 +596,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
584 } 596 }
585 up_read(&ls->ls_root_sem); 597 up_read(&ls->ls_root_sem);
586 598
587 log_debug(ls, "dlm_recover_locks %d locks", count); 599 log_debug(ls, "dlm_recover_locks %d out", count);
588 600
589 error = dlm_wait_function(ls, &recover_list_empty); 601 error = dlm_wait_function(ls, &recover_list_empty);
590 out: 602 out:
@@ -721,21 +733,19 @@ static void recover_conversion(struct dlm_rsb *r)
721} 733}
722 734
723/* We've become the new master for this rsb and waiting/converting locks may 735/* We've become the new master for this rsb and waiting/converting locks may
724 need to be granted in dlm_grant_after_purge() due to locks that may have 736 need to be granted in dlm_recover_grant() due to locks that may have
725 existed from a removed node. */ 737 existed from a removed node. */
726 738
727static void set_locks_purged(struct dlm_rsb *r) 739static void recover_grant(struct dlm_rsb *r)
728{ 740{
729 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) 741 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
730 rsb_set_flag(r, RSB_LOCKS_PURGED); 742 rsb_set_flag(r, RSB_RECOVER_GRANT);
731} 743}
732 744
733void dlm_recover_rsbs(struct dlm_ls *ls) 745void dlm_recover_rsbs(struct dlm_ls *ls)
734{ 746{
735 struct dlm_rsb *r; 747 struct dlm_rsb *r;
736 int count = 0; 748 unsigned int count = 0;
737
738 log_debug(ls, "dlm_recover_rsbs");
739 749
740 down_read(&ls->ls_root_sem); 750 down_read(&ls->ls_root_sem);
741 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 751 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
@@ -744,7 +754,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
744 if (rsb_flag(r, RSB_RECOVER_CONVERT)) 754 if (rsb_flag(r, RSB_RECOVER_CONVERT))
745 recover_conversion(r); 755 recover_conversion(r);
746 if (rsb_flag(r, RSB_NEW_MASTER2)) 756 if (rsb_flag(r, RSB_NEW_MASTER2))
747 set_locks_purged(r); 757 recover_grant(r);
748 recover_lvb(r); 758 recover_lvb(r);
749 count++; 759 count++;
750 } 760 }
@@ -754,7 +764,8 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
754 } 764 }
755 up_read(&ls->ls_root_sem); 765 up_read(&ls->ls_root_sem);
756 766
757 log_debug(ls, "dlm_recover_rsbs %d rsbs", count); 767 if (count)
768 log_debug(ls, "dlm_recover_rsbs %d done", count);
758} 769}
759 770
760/* Create a single list of all root rsb's to be used during recovery */ 771/* Create a single list of all root rsb's to be used during recovery */
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 3780caf7ae0c..f1a9073c0835 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -54,7 +54,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
54 unsigned long start; 54 unsigned long start;
55 int error, neg = 0; 55 int error, neg = 0;
56 56
57 log_debug(ls, "dlm_recover %llx", (unsigned long long)rv->seq); 57 log_debug(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
58 58
59 mutex_lock(&ls->ls_recoverd_active); 59 mutex_lock(&ls->ls_recoverd_active);
60 60
@@ -84,6 +84,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
84 goto fail; 84 goto fail;
85 } 85 }
86 86
87 ls->ls_recover_locks_in = 0;
88
87 dlm_set_recover_status(ls, DLM_RS_NODES); 89 dlm_set_recover_status(ls, DLM_RS_NODES);
88 90
89 error = dlm_recover_members_wait(ls); 91 error = dlm_recover_members_wait(ls);
@@ -130,7 +132,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
130 * Clear lkb's for departed nodes. 132 * Clear lkb's for departed nodes.
131 */ 133 */
132 134
133 dlm_purge_locks(ls); 135 dlm_recover_purge(ls);
134 136
135 /* 137 /*
136 * Get new master nodeid's for rsb's that were mastered on 138 * Get new master nodeid's for rsb's that were mastered on
@@ -161,6 +163,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
161 goto fail; 163 goto fail;
162 } 164 }
163 165
166 log_debug(ls, "dlm_recover_locks %u in",
167 ls->ls_recover_locks_in);
168
164 /* 169 /*
165 * Finalize state in master rsb's now that all locks can be 170 * Finalize state in master rsb's now that all locks can be
166 * checked. This includes conversion resolution and lvb 171 * checked. This includes conversion resolution and lvb
@@ -225,9 +230,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
225 goto fail; 230 goto fail;
226 } 231 }
227 232
228 dlm_grant_after_purge(ls); 233 dlm_recover_grant(ls);
229 234
230 log_debug(ls, "dlm_recover %llx generation %u done: %u ms", 235 log_debug(ls, "dlm_recover %llu generation %u done: %u ms",
231 (unsigned long long)rv->seq, ls->ls_generation, 236 (unsigned long long)rv->seq, ls->ls_generation,
232 jiffies_to_msecs(jiffies - start)); 237 jiffies_to_msecs(jiffies - start));
233 mutex_unlock(&ls->ls_recoverd_active); 238 mutex_unlock(&ls->ls_recoverd_active);
@@ -237,7 +242,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
237 242
238 fail: 243 fail:
239 dlm_release_root_list(ls); 244 dlm_release_root_list(ls);
240 log_debug(ls, "dlm_recover %llx error %d", 245 log_debug(ls, "dlm_recover %llu error %d",
241 (unsigned long long)rv->seq, error); 246 (unsigned long long)rv->seq, error);
242 mutex_unlock(&ls->ls_recoverd_active); 247 mutex_unlock(&ls->ls_recoverd_active);
243 return error; 248 return error;
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index a44fa22890e1..1695f1b0dd45 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -19,6 +19,7 @@
19 19
20struct rq_entry { 20struct rq_entry {
21 struct list_head list; 21 struct list_head list;
22 uint32_t recover_seq;
22 int nodeid; 23 int nodeid;
23 struct dlm_message request; 24 struct dlm_message request;
24}; 25};
@@ -41,6 +42,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
41 return; 42 return;
42 } 43 }
43 44
45 e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
44 e->nodeid = nodeid; 46 e->nodeid = nodeid;
45 memcpy(&e->request, ms, ms->m_header.h_length); 47 memcpy(&e->request, ms, ms->m_header.h_length);
46 48
@@ -63,6 +65,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
63int dlm_process_requestqueue(struct dlm_ls *ls) 65int dlm_process_requestqueue(struct dlm_ls *ls)
64{ 66{
65 struct rq_entry *e; 67 struct rq_entry *e;
68 struct dlm_message *ms;
66 int error = 0; 69 int error = 0;
67 70
68 mutex_lock(&ls->ls_requestqueue_mutex); 71 mutex_lock(&ls->ls_requestqueue_mutex);
@@ -76,7 +79,15 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
76 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); 79 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
77 mutex_unlock(&ls->ls_requestqueue_mutex); 80 mutex_unlock(&ls->ls_requestqueue_mutex);
78 81
79 dlm_receive_message_saved(ls, &e->request); 82 ms = &e->request;
83
84 log_limit(ls, "dlm_process_requestqueue msg %d from %d "
85 "lkid %x remid %x result %d seq %u",
86 ms->m_type, ms->m_header.h_nodeid,
87 ms->m_lkid, ms->m_remid, ms->m_result,
88 e->recover_seq);
89
90 dlm_receive_message_saved(ls, &e->request, e->recover_seq);
80 91
81 mutex_lock(&ls->ls_requestqueue_mutex); 92 mutex_lock(&ls->ls_requestqueue_mutex);
82 list_del(&e->list); 93 list_del(&e->list);
@@ -138,35 +149,7 @@ static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
138 if (!dlm_no_directory(ls)) 149 if (!dlm_no_directory(ls))
139 return 0; 150 return 0;
140 151
141 /* with no directory, the master is likely to change as a part of 152 return 1;
142 recovery; requests to/from the defunct master need to be purged */
143
144 switch (type) {
145 case DLM_MSG_REQUEST:
146 case DLM_MSG_CONVERT:
147 case DLM_MSG_UNLOCK:
148 case DLM_MSG_CANCEL:
149 /* we're no longer the master of this resource, the sender
150 will resend to the new master (see waiter_needs_recovery) */
151
152 if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid())
153 return 1;
154 break;
155
156 case DLM_MSG_REQUEST_REPLY:
157 case DLM_MSG_CONVERT_REPLY:
158 case DLM_MSG_UNLOCK_REPLY:
159 case DLM_MSG_CANCEL_REPLY:
160 case DLM_MSG_GRANT:
161 /* this reply is from the former master of the resource,
162 we'll resend to the new master if needed */
163
164 if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid)
165 return 1;
166 break;
167 }
168
169 return 0;
170} 153}
171 154
172void dlm_purge_requestqueue(struct dlm_ls *ls) 155void dlm_purge_requestqueue(struct dlm_ls *ls)
diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h
index aa9949e5de26..67fd6beffece 100644
--- a/fs/gfs2/incore.h
+++ b/fs/gfs2/incore.h
@@ -543,7 +543,6 @@ struct gfs2_sb_host {
543struct lm_lockstruct { 543struct lm_lockstruct {
544 int ls_jid; 544 int ls_jid;
545 unsigned int ls_first; 545 unsigned int ls_first;
546 unsigned int ls_nodir;
547 const struct lm_lockops *ls_ops; 546 const struct lm_lockops *ls_ops;
548 dlm_lockspace_t *ls_dlm; 547 dlm_lockspace_t *ls_dlm;
549 548
diff --git a/fs/gfs2/lock_dlm.c b/fs/gfs2/lock_dlm.c
index 5f5e70e047dc..4a38db739ca0 100644
--- a/fs/gfs2/lock_dlm.c
+++ b/fs/gfs2/lock_dlm.c
@@ -1209,8 +1209,6 @@ static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
1209 fsname++; 1209 fsname++;
1210 1210
1211 flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL; 1211 flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL;
1212 if (ls->ls_nodir)
1213 flags |= DLM_LSFL_NODIR;
1214 1212
1215 /* 1213 /*
1216 * create/join lockspace 1214 * create/join lockspace
diff --git a/fs/gfs2/ops_fstype.c b/fs/gfs2/ops_fstype.c
index c5871ae40561..b8c250fc4922 100644
--- a/fs/gfs2/ops_fstype.c
+++ b/fs/gfs2/ops_fstype.c
@@ -993,6 +993,7 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent)
993 ls->ls_jid = option; 993 ls->ls_jid = option;
994 break; 994 break;
995 case Opt_id: 995 case Opt_id:
996 case Opt_nodir:
996 /* Obsolete, but left for backward compat purposes */ 997 /* Obsolete, but left for backward compat purposes */
997 break; 998 break;
998 case Opt_first: 999 case Opt_first:
@@ -1001,12 +1002,6 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent)
1001 goto hostdata_error; 1002 goto hostdata_error;
1002 ls->ls_first = option; 1003 ls->ls_first = option;
1003 break; 1004 break;
1004 case Opt_nodir:
1005 ret = match_int(&tmp[0], &option);
1006 if (ret || (option != 0 && option != 1))
1007 goto hostdata_error;
1008 ls->ls_nodir = option;
1009 break;
1010 case Opt_err: 1005 case Opt_err:
1011 default: 1006 default:
1012hostdata_error: 1007hostdata_error:
diff --git a/fs/gfs2/sys.c b/fs/gfs2/sys.c
index d33172c291ba..9c2592b1d5ff 100644
--- a/fs/gfs2/sys.c
+++ b/fs/gfs2/sys.c
@@ -368,10 +368,7 @@ int gfs2_recover_set(struct gfs2_sbd *sdp, unsigned jid)
368 struct gfs2_jdesc *jd; 368 struct gfs2_jdesc *jd;
369 int rv; 369 int rv;
370 370
371 rv = -ESHUTDOWN;
372 spin_lock(&sdp->sd_jindex_spin); 371 spin_lock(&sdp->sd_jindex_spin);
373 if (test_bit(SDF_NORECOVERY, &sdp->sd_flags))
374 goto out;
375 rv = -EBUSY; 372 rv = -EBUSY;
376 if (sdp->sd_jdesc->jd_jid == jid) 373 if (sdp->sd_jdesc->jd_jid == jid)
377 goto out; 374 goto out;
@@ -396,8 +393,13 @@ static ssize_t recover_store(struct gfs2_sbd *sdp, const char *buf, size_t len)
396 if (rv != 1) 393 if (rv != 1)
397 return -EINVAL; 394 return -EINVAL;
398 395
399 rv = gfs2_recover_set(sdp, jid); 396 if (test_bit(SDF_NORECOVERY, &sdp->sd_flags)) {
397 rv = -ESHUTDOWN;
398 goto out;
399 }
400 400
401 rv = gfs2_recover_set(sdp, jid);
402out:
401 return rv ? rv : len; 403 return rv ? rv : len;
402} 404}
403 405
diff --git a/include/linux/dlm.h b/include/linux/dlm.h
index 6c7f6e9546c7..520152411cd1 100644
--- a/include/linux/dlm.h
+++ b/include/linux/dlm.h
@@ -67,7 +67,6 @@ struct dlm_lksb {
67 67
68/* dlm_new_lockspace() flags */ 68/* dlm_new_lockspace() flags */
69 69
70#define DLM_LSFL_NODIR 0x00000001
71#define DLM_LSFL_TIMEWARN 0x00000002 70#define DLM_LSFL_TIMEWARN 0x00000002
72#define DLM_LSFL_FS 0x00000004 71#define DLM_LSFL_FS 0x00000004
73#define DLM_LSFL_NEWEXCL 0x00000008 72#define DLM_LSFL_NEWEXCL 0x00000008