aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c286
1 files changed, 197 insertions, 89 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index f3ba70301a45..bdafb65a5234 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -161,10 +161,11 @@ static const int __quecvt_compat_matrix[8][8] = {
161void dlm_print_lkb(struct dlm_lkb *lkb) 161void dlm_print_lkb(struct dlm_lkb *lkb)
162{ 162{
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " 163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d\n", 164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, 166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid); 167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 (unsigned long long)lkb->lkb_recover_seq);
168} 169}
169 170
170static void dlm_print_rsb(struct dlm_rsb *r) 171static void dlm_print_rsb(struct dlm_rsb *r)
@@ -251,8 +252,6 @@ static inline int is_process_copy(struct dlm_lkb *lkb)
251 252
252static inline int is_master_copy(struct dlm_lkb *lkb) 253static inline int is_master_copy(struct dlm_lkb *lkb)
253{ 254{
254 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
255 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
256 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; 255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
257} 256}
258 257
@@ -1519,13 +1518,13 @@ static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1519 } 1518 }
1520 1519
1521 lkb->lkb_rqmode = DLM_LOCK_IV; 1520 lkb->lkb_rqmode = DLM_LOCK_IV;
1521 lkb->lkb_highbast = 0;
1522} 1522}
1523 1523
1524static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1524static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1525{ 1525{
1526 set_lvb_lock(r, lkb); 1526 set_lvb_lock(r, lkb);
1527 _grant_lock(r, lkb); 1527 _grant_lock(r, lkb);
1528 lkb->lkb_highbast = 0;
1529} 1528}
1530 1529
1531static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1530static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -1887,7 +1886,8 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1887/* Returns the highest requested mode of all blocked conversions; sets 1886/* Returns the highest requested mode of all blocked conversions; sets
1888 cw if there's a blocked conversion to DLM_LOCK_CW. */ 1887 cw if there's a blocked conversion to DLM_LOCK_CW. */
1889 1888
1890static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) 1889static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
1890 unsigned int *count)
1891{ 1891{
1892 struct dlm_lkb *lkb, *s; 1892 struct dlm_lkb *lkb, *s;
1893 int hi, demoted, quit, grant_restart, demote_restart; 1893 int hi, demoted, quit, grant_restart, demote_restart;
@@ -1906,6 +1906,8 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1906 if (can_be_granted(r, lkb, 0, &deadlk)) { 1906 if (can_be_granted(r, lkb, 0, &deadlk)) {
1907 grant_lock_pending(r, lkb); 1907 grant_lock_pending(r, lkb);
1908 grant_restart = 1; 1908 grant_restart = 1;
1909 if (count)
1910 (*count)++;
1909 continue; 1911 continue;
1910 } 1912 }
1911 1913
@@ -1939,14 +1941,17 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1939 return max_t(int, high, hi); 1941 return max_t(int, high, hi);
1940} 1942}
1941 1943
1942static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw) 1944static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
1945 unsigned int *count)
1943{ 1946{
1944 struct dlm_lkb *lkb, *s; 1947 struct dlm_lkb *lkb, *s;
1945 1948
1946 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 1949 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1947 if (can_be_granted(r, lkb, 0, NULL)) 1950 if (can_be_granted(r, lkb, 0, NULL)) {
1948 grant_lock_pending(r, lkb); 1951 grant_lock_pending(r, lkb);
1949 else { 1952 if (count)
1953 (*count)++;
1954 } else {
1950 high = max_t(int, lkb->lkb_rqmode, high); 1955 high = max_t(int, lkb->lkb_rqmode, high);
1951 if (lkb->lkb_rqmode == DLM_LOCK_CW) 1956 if (lkb->lkb_rqmode == DLM_LOCK_CW)
1952 *cw = 1; 1957 *cw = 1;
@@ -1975,16 +1980,20 @@ static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1975 return 0; 1980 return 0;
1976} 1981}
1977 1982
1978static void grant_pending_locks(struct dlm_rsb *r) 1983static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
1979{ 1984{
1980 struct dlm_lkb *lkb, *s; 1985 struct dlm_lkb *lkb, *s;
1981 int high = DLM_LOCK_IV; 1986 int high = DLM_LOCK_IV;
1982 int cw = 0; 1987 int cw = 0;
1983 1988
1984 DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); 1989 if (!is_master(r)) {
1990 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
1991 dlm_dump_rsb(r);
1992 return;
1993 }
1985 1994
1986 high = grant_pending_convert(r, high, &cw); 1995 high = grant_pending_convert(r, high, &cw, count);
1987 high = grant_pending_wait(r, high, &cw); 1996 high = grant_pending_wait(r, high, &cw, count);
1988 1997
1989 if (high == DLM_LOCK_IV) 1998 if (high == DLM_LOCK_IV)
1990 return; 1999 return;
@@ -2520,7 +2529,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2520 before we try again to grant this one. */ 2529 before we try again to grant this one. */
2521 2530
2522 if (is_demoted(lkb)) { 2531 if (is_demoted(lkb)) {
2523 grant_pending_convert(r, DLM_LOCK_IV, NULL); 2532 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
2524 if (_can_be_granted(r, lkb, 1)) { 2533 if (_can_be_granted(r, lkb, 1)) {
2525 grant_lock(r, lkb); 2534 grant_lock(r, lkb);
2526 queue_cast(r, lkb, 0); 2535 queue_cast(r, lkb, 0);
@@ -2548,7 +2557,7 @@ static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2548{ 2557{
2549 switch (error) { 2558 switch (error) {
2550 case 0: 2559 case 0:
2551 grant_pending_locks(r); 2560 grant_pending_locks(r, NULL);
2552 /* grant_pending_locks also sends basts */ 2561 /* grant_pending_locks also sends basts */
2553 break; 2562 break;
2554 case -EAGAIN: 2563 case -EAGAIN:
@@ -2571,7 +2580,7 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2571static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 2580static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2572 int error) 2581 int error)
2573{ 2582{
2574 grant_pending_locks(r); 2583 grant_pending_locks(r, NULL);
2575} 2584}
2576 2585
2577/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 2586/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
@@ -2592,7 +2601,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2592 int error) 2601 int error)
2593{ 2602{
2594 if (error) 2603 if (error)
2595 grant_pending_locks(r); 2604 grant_pending_locks(r, NULL);
2596} 2605}
2597 2606
2598/* 2607/*
@@ -3452,8 +3461,9 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3452 goto fail; 3461 goto fail;
3453 3462
3454 if (lkb->lkb_remid != ms->m_lkid) { 3463 if (lkb->lkb_remid != ms->m_lkid) {
3455 log_error(ls, "receive_convert %x remid %x remote %d %x", 3464 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3456 lkb->lkb_id, lkb->lkb_remid, 3465 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3466 (unsigned long long)lkb->lkb_recover_seq,
3457 ms->m_header.h_nodeid, ms->m_lkid); 3467 ms->m_header.h_nodeid, ms->m_lkid);
3458 error = -ENOENT; 3468 error = -ENOENT;
3459 goto fail; 3469 goto fail;
@@ -3631,6 +3641,7 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3631 goto out; 3641 goto out;
3632 3642
3633 queue_bast(r, lkb, ms->m_bastmode); 3643 queue_bast(r, lkb, ms->m_bastmode);
3644 lkb->lkb_highbast = ms->m_bastmode;
3634 out: 3645 out:
3635 unlock_rsb(r); 3646 unlock_rsb(r);
3636 put_rsb(r); 3647 put_rsb(r);
@@ -3710,8 +3721,13 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3710 3721
3711 mstype = lkb->lkb_wait_type; 3722 mstype = lkb->lkb_wait_type;
3712 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 3723 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3713 if (error) 3724 if (error) {
3725 log_error(ls, "receive_request_reply %x remote %d %x result %d",
3726 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
3727 ms->m_result);
3728 dlm_dump_rsb(r);
3714 goto out; 3729 goto out;
3730 }
3715 3731
3716 /* Optimization: the dir node was also the master, so it took our 3732 /* Optimization: the dir node was also the master, so it took our
3717 lookup as a request and sent request reply instead of lookup reply */ 3733 lookup as a request and sent request reply instead of lookup reply */
@@ -4122,21 +4138,28 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4122 * happen in normal usage for the async messages and cancel, so 4138 * happen in normal usage for the async messages and cancel, so
4123 * only use log_debug for them. 4139 * only use log_debug for them.
4124 * 4140 *
4125 * Other errors are expected and normal. 4141 * Some errors are expected and normal.
4126 */ 4142 */
4127 4143
4128 if (error == -ENOENT && noent) { 4144 if (error == -ENOENT && noent) {
4129 log_debug(ls, "receive %d no %x remote %d %x seq %u", 4145 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4130 ms->m_type, ms->m_remid, ms->m_header.h_nodeid, 4146 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4131 ms->m_lkid, saved_seq); 4147 ms->m_lkid, saved_seq);
4132 } else if (error == -ENOENT) { 4148 } else if (error == -ENOENT) {
4133 log_error(ls, "receive %d no %x remote %d %x seq %u", 4149 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4134 ms->m_type, ms->m_remid, ms->m_header.h_nodeid, 4150 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4135 ms->m_lkid, saved_seq); 4151 ms->m_lkid, saved_seq);
4136 4152
4137 if (ms->m_type == DLM_MSG_CONVERT) 4153 if (ms->m_type == DLM_MSG_CONVERT)
4138 dlm_dump_rsb_hash(ls, ms->m_hash); 4154 dlm_dump_rsb_hash(ls, ms->m_hash);
4139 } 4155 }
4156
4157 if (error == -EINVAL) {
4158 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4159 "saved_seq %u",
4160 ms->m_type, ms->m_header.h_nodeid,
4161 ms->m_lkid, ms->m_remid, saved_seq);
4162 }
4140} 4163}
4141 4164
4142/* If the lockspace is in recovery mode (locking stopped), then normal 4165/* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4200,9 +4223,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4200 4223
4201 ls = dlm_find_lockspace_global(hd->h_lockspace); 4224 ls = dlm_find_lockspace_global(hd->h_lockspace);
4202 if (!ls) { 4225 if (!ls) {
4203 if (dlm_config.ci_log_debug) 4226 if (dlm_config.ci_log_debug) {
4204 log_print("invalid lockspace %x from %d cmd %d type %d", 4227 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4205 hd->h_lockspace, nodeid, hd->h_cmd, type); 4228 "%u from %d cmd %d type %d\n",
4229 hd->h_lockspace, nodeid, hd->h_cmd, type);
4230 }
4206 4231
4207 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) 4232 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4208 dlm_send_ls_not_ready(nodeid, &p->rcom); 4233 dlm_send_ls_not_ready(nodeid, &p->rcom);
@@ -4253,16 +4278,10 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4253static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, 4278static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4254 int dir_nodeid) 4279 int dir_nodeid)
4255{ 4280{
4256 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) 4281 if (dlm_no_directory(ls))
4257 return 1;
4258
4259 if (!dlm_no_directory(ls))
4260 return 0;
4261
4262 if (dir_nodeid == dlm_our_nodeid())
4263 return 1; 4282 return 1;
4264 4283
4265 if (dir_nodeid != lkb->lkb_wait_nodeid) 4284 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4266 return 1; 4285 return 1;
4267 4286
4268 return 0; 4287 return 0;
@@ -4519,112 +4538,177 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
4519 return error; 4538 return error;
4520} 4539}
4521 4540
4522static void purge_queue(struct dlm_rsb *r, struct list_head *queue, 4541static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
4523 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb)) 4542 struct list_head *list)
4524{ 4543{
4525 struct dlm_ls *ls = r->res_ls;
4526 struct dlm_lkb *lkb, *safe; 4544 struct dlm_lkb *lkb, *safe;
4527 4545
4528 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) { 4546 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
4529 if (test(ls, lkb)) { 4547 if (!is_master_copy(lkb))
4530 rsb_set_flag(r, RSB_LOCKS_PURGED); 4548 continue;
4531 del_lkb(r, lkb); 4549
4532 /* this put should free the lkb */ 4550 /* don't purge lkbs we've added in recover_master_copy for
4533 if (!dlm_put_lkb(lkb)) 4551 the current recovery seq */
4534 log_error(ls, "purged lkb not released"); 4552
4535 } 4553 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
4554 continue;
4555
4556 del_lkb(r, lkb);
4557
4558 /* this put should free the lkb */
4559 if (!dlm_put_lkb(lkb))
4560 log_error(ls, "purged mstcpy lkb not released");
4536 } 4561 }
4537} 4562}
4538 4563
4539static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 4564void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4540{ 4565{
4541 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid)); 4566 struct dlm_ls *ls = r->res_ls;
4542}
4543 4567
4544static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 4568 purge_mstcpy_list(ls, r, &r->res_grantqueue);
4545{ 4569 purge_mstcpy_list(ls, r, &r->res_convertqueue);
4546 return is_master_copy(lkb); 4570 purge_mstcpy_list(ls, r, &r->res_waitqueue);
4547} 4571}
4548 4572
4549static void purge_dead_locks(struct dlm_rsb *r) 4573static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
4574 struct list_head *list,
4575 int nodeid_gone, unsigned int *count)
4550{ 4576{
4551 purge_queue(r, &r->res_grantqueue, &purge_dead_test); 4577 struct dlm_lkb *lkb, *safe;
4552 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4553 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4554}
4555 4578
4556void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 4579 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
4557{ 4580 if (!is_master_copy(lkb))
4558 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test); 4581 continue;
4559 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test); 4582
4560 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test); 4583 if ((lkb->lkb_nodeid == nodeid_gone) ||
4584 dlm_is_removed(ls, lkb->lkb_nodeid)) {
4585
4586 del_lkb(r, lkb);
4587
4588 /* this put should free the lkb */
4589 if (!dlm_put_lkb(lkb))
4590 log_error(ls, "purged dead lkb not released");
4591
4592 rsb_set_flag(r, RSB_RECOVER_GRANT);
4593
4594 (*count)++;
4595 }
4596 }
4561} 4597}
4562 4598
4563/* Get rid of locks held by nodes that are gone. */ 4599/* Get rid of locks held by nodes that are gone. */
4564 4600
4565int dlm_purge_locks(struct dlm_ls *ls) 4601void dlm_recover_purge(struct dlm_ls *ls)
4566{ 4602{
4567 struct dlm_rsb *r; 4603 struct dlm_rsb *r;
4604 struct dlm_member *memb;
4605 int nodes_count = 0;
4606 int nodeid_gone = 0;
4607 unsigned int lkb_count = 0;
4608
4609 /* cache one removed nodeid to optimize the common
4610 case of a single node removed */
4611
4612 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
4613 nodes_count++;
4614 nodeid_gone = memb->nodeid;
4615 }
4568 4616
4569 log_debug(ls, "dlm_purge_locks"); 4617 if (!nodes_count)
4618 return;
4570 4619
4571 down_write(&ls->ls_root_sem); 4620 down_write(&ls->ls_root_sem);
4572 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 4621 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4573 hold_rsb(r); 4622 hold_rsb(r);
4574 lock_rsb(r); 4623 lock_rsb(r);
4575 if (is_master(r)) 4624 if (is_master(r)) {
4576 purge_dead_locks(r); 4625 purge_dead_list(ls, r, &r->res_grantqueue,
4626 nodeid_gone, &lkb_count);
4627 purge_dead_list(ls, r, &r->res_convertqueue,
4628 nodeid_gone, &lkb_count);
4629 purge_dead_list(ls, r, &r->res_waitqueue,
4630 nodeid_gone, &lkb_count);
4631 }
4577 unlock_rsb(r); 4632 unlock_rsb(r);
4578 unhold_rsb(r); 4633 unhold_rsb(r);
4579 4634 cond_resched();
4580 schedule();
4581 } 4635 }
4582 up_write(&ls->ls_root_sem); 4636 up_write(&ls->ls_root_sem);
4583 4637
4584 return 0; 4638 if (lkb_count)
4639 log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
4640 lkb_count, nodes_count);
4585} 4641}
4586 4642
4587static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) 4643static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
4588{ 4644{
4589 struct rb_node *n; 4645 struct rb_node *n;
4590 struct dlm_rsb *r, *r_ret = NULL; 4646 struct dlm_rsb *r;
4591 4647
4592 spin_lock(&ls->ls_rsbtbl[bucket].lock); 4648 spin_lock(&ls->ls_rsbtbl[bucket].lock);
4593 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { 4649 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4594 r = rb_entry(n, struct dlm_rsb, res_hashnode); 4650 r = rb_entry(n, struct dlm_rsb, res_hashnode);
4595 if (!rsb_flag(r, RSB_LOCKS_PURGED)) 4651
4652 if (!rsb_flag(r, RSB_RECOVER_GRANT))
4653 continue;
4654 rsb_clear_flag(r, RSB_RECOVER_GRANT);
4655 if (!is_master(r))
4596 continue; 4656 continue;
4597 hold_rsb(r); 4657 hold_rsb(r);
4598 rsb_clear_flag(r, RSB_LOCKS_PURGED); 4658 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4599 r_ret = r; 4659 return r;
4600 break;
4601 } 4660 }
4602 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 4661 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4603 return r_ret; 4662 return NULL;
4604} 4663}
4605 4664
4606void dlm_grant_after_purge(struct dlm_ls *ls) 4665/*
4666 * Attempt to grant locks on resources that we are the master of.
4667 * Locks may have become grantable during recovery because locks
4668 * from departed nodes have been purged (or not rebuilt), allowing
4669 * previously blocked locks to now be granted. The subset of rsb's
4670 * we are interested in are those with lkb's on either the convert or
4671 * waiting queues.
4672 *
4673 * Simplest would be to go through each master rsb and check for non-empty
4674 * convert or waiting queues, and attempt to grant on those rsbs.
4675 * Checking the queues requires lock_rsb, though, for which we'd need
4676 * to release the rsbtbl lock. This would make iterating through all
4677 * rsb's very inefficient. So, we rely on earlier recovery routines
4678 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
4679 * locks for.
4680 */
4681
4682void dlm_recover_grant(struct dlm_ls *ls)
4607{ 4683{
4608 struct dlm_rsb *r; 4684 struct dlm_rsb *r;
4609 int bucket = 0; 4685 int bucket = 0;
4686 unsigned int count = 0;
4687 unsigned int rsb_count = 0;
4688 unsigned int lkb_count = 0;
4610 4689
4611 while (1) { 4690 while (1) {
4612 r = find_purged_rsb(ls, bucket); 4691 r = find_grant_rsb(ls, bucket);
4613 if (!r) { 4692 if (!r) {
4614 if (bucket == ls->ls_rsbtbl_size - 1) 4693 if (bucket == ls->ls_rsbtbl_size - 1)
4615 break; 4694 break;
4616 bucket++; 4695 bucket++;
4617 continue; 4696 continue;
4618 } 4697 }
4698 rsb_count++;
4699 count = 0;
4619 lock_rsb(r); 4700 lock_rsb(r);
4620 if (is_master(r)) { 4701 grant_pending_locks(r, &count);
4621 grant_pending_locks(r); 4702 lkb_count += count;
4622 confirm_master(r, 0); 4703 confirm_master(r, 0);
4623 }
4624 unlock_rsb(r); 4704 unlock_rsb(r);
4625 put_rsb(r); 4705 put_rsb(r);
4626 schedule(); 4706 cond_resched();
4627 } 4707 }
4708
4709 if (lkb_count)
4710 log_debug(ls, "dlm_recover_grant %u locks on %u resources",
4711 lkb_count, rsb_count);
4628} 4712}
4629 4713
4630static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 4714static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
@@ -4723,11 +4807,26 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4723 4807
4724 remid = le32_to_cpu(rl->rl_lkid); 4808 remid = le32_to_cpu(rl->rl_lkid);
4725 4809
4726 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 4810 /* In general we expect the rsb returned to be R_MASTER, but we don't
4727 R_MASTER, &r); 4811 have to require it. Recovery of masters on one node can overlap
4812 recovery of locks on another node, so one node can send us MSTCPY
4813 locks before we've made ourselves master of this rsb. We can still
4814 add new MSTCPY locks that we receive here without any harm; when
4815 we make ourselves master, dlm_recover_masters() won't touch the
4816 MSTCPY locks we've received early. */
4817
4818 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
4728 if (error) 4819 if (error)
4729 goto out; 4820 goto out;
4730 4821
4822 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
4823 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
4824 rc->rc_header.h_nodeid, remid);
4825 error = -EBADR;
4826 put_rsb(r);
4827 goto out;
4828 }
4829
4731 lock_rsb(r); 4830 lock_rsb(r);
4732 4831
4733 lkb = search_remid(r, rc->rc_header.h_nodeid, remid); 4832 lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
@@ -4749,12 +4848,18 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4749 attach_lkb(r, lkb); 4848 attach_lkb(r, lkb);
4750 add_lkb(r, lkb, rl->rl_status); 4849 add_lkb(r, lkb, rl->rl_status);
4751 error = 0; 4850 error = 0;
4851 ls->ls_recover_locks_in++;
4852
4853 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
4854 rsb_set_flag(r, RSB_RECOVER_GRANT);
4752 4855
4753 out_remid: 4856 out_remid:
4754 /* this is the new value returned to the lock holder for 4857 /* this is the new value returned to the lock holder for
4755 saving in its process-copy lkb */ 4858 saving in its process-copy lkb */
4756 rl->rl_remid = cpu_to_le32(lkb->lkb_id); 4859 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4757 4860
4861 lkb->lkb_recover_seq = ls->ls_recover_seq;
4862
4758 out_unlock: 4863 out_unlock:
4759 unlock_rsb(r); 4864 unlock_rsb(r);
4760 put_rsb(r); 4865 put_rsb(r);
@@ -4786,17 +4891,20 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4786 return error; 4891 return error;
4787 } 4892 }
4788 4893
4894 r = lkb->lkb_resource;
4895 hold_rsb(r);
4896 lock_rsb(r);
4897
4789 if (!is_process_copy(lkb)) { 4898 if (!is_process_copy(lkb)) {
4790 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", 4899 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
4791 lkid, rc->rc_header.h_nodeid, remid, result); 4900 lkid, rc->rc_header.h_nodeid, remid, result);
4792 dlm_print_lkb(lkb); 4901 dlm_dump_rsb(r);
4902 unlock_rsb(r);
4903 put_rsb(r);
4904 dlm_put_lkb(lkb);
4793 return -EINVAL; 4905 return -EINVAL;
4794 } 4906 }
4795 4907
4796 r = lkb->lkb_resource;
4797 hold_rsb(r);
4798 lock_rsb(r);
4799
4800 switch (result) { 4908 switch (result) {
4801 case -EBADR: 4909 case -EBADR:
4802 /* There's a chance the new master received our lock before 4910 /* There's a chance the new master received our lock before