aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/recover.c
diff options
context:
space:
mode:
authorJonathan Herman <hermanjl@cs.unc.edu>2013-01-17 16:15:55 -0500
committerJonathan Herman <hermanjl@cs.unc.edu>2013-01-17 16:15:55 -0500
commit8dea78da5cee153b8af9c07a2745f6c55057fe12 (patch)
treea8f4d49d63b1ecc92f2fddceba0655b2472c5bd9 /fs/dlm/recover.c
parent406089d01562f1e2bf9f089fd7637009ebaad589 (diff)
Patched in Tegra support.
Diffstat (limited to 'fs/dlm/recover.c')
-rw-r--r--fs/dlm/recover.c452
1 files changed, 141 insertions, 311 deletions
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index aedea28a86a..14638235f7b 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -36,23 +36,30 @@
36 * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes). When another 36 * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes). When another
37 * function thinks it could have completed the waited-on task, they should wake 37 * function thinks it could have completed the waited-on task, they should wake
38 * up ls_wait_general to get an immediate response rather than waiting for the 38 * up ls_wait_general to get an immediate response rather than waiting for the
39 * timeout. This uses a timeout so it can check periodically if the wait 39 * timer to detect the result. A timer wakes us up periodically while waiting
40 * should abort due to node failure (which doesn't cause a wake_up). 40 * to see if we should abort due to a node failure. This should only be called
41 * This should only be called by the dlm_recoverd thread. 41 * by the dlm_recoverd thread.
42 */ 42 */
43 43
44static void dlm_wait_timer_fn(unsigned long data)
45{
46 struct dlm_ls *ls = (struct dlm_ls *) data;
47 mod_timer(&ls->ls_timer, jiffies + (dlm_config.ci_recover_timer * HZ));
48 wake_up(&ls->ls_wait_general);
49}
50
44int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls)) 51int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
45{ 52{
46 int error = 0; 53 int error = 0;
47 int rv;
48 54
49 while (1) { 55 init_timer(&ls->ls_timer);
50 rv = wait_event_timeout(ls->ls_wait_general, 56 ls->ls_timer.function = dlm_wait_timer_fn;
51 testfn(ls) || dlm_recovery_stopped(ls), 57 ls->ls_timer.data = (long) ls;
52 dlm_config.ci_recover_timer * HZ); 58 ls->ls_timer.expires = jiffies + (dlm_config.ci_recover_timer * HZ);
53 if (rv) 59 add_timer(&ls->ls_timer);
54 break; 60
55 } 61 wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls));
62 del_timer_sync(&ls->ls_timer);
56 63
57 if (dlm_recovery_stopped(ls)) { 64 if (dlm_recovery_stopped(ls)) {
58 log_debug(ls, "dlm_wait_function aborted"); 65 log_debug(ls, "dlm_wait_function aborted");
@@ -78,20 +85,14 @@ uint32_t dlm_recover_status(struct dlm_ls *ls)
78 return status; 85 return status;
79} 86}
80 87
81static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
82{
83 ls->ls_recover_status |= status;
84}
85
86void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status) 88void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
87{ 89{
88 spin_lock(&ls->ls_recover_lock); 90 spin_lock(&ls->ls_recover_lock);
89 _set_recover_status(ls, status); 91 ls->ls_recover_status |= status;
90 spin_unlock(&ls->ls_recover_lock); 92 spin_unlock(&ls->ls_recover_lock);
91} 93}
92 94
93static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, 95static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
94 int save_slots)
95{ 96{
96 struct dlm_rcom *rc = ls->ls_recover_buf; 97 struct dlm_rcom *rc = ls->ls_recover_buf;
97 struct dlm_member *memb; 98 struct dlm_member *memb;
@@ -105,13 +106,10 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
105 goto out; 106 goto out;
106 } 107 }
107 108
108 error = dlm_rcom_status(ls, memb->nodeid, 0); 109 error = dlm_rcom_status(ls, memb->nodeid);
109 if (error) 110 if (error)
110 goto out; 111 goto out;
111 112
112 if (save_slots)
113 dlm_slot_save(ls, rc, memb);
114
115 if (rc->rc_result & wait_status) 113 if (rc->rc_result & wait_status)
116 break; 114 break;
117 if (delay < 1000) 115 if (delay < 1000)
@@ -123,8 +121,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
123 return error; 121 return error;
124} 122}
125 123
126static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status, 124static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status)
127 uint32_t status_flags)
128{ 125{
129 struct dlm_rcom *rc = ls->ls_recover_buf; 126 struct dlm_rcom *rc = ls->ls_recover_buf;
130 int error = 0, delay = 0, nodeid = ls->ls_low_nodeid; 127 int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
@@ -135,7 +132,7 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
135 goto out; 132 goto out;
136 } 133 }
137 134
138 error = dlm_rcom_status(ls, nodeid, status_flags); 135 error = dlm_rcom_status(ls, nodeid);
139 if (error) 136 if (error)
140 break; 137 break;
141 138
@@ -155,56 +152,18 @@ static int wait_status(struct dlm_ls *ls, uint32_t status)
155 int error; 152 int error;
156 153
157 if (ls->ls_low_nodeid == dlm_our_nodeid()) { 154 if (ls->ls_low_nodeid == dlm_our_nodeid()) {
158 error = wait_status_all(ls, status, 0); 155 error = wait_status_all(ls, status);
159 if (!error) 156 if (!error)
160 dlm_set_recover_status(ls, status_all); 157 dlm_set_recover_status(ls, status_all);
161 } else 158 } else
162 error = wait_status_low(ls, status_all, 0); 159 error = wait_status_low(ls, status_all);
163 160
164 return error; 161 return error;
165} 162}
166 163
167int dlm_recover_members_wait(struct dlm_ls *ls) 164int dlm_recover_members_wait(struct dlm_ls *ls)
168{ 165{
169 struct dlm_member *memb; 166 return wait_status(ls, DLM_RS_NODES);
170 struct dlm_slot *slots;
171 int num_slots, slots_size;
172 int error, rv;
173 uint32_t gen;
174
175 list_for_each_entry(memb, &ls->ls_nodes, list) {
176 memb->slot = -1;
177 memb->generation = 0;
178 }
179
180 if (ls->ls_low_nodeid == dlm_our_nodeid()) {
181 error = wait_status_all(ls, DLM_RS_NODES, 1);
182 if (error)
183 goto out;
184
185 /* slots array is sparse, slots_size may be > num_slots */
186
187 rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
188 if (!rv) {
189 spin_lock(&ls->ls_recover_lock);
190 _set_recover_status(ls, DLM_RS_NODES_ALL);
191 ls->ls_num_slots = num_slots;
192 ls->ls_slots_size = slots_size;
193 ls->ls_slots = slots;
194 ls->ls_generation = gen;
195 spin_unlock(&ls->ls_recover_lock);
196 } else {
197 dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
198 }
199 } else {
200 error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
201 if (error)
202 goto out;
203
204 dlm_slots_copy_in(ls);
205 }
206 out:
207 return error;
208} 167}
209 168
210int dlm_recover_directory_wait(struct dlm_ls *ls) 169int dlm_recover_directory_wait(struct dlm_ls *ls)
@@ -270,6 +229,22 @@ static void recover_list_del(struct dlm_rsb *r)
270 dlm_put_rsb(r); 229 dlm_put_rsb(r);
271} 230}
272 231
232static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id)
233{
234 struct dlm_rsb *r = NULL;
235
236 spin_lock(&ls->ls_recover_list_lock);
237
238 list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) {
239 if (id == (unsigned long) r)
240 goto out;
241 }
242 r = NULL;
243 out:
244 spin_unlock(&ls->ls_recover_list_lock);
245 return r;
246}
247
273static void recover_list_clear(struct dlm_ls *ls) 248static void recover_list_clear(struct dlm_ls *ls)
274{ 249{
275 struct dlm_rsb *r, *s; 250 struct dlm_rsb *r, *s;
@@ -290,94 +265,6 @@ static void recover_list_clear(struct dlm_ls *ls)
290 spin_unlock(&ls->ls_recover_list_lock); 265 spin_unlock(&ls->ls_recover_list_lock);
291} 266}
292 267
293static int recover_idr_empty(struct dlm_ls *ls)
294{
295 int empty = 1;
296
297 spin_lock(&ls->ls_recover_idr_lock);
298 if (ls->ls_recover_list_count)
299 empty = 0;
300 spin_unlock(&ls->ls_recover_idr_lock);
301
302 return empty;
303}
304
305static int recover_idr_add(struct dlm_rsb *r)
306{
307 struct dlm_ls *ls = r->res_ls;
308 int rv, id;
309
310 rv = idr_pre_get(&ls->ls_recover_idr, GFP_NOFS);
311 if (!rv)
312 return -ENOMEM;
313
314 spin_lock(&ls->ls_recover_idr_lock);
315 if (r->res_id) {
316 spin_unlock(&ls->ls_recover_idr_lock);
317 return -1;
318 }
319 rv = idr_get_new_above(&ls->ls_recover_idr, r, 1, &id);
320 if (rv) {
321 spin_unlock(&ls->ls_recover_idr_lock);
322 return rv;
323 }
324 r->res_id = id;
325 ls->ls_recover_list_count++;
326 dlm_hold_rsb(r);
327 spin_unlock(&ls->ls_recover_idr_lock);
328 return 0;
329}
330
331static void recover_idr_del(struct dlm_rsb *r)
332{
333 struct dlm_ls *ls = r->res_ls;
334
335 spin_lock(&ls->ls_recover_idr_lock);
336 idr_remove(&ls->ls_recover_idr, r->res_id);
337 r->res_id = 0;
338 ls->ls_recover_list_count--;
339 spin_unlock(&ls->ls_recover_idr_lock);
340
341 dlm_put_rsb(r);
342}
343
344static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
345{
346 struct dlm_rsb *r;
347
348 spin_lock(&ls->ls_recover_idr_lock);
349 r = idr_find(&ls->ls_recover_idr, (int)id);
350 spin_unlock(&ls->ls_recover_idr_lock);
351 return r;
352}
353
354static int recover_idr_clear_rsb(int id, void *p, void *data)
355{
356 struct dlm_ls *ls = data;
357 struct dlm_rsb *r = p;
358
359 r->res_id = 0;
360 r->res_recover_locks_count = 0;
361 ls->ls_recover_list_count--;
362
363 dlm_put_rsb(r);
364 return 0;
365}
366
367static void recover_idr_clear(struct dlm_ls *ls)
368{
369 spin_lock(&ls->ls_recover_idr_lock);
370 idr_for_each(&ls->ls_recover_idr, recover_idr_clear_rsb, ls);
371 idr_remove_all(&ls->ls_recover_idr);
372
373 if (ls->ls_recover_list_count != 0) {
374 log_error(ls, "warning: recover_list_count %d",
375 ls->ls_recover_list_count);
376 ls->ls_recover_list_count = 0;
377 }
378 spin_unlock(&ls->ls_recover_idr_lock);
379}
380
381 268
382/* Master recovery: find new master node for rsb's that were 269/* Master recovery: find new master node for rsb's that were
383 mastered on nodes that have been removed. 270 mastered on nodes that have been removed.
@@ -404,12 +291,9 @@ static void set_lock_master(struct list_head *queue, int nodeid)
404{ 291{
405 struct dlm_lkb *lkb; 292 struct dlm_lkb *lkb;
406 293
407 list_for_each_entry(lkb, queue, lkb_statequeue) { 294 list_for_each_entry(lkb, queue, lkb_statequeue)
408 if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) { 295 if (!(lkb->lkb_flags & DLM_IFL_MSTCPY))
409 lkb->lkb_nodeid = nodeid; 296 lkb->lkb_nodeid = nodeid;
410 lkb->lkb_remid = 0;
411 }
412 }
413} 297}
414 298
415static void set_master_lkbs(struct dlm_rsb *r) 299static void set_master_lkbs(struct dlm_rsb *r)
@@ -422,93 +306,67 @@ static void set_master_lkbs(struct dlm_rsb *r)
422/* 306/*
423 * Propagate the new master nodeid to locks 307 * Propagate the new master nodeid to locks
424 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider. 308 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
425 * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which 309 * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which
426 * rsb's to consider. 310 * rsb's to consider.
427 */ 311 */
428 312
429static void set_new_master(struct dlm_rsb *r) 313static void set_new_master(struct dlm_rsb *r, int nodeid)
430{ 314{
315 lock_rsb(r);
316 r->res_nodeid = nodeid;
431 set_master_lkbs(r); 317 set_master_lkbs(r);
432 rsb_set_flag(r, RSB_NEW_MASTER); 318 rsb_set_flag(r, RSB_NEW_MASTER);
433 rsb_set_flag(r, RSB_NEW_MASTER2); 319 rsb_set_flag(r, RSB_NEW_MASTER2);
320 unlock_rsb(r);
434} 321}
435 322
436/* 323/*
437 * We do async lookups on rsb's that need new masters. The rsb's 324 * We do async lookups on rsb's that need new masters. The rsb's
438 * waiting for a lookup reply are kept on the recover_list. 325 * waiting for a lookup reply are kept on the recover_list.
439 *
440 * Another node recovering the master may have sent us a rcom lookup,
441 * and our dlm_master_lookup() set it as the new master, along with
442 * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
443 * equals our_nodeid below).
444 */ 326 */
445 327
446static int recover_master(struct dlm_rsb *r, unsigned int *count) 328static int recover_master(struct dlm_rsb *r)
447{ 329{
448 struct dlm_ls *ls = r->res_ls; 330 struct dlm_ls *ls = r->res_ls;
449 int our_nodeid, dir_nodeid; 331 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
450 int is_removed = 0;
451 int error;
452
453 if (is_master(r))
454 return 0;
455
456 is_removed = dlm_is_removed(ls, r->res_nodeid);
457 332
458 if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
459 return 0;
460
461 our_nodeid = dlm_our_nodeid();
462 dir_nodeid = dlm_dir_nodeid(r); 333 dir_nodeid = dlm_dir_nodeid(r);
463 334
464 if (dir_nodeid == our_nodeid) { 335 if (dir_nodeid == our_nodeid) {
465 if (is_removed) { 336 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
466 r->res_master_nodeid = our_nodeid; 337 r->res_length, &ret_nodeid);
467 r->res_nodeid = 0; 338 if (error)
468 } 339 log_error(ls, "recover dir lookup error %d", error);
469 340
470 /* set master of lkbs to ourself when is_removed, or to 341 if (ret_nodeid == our_nodeid)
471 another new master which we set along with NEW_MASTER 342 ret_nodeid = 0;
472 in dlm_master_lookup */ 343 set_new_master(r, ret_nodeid);
473 set_new_master(r);
474 error = 0;
475 } else { 344 } else {
476 recover_idr_add(r); 345 recover_list_add(r);
477 error = dlm_send_rcom_lookup(r, dir_nodeid); 346 error = dlm_send_rcom_lookup(r, dir_nodeid);
478 } 347 }
479 348
480 (*count)++;
481 return error; 349 return error;
482} 350}
483 351
484/* 352/*
485 * All MSTCPY locks are purged and rebuilt, even if the master stayed the same. 353 * When not using a directory, most resource names will hash to a new static
486 * This is necessary because recovery can be started, aborted and restarted, 354 * master nodeid and the resource will need to be remastered.
487 * causing the master nodeid to briefly change during the aborted recovery, and
488 * change back to the original value in the second recovery. The MSTCPY locks
489 * may or may not have been purged during the aborted recovery. Another node
490 * with an outstanding request in waiters list and a request reply saved in the
491 * requestqueue, cannot know whether it should ignore the reply and resend the
492 * request, or accept the reply and complete the request. It must do the
493 * former if the remote node purged MSTCPY locks, and it must do the later if
494 * the remote node did not. This is solved by always purging MSTCPY locks, in
495 * which case, the request reply would always be ignored and the request
496 * resent.
497 */ 355 */
498 356
499static int recover_master_static(struct dlm_rsb *r, unsigned int *count) 357static int recover_master_static(struct dlm_rsb *r)
500{ 358{
501 int dir_nodeid = dlm_dir_nodeid(r); 359 int master = dlm_dir_nodeid(r);
502 int new_master = dir_nodeid;
503 360
504 if (dir_nodeid == dlm_our_nodeid()) 361 if (master == dlm_our_nodeid())
505 new_master = 0; 362 master = 0;
506 363
507 dlm_purge_mstcpy_locks(r); 364 if (r->res_nodeid != master) {
508 r->res_master_nodeid = dir_nodeid; 365 if (is_master(r))
509 r->res_nodeid = new_master; 366 dlm_purge_mstcpy_locks(r);
510 set_new_master(r); 367 set_new_master(r, master);
511 (*count)++; 368 return 1;
369 }
512 return 0; 370 return 0;
513} 371}
514 372
@@ -525,10 +383,7 @@ static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
525int dlm_recover_masters(struct dlm_ls *ls) 383int dlm_recover_masters(struct dlm_ls *ls)
526{ 384{
527 struct dlm_rsb *r; 385 struct dlm_rsb *r;
528 unsigned int total = 0; 386 int error = 0, count = 0;
529 unsigned int count = 0;
530 int nodir = dlm_no_directory(ls);
531 int error;
532 387
533 log_debug(ls, "dlm_recover_masters"); 388 log_debug(ls, "dlm_recover_masters");
534 389
@@ -540,58 +395,48 @@ int dlm_recover_masters(struct dlm_ls *ls)
540 goto out; 395 goto out;
541 } 396 }
542 397
543 lock_rsb(r); 398 if (dlm_no_directory(ls))
544 if (nodir) 399 count += recover_master_static(r);
545 error = recover_master_static(r, &count); 400 else if (!is_master(r) &&
546 else 401 (dlm_is_removed(ls, r->res_nodeid) ||
547 error = recover_master(r, &count); 402 rsb_flag(r, RSB_NEW_MASTER))) {
548 unlock_rsb(r); 403 recover_master(r);
549 cond_resched(); 404 count++;
550 total++;
551
552 if (error) {
553 up_read(&ls->ls_root_sem);
554 goto out;
555 } 405 }
406
407 schedule();
556 } 408 }
557 up_read(&ls->ls_root_sem); 409 up_read(&ls->ls_root_sem);
558 410
559 log_debug(ls, "dlm_recover_masters %u of %u", count, total); 411 log_debug(ls, "dlm_recover_masters %d resources", count);
560 412
561 error = dlm_wait_function(ls, &recover_idr_empty); 413 error = dlm_wait_function(ls, &recover_list_empty);
562 out: 414 out:
563 if (error) 415 if (error)
564 recover_idr_clear(ls); 416 recover_list_clear(ls);
565 return error; 417 return error;
566} 418}
567 419
568int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) 420int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
569{ 421{
570 struct dlm_rsb *r; 422 struct dlm_rsb *r;
571 int ret_nodeid, new_master; 423 int nodeid;
572 424
573 r = recover_idr_find(ls, rc->rc_id); 425 r = recover_list_find(ls, rc->rc_id);
574 if (!r) { 426 if (!r) {
575 log_error(ls, "dlm_recover_master_reply no id %llx", 427 log_error(ls, "dlm_recover_master_reply no id %llx",
576 (unsigned long long)rc->rc_id); 428 (unsigned long long)rc->rc_id);
577 goto out; 429 goto out;
578 } 430 }
579 431
580 ret_nodeid = rc->rc_result; 432 nodeid = rc->rc_result;
581 433 if (nodeid == dlm_our_nodeid())
582 if (ret_nodeid == dlm_our_nodeid()) 434 nodeid = 0;
583 new_master = 0;
584 else
585 new_master = ret_nodeid;
586 435
587 lock_rsb(r); 436 set_new_master(r, nodeid);
588 r->res_master_nodeid = ret_nodeid; 437 recover_list_del(r);
589 r->res_nodeid = new_master;
590 set_new_master(r);
591 unlock_rsb(r);
592 recover_idr_del(r);
593 438
594 if (recover_idr_empty(ls)) 439 if (recover_list_empty(ls))
595 wake_up(&ls->ls_wait_general); 440 wake_up(&ls->ls_wait_general);
596 out: 441 out:
597 return 0; 442 return 0;
@@ -663,6 +508,8 @@ int dlm_recover_locks(struct dlm_ls *ls)
663 struct dlm_rsb *r; 508 struct dlm_rsb *r;
664 int error, count = 0; 509 int error, count = 0;
665 510
511 log_debug(ls, "dlm_recover_locks");
512
666 down_read(&ls->ls_root_sem); 513 down_read(&ls->ls_root_sem);
667 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 514 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
668 if (is_master(r)) { 515 if (is_master(r)) {
@@ -689,12 +536,14 @@ int dlm_recover_locks(struct dlm_ls *ls)
689 } 536 }
690 up_read(&ls->ls_root_sem); 537 up_read(&ls->ls_root_sem);
691 538
692 log_debug(ls, "dlm_recover_locks %d out", count); 539 log_debug(ls, "dlm_recover_locks %d locks", count);
693 540
694 error = dlm_wait_function(ls, &recover_list_empty); 541 error = dlm_wait_function(ls, &recover_list_empty);
695 out: 542 out:
696 if (error) 543 if (error)
697 recover_list_clear(ls); 544 recover_list_clear(ls);
545 else
546 dlm_set_recover_status(ls, DLM_RS_LOCKS);
698 return error; 547 return error;
699} 548}
700 549
@@ -717,14 +566,8 @@ void dlm_recovered_lock(struct dlm_rsb *r)
717 * the VALNOTVALID flag if necessary, and determining the correct lvb contents 566 * the VALNOTVALID flag if necessary, and determining the correct lvb contents
718 * based on the lvb's of the locks held on the rsb. 567 * based on the lvb's of the locks held on the rsb.
719 * 568 *
720 * RSB_VALNOTVALID is set in two cases: 569 * RSB_VALNOTVALID is set if there are only NL/CR locks on the rsb. If it
721 * 570 * was already set prior to recovery, it's not cleared, regardless of locks.
722 * 1. we are master, but not new, and we purged an EX/PW lock held by a
723 * failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL)
724 *
725 * 2. we are a new master, and there are only NL/CR locks left.
726 * (We could probably improve this by only invaliding in this way when
727 * the previous master left uncleanly. VMS docs mention that.)
728 * 571 *
729 * The LVB contents are only considered for changing when this is a new master 572 * The LVB contents are only considered for changing when this is a new master
730 * of the rsb (NEW_MASTER2). Then, the rsb's lvb is taken from any lkb with 573 * of the rsb (NEW_MASTER2). Then, the rsb's lvb is taken from any lkb with
@@ -740,19 +583,6 @@ static void recover_lvb(struct dlm_rsb *r)
740 int big_lock_exists = 0; 583 int big_lock_exists = 0;
741 int lvblen = r->res_ls->ls_lvblen; 584 int lvblen = r->res_ls->ls_lvblen;
742 585
743 if (!rsb_flag(r, RSB_NEW_MASTER2) &&
744 rsb_flag(r, RSB_RECOVER_LVB_INVAL)) {
745 /* case 1 above */
746 rsb_set_flag(r, RSB_VALNOTVALID);
747 return;
748 }
749
750 if (!rsb_flag(r, RSB_NEW_MASTER2))
751 return;
752
753 /* we are the new master, so figure out if VALNOTVALID should
754 be set, and set the rsb lvb from the best lkb available. */
755
756 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { 586 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
757 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 587 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
758 continue; 588 continue;
@@ -791,10 +621,13 @@ static void recover_lvb(struct dlm_rsb *r)
791 if (!lock_lvb_exists) 621 if (!lock_lvb_exists)
792 goto out; 622 goto out;
793 623
794 /* lvb is invalidated if only NL/CR locks remain */
795 if (!big_lock_exists) 624 if (!big_lock_exists)
796 rsb_set_flag(r, RSB_VALNOTVALID); 625 rsb_set_flag(r, RSB_VALNOTVALID);
797 626
627 /* don't mess with the lvb unless we're the new master */
628 if (!rsb_flag(r, RSB_NEW_MASTER2))
629 goto out;
630
798 if (!r->res_lvbptr) { 631 if (!r->res_lvbptr) {
799 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 632 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
800 if (!r->res_lvbptr) 633 if (!r->res_lvbptr)
@@ -820,7 +653,6 @@ static void recover_lvb(struct dlm_rsb *r)
820 653
821static void recover_conversion(struct dlm_rsb *r) 654static void recover_conversion(struct dlm_rsb *r)
822{ 655{
823 struct dlm_ls *ls = r->res_ls;
824 struct dlm_lkb *lkb; 656 struct dlm_lkb *lkb;
825 int grmode = -1; 657 int grmode = -1;
826 658
@@ -835,32 +667,29 @@ static void recover_conversion(struct dlm_rsb *r)
835 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { 667 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
836 if (lkb->lkb_grmode != DLM_LOCK_IV) 668 if (lkb->lkb_grmode != DLM_LOCK_IV)
837 continue; 669 continue;
838 if (grmode == -1) { 670 if (grmode == -1)
839 log_debug(ls, "recover_conversion %x set gr to rq %d",
840 lkb->lkb_id, lkb->lkb_rqmode);
841 lkb->lkb_grmode = lkb->lkb_rqmode; 671 lkb->lkb_grmode = lkb->lkb_rqmode;
842 } else { 672 else
843 log_debug(ls, "recover_conversion %x set gr %d",
844 lkb->lkb_id, grmode);
845 lkb->lkb_grmode = grmode; 673 lkb->lkb_grmode = grmode;
846 }
847 } 674 }
848} 675}
849 676
850/* We've become the new master for this rsb and waiting/converting locks may 677/* We've become the new master for this rsb and waiting/converting locks may
851 need to be granted in dlm_recover_grant() due to locks that may have 678 need to be granted in dlm_grant_after_purge() due to locks that may have
852 existed from a removed node. */ 679 existed from a removed node. */
853 680
854static void recover_grant(struct dlm_rsb *r) 681static void set_locks_purged(struct dlm_rsb *r)
855{ 682{
856 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) 683 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
857 rsb_set_flag(r, RSB_RECOVER_GRANT); 684 rsb_set_flag(r, RSB_LOCKS_PURGED);
858} 685}
859 686
860void dlm_recover_rsbs(struct dlm_ls *ls) 687void dlm_recover_rsbs(struct dlm_ls *ls)
861{ 688{
862 struct dlm_rsb *r; 689 struct dlm_rsb *r;
863 unsigned int count = 0; 690 int count = 0;
691
692 log_debug(ls, "dlm_recover_rsbs");
864 693
865 down_read(&ls->ls_root_sem); 694 down_read(&ls->ls_root_sem);
866 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 695 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
@@ -868,33 +697,24 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
868 if (is_master(r)) { 697 if (is_master(r)) {
869 if (rsb_flag(r, RSB_RECOVER_CONVERT)) 698 if (rsb_flag(r, RSB_RECOVER_CONVERT))
870 recover_conversion(r); 699 recover_conversion(r);
871
872 /* recover lvb before granting locks so the updated
873 lvb/VALNOTVALID is presented in the completion */
874 recover_lvb(r);
875
876 if (rsb_flag(r, RSB_NEW_MASTER2)) 700 if (rsb_flag(r, RSB_NEW_MASTER2))
877 recover_grant(r); 701 set_locks_purged(r);
702 recover_lvb(r);
878 count++; 703 count++;
879 } else {
880 rsb_clear_flag(r, RSB_VALNOTVALID);
881 } 704 }
882 rsb_clear_flag(r, RSB_RECOVER_CONVERT); 705 rsb_clear_flag(r, RSB_RECOVER_CONVERT);
883 rsb_clear_flag(r, RSB_RECOVER_LVB_INVAL);
884 rsb_clear_flag(r, RSB_NEW_MASTER2); 706 rsb_clear_flag(r, RSB_NEW_MASTER2);
885 unlock_rsb(r); 707 unlock_rsb(r);
886 } 708 }
887 up_read(&ls->ls_root_sem); 709 up_read(&ls->ls_root_sem);
888 710
889 if (count) 711 log_debug(ls, "dlm_recover_rsbs %d rsbs", count);
890 log_debug(ls, "dlm_recover_rsbs %d done", count);
891} 712}
892 713
893/* Create a single list of all root rsb's to be used during recovery */ 714/* Create a single list of all root rsb's to be used during recovery */
894 715
895int dlm_create_root_list(struct dlm_ls *ls) 716int dlm_create_root_list(struct dlm_ls *ls)
896{ 717{
897 struct rb_node *n;
898 struct dlm_rsb *r; 718 struct dlm_rsb *r;
899 int i, error = 0; 719 int i, error = 0;
900 720
@@ -907,14 +727,24 @@ int dlm_create_root_list(struct dlm_ls *ls)
907 727
908 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 728 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
909 spin_lock(&ls->ls_rsbtbl[i].lock); 729 spin_lock(&ls->ls_rsbtbl[i].lock);
910 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { 730 list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
911 r = rb_entry(n, struct dlm_rsb, res_hashnode);
912 list_add(&r->res_root_list, &ls->ls_root_list); 731 list_add(&r->res_root_list, &ls->ls_root_list);
913 dlm_hold_rsb(r); 732 dlm_hold_rsb(r);
914 } 733 }
915 734
916 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss)) 735 /* If we're using a directory, add tossed rsbs to the root
917 log_error(ls, "dlm_create_root_list toss not empty"); 736 list; they'll have entries created in the new directory,
737 but no other recovery steps should do anything with them. */
738
739 if (dlm_no_directory(ls)) {
740 spin_unlock(&ls->ls_rsbtbl[i].lock);
741 continue;
742 }
743
744 list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) {
745 list_add(&r->res_root_list, &ls->ls_root_list);
746 dlm_hold_rsb(r);
747 }
918 spin_unlock(&ls->ls_rsbtbl[i].lock); 748 spin_unlock(&ls->ls_rsbtbl[i].lock);
919 } 749 }
920 out: 750 out:
@@ -934,26 +764,26 @@ void dlm_release_root_list(struct dlm_ls *ls)
934 up_write(&ls->ls_root_sem); 764 up_write(&ls->ls_root_sem);
935} 765}
936 766
937void dlm_clear_toss(struct dlm_ls *ls) 767/* If not using a directory, clear the entire toss list, there's no benefit to
768 caching the master value since it's fixed. If we are using a dir, keep the
769 rsb's we're the master of. Recovery will add them to the root list and from
770 there they'll be entered in the rebuilt directory. */
771
772void dlm_clear_toss_list(struct dlm_ls *ls)
938{ 773{
939 struct rb_node *n, *next; 774 struct dlm_rsb *r, *safe;
940 struct dlm_rsb *r;
941 unsigned int count = 0;
942 int i; 775 int i;
943 776
944 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 777 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
945 spin_lock(&ls->ls_rsbtbl[i].lock); 778 spin_lock(&ls->ls_rsbtbl[i].lock);
946 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) { 779 list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
947 next = rb_next(n); 780 res_hashchain) {
948 r = rb_entry(n, struct dlm_rsb, res_hashnode); 781 if (dlm_no_directory(ls) || !is_master(r)) {
949 rb_erase(n, &ls->ls_rsbtbl[i].toss); 782 list_del(&r->res_hashchain);
950 dlm_free_rsb(r); 783 dlm_free_rsb(r);
951 count++; 784 }
952 } 785 }
953 spin_unlock(&ls->ls_rsbtbl[i].lock); 786 spin_unlock(&ls->ls_rsbtbl[i].lock);
954 } 787 }
955
956 if (count)
957 log_debug(ls, "dlm_clear_toss %u done", count);
958} 788}
959 789