aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/dlm/config.c7
-rw-r--r--fs/dlm/config.h1
-rw-r--r--fs/dlm/debug_fs.c103
-rw-r--r--fs/dlm/dir.c287
-rw-r--r--fs/dlm/dir.h7
-rw-r--r--fs/dlm/dlm_internal.h46
-rw-r--r--fs/dlm/lock.c1022
-rw-r--r--fs/dlm/lock.h5
-rw-r--r--fs/dlm/lockspace.c23
-rw-r--r--fs/dlm/rcom.c145
-rw-r--r--fs/dlm/rcom.h1
-rw-r--r--fs/dlm/recover.c140
-rw-r--r--fs/dlm/recover.h2
-rw-r--r--fs/dlm/recoverd.c14
14 files changed, 1215 insertions, 588 deletions
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index e7e327d43fa5..9ccf7346834a 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -96,7 +96,6 @@ struct dlm_cluster {
96 unsigned int cl_tcp_port; 96 unsigned int cl_tcp_port;
97 unsigned int cl_buffer_size; 97 unsigned int cl_buffer_size;
98 unsigned int cl_rsbtbl_size; 98 unsigned int cl_rsbtbl_size;
99 unsigned int cl_dirtbl_size;
100 unsigned int cl_recover_timer; 99 unsigned int cl_recover_timer;
101 unsigned int cl_toss_secs; 100 unsigned int cl_toss_secs;
102 unsigned int cl_scan_secs; 101 unsigned int cl_scan_secs;
@@ -113,7 +112,6 @@ enum {
113 CLUSTER_ATTR_TCP_PORT = 0, 112 CLUSTER_ATTR_TCP_PORT = 0,
114 CLUSTER_ATTR_BUFFER_SIZE, 113 CLUSTER_ATTR_BUFFER_SIZE,
115 CLUSTER_ATTR_RSBTBL_SIZE, 114 CLUSTER_ATTR_RSBTBL_SIZE,
116 CLUSTER_ATTR_DIRTBL_SIZE,
117 CLUSTER_ATTR_RECOVER_TIMER, 115 CLUSTER_ATTR_RECOVER_TIMER,
118 CLUSTER_ATTR_TOSS_SECS, 116 CLUSTER_ATTR_TOSS_SECS,
119 CLUSTER_ATTR_SCAN_SECS, 117 CLUSTER_ATTR_SCAN_SECS,
@@ -189,7 +187,6 @@ __CONFIGFS_ATTR(name, 0644, name##_read, name##_write)
189CLUSTER_ATTR(tcp_port, 1); 187CLUSTER_ATTR(tcp_port, 1);
190CLUSTER_ATTR(buffer_size, 1); 188CLUSTER_ATTR(buffer_size, 1);
191CLUSTER_ATTR(rsbtbl_size, 1); 189CLUSTER_ATTR(rsbtbl_size, 1);
192CLUSTER_ATTR(dirtbl_size, 1);
193CLUSTER_ATTR(recover_timer, 1); 190CLUSTER_ATTR(recover_timer, 1);
194CLUSTER_ATTR(toss_secs, 1); 191CLUSTER_ATTR(toss_secs, 1);
195CLUSTER_ATTR(scan_secs, 1); 192CLUSTER_ATTR(scan_secs, 1);
@@ -204,7 +201,6 @@ static struct configfs_attribute *cluster_attrs[] = {
204 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, 201 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
205 [CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr, 202 [CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr,
206 [CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr, 203 [CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr,
207 [CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr,
208 [CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr, 204 [CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr,
209 [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, 205 [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
210 [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr, 206 [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr,
@@ -478,7 +474,6 @@ static struct config_group *make_cluster(struct config_group *g,
478 cl->cl_tcp_port = dlm_config.ci_tcp_port; 474 cl->cl_tcp_port = dlm_config.ci_tcp_port;
479 cl->cl_buffer_size = dlm_config.ci_buffer_size; 475 cl->cl_buffer_size = dlm_config.ci_buffer_size;
480 cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size; 476 cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size;
481 cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size;
482 cl->cl_recover_timer = dlm_config.ci_recover_timer; 477 cl->cl_recover_timer = dlm_config.ci_recover_timer;
483 cl->cl_toss_secs = dlm_config.ci_toss_secs; 478 cl->cl_toss_secs = dlm_config.ci_toss_secs;
484 cl->cl_scan_secs = dlm_config.ci_scan_secs; 479 cl->cl_scan_secs = dlm_config.ci_scan_secs;
@@ -1050,7 +1045,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
1050#define DEFAULT_TCP_PORT 21064 1045#define DEFAULT_TCP_PORT 21064
1051#define DEFAULT_BUFFER_SIZE 4096 1046#define DEFAULT_BUFFER_SIZE 4096
1052#define DEFAULT_RSBTBL_SIZE 1024 1047#define DEFAULT_RSBTBL_SIZE 1024
1053#define DEFAULT_DIRTBL_SIZE 1024
1054#define DEFAULT_RECOVER_TIMER 5 1048#define DEFAULT_RECOVER_TIMER 5
1055#define DEFAULT_TOSS_SECS 10 1049#define DEFAULT_TOSS_SECS 10
1056#define DEFAULT_SCAN_SECS 5 1050#define DEFAULT_SCAN_SECS 5
@@ -1066,7 +1060,6 @@ struct dlm_config_info dlm_config = {
1066 .ci_tcp_port = DEFAULT_TCP_PORT, 1060 .ci_tcp_port = DEFAULT_TCP_PORT,
1067 .ci_buffer_size = DEFAULT_BUFFER_SIZE, 1061 .ci_buffer_size = DEFAULT_BUFFER_SIZE,
1068 .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE, 1062 .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE,
1069 .ci_dirtbl_size = DEFAULT_DIRTBL_SIZE,
1070 .ci_recover_timer = DEFAULT_RECOVER_TIMER, 1063 .ci_recover_timer = DEFAULT_RECOVER_TIMER,
1071 .ci_toss_secs = DEFAULT_TOSS_SECS, 1064 .ci_toss_secs = DEFAULT_TOSS_SECS,
1072 .ci_scan_secs = DEFAULT_SCAN_SECS, 1065 .ci_scan_secs = DEFAULT_SCAN_SECS,
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index 9f5e3663bb0c..dbd35a08f3a5 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -27,7 +27,6 @@ struct dlm_config_info {
27 int ci_tcp_port; 27 int ci_tcp_port;
28 int ci_buffer_size; 28 int ci_buffer_size;
29 int ci_rsbtbl_size; 29 int ci_rsbtbl_size;
30 int ci_dirtbl_size;
31 int ci_recover_timer; 30 int ci_recover_timer;
32 int ci_toss_secs; 31 int ci_toss_secs;
33 int ci_scan_secs; 32 int ci_scan_secs;
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 1c9b08095f98..b969deef9ebb 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -344,6 +344,45 @@ static int print_format3(struct dlm_rsb *r, struct seq_file *s)
344 return rv; 344 return rv;
345} 345}
346 346
347static int print_format4(struct dlm_rsb *r, struct seq_file *s)
348{
349 int our_nodeid = dlm_our_nodeid();
350 int print_name = 1;
351 int i, rv;
352
353 lock_rsb(r);
354
355 rv = seq_printf(s, "rsb %p %d %d %d %d %lu %lx %d ",
356 r,
357 r->res_nodeid,
358 r->res_master_nodeid,
359 r->res_dir_nodeid,
360 our_nodeid,
361 r->res_toss_time,
362 r->res_flags,
363 r->res_length);
364 if (rv)
365 goto out;
366
367 for (i = 0; i < r->res_length; i++) {
368 if (!isascii(r->res_name[i]) || !isprint(r->res_name[i]))
369 print_name = 0;
370 }
371
372 seq_printf(s, "%s", print_name ? "str " : "hex");
373
374 for (i = 0; i < r->res_length; i++) {
375 if (print_name)
376 seq_printf(s, "%c", r->res_name[i]);
377 else
378 seq_printf(s, " %02x", (unsigned char)r->res_name[i]);
379 }
380 rv = seq_printf(s, "\n");
381 out:
382 unlock_rsb(r);
383 return rv;
384}
385
347struct rsbtbl_iter { 386struct rsbtbl_iter {
348 struct dlm_rsb *rsb; 387 struct dlm_rsb *rsb;
349 unsigned bucket; 388 unsigned bucket;
@@ -382,6 +421,13 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr)
382 } 421 }
383 rv = print_format3(ri->rsb, seq); 422 rv = print_format3(ri->rsb, seq);
384 break; 423 break;
424 case 4:
425 if (ri->header) {
426 seq_printf(seq, "version 4 rsb 2\n");
427 ri->header = 0;
428 }
429 rv = print_format4(ri->rsb, seq);
430 break;
385 } 431 }
386 432
387 return rv; 433 return rv;
@@ -390,15 +436,18 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr)
390static const struct seq_operations format1_seq_ops; 436static const struct seq_operations format1_seq_ops;
391static const struct seq_operations format2_seq_ops; 437static const struct seq_operations format2_seq_ops;
392static const struct seq_operations format3_seq_ops; 438static const struct seq_operations format3_seq_ops;
439static const struct seq_operations format4_seq_ops;
393 440
394static void *table_seq_start(struct seq_file *seq, loff_t *pos) 441static void *table_seq_start(struct seq_file *seq, loff_t *pos)
395{ 442{
443 struct rb_root *tree;
396 struct rb_node *node; 444 struct rb_node *node;
397 struct dlm_ls *ls = seq->private; 445 struct dlm_ls *ls = seq->private;
398 struct rsbtbl_iter *ri; 446 struct rsbtbl_iter *ri;
399 struct dlm_rsb *r; 447 struct dlm_rsb *r;
400 loff_t n = *pos; 448 loff_t n = *pos;
401 unsigned bucket, entry; 449 unsigned bucket, entry;
450 int toss = (seq->op == &format4_seq_ops);
402 451
403 bucket = n >> 32; 452 bucket = n >> 32;
404 entry = n & ((1LL << 32) - 1); 453 entry = n & ((1LL << 32) - 1);
@@ -417,11 +466,14 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
417 ri->format = 2; 466 ri->format = 2;
418 if (seq->op == &format3_seq_ops) 467 if (seq->op == &format3_seq_ops)
419 ri->format = 3; 468 ri->format = 3;
469 if (seq->op == &format4_seq_ops)
470 ri->format = 4;
471
472 tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
420 473
421 spin_lock(&ls->ls_rsbtbl[bucket].lock); 474 spin_lock(&ls->ls_rsbtbl[bucket].lock);
422 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { 475 if (!RB_EMPTY_ROOT(tree)) {
423 for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node; 476 for (node = rb_first(tree); node; node = rb_next(node)) {
424 node = rb_next(node)) {
425 r = rb_entry(node, struct dlm_rsb, res_hashnode); 477 r = rb_entry(node, struct dlm_rsb, res_hashnode);
426 if (!entry--) { 478 if (!entry--) {
427 dlm_hold_rsb(r); 479 dlm_hold_rsb(r);
@@ -449,10 +501,11 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
449 kfree(ri); 501 kfree(ri);
450 return NULL; 502 return NULL;
451 } 503 }
504 tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
452 505
453 spin_lock(&ls->ls_rsbtbl[bucket].lock); 506 spin_lock(&ls->ls_rsbtbl[bucket].lock);
454 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { 507 if (!RB_EMPTY_ROOT(tree)) {
455 node = rb_first(&ls->ls_rsbtbl[bucket].keep); 508 node = rb_first(tree);
456 r = rb_entry(node, struct dlm_rsb, res_hashnode); 509 r = rb_entry(node, struct dlm_rsb, res_hashnode);
457 dlm_hold_rsb(r); 510 dlm_hold_rsb(r);
458 ri->rsb = r; 511 ri->rsb = r;
@@ -469,10 +522,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
469{ 522{
470 struct dlm_ls *ls = seq->private; 523 struct dlm_ls *ls = seq->private;
471 struct rsbtbl_iter *ri = iter_ptr; 524 struct rsbtbl_iter *ri = iter_ptr;
525 struct rb_root *tree;
472 struct rb_node *next; 526 struct rb_node *next;
473 struct dlm_rsb *r, *rp; 527 struct dlm_rsb *r, *rp;
474 loff_t n = *pos; 528 loff_t n = *pos;
475 unsigned bucket; 529 unsigned bucket;
530 int toss = (seq->op == &format4_seq_ops);
476 531
477 bucket = n >> 32; 532 bucket = n >> 32;
478 533
@@ -511,10 +566,11 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
511 kfree(ri); 566 kfree(ri);
512 return NULL; 567 return NULL;
513 } 568 }
569 tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
514 570
515 spin_lock(&ls->ls_rsbtbl[bucket].lock); 571 spin_lock(&ls->ls_rsbtbl[bucket].lock);
516 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { 572 if (!RB_EMPTY_ROOT(tree)) {
517 next = rb_first(&ls->ls_rsbtbl[bucket].keep); 573 next = rb_first(tree);
518 r = rb_entry(next, struct dlm_rsb, res_hashnode); 574 r = rb_entry(next, struct dlm_rsb, res_hashnode);
519 dlm_hold_rsb(r); 575 dlm_hold_rsb(r);
520 ri->rsb = r; 576 ri->rsb = r;
@@ -558,9 +614,17 @@ static const struct seq_operations format3_seq_ops = {
558 .show = table_seq_show, 614 .show = table_seq_show,
559}; 615};
560 616
617static const struct seq_operations format4_seq_ops = {
618 .start = table_seq_start,
619 .next = table_seq_next,
620 .stop = table_seq_stop,
621 .show = table_seq_show,
622};
623
561static const struct file_operations format1_fops; 624static const struct file_operations format1_fops;
562static const struct file_operations format2_fops; 625static const struct file_operations format2_fops;
563static const struct file_operations format3_fops; 626static const struct file_operations format3_fops;
627static const struct file_operations format4_fops;
564 628
565static int table_open(struct inode *inode, struct file *file) 629static int table_open(struct inode *inode, struct file *file)
566{ 630{
@@ -573,6 +637,8 @@ static int table_open(struct inode *inode, struct file *file)
573 ret = seq_open(file, &format2_seq_ops); 637 ret = seq_open(file, &format2_seq_ops);
574 else if (file->f_op == &format3_fops) 638 else if (file->f_op == &format3_fops)
575 ret = seq_open(file, &format3_seq_ops); 639 ret = seq_open(file, &format3_seq_ops);
640 else if (file->f_op == &format4_fops)
641 ret = seq_open(file, &format4_seq_ops);
576 642
577 if (ret) 643 if (ret)
578 return ret; 644 return ret;
@@ -606,6 +672,14 @@ static const struct file_operations format3_fops = {
606 .release = seq_release 672 .release = seq_release
607}; 673};
608 674
675static const struct file_operations format4_fops = {
676 .owner = THIS_MODULE,
677 .open = table_open,
678 .read = seq_read,
679 .llseek = seq_lseek,
680 .release = seq_release
681};
682
609/* 683/*
610 * dump lkb's on the ls_waiters list 684 * dump lkb's on the ls_waiters list
611 */ 685 */
@@ -652,6 +726,8 @@ void dlm_delete_debug_file(struct dlm_ls *ls)
652 debugfs_remove(ls->ls_debug_locks_dentry); 726 debugfs_remove(ls->ls_debug_locks_dentry);
653 if (ls->ls_debug_all_dentry) 727 if (ls->ls_debug_all_dentry)
654 debugfs_remove(ls->ls_debug_all_dentry); 728 debugfs_remove(ls->ls_debug_all_dentry);
729 if (ls->ls_debug_toss_dentry)
730 debugfs_remove(ls->ls_debug_toss_dentry);
655} 731}
656 732
657int dlm_create_debug_file(struct dlm_ls *ls) 733int dlm_create_debug_file(struct dlm_ls *ls)
@@ -694,6 +770,19 @@ int dlm_create_debug_file(struct dlm_ls *ls)
694 if (!ls->ls_debug_all_dentry) 770 if (!ls->ls_debug_all_dentry)
695 goto fail; 771 goto fail;
696 772
773 /* format 4 */
774
775 memset(name, 0, sizeof(name));
776 snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_toss", ls->ls_name);
777
778 ls->ls_debug_toss_dentry = debugfs_create_file(name,
779 S_IFREG | S_IRUGO,
780 dlm_root,
781 ls,
782 &format4_fops);
783 if (!ls->ls_debug_toss_dentry)
784 goto fail;
785
697 memset(name, 0, sizeof(name)); 786 memset(name, 0, sizeof(name));
698 snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_waiters", ls->ls_name); 787 snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_waiters", ls->ls_name);
699 788
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index dc5eb598b81f..278a75cda446 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -23,50 +23,6 @@
23#include "lock.h" 23#include "lock.h"
24#include "dir.h" 24#include "dir.h"
25 25
26
27static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28{
29 spin_lock(&ls->ls_recover_list_lock);
30 list_add(&de->list, &ls->ls_recover_list);
31 spin_unlock(&ls->ls_recover_list_lock);
32}
33
34static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35{
36 int found = 0;
37 struct dlm_direntry *de;
38
39 spin_lock(&ls->ls_recover_list_lock);
40 list_for_each_entry(de, &ls->ls_recover_list, list) {
41 if (de->length == len) {
42 list_del(&de->list);
43 de->master_nodeid = 0;
44 memset(de->name, 0, len);
45 found = 1;
46 break;
47 }
48 }
49 spin_unlock(&ls->ls_recover_list_lock);
50
51 if (!found)
52 de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
53 return de;
54}
55
56void dlm_clear_free_entries(struct dlm_ls *ls)
57{
58 struct dlm_direntry *de;
59
60 spin_lock(&ls->ls_recover_list_lock);
61 while (!list_empty(&ls->ls_recover_list)) {
62 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63 list);
64 list_del(&de->list);
65 kfree(de);
66 }
67 spin_unlock(&ls->ls_recover_list_lock);
68}
69
70/* 26/*
71 * We use the upper 16 bits of the hash value to select the directory node. 27 * We use the upper 16 bits of the hash value to select the directory node.
72 * Low bits are used for distribution of rsb's among hash buckets on each node. 28 * Low bits are used for distribution of rsb's among hash buckets on each node.
@@ -78,144 +34,53 @@ void dlm_clear_free_entries(struct dlm_ls *ls)
78 34
79int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) 35int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80{ 36{
81 struct list_head *tmp; 37 uint32_t node;
82 struct dlm_member *memb = NULL;
83 uint32_t node, n = 0;
84 int nodeid;
85
86 if (ls->ls_num_nodes == 1) {
87 nodeid = dlm_our_nodeid();
88 goto out;
89 }
90 38
91 if (ls->ls_node_array) { 39 if (ls->ls_num_nodes == 1)
40 return dlm_our_nodeid();
41 else {
92 node = (hash >> 16) % ls->ls_total_weight; 42 node = (hash >> 16) % ls->ls_total_weight;
93 nodeid = ls->ls_node_array[node]; 43 return ls->ls_node_array[node];
94 goto out;
95 }
96
97 /* make_member_array() failed to kmalloc ls_node_array... */
98
99 node = (hash >> 16) % ls->ls_num_nodes;
100
101 list_for_each(tmp, &ls->ls_nodes) {
102 if (n++ != node)
103 continue;
104 memb = list_entry(tmp, struct dlm_member, list);
105 break;
106 } 44 }
107
108 DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109 ls->ls_num_nodes, n, node););
110 nodeid = memb->nodeid;
111 out:
112 return nodeid;
113} 45}
114 46
115int dlm_dir_nodeid(struct dlm_rsb *r) 47int dlm_dir_nodeid(struct dlm_rsb *r)
116{ 48{
117 return dlm_hash2nodeid(r->res_ls, r->res_hash); 49 return r->res_dir_nodeid;
118}
119
120static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121{
122 uint32_t val;
123
124 val = jhash(name, len, 0);
125 val &= (ls->ls_dirtbl_size - 1);
126
127 return val;
128}
129
130static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131{
132 uint32_t bucket;
133
134 bucket = dir_hash(ls, de->name, de->length);
135 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136} 50}
137 51
138static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name, 52void dlm_recover_dir_nodeid(struct dlm_ls *ls)
139 int namelen, uint32_t bucket)
140{ 53{
141 struct dlm_direntry *de; 54 struct dlm_rsb *r;
142
143 list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144 if (de->length == namelen && !memcmp(name, de->name, namelen))
145 goto out;
146 }
147 de = NULL;
148 out:
149 return de;
150}
151
152void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153{
154 struct dlm_direntry *de;
155 uint32_t bucket;
156
157 bucket = dir_hash(ls, name, namelen);
158
159 spin_lock(&ls->ls_dirtbl[bucket].lock);
160
161 de = search_bucket(ls, name, namelen, bucket);
162
163 if (!de) {
164 log_error(ls, "remove fr %u none", nodeid);
165 goto out;
166 }
167
168 if (de->master_nodeid != nodeid) {
169 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170 goto out;
171 }
172
173 list_del(&de->list);
174 kfree(de);
175 out:
176 spin_unlock(&ls->ls_dirtbl[bucket].lock);
177}
178 55
179void dlm_dir_clear(struct dlm_ls *ls) 56 down_read(&ls->ls_root_sem);
180{ 57 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
181 struct list_head *head; 58 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
182 struct dlm_direntry *de;
183 int i;
184
185 DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186
187 for (i = 0; i < ls->ls_dirtbl_size; i++) {
188 spin_lock(&ls->ls_dirtbl[i].lock);
189 head = &ls->ls_dirtbl[i].list;
190 while (!list_empty(head)) {
191 de = list_entry(head->next, struct dlm_direntry, list);
192 list_del(&de->list);
193 put_free_de(ls, de);
194 }
195 spin_unlock(&ls->ls_dirtbl[i].lock);
196 } 59 }
60 up_read(&ls->ls_root_sem);
197} 61}
198 62
199int dlm_recover_directory(struct dlm_ls *ls) 63int dlm_recover_directory(struct dlm_ls *ls)
200{ 64{
201 struct dlm_member *memb; 65 struct dlm_member *memb;
202 struct dlm_direntry *de;
203 char *b, *last_name = NULL; 66 char *b, *last_name = NULL;
204 int error = -ENOMEM, last_len, count = 0; 67 int error = -ENOMEM, last_len, nodeid, result;
205 uint16_t namelen; 68 uint16_t namelen;
69 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
206 70
207 log_debug(ls, "dlm_recover_directory"); 71 log_debug(ls, "dlm_recover_directory");
208 72
209 if (dlm_no_directory(ls)) 73 if (dlm_no_directory(ls))
210 goto out_status; 74 goto out_status;
211 75
212 dlm_dir_clear(ls);
213
214 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS); 76 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
215 if (!last_name) 77 if (!last_name)
216 goto out; 78 goto out;
217 79
218 list_for_each_entry(memb, &ls->ls_nodes, list) { 80 list_for_each_entry(memb, &ls->ls_nodes, list) {
81 if (memb->nodeid == dlm_our_nodeid())
82 continue;
83
219 memset(last_name, 0, DLM_RESNAME_MAXLEN); 84 memset(last_name, 0, DLM_RESNAME_MAXLEN);
220 last_len = 0; 85 last_len = 0;
221 86
@@ -230,7 +95,7 @@ int dlm_recover_directory(struct dlm_ls *ls)
230 if (error) 95 if (error)
231 goto out_free; 96 goto out_free;
232 97
233 schedule(); 98 cond_resched();
234 99
235 /* 100 /*
236 * pick namelen/name pairs out of received buffer 101 * pick namelen/name pairs out of received buffer
@@ -267,87 +132,71 @@ int dlm_recover_directory(struct dlm_ls *ls)
267 if (namelen > DLM_RESNAME_MAXLEN) 132 if (namelen > DLM_RESNAME_MAXLEN)
268 goto out_free; 133 goto out_free;
269 134
270 error = -ENOMEM; 135 error = dlm_master_lookup(ls, memb->nodeid,
271 de = get_free_de(ls, namelen); 136 b, namelen,
272 if (!de) 137 DLM_LU_RECOVER_DIR,
138 &nodeid, &result);
139 if (error) {
140 log_error(ls, "recover_dir lookup %d",
141 error);
273 goto out_free; 142 goto out_free;
143 }
144
145 /* The name was found in rsbtbl, but the
146 * master nodeid is different from
147 * memb->nodeid which says it is the master.
148 * This should not happen. */
149
150 if (result == DLM_LU_MATCH &&
151 nodeid != memb->nodeid) {
152 count_bad++;
153 log_error(ls, "recover_dir lookup %d "
154 "nodeid %d memb %d bad %u",
155 result, nodeid, memb->nodeid,
156 count_bad);
157 print_hex_dump_bytes("dlm_recover_dir ",
158 DUMP_PREFIX_NONE,
159 b, namelen);
160 }
161
162 /* The name was found in rsbtbl, and the
163 * master nodeid matches memb->nodeid. */
164
165 if (result == DLM_LU_MATCH &&
166 nodeid == memb->nodeid) {
167 count_match++;
168 }
169
170 /* The name was not found in rsbtbl and was
171 * added with memb->nodeid as the master. */
172
173 if (result == DLM_LU_ADD) {
174 count_add++;
175 }
274 176
275 de->master_nodeid = memb->nodeid;
276 de->length = namelen;
277 last_len = namelen; 177 last_len = namelen;
278 memcpy(de->name, b, namelen);
279 memcpy(last_name, b, namelen); 178 memcpy(last_name, b, namelen);
280 b += namelen; 179 b += namelen;
281 left -= namelen; 180 left -= namelen;
282
283 add_entry_to_hash(ls, de);
284 count++; 181 count++;
285 } 182 }
286 } 183 }
287 done: 184 done:
288 ; 185 ;
289 } 186 }
290 187
291 out_status: 188 out_status:
292 error = 0; 189 error = 0;
293 log_debug(ls, "dlm_recover_directory %d entries", count); 190 dlm_set_recover_status(ls, DLM_RS_DIR);
191
192 log_debug(ls, "dlm_recover_directory %u in %u new",
193 count, count_add);
294 out_free: 194 out_free:
295 kfree(last_name); 195 kfree(last_name);
296 out: 196 out:
297 dlm_clear_free_entries(ls);
298 return error; 197 return error;
299} 198}
300 199
301static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
302 int namelen, int *r_nodeid)
303{
304 struct dlm_direntry *de, *tmp;
305 uint32_t bucket;
306
307 bucket = dir_hash(ls, name, namelen);
308
309 spin_lock(&ls->ls_dirtbl[bucket].lock);
310 de = search_bucket(ls, name, namelen, bucket);
311 if (de) {
312 *r_nodeid = de->master_nodeid;
313 spin_unlock(&ls->ls_dirtbl[bucket].lock);
314 if (*r_nodeid == nodeid)
315 return -EEXIST;
316 return 0;
317 }
318
319 spin_unlock(&ls->ls_dirtbl[bucket].lock);
320
321 if (namelen > DLM_RESNAME_MAXLEN)
322 return -EINVAL;
323
324 de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
325 if (!de)
326 return -ENOMEM;
327
328 de->master_nodeid = nodeid;
329 de->length = namelen;
330 memcpy(de->name, name, namelen);
331
332 spin_lock(&ls->ls_dirtbl[bucket].lock);
333 tmp = search_bucket(ls, name, namelen, bucket);
334 if (tmp) {
335 kfree(de);
336 de = tmp;
337 } else {
338 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
339 }
340 *r_nodeid = de->master_nodeid;
341 spin_unlock(&ls->ls_dirtbl[bucket].lock);
342 return 0;
343}
344
345int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
346 int *r_nodeid)
347{
348 return get_entry(ls, nodeid, name, namelen, r_nodeid);
349}
350
351static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) 200static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
352{ 201{
353 struct dlm_rsb *r; 202 struct dlm_rsb *r;
@@ -358,10 +207,10 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
358 bucket = hash & (ls->ls_rsbtbl_size - 1); 207 bucket = hash & (ls->ls_rsbtbl_size - 1);
359 208
360 spin_lock(&ls->ls_rsbtbl[bucket].lock); 209 spin_lock(&ls->ls_rsbtbl[bucket].lock);
361 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, 0, &r); 210 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
362 if (rv) 211 if (rv)
363 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss, 212 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
364 name, len, 0, &r); 213 name, len, &r);
365 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 214 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
366 215
367 if (!rv) 216 if (!rv)
@@ -371,7 +220,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
371 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 220 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
372 if (len == r->res_length && !memcmp(name, r->res_name, len)) { 221 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
373 up_read(&ls->ls_root_sem); 222 up_read(&ls->ls_root_sem);
374 log_error(ls, "find_rsb_root revert to root_list %s", 223 log_debug(ls, "find_rsb_root revert to root_list %s",
375 r->res_name); 224 r->res_name);
376 return r; 225 return r;
377 } 226 }
@@ -429,6 +278,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
429 be_namelen = cpu_to_be16(0); 278 be_namelen = cpu_to_be16(0);
430 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 279 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
431 offset += sizeof(__be16); 280 offset += sizeof(__be16);
281 ls->ls_recover_dir_sent_msg++;
432 goto out; 282 goto out;
433 } 283 }
434 284
@@ -437,6 +287,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
437 offset += sizeof(__be16); 287 offset += sizeof(__be16);
438 memcpy(outbuf + offset, r->res_name, r->res_length); 288 memcpy(outbuf + offset, r->res_name, r->res_length);
439 offset += r->res_length; 289 offset += r->res_length;
290 ls->ls_recover_dir_sent_res++;
440 } 291 }
441 292
442 /* 293 /*
@@ -449,8 +300,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
449 be_namelen = cpu_to_be16(0xFFFF); 300 be_namelen = cpu_to_be16(0xFFFF);
450 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 301 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
451 offset += sizeof(__be16); 302 offset += sizeof(__be16);
303 ls->ls_recover_dir_sent_msg++;
452 } 304 }
453
454 out: 305 out:
455 up_read(&ls->ls_root_sem); 306 up_read(&ls->ls_root_sem);
456} 307}
diff --git a/fs/dlm/dir.h b/fs/dlm/dir.h
index 0b0eb1267b6e..417506344456 100644
--- a/fs/dlm/dir.h
+++ b/fs/dlm/dir.h
@@ -14,15 +14,10 @@
14#ifndef __DIR_DOT_H__ 14#ifndef __DIR_DOT_H__
15#define __DIR_DOT_H__ 15#define __DIR_DOT_H__
16 16
17
18int dlm_dir_nodeid(struct dlm_rsb *rsb); 17int dlm_dir_nodeid(struct dlm_rsb *rsb);
19int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash); 18int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
20void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int len); 19void dlm_recover_dir_nodeid(struct dlm_ls *ls);
21void dlm_dir_clear(struct dlm_ls *ls);
22void dlm_clear_free_entries(struct dlm_ls *ls);
23int dlm_recover_directory(struct dlm_ls *ls); 20int dlm_recover_directory(struct dlm_ls *ls);
24int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
25 int *r_nodeid);
26void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, 21void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
27 char *outbuf, int outlen, int nodeid); 22 char *outbuf, int outlen, int nodeid);
28 23
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index bc342f7ac3af..3093207a7684 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -55,8 +55,6 @@ struct dlm_lkb;
55struct dlm_rsb; 55struct dlm_rsb;
56struct dlm_member; 56struct dlm_member;
57struct dlm_rsbtable; 57struct dlm_rsbtable;
58struct dlm_dirtable;
59struct dlm_direntry;
60struct dlm_recover; 58struct dlm_recover;
61struct dlm_header; 59struct dlm_header;
62struct dlm_message; 60struct dlm_message;
@@ -98,18 +96,6 @@ do { \
98} 96}
99 97
100 98
101struct dlm_direntry {
102 struct list_head list;
103 uint32_t master_nodeid;
104 uint16_t length;
105 char name[1];
106};
107
108struct dlm_dirtable {
109 struct list_head list;
110 spinlock_t lock;
111};
112
113struct dlm_rsbtable { 99struct dlm_rsbtable {
114 struct rb_root keep; 100 struct rb_root keep;
115 struct rb_root toss; 101 struct rb_root toss;
@@ -283,6 +269,15 @@ struct dlm_lkb {
283 }; 269 };
284}; 270};
285 271
272/*
273 * res_master_nodeid is "normal": 0 is unset/invalid, non-zero is the real
274 * nodeid, even when nodeid is our_nodeid.
275 *
276 * res_nodeid is "odd": -1 is unset/invalid, zero means our_nodeid,
277 * greater than zero when another nodeid.
278 *
279 * (TODO: remove res_nodeid and only use res_master_nodeid)
280 */
286 281
287struct dlm_rsb { 282struct dlm_rsb {
288 struct dlm_ls *res_ls; /* the lockspace */ 283 struct dlm_ls *res_ls; /* the lockspace */
@@ -291,6 +286,8 @@ struct dlm_rsb {
291 unsigned long res_flags; 286 unsigned long res_flags;
292 int res_length; /* length of rsb name */ 287 int res_length; /* length of rsb name */
293 int res_nodeid; 288 int res_nodeid;
289 int res_master_nodeid;
290 int res_dir_nodeid;
294 uint32_t res_lvbseq; 291 uint32_t res_lvbseq;
295 uint32_t res_hash; 292 uint32_t res_hash;
296 uint32_t res_bucket; /* rsbtbl */ 293 uint32_t res_bucket; /* rsbtbl */
@@ -313,10 +310,21 @@ struct dlm_rsb {
313 char res_name[DLM_RESNAME_MAXLEN+1]; 310 char res_name[DLM_RESNAME_MAXLEN+1];
314}; 311};
315 312
313/* dlm_master_lookup() flags */
314
315#define DLM_LU_RECOVER_DIR 1
316#define DLM_LU_RECOVER_MASTER 2
317
318/* dlm_master_lookup() results */
319
320#define DLM_LU_MATCH 1
321#define DLM_LU_ADD 2
322
316/* find_rsb() flags */ 323/* find_rsb() flags */
317 324
318#define R_MASTER 1 /* only return rsb if it's a master */ 325#define R_REQUEST 0x00000001
319#define R_CREATE 2 /* create/add rsb if not found */ 326#define R_RECEIVE_REQUEST 0x00000002
327#define R_RECEIVE_RECOVER 0x00000004
320 328
321/* rsb_flags */ 329/* rsb_flags */
322 330
@@ -509,9 +517,6 @@ struct dlm_ls {
509 struct dlm_rsbtable *ls_rsbtbl; 517 struct dlm_rsbtable *ls_rsbtbl;
510 uint32_t ls_rsbtbl_size; 518 uint32_t ls_rsbtbl_size;
511 519
512 struct dlm_dirtable *ls_dirtbl;
513 uint32_t ls_dirtbl_size;
514
515 struct mutex ls_waiters_mutex; 520 struct mutex ls_waiters_mutex;
516 struct list_head ls_waiters; /* lkbs needing a reply */ 521 struct list_head ls_waiters; /* lkbs needing a reply */
517 522
@@ -545,6 +550,7 @@ struct dlm_ls {
545 struct dentry *ls_debug_waiters_dentry; /* debugfs */ 550 struct dentry *ls_debug_waiters_dentry; /* debugfs */
546 struct dentry *ls_debug_locks_dentry; /* debugfs */ 551 struct dentry *ls_debug_locks_dentry; /* debugfs */
547 struct dentry *ls_debug_all_dentry; /* debugfs */ 552 struct dentry *ls_debug_all_dentry; /* debugfs */
553 struct dentry *ls_debug_toss_dentry; /* debugfs */
548 554
549 wait_queue_head_t ls_uevent_wait; /* user part of join/leave */ 555 wait_queue_head_t ls_uevent_wait; /* user part of join/leave */
550 int ls_uevent_result; 556 int ls_uevent_result;
@@ -573,6 +579,8 @@ struct dlm_ls {
573 struct mutex ls_requestqueue_mutex; 579 struct mutex ls_requestqueue_mutex;
574 struct dlm_rcom *ls_recover_buf; 580 struct dlm_rcom *ls_recover_buf;
575 int ls_recover_nodeid; /* for debugging */ 581 int ls_recover_nodeid; /* for debugging */
582 unsigned int ls_recover_dir_sent_res; /* for log info */
583 unsigned int ls_recover_dir_sent_msg; /* for log info */
576 unsigned int ls_recover_locks_in; /* for log info */ 584 unsigned int ls_recover_locks_in; /* for log info */
577 uint64_t ls_rcom_seq; 585 uint64_t ls_rcom_seq;
578 spinlock_t ls_rcom_spin; 586 spinlock_t ls_rcom_spin;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index bdafb65a5234..d9ee1b96549a 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -90,6 +90,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
90static int receive_extralen(struct dlm_message *ms); 90static int receive_extralen(struct dlm_message *ms);
91static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 91static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92static void del_timeout(struct dlm_lkb *lkb); 92static void del_timeout(struct dlm_lkb *lkb);
93static void toss_rsb(struct kref *kref);
93 94
94/* 95/*
95 * Lock compatibilty matrix - thanks Steve 96 * Lock compatibilty matrix - thanks Steve
@@ -170,9 +171,11 @@ void dlm_print_lkb(struct dlm_lkb *lkb)
170 171
171static void dlm_print_rsb(struct dlm_rsb *r) 172static void dlm_print_rsb(struct dlm_rsb *r)
172{ 173{
173 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", 174 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 r->res_nodeid, r->res_flags, r->res_first_lkid, 175 "rlc %d name %s\n",
175 r->res_recover_locks_count, r->res_name); 176 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177 r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178 r->res_name);
176} 179}
177 180
178void dlm_dump_rsb(struct dlm_rsb *r) 181void dlm_dump_rsb(struct dlm_rsb *r)
@@ -327,6 +330,37 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
327 * Basic operations on rsb's and lkb's 330 * Basic operations on rsb's and lkb's
328 */ 331 */
329 332
333/* This is only called to add a reference when the code already holds
334 a valid reference to the rsb, so there's no need for locking. */
335
336static inline void hold_rsb(struct dlm_rsb *r)
337{
338 kref_get(&r->res_ref);
339}
340
341void dlm_hold_rsb(struct dlm_rsb *r)
342{
343 hold_rsb(r);
344}
345
346/* When all references to the rsb are gone it's transferred to
347 the tossed list for later disposal. */
348
349static void put_rsb(struct dlm_rsb *r)
350{
351 struct dlm_ls *ls = r->res_ls;
352 uint32_t bucket = r->res_bucket;
353
354 spin_lock(&ls->ls_rsbtbl[bucket].lock);
355 kref_put(&r->res_ref, toss_rsb);
356 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
357}
358
359void dlm_put_rsb(struct dlm_rsb *r)
360{
361 put_rsb(r);
362}
363
330static int pre_rsb_struct(struct dlm_ls *ls) 364static int pre_rsb_struct(struct dlm_ls *ls)
331{ 365{
332 struct dlm_rsb *r1, *r2; 366 struct dlm_rsb *r1, *r2;
@@ -411,11 +445,10 @@ static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
411} 445}
412 446
413int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, 447int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
414 unsigned int flags, struct dlm_rsb **r_ret) 448 struct dlm_rsb **r_ret)
415{ 449{
416 struct rb_node *node = tree->rb_node; 450 struct rb_node *node = tree->rb_node;
417 struct dlm_rsb *r; 451 struct dlm_rsb *r;
418 int error = 0;
419 int rc; 452 int rc;
420 453
421 while (node) { 454 while (node) {
@@ -432,10 +465,8 @@ int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
432 return -EBADR; 465 return -EBADR;
433 466
434 found: 467 found:
435 if (r->res_nodeid && (flags & R_MASTER))
436 error = -ENOTBLK;
437 *r_ret = r; 468 *r_ret = r;
438 return error; 469 return 0;
439} 470}
440 471
441static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) 472static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
@@ -467,124 +498,587 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
467 return 0; 498 return 0;
468} 499}
469 500
470static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b, 501/*
471 unsigned int flags, struct dlm_rsb **r_ret) 502 * Find rsb in rsbtbl and potentially create/add one
503 *
504 * Delaying the release of rsb's has a similar benefit to applications keeping
505 * NL locks on an rsb, but without the guarantee that the cached master value
506 * will still be valid when the rsb is reused. Apps aren't always smart enough
507 * to keep NL locks on an rsb that they may lock again shortly; this can lead
508 * to excessive master lookups and removals if we don't delay the release.
509 *
510 * Searching for an rsb means looking through both the normal list and toss
511 * list. When found on the toss list the rsb is moved to the normal list with
512 * ref count of 1; when found on normal list the ref count is incremented.
513 *
514 * rsb's on the keep list are being used locally and refcounted.
515 * rsb's on the toss list are not being used locally, and are not refcounted.
516 *
517 * The toss list rsb's were either
518 * - previously used locally but not any more (were on keep list, then
519 * moved to toss list when last refcount dropped)
520 * - created and put on toss list as a directory record for a lookup
521 * (we are the dir node for the res, but are not using the res right now,
522 * but some other node is)
523 *
524 * The purpose of find_rsb() is to return a refcounted rsb for local use.
525 * So, if the given rsb is on the toss list, it is moved to the keep list
526 * before being returned.
527 *
528 * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529 * more refcounts exist, so the rsb is moved from the keep list to the
530 * toss list.
531 *
532 * rsb's on both keep and toss lists are used for doing a name to master
533 * lookups. rsb's that are in use locally (and being refcounted) are on
534 * the keep list, rsb's that are not in use locally (not refcounted) and
535 * only exist for name/master lookups are on the toss list.
536 *
537 * rsb's on the toss list who's dir_nodeid is not local can have stale
538 * name/master mappings. So, remote requests on such rsb's can potentially
539 * return with an error, which means the mapping is stale and needs to
540 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
541 * first_lkid is to keep only a single outstanding request on an rsb
542 * while that rsb has a potentially stale master.)
543 */
544
545static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
546 uint32_t hash, uint32_t b,
547 int dir_nodeid, int from_nodeid,
548 unsigned int flags, struct dlm_rsb **r_ret)
472{ 549{
473 struct dlm_rsb *r; 550 struct dlm_rsb *r = NULL;
551 int our_nodeid = dlm_our_nodeid();
552 int from_local = 0;
553 int from_other = 0;
554 int from_dir = 0;
555 int create = 0;
474 int error; 556 int error;
475 557
476 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r); 558 if (flags & R_RECEIVE_REQUEST) {
477 if (!error) { 559 if (from_nodeid == dir_nodeid)
478 kref_get(&r->res_ref); 560 from_dir = 1;
479 goto out; 561 else
562 from_other = 1;
563 } else if (flags & R_REQUEST) {
564 from_local = 1;
565 }
566
567 /*
568 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569 * from_nodeid has sent us a lock in dlm_recover_locks, believing
570 * we're the new master. Our local recovery may not have set
571 * res_master_nodeid to our_nodeid yet, so allow either. Don't
572 * create the rsb; dlm_recover_process_copy() will handle EBADR
573 * by resending.
574 *
575 * If someone sends us a request, we are the dir node, and we do
576 * not find the rsb anywhere, then recreate it. This happens if
577 * someone sends us a request after we have removed/freed an rsb
578 * from our toss list. (They sent a request instead of lookup
579 * because they are using an rsb from their toss list.)
580 */
581
582 if (from_local || from_dir ||
583 (from_other && (dir_nodeid == our_nodeid))) {
584 create = 1;
480 } 585 }
481 if (error == -ENOTBLK)
482 goto out;
483 586
484 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); 587 retry:
588 if (create) {
589 error = pre_rsb_struct(ls);
590 if (error < 0)
591 goto out;
592 }
593
594 spin_lock(&ls->ls_rsbtbl[b].lock);
595
596 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
485 if (error) 597 if (error)
486 goto out; 598 goto do_toss;
599
600 /*
601 * rsb is active, so we can't check master_nodeid without lock_rsb.
602 */
487 603
488 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 604 kref_get(&r->res_ref);
489 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 605 error = 0;
606 goto out_unlock;
607
608
609 do_toss:
610 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
490 if (error) 611 if (error)
491 return error; 612 goto do_new;
492 613
493 if (dlm_no_directory(ls)) 614 /*
494 goto out; 615 * rsb found inactive (master_nodeid may be out of date unless
616 * we are the dir_nodeid or were the master) No other thread
617 * is using this rsb because it's on the toss list, so we can
618 * look at or update res_master_nodeid without lock_rsb.
619 */
495 620
496 if (r->res_nodeid == -1) { 621 if ((r->res_master_nodeid != our_nodeid) && from_other) {
622 /* our rsb was not master, and another node (not the dir node)
623 has sent us a request */
624 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625 from_nodeid, r->res_master_nodeid, dir_nodeid,
626 r->res_name);
627 error = -ENOTBLK;
628 goto out_unlock;
629 }
630
631 if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632 /* don't think this should ever happen */
633 log_error(ls, "find_rsb toss from_dir %d master %d",
634 from_nodeid, r->res_master_nodeid);
635 dlm_print_rsb(r);
636 /* fix it and go on */
637 r->res_master_nodeid = our_nodeid;
638 r->res_nodeid = 0;
497 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 639 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
498 r->res_first_lkid = 0; 640 r->res_first_lkid = 0;
499 } else if (r->res_nodeid > 0) { 641 }
642
643 if (from_local && (r->res_master_nodeid != our_nodeid)) {
644 /* Because we have held no locks on this rsb,
645 res_master_nodeid could have become stale. */
500 rsb_set_flag(r, RSB_MASTER_UNCERTAIN); 646 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
501 r->res_first_lkid = 0; 647 r->res_first_lkid = 0;
648 }
649
650 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
652 goto out_unlock;
653
654
655 do_new:
656 /*
657 * rsb not found
658 */
659
660 if (error == -EBADR && !create)
661 goto out_unlock;
662
663 error = get_rsb_struct(ls, name, len, &r);
664 if (error == -EAGAIN) {
665 spin_unlock(&ls->ls_rsbtbl[b].lock);
666 goto retry;
667 }
668 if (error)
669 goto out_unlock;
670
671 r->res_hash = hash;
672 r->res_bucket = b;
673 r->res_dir_nodeid = dir_nodeid;
674 kref_init(&r->res_ref);
675
676 if (from_dir) {
677 /* want to see how often this happens */
678 log_debug(ls, "find_rsb new from_dir %d recreate %s",
679 from_nodeid, r->res_name);
680 r->res_master_nodeid = our_nodeid;
681 r->res_nodeid = 0;
682 goto out_add;
683 }
684
685 if (from_other && (dir_nodeid != our_nodeid)) {
686 /* should never happen */
687 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688 from_nodeid, dir_nodeid, our_nodeid, r->res_name);
689 dlm_free_rsb(r);
690 error = -ENOTBLK;
691 goto out_unlock;
692 }
693
694 if (from_other) {
695 log_debug(ls, "find_rsb new from_other %d dir %d %s",
696 from_nodeid, dir_nodeid, r->res_name);
697 }
698
699 if (dir_nodeid == our_nodeid) {
700 /* When we are the dir nodeid, we can set the master
701 node immediately */
702 r->res_master_nodeid = our_nodeid;
703 r->res_nodeid = 0;
502 } else { 704 } else {
503 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r);); 705 /* set_master will send_lookup to dir_nodeid */
504 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),); 706 r->res_master_nodeid = 0;
707 r->res_nodeid = -1;
505 } 708 }
709
710 out_add:
711 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
712 out_unlock:
713 spin_unlock(&ls->ls_rsbtbl[b].lock);
506 out: 714 out:
507 *r_ret = r; 715 *r_ret = r;
508 return error; 716 return error;
509} 717}
510 718
719/* During recovery, other nodes can send us new MSTCPY locks (from
720 dlm_recover_locks) before we've made ourself master (in
721 dlm_recover_masters). */
722
723static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
724 uint32_t hash, uint32_t b,
725 int dir_nodeid, int from_nodeid,
726 unsigned int flags, struct dlm_rsb **r_ret)
727{
728 struct dlm_rsb *r = NULL;
729 int our_nodeid = dlm_our_nodeid();
730 int recover = (flags & R_RECEIVE_RECOVER);
731 int error;
732
733 retry:
734 error = pre_rsb_struct(ls);
735 if (error < 0)
736 goto out;
737
738 spin_lock(&ls->ls_rsbtbl[b].lock);
739
740 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
741 if (error)
742 goto do_toss;
743
744 /*
745 * rsb is active, so we can't check master_nodeid without lock_rsb.
746 */
747
748 kref_get(&r->res_ref);
749 goto out_unlock;
750
751
752 do_toss:
753 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
754 if (error)
755 goto do_new;
756
757 /*
758 * rsb found inactive. No other thread is using this rsb because
759 * it's on the toss list, so we can look at or update
760 * res_master_nodeid without lock_rsb.
761 */
762
763 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
764 /* our rsb is not master, and another node has sent us a
765 request; this should never happen */
766 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
767 from_nodeid, r->res_master_nodeid, dir_nodeid);
768 dlm_print_rsb(r);
769 error = -ENOTBLK;
770 goto out_unlock;
771 }
772
773 if (!recover && (r->res_master_nodeid != our_nodeid) &&
774 (dir_nodeid == our_nodeid)) {
775 /* our rsb is not master, and we are dir; may as well fix it;
776 this should never happen */
777 log_error(ls, "find_rsb toss our %d master %d dir %d",
778 our_nodeid, r->res_master_nodeid, dir_nodeid);
779 dlm_print_rsb(r);
780 r->res_master_nodeid = our_nodeid;
781 r->res_nodeid = 0;
782 }
783
784 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
785 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
786 goto out_unlock;
787
788
789 do_new:
790 /*
791 * rsb not found
792 */
793
794 error = get_rsb_struct(ls, name, len, &r);
795 if (error == -EAGAIN) {
796 spin_unlock(&ls->ls_rsbtbl[b].lock);
797 goto retry;
798 }
799 if (error)
800 goto out_unlock;
801
802 r->res_hash = hash;
803 r->res_bucket = b;
804 r->res_dir_nodeid = dir_nodeid;
805 r->res_master_nodeid = dir_nodeid;
806 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
807 kref_init(&r->res_ref);
808
809 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
810 out_unlock:
811 spin_unlock(&ls->ls_rsbtbl[b].lock);
812 out:
813 *r_ret = r;
814 return error;
815}
816
817static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
818 unsigned int flags, struct dlm_rsb **r_ret)
819{
820 uint32_t hash, b;
821 int dir_nodeid;
822
823 if (len > DLM_RESNAME_MAXLEN)
824 return -EINVAL;
825
826 hash = jhash(name, len, 0);
827 b = hash & (ls->ls_rsbtbl_size - 1);
828
829 dir_nodeid = dlm_hash2nodeid(ls, hash);
830
831 if (dlm_no_directory(ls))
832 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
833 from_nodeid, flags, r_ret);
834 else
835 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
836 from_nodeid, flags, r_ret);
837}
838
839/* we have received a request and found that res_master_nodeid != our_nodeid,
840 so we need to return an error or make ourself the master */
841
842static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
843 int from_nodeid)
844{
845 if (dlm_no_directory(ls)) {
846 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
847 from_nodeid, r->res_master_nodeid,
848 r->res_dir_nodeid);
849 dlm_print_rsb(r);
850 return -ENOTBLK;
851 }
852
853 if (from_nodeid != r->res_dir_nodeid) {
854 /* our rsb is not master, and another node (not the dir node)
855 has sent us a request. this is much more common when our
856 master_nodeid is zero, so limit debug to non-zero. */
857
858 if (r->res_master_nodeid) {
859 log_debug(ls, "validate master from_other %d master %d "
860 "dir %d first %x %s", from_nodeid,
861 r->res_master_nodeid, r->res_dir_nodeid,
862 r->res_first_lkid, r->res_name);
863 }
864 return -ENOTBLK;
865 } else {
866 /* our rsb is not master, but the dir nodeid has sent us a
867 request; this could happen with master 0 / res_nodeid -1 */
868
869 if (r->res_master_nodeid) {
870 log_error(ls, "validate master from_dir %d master %d "
871 "first %x %s",
872 from_nodeid, r->res_master_nodeid,
873 r->res_first_lkid, r->res_name);
874 }
875
876 r->res_master_nodeid = dlm_our_nodeid();
877 r->res_nodeid = 0;
878 return 0;
879 }
880}
881
511/* 882/*
512 * Find rsb in rsbtbl and potentially create/add one 883 * We're the dir node for this res and another node wants to know the
884 * master nodeid. During normal operation (non recovery) this is only
885 * called from receive_lookup(); master lookups when the local node is
886 * the dir node are done by find_rsb().
513 * 887 *
514 * Delaying the release of rsb's has a similar benefit to applications keeping 888 * normal operation, we are the dir node for a resource
515 * NL locks on an rsb, but without the guarantee that the cached master value 889 * . _request_lock
516 * will still be valid when the rsb is reused. Apps aren't always smart enough 890 * . set_master
517 * to keep NL locks on an rsb that they may lock again shortly; this can lead 891 * . send_lookup
518 * to excessive master lookups and removals if we don't delay the release. 892 * . receive_lookup
893 * . dlm_master_lookup flags 0
519 * 894 *
520 * Searching for an rsb means looking through both the normal list and toss 895 * recover directory, we are rebuilding dir for all resources
521 * list. When found on the toss list the rsb is moved to the normal list with 896 * . dlm_recover_directory
522 * ref count of 1; when found on normal list the ref count is incremented. 897 * . dlm_rcom_names
898 * remote node sends back the rsb names it is master of and we are dir of
899 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
900 * we either create new rsb setting remote node as master, or find existing
901 * rsb and set master to be the remote node.
902 *
903 * recover masters, we are finding the new master for resources
904 * . dlm_recover_masters
905 * . recover_master
906 * . dlm_send_rcom_lookup
907 * . receive_rcom_lookup
908 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
523 */ 909 */
524 910
525static int find_rsb(struct dlm_ls *ls, char *name, int namelen, 911int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
526 unsigned int flags, struct dlm_rsb **r_ret) 912 unsigned int flags, int *r_nodeid, int *result)
527{ 913{
528 struct dlm_rsb *r = NULL; 914 struct dlm_rsb *r = NULL;
529 uint32_t hash, bucket; 915 uint32_t hash, b;
530 int error; 916 int from_master = (flags & DLM_LU_RECOVER_DIR);
917 int fix_master = (flags & DLM_LU_RECOVER_MASTER);
918 int our_nodeid = dlm_our_nodeid();
919 int dir_nodeid, error, toss_list = 0;
531 920
532 if (namelen > DLM_RESNAME_MAXLEN) { 921 if (len > DLM_RESNAME_MAXLEN)
533 error = -EINVAL; 922 return -EINVAL;
534 goto out; 923
924 if (from_nodeid == our_nodeid) {
925 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
926 our_nodeid, flags);
927 return -EINVAL;
535 } 928 }
536 929
537 if (dlm_no_directory(ls)) 930 hash = jhash(name, len, 0);
538 flags |= R_CREATE; 931 b = hash & (ls->ls_rsbtbl_size - 1);
539 932
540 hash = jhash(name, namelen, 0); 933 dir_nodeid = dlm_hash2nodeid(ls, hash);
541 bucket = hash & (ls->ls_rsbtbl_size - 1); 934 if (dir_nodeid != our_nodeid) {
935 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
936 from_nodeid, dir_nodeid, our_nodeid, hash,
937 ls->ls_num_nodes);
938 *r_nodeid = -1;
939 return -EINVAL;
940 }
542 941
543 retry: 942 retry:
544 if (flags & R_CREATE) { 943 error = pre_rsb_struct(ls);
545 error = pre_rsb_struct(ls); 944 if (error < 0)
546 if (error < 0) 945 return error;
547 goto out; 946
947 spin_lock(&ls->ls_rsbtbl[b].lock);
948 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
949 if (!error) {
950 /* because the rsb is active, we need to lock_rsb before
951 checking/changing re_master_nodeid */
952
953 hold_rsb(r);
954 spin_unlock(&ls->ls_rsbtbl[b].lock);
955 lock_rsb(r);
956 goto found;
548 } 957 }
549 958
550 spin_lock(&ls->ls_rsbtbl[bucket].lock); 959 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
960 if (error)
961 goto not_found;
551 962
552 error = _search_rsb(ls, name, namelen, bucket, flags, &r); 963 /* because the rsb is inactive (on toss list), it's not refcounted
553 if (!error) 964 and lock_rsb is not used, but is protected by the rsbtbl lock */
554 goto out_unlock;
555 965
556 if (error == -EBADR && !(flags & R_CREATE)) 966 toss_list = 1;
557 goto out_unlock; 967 found:
968 if (r->res_dir_nodeid != our_nodeid) {
969 /* should not happen, but may as well fix it and carry on */
970 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
971 r->res_dir_nodeid, our_nodeid, r->res_name);
972 r->res_dir_nodeid = our_nodeid;
973 }
974
975 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
976 /* Recovery uses this function to set a new master when
977 the previous master failed. Setting NEW_MASTER will
978 force dlm_recover_masters to call recover_master on this
979 rsb even though the res_nodeid is no longer removed. */
980
981 r->res_master_nodeid = from_nodeid;
982 r->res_nodeid = from_nodeid;
983 rsb_set_flag(r, RSB_NEW_MASTER);
984
985 if (toss_list) {
986 /* I don't think we should ever find it on toss list. */
987 log_error(ls, "dlm_master_lookup fix_master on toss");
988 dlm_dump_rsb(r);
989 }
990 }
558 991
559 /* the rsb was found but wasn't a master copy */ 992 if (from_master && (r->res_master_nodeid != from_nodeid)) {
560 if (error == -ENOTBLK) 993 /* this will happen if from_nodeid became master during
561 goto out_unlock; 994 a previous recovery cycle, and we aborted the previous
995 cycle before recovering this master value */
562 996
563 error = get_rsb_struct(ls, name, namelen, &r); 997 log_limit(ls, "dlm_master_lookup from_master %d "
998 "master_nodeid %d res_nodeid %d first %x %s",
999 from_nodeid, r->res_master_nodeid, r->res_nodeid,
1000 r->res_first_lkid, r->res_name);
1001
1002 if (r->res_master_nodeid == our_nodeid) {
1003 log_error(ls, "from_master %d our_master", from_nodeid);
1004 dlm_dump_rsb(r);
1005 dlm_send_rcom_lookup_dump(r, from_nodeid);
1006 goto out_found;
1007 }
1008
1009 r->res_master_nodeid = from_nodeid;
1010 r->res_nodeid = from_nodeid;
1011 rsb_set_flag(r, RSB_NEW_MASTER);
1012 }
1013
1014 if (!r->res_master_nodeid) {
1015 /* this will happen if recovery happens while we're looking
1016 up the master for this rsb */
1017
1018 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1019 from_nodeid, r->res_first_lkid, r->res_name);
1020 r->res_master_nodeid = from_nodeid;
1021 r->res_nodeid = from_nodeid;
1022 }
1023
1024 if (!from_master && !fix_master &&
1025 (r->res_master_nodeid == from_nodeid)) {
1026 /* this can happen when the master sends remove, the dir node
1027 finds the rsb on the keep list and ignores the remove,
1028 and the former master sends a lookup */
1029
1030 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1031 "first %x %s", from_nodeid, flags,
1032 r->res_first_lkid, r->res_name);
1033 }
1034
1035 out_found:
1036 *r_nodeid = r->res_master_nodeid;
1037 if (result)
1038 *result = DLM_LU_MATCH;
1039
1040 if (toss_list) {
1041 r->res_toss_time = jiffies;
1042 /* the rsb was inactive (on toss list) */
1043 spin_unlock(&ls->ls_rsbtbl[b].lock);
1044 } else {
1045 /* the rsb was active */
1046 unlock_rsb(r);
1047 put_rsb(r);
1048 }
1049 return 0;
1050
1051 not_found:
1052 error = get_rsb_struct(ls, name, len, &r);
564 if (error == -EAGAIN) { 1053 if (error == -EAGAIN) {
565 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 1054 spin_unlock(&ls->ls_rsbtbl[b].lock);
566 goto retry; 1055 goto retry;
567 } 1056 }
568 if (error) 1057 if (error)
569 goto out_unlock; 1058 goto out_unlock;
570 1059
571 r->res_hash = hash; 1060 r->res_hash = hash;
572 r->res_bucket = bucket; 1061 r->res_bucket = b;
573 r->res_nodeid = -1; 1062 r->res_dir_nodeid = our_nodeid;
1063 r->res_master_nodeid = from_nodeid;
1064 r->res_nodeid = from_nodeid;
574 kref_init(&r->res_ref); 1065 kref_init(&r->res_ref);
1066 r->res_toss_time = jiffies;
575 1067
576 /* With no directory, the master can be set immediately */ 1068 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
577 if (dlm_no_directory(ls)) { 1069 if (error) {
578 int nodeid = dlm_dir_nodeid(r); 1070 /* should never happen */
579 if (nodeid == dlm_our_nodeid()) 1071 dlm_free_rsb(r);
580 nodeid = 0; 1072 spin_unlock(&ls->ls_rsbtbl[b].lock);
581 r->res_nodeid = nodeid; 1073 goto retry;
582 } 1074 }
583 error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep); 1075
1076 if (result)
1077 *result = DLM_LU_ADD;
1078 *r_nodeid = from_nodeid;
1079 error = 0;
584 out_unlock: 1080 out_unlock:
585 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 1081 spin_unlock(&ls->ls_rsbtbl[b].lock);
586 out:
587 *r_ret = r;
588 return error; 1082 return error;
589} 1083}
590 1084
@@ -605,17 +1099,27 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
605 } 1099 }
606} 1100}
607 1101
608/* This is only called to add a reference when the code already holds 1102void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
609 a valid reference to the rsb, so there's no need for locking. */
610
611static inline void hold_rsb(struct dlm_rsb *r)
612{ 1103{
613 kref_get(&r->res_ref); 1104 struct dlm_rsb *r = NULL;
614} 1105 uint32_t hash, b;
1106 int error;
615 1107
616void dlm_hold_rsb(struct dlm_rsb *r) 1108 hash = jhash(name, len, 0);
617{ 1109 b = hash & (ls->ls_rsbtbl_size - 1);
618 hold_rsb(r); 1110
1111 spin_lock(&ls->ls_rsbtbl[b].lock);
1112 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1113 if (!error)
1114 goto out_dump;
1115
1116 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1117 if (error)
1118 goto out;
1119 out_dump:
1120 dlm_dump_rsb(r);
1121 out:
1122 spin_unlock(&ls->ls_rsbtbl[b].lock);
619} 1123}
620 1124
621static void toss_rsb(struct kref *kref) 1125static void toss_rsb(struct kref *kref)
@@ -634,24 +1138,6 @@ static void toss_rsb(struct kref *kref)
634 } 1138 }
635} 1139}
636 1140
637/* When all references to the rsb are gone it's transferred to
638 the tossed list for later disposal. */
639
640static void put_rsb(struct dlm_rsb *r)
641{
642 struct dlm_ls *ls = r->res_ls;
643 uint32_t bucket = r->res_bucket;
644
645 spin_lock(&ls->ls_rsbtbl[bucket].lock);
646 kref_put(&r->res_ref, toss_rsb);
647 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
648}
649
650void dlm_put_rsb(struct dlm_rsb *r)
651{
652 put_rsb(r);
653}
654
655/* See comment for unhold_lkb */ 1141/* See comment for unhold_lkb */
656 1142
657static void unhold_rsb(struct dlm_rsb *r) 1143static void unhold_rsb(struct dlm_rsb *r)
@@ -1138,27 +1624,13 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1138 return error; 1624 return error;
1139} 1625}
1140 1626
1141static void dir_remove(struct dlm_rsb *r)
1142{
1143 int to_nodeid;
1144
1145 if (dlm_no_directory(r->res_ls))
1146 return;
1147
1148 to_nodeid = dlm_dir_nodeid(r);
1149 if (to_nodeid != dlm_our_nodeid())
1150 send_remove(r);
1151 else
1152 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1153 r->res_name, r->res_length);
1154}
1155
1156/* FIXME: make this more efficient */ 1627/* FIXME: make this more efficient */
1157 1628
1158static int shrink_bucket(struct dlm_ls *ls, int b) 1629static int shrink_bucket(struct dlm_ls *ls, int b)
1159{ 1630{
1160 struct rb_node *n; 1631 struct rb_node *n;
1161 struct dlm_rsb *r; 1632 struct dlm_rsb *r;
1633 int our_nodeid = dlm_our_nodeid();
1162 int count = 0, found; 1634 int count = 0, found;
1163 1635
1164 for (;;) { 1636 for (;;) {
@@ -1166,6 +1638,17 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
1166 spin_lock(&ls->ls_rsbtbl[b].lock); 1638 spin_lock(&ls->ls_rsbtbl[b].lock);
1167 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) { 1639 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
1168 r = rb_entry(n, struct dlm_rsb, res_hashnode); 1640 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1641
1642 /* If we're the directory record for this rsb, and
1643 we're not the master of it, then we need to wait
1644 for the master node to send us a dir remove for
1645 before removing the dir record. */
1646
1647 if (!dlm_no_directory(ls) && !is_master(r) &&
1648 (dlm_dir_nodeid(r) == our_nodeid)) {
1649 continue;
1650 }
1651
1169 if (!time_after_eq(jiffies, r->res_toss_time + 1652 if (!time_after_eq(jiffies, r->res_toss_time +
1170 dlm_config.ci_toss_secs * HZ)) 1653 dlm_config.ci_toss_secs * HZ))
1171 continue; 1654 continue;
@@ -1182,8 +1665,15 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
1182 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1665 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1183 spin_unlock(&ls->ls_rsbtbl[b].lock); 1666 spin_unlock(&ls->ls_rsbtbl[b].lock);
1184 1667
1185 if (is_master(r)) 1668 /* We're the master of this rsb but we're not
1186 dir_remove(r); 1669 the directory record, so we need to tell the
1670 dir node to remove the dir record. */
1671
1672 if (!dlm_no_directory(ls) && is_master(r) &&
1673 (dlm_dir_nodeid(r) != our_nodeid)) {
1674 send_remove(r);
1675 }
1676
1187 dlm_free_rsb(r); 1677 dlm_free_rsb(r);
1188 count++; 1678 count++;
1189 } else { 1679 } else {
@@ -2078,8 +2568,7 @@ static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2078 2568
2079static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) 2569static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2080{ 2570{
2081 struct dlm_ls *ls = r->res_ls; 2571 int our_nodeid = dlm_our_nodeid();
2082 int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
2083 2572
2084 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { 2573 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2085 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 2574 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
@@ -2093,53 +2582,35 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2093 return 1; 2582 return 1;
2094 } 2583 }
2095 2584
2096 if (r->res_nodeid == 0) { 2585 if (r->res_master_nodeid == our_nodeid) {
2097 lkb->lkb_nodeid = 0; 2586 lkb->lkb_nodeid = 0;
2098 return 0; 2587 return 0;
2099 } 2588 }
2100 2589
2101 if (r->res_nodeid > 0) { 2590 if (r->res_master_nodeid) {
2102 lkb->lkb_nodeid = r->res_nodeid; 2591 lkb->lkb_nodeid = r->res_master_nodeid;
2103 return 0; 2592 return 0;
2104 } 2593 }
2105 2594
2106 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r);); 2595 if (dlm_dir_nodeid(r) == our_nodeid) {
2107 2596 /* This is a somewhat unusual case; find_rsb will usually
2108 dir_nodeid = dlm_dir_nodeid(r); 2597 have set res_master_nodeid when dir nodeid is local, but
2109 2598 there are cases where we become the dir node after we've
2110 if (dir_nodeid != our_nodeid) { 2599 past find_rsb and go through _request_lock again.
2111 r->res_first_lkid = lkb->lkb_id; 2600 confirm_master() or process_lookup_list() needs to be
2112 send_lookup(r, lkb); 2601 called after this. */
2113 return 1; 2602 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2114 } 2603 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2115 2604 r->res_name);
2116 for (i = 0; i < 2; i++) { 2605 r->res_master_nodeid = our_nodeid;
2117 /* It's possible for dlm_scand to remove an old rsb for
2118 this same resource from the toss list, us to create
2119 a new one, look up the master locally, and find it
2120 already exists just before dlm_scand does the
2121 dir_remove() on the previous rsb. */
2122
2123 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2124 r->res_length, &ret_nodeid);
2125 if (!error)
2126 break;
2127 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2128 schedule();
2129 }
2130 if (error && error != -EEXIST)
2131 return error;
2132
2133 if (ret_nodeid == our_nodeid) {
2134 r->res_first_lkid = 0;
2135 r->res_nodeid = 0; 2606 r->res_nodeid = 0;
2136 lkb->lkb_nodeid = 0; 2607 lkb->lkb_nodeid = 0;
2137 } else { 2608 return 0;
2138 r->res_first_lkid = lkb->lkb_id;
2139 r->res_nodeid = ret_nodeid;
2140 lkb->lkb_nodeid = ret_nodeid;
2141 } 2609 }
2142 return 0; 2610
2611 r->res_first_lkid = lkb->lkb_id;
2612 send_lookup(r, lkb);
2613 return 1;
2143} 2614}
2144 2615
2145static void process_lookup_list(struct dlm_rsb *r) 2616static void process_lookup_list(struct dlm_rsb *r)
@@ -2584,7 +3055,7 @@ static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2584} 3055}
2585 3056
2586/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 3057/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2587 3058
2588static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3059static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2589{ 3060{
2590 int error; 3061 int error;
@@ -2708,11 +3179,11 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2708 3179
2709 error = validate_lock_args(ls, lkb, args); 3180 error = validate_lock_args(ls, lkb, args);
2710 if (error) 3181 if (error)
2711 goto out; 3182 return error;
2712 3183
2713 error = find_rsb(ls, name, len, R_CREATE, &r); 3184 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
2714 if (error) 3185 if (error)
2715 goto out; 3186 return error;
2716 3187
2717 lock_rsb(r); 3188 lock_rsb(r);
2718 3189
@@ -2723,8 +3194,6 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2723 3194
2724 unlock_rsb(r); 3195 unlock_rsb(r);
2725 put_rsb(r); 3196 put_rsb(r);
2726
2727 out:
2728 return error; 3197 return error;
2729} 3198}
2730 3199
@@ -3406,8 +3875,11 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3406{ 3875{
3407 struct dlm_lkb *lkb; 3876 struct dlm_lkb *lkb;
3408 struct dlm_rsb *r; 3877 struct dlm_rsb *r;
3878 int from_nodeid;
3409 int error, namelen; 3879 int error, namelen;
3410 3880
3881 from_nodeid = ms->m_header.h_nodeid;
3882
3411 error = create_lkb(ls, &lkb); 3883 error = create_lkb(ls, &lkb);
3412 if (error) 3884 if (error)
3413 goto fail; 3885 goto fail;
@@ -3420,9 +3892,16 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3420 goto fail; 3892 goto fail;
3421 } 3893 }
3422 3894
3895 /* The dir node is the authority on whether we are the master
3896 for this rsb or not, so if the master sends us a request, we should
3897 recreate the rsb if we've destroyed it. This race happens when we
3898 send a remove message to the dir node at the same time that the dir
3899 node sends us a request for the rsb. */
3900
3423 namelen = receive_extralen(ms); 3901 namelen = receive_extralen(ms);
3424 3902
3425 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r); 3903 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
3904 R_RECEIVE_REQUEST, &r);
3426 if (error) { 3905 if (error) {
3427 __put_lkb(ls, lkb); 3906 __put_lkb(ls, lkb);
3428 goto fail; 3907 goto fail;
@@ -3430,6 +3909,16 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3430 3909
3431 lock_rsb(r); 3910 lock_rsb(r);
3432 3911
3912 if (r->res_master_nodeid != dlm_our_nodeid()) {
3913 error = validate_master_nodeid(ls, r, from_nodeid);
3914 if (error) {
3915 unlock_rsb(r);
3916 put_rsb(r);
3917 __put_lkb(ls, lkb);
3918 goto fail;
3919 }
3920 }
3921
3433 attach_lkb(r, lkb); 3922 attach_lkb(r, lkb);
3434 error = do_request(r, lkb); 3923 error = do_request(r, lkb);
3435 send_request_reply(r, lkb, error); 3924 send_request_reply(r, lkb, error);
@@ -3445,6 +3934,23 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3445 return 0; 3934 return 0;
3446 3935
3447 fail: 3936 fail:
3937 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
3938 and do this receive_request again from process_lookup_list once
3939 we get the lookup reply. This would avoid a many repeated
3940 ENOTBLK request failures when the lookup reply designating us
3941 as master is delayed. */
3942
3943 /* We could repeatedly return -EBADR here if our send_remove() is
3944 delayed in being sent/arriving/being processed on the dir node.
3945 Another node would repeatedly lookup up the master, and the dir
3946 node would continue returning our nodeid until our send_remove
3947 took effect. */
3948
3949 if (error != -ENOTBLK) {
3950 log_limit(ls, "receive_request %x from %d %d",
3951 ms->m_lkid, from_nodeid, error);
3952 }
3953
3448 setup_stub_lkb(ls, ms); 3954 setup_stub_lkb(ls, ms);
3449 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 3955 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3450 return error; 3956 return error;
@@ -3651,49 +4157,110 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3651 4157
3652static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 4158static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3653{ 4159{
3654 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid; 4160 int len, error, ret_nodeid, from_nodeid, our_nodeid;
3655 4161
3656 from_nodeid = ms->m_header.h_nodeid; 4162 from_nodeid = ms->m_header.h_nodeid;
3657 our_nodeid = dlm_our_nodeid(); 4163 our_nodeid = dlm_our_nodeid();
3658 4164
3659 len = receive_extralen(ms); 4165 len = receive_extralen(ms);
3660 4166
3661 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); 4167 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
3662 if (dir_nodeid != our_nodeid) { 4168 &ret_nodeid, NULL);
3663 log_error(ls, "lookup dir_nodeid %d from %d",
3664 dir_nodeid, from_nodeid);
3665 error = -EINVAL;
3666 ret_nodeid = -1;
3667 goto out;
3668 }
3669
3670 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3671 4169
3672 /* Optimization: we're master so treat lookup as a request */ 4170 /* Optimization: we're master so treat lookup as a request */
3673 if (!error && ret_nodeid == our_nodeid) { 4171 if (!error && ret_nodeid == our_nodeid) {
3674 receive_request(ls, ms); 4172 receive_request(ls, ms);
3675 return; 4173 return;
3676 } 4174 }
3677 out:
3678 send_lookup_reply(ls, ms, ret_nodeid, error); 4175 send_lookup_reply(ls, ms, ret_nodeid, error);
3679} 4176}
3680 4177
3681static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) 4178static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3682{ 4179{
3683 int len, dir_nodeid, from_nodeid; 4180 char name[DLM_RESNAME_MAXLEN+1];
4181 struct dlm_rsb *r;
4182 uint32_t hash, b;
4183 int rv, len, dir_nodeid, from_nodeid;
3684 4184
3685 from_nodeid = ms->m_header.h_nodeid; 4185 from_nodeid = ms->m_header.h_nodeid;
3686 4186
3687 len = receive_extralen(ms); 4187 len = receive_extralen(ms);
3688 4188
4189 if (len > DLM_RESNAME_MAXLEN) {
4190 log_error(ls, "receive_remove from %d bad len %d",
4191 from_nodeid, len);
4192 return;
4193 }
4194
3689 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); 4195 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3690 if (dir_nodeid != dlm_our_nodeid()) { 4196 if (dir_nodeid != dlm_our_nodeid()) {
3691 log_error(ls, "remove dir entry dir_nodeid %d from %d", 4197 log_error(ls, "receive_remove from %d bad nodeid %d",
3692 dir_nodeid, from_nodeid); 4198 from_nodeid, dir_nodeid);
3693 return; 4199 return;
3694 } 4200 }
3695 4201
3696 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); 4202 /* Look for name on rsbtbl.toss, if it's there, kill it.
4203 If it's on rsbtbl.keep, it's being used, and we should ignore this
4204 message. This is an expected race between the dir node sending a
4205 request to the master node at the same time as the master node sends
4206 a remove to the dir node. The resolution to that race is for the
4207 dir node to ignore the remove message, and the master node to
4208 recreate the master rsb when it gets a request from the dir node for
4209 an rsb it doesn't have. */
4210
4211 memset(name, 0, sizeof(name));
4212 memcpy(name, ms->m_extra, len);
4213
4214 hash = jhash(name, len, 0);
4215 b = hash & (ls->ls_rsbtbl_size - 1);
4216
4217 spin_lock(&ls->ls_rsbtbl[b].lock);
4218
4219 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4220 if (rv) {
4221 /* verify the rsb is on keep list per comment above */
4222 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4223 if (rv) {
4224 /* should not happen */
4225 log_error(ls, "receive_remove from %d not found %s",
4226 from_nodeid, name);
4227 spin_unlock(&ls->ls_rsbtbl[b].lock);
4228 return;
4229 }
4230 if (r->res_master_nodeid != from_nodeid) {
4231 /* should not happen */
4232 log_error(ls, "receive_remove keep from %d master %d",
4233 from_nodeid, r->res_master_nodeid);
4234 dlm_print_rsb(r);
4235 spin_unlock(&ls->ls_rsbtbl[b].lock);
4236 return;
4237 }
4238
4239 log_debug(ls, "receive_remove from %d master %d first %x %s",
4240 from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4241 name);
4242 spin_unlock(&ls->ls_rsbtbl[b].lock);
4243 return;
4244 }
4245
4246 if (r->res_master_nodeid != from_nodeid) {
4247 log_error(ls, "receive_remove toss from %d master %d",
4248 from_nodeid, r->res_master_nodeid);
4249 dlm_print_rsb(r);
4250 spin_unlock(&ls->ls_rsbtbl[b].lock);
4251 return;
4252 }
4253
4254 if (kref_put(&r->res_ref, kill_rsb)) {
4255 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4256 spin_unlock(&ls->ls_rsbtbl[b].lock);
4257 dlm_free_rsb(r);
4258 } else {
4259 log_error(ls, "receive_remove from %d rsb ref error",
4260 from_nodeid);
4261 dlm_print_rsb(r);
4262 spin_unlock(&ls->ls_rsbtbl[b].lock);
4263 }
3697} 4264}
3698 4265
3699static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) 4266static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3706,6 +4273,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3706 struct dlm_lkb *lkb; 4273 struct dlm_lkb *lkb;
3707 struct dlm_rsb *r; 4274 struct dlm_rsb *r;
3708 int error, mstype, result; 4275 int error, mstype, result;
4276 int from_nodeid = ms->m_header.h_nodeid;
3709 4277
3710 error = find_lkb(ls, ms->m_remid, &lkb); 4278 error = find_lkb(ls, ms->m_remid, &lkb);
3711 if (error) 4279 if (error)
@@ -3723,8 +4291,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3723 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 4291 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3724 if (error) { 4292 if (error) {
3725 log_error(ls, "receive_request_reply %x remote %d %x result %d", 4293 log_error(ls, "receive_request_reply %x remote %d %x result %d",
3726 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid, 4294 lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
3727 ms->m_result);
3728 dlm_dump_rsb(r); 4295 dlm_dump_rsb(r);
3729 goto out; 4296 goto out;
3730 } 4297 }
@@ -3732,8 +4299,9 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3732 /* Optimization: the dir node was also the master, so it took our 4299 /* Optimization: the dir node was also the master, so it took our
3733 lookup as a request and sent request reply instead of lookup reply */ 4300 lookup as a request and sent request reply instead of lookup reply */
3734 if (mstype == DLM_MSG_LOOKUP) { 4301 if (mstype == DLM_MSG_LOOKUP) {
3735 r->res_nodeid = ms->m_header.h_nodeid; 4302 r->res_master_nodeid = from_nodeid;
3736 lkb->lkb_nodeid = r->res_nodeid; 4303 r->res_nodeid = from_nodeid;
4304 lkb->lkb_nodeid = from_nodeid;
3737 } 4305 }
3738 4306
3739 /* this is the value returned from do_request() on the master */ 4307 /* this is the value returned from do_request() on the master */
@@ -3767,18 +4335,30 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3767 case -EBADR: 4335 case -EBADR:
3768 case -ENOTBLK: 4336 case -ENOTBLK:
3769 /* find_rsb failed to find rsb or rsb wasn't master */ 4337 /* find_rsb failed to find rsb or rsb wasn't master */
3770 log_debug(ls, "receive_request_reply %x %x master diff %d %d", 4338 log_limit(ls, "receive_request_reply %x from %d %d "
3771 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result); 4339 "master %d dir %d first %x %s", lkb->lkb_id,
3772 r->res_nodeid = -1; 4340 from_nodeid, result, r->res_master_nodeid,
3773 lkb->lkb_nodeid = -1; 4341 r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4342
4343 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4344 r->res_master_nodeid != dlm_our_nodeid()) {
4345 /* cause _request_lock->set_master->send_lookup */
4346 r->res_master_nodeid = 0;
4347 r->res_nodeid = -1;
4348 lkb->lkb_nodeid = -1;
4349 }
3774 4350
3775 if (is_overlap(lkb)) { 4351 if (is_overlap(lkb)) {
3776 /* we'll ignore error in cancel/unlock reply */ 4352 /* we'll ignore error in cancel/unlock reply */
3777 queue_cast_overlap(r, lkb); 4353 queue_cast_overlap(r, lkb);
3778 confirm_master(r, result); 4354 confirm_master(r, result);
3779 unhold_lkb(lkb); /* undoes create_lkb() */ 4355 unhold_lkb(lkb); /* undoes create_lkb() */
3780 } else 4356 } else {
3781 _request_lock(r, lkb); 4357 _request_lock(r, lkb);
4358
4359 if (r->res_master_nodeid == dlm_our_nodeid())
4360 confirm_master(r, 0);
4361 }
3782 break; 4362 break;
3783 4363
3784 default: 4364 default:
@@ -3994,6 +4574,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3994 struct dlm_lkb *lkb; 4574 struct dlm_lkb *lkb;
3995 struct dlm_rsb *r; 4575 struct dlm_rsb *r;
3996 int error, ret_nodeid; 4576 int error, ret_nodeid;
4577 int do_lookup_list = 0;
3997 4578
3998 error = find_lkb(ls, ms->m_lkid, &lkb); 4579 error = find_lkb(ls, ms->m_lkid, &lkb);
3999 if (error) { 4580 if (error) {
@@ -4001,7 +4582,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4001 return; 4582 return;
4002 } 4583 }
4003 4584
4004 /* ms->m_result is the value returned by dlm_dir_lookup on dir node 4585 /* ms->m_result is the value returned by dlm_master_lookup on dir node
4005 FIXME: will a non-zero error ever be returned? */ 4586 FIXME: will a non-zero error ever be returned? */
4006 4587
4007 r = lkb->lkb_resource; 4588 r = lkb->lkb_resource;
@@ -4013,12 +4594,37 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4013 goto out; 4594 goto out;
4014 4595
4015 ret_nodeid = ms->m_nodeid; 4596 ret_nodeid = ms->m_nodeid;
4597
4598 /* We sometimes receive a request from the dir node for this
4599 rsb before we've received the dir node's loookup_reply for it.
4600 The request from the dir node implies we're the master, so we set
4601 ourself as master in receive_request_reply, and verify here that
4602 we are indeed the master. */
4603
4604 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4605 /* This should never happen */
4606 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4607 "master %d dir %d our %d first %x %s",
4608 lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4609 r->res_master_nodeid, r->res_dir_nodeid,
4610 dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4611 }
4612
4016 if (ret_nodeid == dlm_our_nodeid()) { 4613 if (ret_nodeid == dlm_our_nodeid()) {
4614 r->res_master_nodeid = ret_nodeid;
4017 r->res_nodeid = 0; 4615 r->res_nodeid = 0;
4018 ret_nodeid = 0; 4616 do_lookup_list = 1;
4019 r->res_first_lkid = 0; 4617 r->res_first_lkid = 0;
4618 } else if (ret_nodeid == -1) {
4619 /* the remote node doesn't believe it's the dir node */
4620 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4621 lkb->lkb_id, ms->m_header.h_nodeid);
4622 r->res_master_nodeid = 0;
4623 r->res_nodeid = -1;
4624 lkb->lkb_nodeid = -1;
4020 } else { 4625 } else {
4021 /* set_master() will copy res_nodeid to lkb_nodeid */ 4626 /* set_master() will set lkb_nodeid from r */
4627 r->res_master_nodeid = ret_nodeid;
4022 r->res_nodeid = ret_nodeid; 4628 r->res_nodeid = ret_nodeid;
4023 } 4629 }
4024 4630
@@ -4033,7 +4639,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4033 _request_lock(r, lkb); 4639 _request_lock(r, lkb);
4034 4640
4035 out_list: 4641 out_list:
4036 if (!ret_nodeid) 4642 if (do_lookup_list)
4037 process_lookup_list(r); 4643 process_lookup_list(r);
4038 out: 4644 out:
4039 unlock_rsb(r); 4645 unlock_rsb(r);
@@ -4047,7 +4653,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4047 int error = 0, noent = 0; 4653 int error = 0, noent = 0;
4048 4654
4049 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { 4655 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4050 log_debug(ls, "ignore non-member message %d from %d %x %x %d", 4656 log_limit(ls, "receive %d from non-member %d %x %x %d",
4051 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, 4657 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4052 ms->m_remid, ms->m_result); 4658 ms->m_remid, ms->m_result);
4053 return; 4659 return;
@@ -4174,6 +4780,15 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4174 int nodeid) 4780 int nodeid)
4175{ 4781{
4176 if (dlm_locking_stopped(ls)) { 4782 if (dlm_locking_stopped(ls)) {
4783 /* If we were a member of this lockspace, left, and rejoined,
4784 other nodes may still be sending us messages from the
4785 lockspace generation before we left. */
4786 if (!ls->ls_generation) {
4787 log_limit(ls, "receive %d from %d ignore old gen",
4788 ms->m_type, nodeid);
4789 return;
4790 }
4791
4177 dlm_add_requestqueue(ls, nodeid, ms); 4792 dlm_add_requestqueue(ls, nodeid, ms);
4178 } else { 4793 } else {
4179 dlm_wait_requestqueue(ls); 4794 dlm_wait_requestqueue(ls);
@@ -4798,6 +5413,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4798 struct dlm_rsb *r; 5413 struct dlm_rsb *r;
4799 struct dlm_lkb *lkb; 5414 struct dlm_lkb *lkb;
4800 uint32_t remid = 0; 5415 uint32_t remid = 0;
5416 int from_nodeid = rc->rc_header.h_nodeid;
4801 int error; 5417 int error;
4802 5418
4803 if (rl->rl_parent_lkid) { 5419 if (rl->rl_parent_lkid) {
@@ -4815,21 +5431,21 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4815 we make ourselves master, dlm_recover_masters() won't touch the 5431 we make ourselves master, dlm_recover_masters() won't touch the
4816 MSTCPY locks we've received early. */ 5432 MSTCPY locks we've received early. */
4817 5433
4818 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r); 5434 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5435 from_nodeid, R_RECEIVE_RECOVER, &r);
4819 if (error) 5436 if (error)
4820 goto out; 5437 goto out;
4821 5438
5439 lock_rsb(r);
5440
4822 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) { 5441 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
4823 log_error(ls, "dlm_recover_master_copy remote %d %x not dir", 5442 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
4824 rc->rc_header.h_nodeid, remid); 5443 from_nodeid, remid);
4825 error = -EBADR; 5444 error = -EBADR;
4826 put_rsb(r); 5445 goto out_unlock;
4827 goto out;
4828 } 5446 }
4829 5447
4830 lock_rsb(r); 5448 lkb = search_remid(r, from_nodeid, remid);
4831
4832 lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
4833 if (lkb) { 5449 if (lkb) {
4834 error = -EEXIST; 5450 error = -EEXIST;
4835 goto out_remid; 5451 goto out_remid;
@@ -4866,7 +5482,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4866 out: 5482 out:
4867 if (error && error != -EEXIST) 5483 if (error && error != -EEXIST)
4868 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d", 5484 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
4869 rc->rc_header.h_nodeid, remid, error); 5485 from_nodeid, remid, error);
4870 rl->rl_result = cpu_to_le32(error); 5486 rl->rl_result = cpu_to_le32(error);
4871 return error; 5487 return error;
4872} 5488}
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index c8b226c62807..5e0c72e36a9b 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -14,6 +14,7 @@
14#define __LOCK_DOT_H__ 14#define __LOCK_DOT_H__
15 15
16void dlm_dump_rsb(struct dlm_rsb *r); 16void dlm_dump_rsb(struct dlm_rsb *r);
17void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len);
17void dlm_print_lkb(struct dlm_lkb *lkb); 18void dlm_print_lkb(struct dlm_lkb *lkb);
18void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, 19void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
19 uint32_t saved_seq); 20 uint32_t saved_seq);
@@ -28,9 +29,11 @@ void dlm_unlock_recovery(struct dlm_ls *ls);
28void dlm_scan_waiters(struct dlm_ls *ls); 29void dlm_scan_waiters(struct dlm_ls *ls);
29void dlm_scan_timeout(struct dlm_ls *ls); 30void dlm_scan_timeout(struct dlm_ls *ls);
30void dlm_adjust_timeouts(struct dlm_ls *ls); 31void dlm_adjust_timeouts(struct dlm_ls *ls);
32int dlm_master_lookup(struct dlm_ls *ls, int nodeid, char *name, int len,
33 unsigned int flags, int *r_nodeid, int *result);
31 34
32int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, 35int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
33 unsigned int flags, struct dlm_rsb **r_ret); 36 struct dlm_rsb **r_ret);
34 37
35void dlm_recover_purge(struct dlm_ls *ls); 38void dlm_recover_purge(struct dlm_ls *ls);
36void dlm_purge_mstcpy_locks(struct dlm_rsb *r); 39void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index ca506abbdd3b..065bb75ed609 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -509,17 +509,6 @@ static int new_lockspace(const char *name, const char *cluster,
509 idr_init(&ls->ls_lkbidr); 509 idr_init(&ls->ls_lkbidr);
510 spin_lock_init(&ls->ls_lkbidr_spin); 510 spin_lock_init(&ls->ls_lkbidr_spin);
511 511
512 size = dlm_config.ci_dirtbl_size;
513 ls->ls_dirtbl_size = size;
514
515 ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
516 if (!ls->ls_dirtbl)
517 goto out_lkbfree;
518 for (i = 0; i < size; i++) {
519 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
520 spin_lock_init(&ls->ls_dirtbl[i].lock);
521 }
522
523 INIT_LIST_HEAD(&ls->ls_waiters); 512 INIT_LIST_HEAD(&ls->ls_waiters);
524 mutex_init(&ls->ls_waiters_mutex); 513 mutex_init(&ls->ls_waiters_mutex);
525 INIT_LIST_HEAD(&ls->ls_orphans); 514 INIT_LIST_HEAD(&ls->ls_orphans);
@@ -567,7 +556,7 @@ static int new_lockspace(const char *name, const char *cluster,
567 556
568 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); 557 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
569 if (!ls->ls_recover_buf) 558 if (!ls->ls_recover_buf)
570 goto out_dirfree; 559 goto out_lkbfree;
571 560
572 ls->ls_slot = 0; 561 ls->ls_slot = 0;
573 ls->ls_num_slots = 0; 562 ls->ls_num_slots = 0;
@@ -648,8 +637,6 @@ static int new_lockspace(const char *name, const char *cluster,
648 list_del(&ls->ls_list); 637 list_del(&ls->ls_list);
649 spin_unlock(&lslist_lock); 638 spin_unlock(&lslist_lock);
650 kfree(ls->ls_recover_buf); 639 kfree(ls->ls_recover_buf);
651 out_dirfree:
652 vfree(ls->ls_dirtbl);
653 out_lkbfree: 640 out_lkbfree:
654 idr_destroy(&ls->ls_lkbidr); 641 idr_destroy(&ls->ls_lkbidr);
655 vfree(ls->ls_rsbtbl); 642 vfree(ls->ls_rsbtbl);
@@ -779,13 +766,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
779 kfree(ls->ls_recover_buf); 766 kfree(ls->ls_recover_buf);
780 767
781 /* 768 /*
782 * Free direntry structs.
783 */
784
785 dlm_dir_clear(ls);
786 vfree(ls->ls_dirtbl);
787
788 /*
789 * Free all lkb's in idr 769 * Free all lkb's in idr
790 */ 770 */
791 771
@@ -826,7 +806,6 @@ static int release_lockspace(struct dlm_ls *ls, int force)
826 806
827 dlm_purge_requestqueue(ls); 807 dlm_purge_requestqueue(ls);
828 kfree(ls->ls_recover_args); 808 kfree(ls->ls_recover_args);
829 dlm_clear_free_entries(ls);
830 dlm_clear_members(ls); 809 dlm_clear_members(ls);
831 dlm_clear_members_gone(ls); 810 dlm_clear_members_gone(ls);
832 kfree(ls->ls_node_array); 811 kfree(ls->ls_node_array);
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 64d3e2b958c7..c8c298d81463 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -23,8 +23,6 @@
23#include "memory.h" 23#include "memory.h"
24#include "lock.h" 24#include "lock.h"
25#include "util.h" 25#include "util.h"
26#include "member.h"
27
28 26
29static int rcom_response(struct dlm_ls *ls) 27static int rcom_response(struct dlm_ls *ls)
30{ 28{
@@ -275,19 +273,9 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
275 struct dlm_rcom *rc; 273 struct dlm_rcom *rc;
276 struct dlm_mhandle *mh; 274 struct dlm_mhandle *mh;
277 int error = 0; 275 int error = 0;
278 int max_size = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
279 276
280 ls->ls_recover_nodeid = nodeid; 277 ls->ls_recover_nodeid = nodeid;
281 278
282 if (nodeid == dlm_our_nodeid()) {
283 ls->ls_recover_buf->rc_header.h_length =
284 dlm_config.ci_buffer_size;
285 dlm_copy_master_names(ls, last_name, last_len,
286 ls->ls_recover_buf->rc_buf,
287 max_size, nodeid);
288 goto out;
289 }
290
291 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh); 279 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
292 if (error) 280 if (error)
293 goto out; 281 goto out;
@@ -344,6 +332,25 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
344 return error; 332 return error;
345} 333}
346 334
335int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid)
336{
337 struct dlm_rcom *rc;
338 struct dlm_mhandle *mh;
339 struct dlm_ls *ls = r->res_ls;
340 int error;
341
342 error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length,
343 &rc, &mh);
344 if (error)
345 goto out;
346 memcpy(rc->rc_buf, r->res_name, r->res_length);
347 rc->rc_id = 0xFFFFFFFF;
348
349 send_rcom(ls, mh, rc);
350 out:
351 return error;
352}
353
347static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) 354static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
348{ 355{
349 struct dlm_rcom *rc; 356 struct dlm_rcom *rc;
@@ -355,7 +362,14 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
355 if (error) 362 if (error)
356 return; 363 return;
357 364
358 error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid); 365 if (rc_in->rc_id == 0xFFFFFFFF) {
366 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
367 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
368 return;
369 }
370
371 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
372 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
359 if (error) 373 if (error)
360 ret_nodeid = error; 374 ret_nodeid = error;
361 rc->rc_result = ret_nodeid; 375 rc->rc_result = ret_nodeid;
@@ -486,17 +500,76 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
486 return 0; 500 return 0;
487} 501}
488 502
503/*
504 * Ignore messages for stage Y before we set
505 * recover_status bit for stage X:
506 *
507 * recover_status = 0
508 *
509 * dlm_recover_members()
510 * - send nothing
511 * - recv nothing
512 * - ignore NAMES, NAMES_REPLY
513 * - ignore LOOKUP, LOOKUP_REPLY
514 * - ignore LOCK, LOCK_REPLY
515 *
516 * recover_status |= NODES
517 *
518 * dlm_recover_members_wait()
519 *
520 * dlm_recover_directory()
521 * - send NAMES
522 * - recv NAMES_REPLY
523 * - ignore LOOKUP, LOOKUP_REPLY
524 * - ignore LOCK, LOCK_REPLY
525 *
526 * recover_status |= DIR
527 *
528 * dlm_recover_directory_wait()
529 *
530 * dlm_recover_masters()
531 * - send LOOKUP
532 * - recv LOOKUP_REPLY
533 *
534 * dlm_recover_locks()
535 * - send LOCKS
536 * - recv LOCKS_REPLY
537 *
538 * recover_status |= LOCKS
539 *
540 * dlm_recover_locks_wait()
541 *
542 * recover_status |= DONE
543 */
544
489/* Called by dlm_recv; corresponds to dlm_receive_message() but special 545/* Called by dlm_recv; corresponds to dlm_receive_message() but special
490 recovery-only comms are sent through here. */ 546 recovery-only comms are sent through here. */
491 547
492void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 548void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
493{ 549{
494 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); 550 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
495 int stop, reply = 0, lock = 0; 551 int stop, reply = 0, names = 0, lookup = 0, lock = 0;
496 uint32_t status; 552 uint32_t status;
497 uint64_t seq; 553 uint64_t seq;
498 554
499 switch (rc->rc_type) { 555 switch (rc->rc_type) {
556 case DLM_RCOM_STATUS_REPLY:
557 reply = 1;
558 break;
559 case DLM_RCOM_NAMES:
560 names = 1;
561 break;
562 case DLM_RCOM_NAMES_REPLY:
563 names = 1;
564 reply = 1;
565 break;
566 case DLM_RCOM_LOOKUP:
567 lookup = 1;
568 break;
569 case DLM_RCOM_LOOKUP_REPLY:
570 lookup = 1;
571 reply = 1;
572 break;
500 case DLM_RCOM_LOCK: 573 case DLM_RCOM_LOCK:
501 lock = 1; 574 lock = 1;
502 break; 575 break;
@@ -504,10 +577,6 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
504 lock = 1; 577 lock = 1;
505 reply = 1; 578 reply = 1;
506 break; 579 break;
507 case DLM_RCOM_STATUS_REPLY:
508 case DLM_RCOM_NAMES_REPLY:
509 case DLM_RCOM_LOOKUP_REPLY:
510 reply = 1;
511 }; 580 };
512 581
513 spin_lock(&ls->ls_recover_lock); 582 spin_lock(&ls->ls_recover_lock);
@@ -516,19 +585,17 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
516 seq = ls->ls_recover_seq; 585 seq = ls->ls_recover_seq;
517 spin_unlock(&ls->ls_recover_lock); 586 spin_unlock(&ls->ls_recover_lock);
518 587
519 if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) || 588 if (stop && (rc->rc_type != DLM_RCOM_STATUS))
520 (reply && (rc->rc_seq_reply != seq)) || 589 goto ignore;
521 (lock && !(status & DLM_RS_DIR))) { 590
522 log_limit(ls, "dlm_receive_rcom ignore msg %d " 591 if (reply && (rc->rc_seq_reply != seq))
523 "from %d %llu %llu recover seq %llu sts %x gen %u", 592 goto ignore;
524 rc->rc_type, 593
525 nodeid, 594 if (!(status & DLM_RS_NODES) && (names || lookup || lock))
526 (unsigned long long)rc->rc_seq, 595 goto ignore;
527 (unsigned long long)rc->rc_seq_reply, 596
528 (unsigned long long)seq, 597 if (!(status & DLM_RS_DIR) && (lookup || lock))
529 status, ls->ls_generation); 598 goto ignore;
530 goto out;
531 }
532 599
533 switch (rc->rc_type) { 600 switch (rc->rc_type) {
534 case DLM_RCOM_STATUS: 601 case DLM_RCOM_STATUS:
@@ -570,10 +637,20 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
570 default: 637 default:
571 log_error(ls, "receive_rcom bad type %d", rc->rc_type); 638 log_error(ls, "receive_rcom bad type %d", rc->rc_type);
572 } 639 }
573out: 640 return;
641
642ignore:
643 log_limit(ls, "dlm_receive_rcom ignore msg %d "
644 "from %d %llu %llu recover seq %llu sts %x gen %u",
645 rc->rc_type,
646 nodeid,
647 (unsigned long long)rc->rc_seq,
648 (unsigned long long)rc->rc_seq_reply,
649 (unsigned long long)seq,
650 status, ls->ls_generation);
574 return; 651 return;
575Eshort: 652Eshort:
576 log_error(ls, "recovery message %x from %d is too short", 653 log_error(ls, "recovery message %d from %d is too short",
577 rc->rc_type, nodeid); 654 rc->rc_type, nodeid);
578} 655}
579 656
diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h
index 206723ab744d..f8e243463c15 100644
--- a/fs/dlm/rcom.h
+++ b/fs/dlm/rcom.h
@@ -17,6 +17,7 @@
17int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags); 17int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
18int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); 18int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
19int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); 19int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
20int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid);
20int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 21int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
21void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid); 22void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
22int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in); 23int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 7554e4dac6bb..3c025fe49ad3 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -361,9 +361,8 @@ static void set_master_lkbs(struct dlm_rsb *r)
361 * rsb's to consider. 361 * rsb's to consider.
362 */ 362 */
363 363
364static void set_new_master(struct dlm_rsb *r, int nodeid) 364static void set_new_master(struct dlm_rsb *r)
365{ 365{
366 r->res_nodeid = nodeid;
367 set_master_lkbs(r); 366 set_master_lkbs(r);
368 rsb_set_flag(r, RSB_NEW_MASTER); 367 rsb_set_flag(r, RSB_NEW_MASTER);
369 rsb_set_flag(r, RSB_NEW_MASTER2); 368 rsb_set_flag(r, RSB_NEW_MASTER2);
@@ -372,31 +371,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
372/* 371/*
373 * We do async lookups on rsb's that need new masters. The rsb's 372 * We do async lookups on rsb's that need new masters. The rsb's
374 * waiting for a lookup reply are kept on the recover_list. 373 * waiting for a lookup reply are kept on the recover_list.
374 *
375 * Another node recovering the master may have sent us a rcom lookup,
376 * and our dlm_master_lookup() set it as the new master, along with
377 * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
378 * equals our_nodeid below).
375 */ 379 */
376 380
377static int recover_master(struct dlm_rsb *r) 381static int recover_master(struct dlm_rsb *r, unsigned int *count)
378{ 382{
379 struct dlm_ls *ls = r->res_ls; 383 struct dlm_ls *ls = r->res_ls;
380 int error, ret_nodeid; 384 int our_nodeid, dir_nodeid;
381 int our_nodeid = dlm_our_nodeid(); 385 int is_removed = 0;
382 int dir_nodeid = dlm_dir_nodeid(r); 386 int error;
387
388 if (is_master(r))
389 return 0;
390
391 is_removed = dlm_is_removed(ls, r->res_nodeid);
392
393 if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
394 return 0;
395
396 our_nodeid = dlm_our_nodeid();
397 dir_nodeid = dlm_dir_nodeid(r);
383 398
384 if (dir_nodeid == our_nodeid) { 399 if (dir_nodeid == our_nodeid) {
385 error = dlm_dir_lookup(ls, our_nodeid, r->res_name, 400 if (is_removed) {
386 r->res_length, &ret_nodeid); 401 r->res_master_nodeid = our_nodeid;
387 if (error) 402 r->res_nodeid = 0;
388 log_error(ls, "recover dir lookup error %d", error); 403 }
389 404
390 if (ret_nodeid == our_nodeid) 405 /* set master of lkbs to ourself when is_removed, or to
391 ret_nodeid = 0; 406 another new master which we set along with NEW_MASTER
392 lock_rsb(r); 407 in dlm_master_lookup */
393 set_new_master(r, ret_nodeid); 408 set_new_master(r);
394 unlock_rsb(r); 409 error = 0;
395 } else { 410 } else {
396 recover_list_add(r); 411 recover_list_add(r);
397 error = dlm_send_rcom_lookup(r, dir_nodeid); 412 error = dlm_send_rcom_lookup(r, dir_nodeid);
398 } 413 }
399 414
415 (*count)++;
400 return error; 416 return error;
401} 417}
402 418
@@ -415,7 +431,7 @@ static int recover_master(struct dlm_rsb *r)
415 * resent. 431 * resent.
416 */ 432 */
417 433
418static int recover_master_static(struct dlm_rsb *r) 434static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
419{ 435{
420 int dir_nodeid = dlm_dir_nodeid(r); 436 int dir_nodeid = dlm_dir_nodeid(r);
421 int new_master = dir_nodeid; 437 int new_master = dir_nodeid;
@@ -423,11 +439,12 @@ static int recover_master_static(struct dlm_rsb *r)
423 if (dir_nodeid == dlm_our_nodeid()) 439 if (dir_nodeid == dlm_our_nodeid())
424 new_master = 0; 440 new_master = 0;
425 441
426 lock_rsb(r);
427 dlm_purge_mstcpy_locks(r); 442 dlm_purge_mstcpy_locks(r);
428 set_new_master(r, new_master); 443 r->res_master_nodeid = dir_nodeid;
429 unlock_rsb(r); 444 r->res_nodeid = new_master;
430 return 1; 445 set_new_master(r);
446 (*count)++;
447 return 0;
431} 448}
432 449
433/* 450/*
@@ -443,7 +460,10 @@ static int recover_master_static(struct dlm_rsb *r)
443int dlm_recover_masters(struct dlm_ls *ls) 460int dlm_recover_masters(struct dlm_ls *ls)
444{ 461{
445 struct dlm_rsb *r; 462 struct dlm_rsb *r;
446 int error = 0, count = 0; 463 unsigned int total = 0;
464 unsigned int count = 0;
465 int nodir = dlm_no_directory(ls);
466 int error;
447 467
448 log_debug(ls, "dlm_recover_masters"); 468 log_debug(ls, "dlm_recover_masters");
449 469
@@ -455,20 +475,23 @@ int dlm_recover_masters(struct dlm_ls *ls)
455 goto out; 475 goto out;
456 } 476 }
457 477
458 if (dlm_no_directory(ls)) 478 lock_rsb(r);
459 count += recover_master_static(r); 479 if (nodir)
460 else if (!is_master(r) && 480 error = recover_master_static(r, &count);
461 (dlm_is_removed(ls, r->res_nodeid) || 481 else
462 rsb_flag(r, RSB_NEW_MASTER))) { 482 error = recover_master(r, &count);
463 recover_master(r); 483 unlock_rsb(r);
464 count++; 484 cond_resched();
465 } 485 total++;
466 486
467 schedule(); 487 if (error) {
488 up_read(&ls->ls_root_sem);
489 goto out;
490 }
468 } 491 }
469 up_read(&ls->ls_root_sem); 492 up_read(&ls->ls_root_sem);
470 493
471 log_debug(ls, "dlm_recover_masters %d resources", count); 494 log_debug(ls, "dlm_recover_masters %u of %u", count, total);
472 495
473 error = dlm_wait_function(ls, &recover_list_empty); 496 error = dlm_wait_function(ls, &recover_list_empty);
474 out: 497 out:
@@ -480,7 +503,7 @@ int dlm_recover_masters(struct dlm_ls *ls)
480int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) 503int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
481{ 504{
482 struct dlm_rsb *r; 505 struct dlm_rsb *r;
483 int nodeid; 506 int ret_nodeid, new_master;
484 507
485 r = recover_list_find(ls, rc->rc_id); 508 r = recover_list_find(ls, rc->rc_id);
486 if (!r) { 509 if (!r) {
@@ -489,12 +512,17 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
489 goto out; 512 goto out;
490 } 513 }
491 514
492 nodeid = rc->rc_result; 515 ret_nodeid = rc->rc_result;
493 if (nodeid == dlm_our_nodeid()) 516
494 nodeid = 0; 517 if (ret_nodeid == dlm_our_nodeid())
518 new_master = 0;
519 else
520 new_master = ret_nodeid;
495 521
496 lock_rsb(r); 522 lock_rsb(r);
497 set_new_master(r, nodeid); 523 r->res_master_nodeid = ret_nodeid;
524 r->res_nodeid = new_master;
525 set_new_master(r);
498 unlock_rsb(r); 526 unlock_rsb(r);
499 recover_list_del(r); 527 recover_list_del(r);
500 528
@@ -791,20 +819,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
791 dlm_hold_rsb(r); 819 dlm_hold_rsb(r);
792 } 820 }
793 821
794 /* If we're using a directory, add tossed rsbs to the root 822 if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
795 list; they'll have entries created in the new directory, 823 log_error(ls, "dlm_create_root_list toss not empty");
796 but no other recovery steps should do anything with them. */
797
798 if (dlm_no_directory(ls)) {
799 spin_unlock(&ls->ls_rsbtbl[i].lock);
800 continue;
801 }
802
803 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
804 r = rb_entry(n, struct dlm_rsb, res_hashnode);
805 list_add(&r->res_root_list, &ls->ls_root_list);
806 dlm_hold_rsb(r);
807 }
808 spin_unlock(&ls->ls_rsbtbl[i].lock); 824 spin_unlock(&ls->ls_rsbtbl[i].lock);
809 } 825 }
810 out: 826 out:
@@ -824,28 +840,26 @@ void dlm_release_root_list(struct dlm_ls *ls)
824 up_write(&ls->ls_root_sem); 840 up_write(&ls->ls_root_sem);
825} 841}
826 842
827/* If not using a directory, clear the entire toss list, there's no benefit to 843void dlm_clear_toss(struct dlm_ls *ls)
828 caching the master value since it's fixed. If we are using a dir, keep the
829 rsb's we're the master of. Recovery will add them to the root list and from
830 there they'll be entered in the rebuilt directory. */
831
832void dlm_clear_toss_list(struct dlm_ls *ls)
833{ 844{
834 struct rb_node *n, *next; 845 struct rb_node *n, *next;
835 struct dlm_rsb *rsb; 846 struct dlm_rsb *r;
847 unsigned int count = 0;
836 int i; 848 int i;
837 849
838 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 850 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
839 spin_lock(&ls->ls_rsbtbl[i].lock); 851 spin_lock(&ls->ls_rsbtbl[i].lock);
840 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) { 852 for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
841 next = rb_next(n);; 853 next = rb_next(n);
842 rsb = rb_entry(n, struct dlm_rsb, res_hashnode); 854 r = rb_entry(n, struct dlm_rsb, res_hashnode);
843 if (dlm_no_directory(ls) || !is_master(rsb)) { 855 rb_erase(n, &ls->ls_rsbtbl[i].toss);
844 rb_erase(n, &ls->ls_rsbtbl[i].toss); 856 dlm_free_rsb(r);
845 dlm_free_rsb(rsb); 857 count++;
846 }
847 } 858 }
848 spin_unlock(&ls->ls_rsbtbl[i].lock); 859 spin_unlock(&ls->ls_rsbtbl[i].lock);
849 } 860 }
861
862 if (count)
863 log_debug(ls, "dlm_clear_toss %u done", count);
850} 864}
851 865
diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h
index ebd0363f1e08..d8c8738c70eb 100644
--- a/fs/dlm/recover.h
+++ b/fs/dlm/recover.h
@@ -27,7 +27,7 @@ int dlm_recover_locks(struct dlm_ls *ls);
27void dlm_recovered_lock(struct dlm_rsb *r); 27void dlm_recovered_lock(struct dlm_rsb *r);
28int dlm_create_root_list(struct dlm_ls *ls); 28int dlm_create_root_list(struct dlm_ls *ls);
29void dlm_release_root_list(struct dlm_ls *ls); 29void dlm_release_root_list(struct dlm_ls *ls);
30void dlm_clear_toss_list(struct dlm_ls *ls); 30void dlm_clear_toss(struct dlm_ls *ls);
31void dlm_recover_rsbs(struct dlm_ls *ls); 31void dlm_recover_rsbs(struct dlm_ls *ls);
32 32
33#endif /* __RECOVER_DOT_H__ */ 33#endif /* __RECOVER_DOT_H__ */
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index f1a9073c0835..88ce65ff021e 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -60,12 +60,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
60 60
61 dlm_callback_suspend(ls); 61 dlm_callback_suspend(ls);
62 62
63 /* 63 dlm_clear_toss(ls);
64 * Free non-master tossed rsb's. Master rsb's are kept on toss
65 * list and put on root list to be included in resdir recovery.
66 */
67
68 dlm_clear_toss_list(ls);
69 64
70 /* 65 /*
71 * This list of root rsb's will be the basis of most of the recovery 66 * This list of root rsb's will be the basis of most of the recovery
@@ -84,6 +79,10 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
84 goto fail; 79 goto fail;
85 } 80 }
86 81
82 dlm_recover_dir_nodeid(ls);
83
84 ls->ls_recover_dir_sent_res = 0;
85 ls->ls_recover_dir_sent_msg = 0;
87 ls->ls_recover_locks_in = 0; 86 ls->ls_recover_locks_in = 0;
88 87
89 dlm_set_recover_status(ls, DLM_RS_NODES); 88 dlm_set_recover_status(ls, DLM_RS_NODES);
@@ -115,6 +114,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
115 goto fail; 114 goto fail;
116 } 115 }
117 116
117 log_debug(ls, "dlm_recover_directory %u out %u messages",
118 ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
119
118 /* 120 /*
119 * We may have outstanding operations that are waiting for a reply from 121 * We may have outstanding operations that are waiting for a reply from
120 * a failed node. Mark these to be resent after recovery. Unlock and 122 * a failed node. Mark these to be resent after recovery. Unlock and